diff mbox

[v11,2/4] block: add I/O throttling algorithm

Message ID 1320213705-8097-3-git-send-email-wuzhy@linux.vnet.ibm.com
State New
Headers show

Commit Message

Zhi Yong Wu Nov. 2, 2011, 6:01 a.m. UTC
Signed-off-by: Zhi Yong Wu <wuzhy@linux.vnet.ibm.com>
Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
 block.c               |  233 +++++++++++++++++++++++++++++++++++++++++++++++--
 block.h               |    1 +
 block_int.h           |    1 +
 qemu-coroutine-lock.c |    8 ++
 qemu-coroutine.h      |    6 ++
 5 files changed, 242 insertions(+), 7 deletions(-)

Comments

Kevin Wolf Nov. 2, 2011, 11:51 a.m. UTC | #1
Am 02.11.2011 07:01, schrieb Zhi Yong Wu:
> Signed-off-by: Zhi Yong Wu <wuzhy@linux.vnet.ibm.com>
> Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
> ---
>  block.c               |  233 +++++++++++++++++++++++++++++++++++++++++++++++--
>  block.h               |    1 +
>  block_int.h           |    1 +
>  qemu-coroutine-lock.c |    8 ++
>  qemu-coroutine.h      |    6 ++
>  5 files changed, 242 insertions(+), 7 deletions(-)
> 
> diff --git a/block.c b/block.c
> index c70f86d..440e889 100644
> --- a/block.c
> +++ b/block.c
> @@ -74,6 +74,13 @@ static BlockDriverAIOCB *bdrv_co_aio_rw_vector(BlockDriverState *bs,
>                                                 bool is_write);
>  static void coroutine_fn bdrv_co_do_rw(void *opaque);
>  
> +static bool bdrv_exceed_bps_limits(BlockDriverState *bs, int nb_sectors,
> +        bool is_write, double elapsed_time, uint64_t *wait);
> +static bool bdrv_exceed_iops_limits(BlockDriverState *bs, bool is_write,
> +        double elapsed_time, uint64_t *wait);
> +static bool bdrv_exceed_io_limits(BlockDriverState *bs, int nb_sectors,
> +        bool is_write, int64_t *wait);
> +
>  static QTAILQ_HEAD(, BlockDriverState) bdrv_states =
>      QTAILQ_HEAD_INITIALIZER(bdrv_states);
>  
> @@ -107,6 +114,28 @@ int is_windows_drive(const char *filename)
>  #endif
>  
>  /* throttling disk I/O limits */
> +void bdrv_io_limits_disable(BlockDriverState *bs)
> +{
> +    bs->io_limits_enabled = false;
> +
> +    if (!qemu_co_queue_empty(&bs->throttled_reqs)) {

This if is unnecessary, you can just always run the while loop.

> +        while (qemu_co_queue_next(&bs->throttled_reqs));
> +    }
> +
> +    qemu_co_queue_init(&bs->throttled_reqs);

Why?

> +
> +    if (bs->block_timer) {
> +        qemu_del_timer(bs->block_timer);
> +        qemu_free_timer(bs->block_timer);
> +        bs->block_timer = NULL;
> +    }
> +
> +    bs->slice_start = 0;
> +    bs->slice_end   = 0;
> +    bs->slice_time  = 0;
> +    memset(&bs->io_disps, 0, sizeof(bs->io_disps));
> +}
> +
>  static void bdrv_block_timer(void *opaque)
>  {
>      BlockDriverState *bs = opaque;
> @@ -116,14 +145,13 @@ static void bdrv_block_timer(void *opaque)
>  
>  void bdrv_io_limits_enable(BlockDriverState *bs)
>  {
> -    bs->io_limits_enabled = true;
>      qemu_co_queue_init(&bs->throttled_reqs);
> -
> -    bs->block_timer   = qemu_new_timer_ns(vm_clock, bdrv_block_timer, bs);
> -    bs->slice_time    = 5 * BLOCK_IO_SLICE_TIME;
> -    bs->slice_start   = qemu_get_clock_ns(vm_clock);
> -    bs->slice_end     = bs->slice_start + bs->slice_time;
> +    bs->block_timer = qemu_new_timer_ns(vm_clock, bdrv_block_timer, bs);
> +    bs->slice_time  = 5 * BLOCK_IO_SLICE_TIME;
> +    bs->slice_start = qemu_get_clock_ns(vm_clock);
> +    bs->slice_end   = bs->slice_start + bs->slice_time;

You're only changing whitespace here, right? If so, can you please use
the final version in patch 1?

>      memset(&bs->io_disps, 0, sizeof(bs->io_disps));
> +    bs->io_limits_enabled = true;
>  }
>  
>  bool bdrv_io_limits_enabled(BlockDriverState *bs)
> @@ -137,6 +165,30 @@ bool bdrv_io_limits_enabled(BlockDriverState *bs)
>           || io_limits->iops[BLOCK_IO_LIMIT_TOTAL];
>  }
>  
> +static void bdrv_io_limits_intercept(BlockDriverState *bs,
> +                                     bool is_write, int nb_sectors)
> +{
> +    int64_t wait_time = -1;
> +
> +    if (!qemu_co_queue_empty(&bs->throttled_reqs)) {
> +        qemu_co_queue_wait(&bs->throttled_reqs);
> +        goto resume;
> +    } else if (bdrv_exceed_io_limits(bs, nb_sectors, is_write, &wait_time)) {
> +        qemu_mod_timer(bs->block_timer,
> +                       wait_time + qemu_get_clock_ns(vm_clock));
> +        qemu_co_queue_wait(&bs->throttled_reqs);
> +
> +resume:

This goto construct isn't very nice. You could have avoided it with an
"else return;"

But anyway, why do you even need the else if? Can't you directly use the
while loop? The difference would be that qemu_co_queue_next() is run
even if a request is allowed without going through the queue, but would
that hurt?


> +        while (bdrv_exceed_io_limits(bs, nb_sectors, is_write, &wait_time)) {
> +            qemu_mod_timer(bs->block_timer,
> +                           wait_time + qemu_get_clock_ns(vm_clock));
> +            qemu_co_queue_wait_insert_head(&bs->throttled_reqs);

Why do you want to insert at the head? Wouldn't a queue be more
appropriate than a stack?

> +        }
> +
> +        qemu_co_queue_next(&bs->throttled_reqs);
> +    }
> +}
> +
>  /* check if the path starts with "<protocol>:" */
>  static int path_has_protocol(const char *path)
>  {

Kevin
Zhiyong Wu Nov. 3, 2011, 3:18 a.m. UTC | #2
On Wed, Nov 2, 2011 at 7:51 PM, Kevin Wolf <kwolf@redhat.com> wrote:
> Am 02.11.2011 07:01, schrieb Zhi Yong Wu:
>> Signed-off-by: Zhi Yong Wu <wuzhy@linux.vnet.ibm.com>
>> Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
>> ---
>>  block.c               |  233 +++++++++++++++++++++++++++++++++++++++++++++++--
>>  block.h               |    1 +
>>  block_int.h           |    1 +
>>  qemu-coroutine-lock.c |    8 ++
>>  qemu-coroutine.h      |    6 ++
>>  5 files changed, 242 insertions(+), 7 deletions(-)
>>
>> diff --git a/block.c b/block.c
>> index c70f86d..440e889 100644
>> --- a/block.c
>> +++ b/block.c
>> @@ -74,6 +74,13 @@ static BlockDriverAIOCB *bdrv_co_aio_rw_vector(BlockDriverState *bs,
>>                                                 bool is_write);
>>  static void coroutine_fn bdrv_co_do_rw(void *opaque);
>>
>> +static bool bdrv_exceed_bps_limits(BlockDriverState *bs, int nb_sectors,
>> +        bool is_write, double elapsed_time, uint64_t *wait);
>> +static bool bdrv_exceed_iops_limits(BlockDriverState *bs, bool is_write,
>> +        double elapsed_time, uint64_t *wait);
>> +static bool bdrv_exceed_io_limits(BlockDriverState *bs, int nb_sectors,
>> +        bool is_write, int64_t *wait);
>> +
>>  static QTAILQ_HEAD(, BlockDriverState) bdrv_states =
>>      QTAILQ_HEAD_INITIALIZER(bdrv_states);
>>
>> @@ -107,6 +114,28 @@ int is_windows_drive(const char *filename)
>>  #endif
>>
>>  /* throttling disk I/O limits */
>> +void bdrv_io_limits_disable(BlockDriverState *bs)
>> +{
>> +    bs->io_limits_enabled = false;
>> +
>> +    if (!qemu_co_queue_empty(&bs->throttled_reqs)) {
>
> This if is unnecessary, you can just always run the while loop.
Good catch. Removed.
>
>> +        while (qemu_co_queue_next(&bs->throttled_reqs));
>> +    }
>> +
>> +    qemu_co_queue_init(&bs->throttled_reqs);
>
> Why?
Removed.
>
>> +
>> +    if (bs->block_timer) {
>> +        qemu_del_timer(bs->block_timer);
>> +        qemu_free_timer(bs->block_timer);
>> +        bs->block_timer = NULL;
>> +    }
>> +
>> +    bs->slice_start = 0;
>> +    bs->slice_end   = 0;
>> +    bs->slice_time  = 0;
>> +    memset(&bs->io_disps, 0, sizeof(bs->io_disps));
>> +}
>> +
>>  static void bdrv_block_timer(void *opaque)
>>  {
>>      BlockDriverState *bs = opaque;
>> @@ -116,14 +145,13 @@ static void bdrv_block_timer(void *opaque)
>>
>>  void bdrv_io_limits_enable(BlockDriverState *bs)
>>  {
>> -    bs->io_limits_enabled = true;
>>      qemu_co_queue_init(&bs->throttled_reqs);
>> -
>> -    bs->block_timer   = qemu_new_timer_ns(vm_clock, bdrv_block_timer, bs);
>> -    bs->slice_time    = 5 * BLOCK_IO_SLICE_TIME;
>> -    bs->slice_start   = qemu_get_clock_ns(vm_clock);
>> -    bs->slice_end     = bs->slice_start + bs->slice_time;
>> +    bs->block_timer = qemu_new_timer_ns(vm_clock, bdrv_block_timer, bs);
>> +    bs->slice_time  = 5 * BLOCK_IO_SLICE_TIME;
>> +    bs->slice_start = qemu_get_clock_ns(vm_clock);
>> +    bs->slice_end   = bs->slice_start + bs->slice_time;
>
> You're only changing whitespace here, right? If so, can you please use
> the final version in patch 1?
OK
>
>>      memset(&bs->io_disps, 0, sizeof(bs->io_disps));
>> +    bs->io_limits_enabled = true;
>>  }
>>
>>  bool bdrv_io_limits_enabled(BlockDriverState *bs)
>> @@ -137,6 +165,30 @@ bool bdrv_io_limits_enabled(BlockDriverState *bs)
>>           || io_limits->iops[BLOCK_IO_LIMIT_TOTAL];
>>  }
>>
>> +static void bdrv_io_limits_intercept(BlockDriverState *bs,
>> +                                     bool is_write, int nb_sectors)
>> +{
>> +    int64_t wait_time = -1;
>> +
>> +    if (!qemu_co_queue_empty(&bs->throttled_reqs)) {
>> +        qemu_co_queue_wait(&bs->throttled_reqs);
>> +        goto resume;
>> +    } else if (bdrv_exceed_io_limits(bs, nb_sectors, is_write, &wait_time)) {
>> +        qemu_mod_timer(bs->block_timer,
>> +                       wait_time + qemu_get_clock_ns(vm_clock));
>> +        qemu_co_queue_wait(&bs->throttled_reqs);
>> +
>> +resume:
>
> This goto construct isn't very nice. You could have avoided it with an
> "else return;"
else return? no, it can not be returned shortly, i think.
>
> But anyway, why do you even need the else if? Can't you directly use the
> while loop? The difference would be that qemu_co_queue_next() is run
> even if a request is allowed without going through the queue, but would
> that hurt?
Great point. thanks.
>
>
>> +        while (bdrv_exceed_io_limits(bs, nb_sectors, is_write, &wait_time)) {
>> +            qemu_mod_timer(bs->block_timer,
>> +                           wait_time + qemu_get_clock_ns(vm_clock));
>> +            qemu_co_queue_wait_insert_head(&bs->throttled_reqs);
>
> Why do you want to insert at the head? Wouldn't a queue be more
In fact, we hope to keep each request's timing, in FIFO mode. The next
throttled requests will not be dequeued until the current request is
allowed to be serviced. So if the current request still exceeds the
limits, it will be inserted to the head. All requests followed it will
be still in throttled_reqs queue.

> appropriate than a stack?
>
>> +        }
>> +
>> +        qemu_co_queue_next(&bs->throttled_reqs);
>> +    }
>> +}
>> +
>>  /* check if the path starts with "<protocol>:" */
>>  static int path_has_protocol(const char *path)
>>  {
>
> Kevin
>
Zhiyong Wu Nov. 3, 2011, 8:03 a.m. UTC | #3
On Thu, Nov 3, 2011 at 4:03 PM, Kevin Wolf <kwolf@redhat.com> wrote:
> Am 03.11.2011 04:18, schrieb Zhi Yong Wu:
>> On Wed, Nov 2, 2011 at 7:51 PM, Kevin Wolf <kwolf@redhat.com> wrote:
>>> Am 02.11.2011 07:01, schrieb Zhi Yong Wu:
>>>> Signed-off-by: Zhi Yong Wu <wuzhy@linux.vnet.ibm.com>
>>>> Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
>>>> ---
>>>>  block.c               |  233 +++++++++++++++++++++++++++++++++++++++++++++++--
>>>>  block.h               |    1 +
>>>>  block_int.h           |    1 +
>>>>  qemu-coroutine-lock.c |    8 ++
>>>>  qemu-coroutine.h      |    6 ++
>>>>  5 files changed, 242 insertions(+), 7 deletions(-)
>>>>
>>>> diff --git a/block.c b/block.c
>>>> index c70f86d..440e889 100644
>>>> --- a/block.c
>>>> +++ b/block.c
>>>> @@ -74,6 +74,13 @@ static BlockDriverAIOCB *bdrv_co_aio_rw_vector(BlockDriverState *bs,
>>>>                                                 bool is_write);
>>>>  static void coroutine_fn bdrv_co_do_rw(void *opaque);
>>>>
>>>> +static bool bdrv_exceed_bps_limits(BlockDriverState *bs, int nb_sectors,
>>>> +        bool is_write, double elapsed_time, uint64_t *wait);
>>>> +static bool bdrv_exceed_iops_limits(BlockDriverState *bs, bool is_write,
>>>> +        double elapsed_time, uint64_t *wait);
>>>> +static bool bdrv_exceed_io_limits(BlockDriverState *bs, int nb_sectors,
>>>> +        bool is_write, int64_t *wait);
>>>> +
>>>>  static QTAILQ_HEAD(, BlockDriverState) bdrv_states =
>>>>      QTAILQ_HEAD_INITIALIZER(bdrv_states);
>>>>
>>>> @@ -107,6 +114,28 @@ int is_windows_drive(const char *filename)
>>>>  #endif
>>>>
>>>>  /* throttling disk I/O limits */
>>>> +void bdrv_io_limits_disable(BlockDriverState *bs)
>>>> +{
>>>> +    bs->io_limits_enabled = false;
>>>> +
>>>> +    if (!qemu_co_queue_empty(&bs->throttled_reqs)) {
>>>
>>> This if is unnecessary, you can just always run the while loop.
>> Good catch. Removed.
>>>
>>>> +        while (qemu_co_queue_next(&bs->throttled_reqs));
>>>> +    }
>>>> +
>>>> +    qemu_co_queue_init(&bs->throttled_reqs);
>>>
>>> Why?
>> Removed.
>>>
>>>> +
>>>> +    if (bs->block_timer) {
>>>> +        qemu_del_timer(bs->block_timer);
>>>> +        qemu_free_timer(bs->block_timer);
>>>> +        bs->block_timer = NULL;
>>>> +    }
>>>> +
>>>> +    bs->slice_start = 0;
>>>> +    bs->slice_end   = 0;
>>>> +    bs->slice_time  = 0;
>>>> +    memset(&bs->io_disps, 0, sizeof(bs->io_disps));
>>>> +}
>>>> +
>>>>  static void bdrv_block_timer(void *opaque)
>>>>  {
>>>>      BlockDriverState *bs = opaque;
>>>> @@ -116,14 +145,13 @@ static void bdrv_block_timer(void *opaque)
>>>>
>>>>  void bdrv_io_limits_enable(BlockDriverState *bs)
>>>>  {
>>>> -    bs->io_limits_enabled = true;
>>>>      qemu_co_queue_init(&bs->throttled_reqs);
>>>> -
>>>> -    bs->block_timer   = qemu_new_timer_ns(vm_clock, bdrv_block_timer, bs);
>>>> -    bs->slice_time    = 5 * BLOCK_IO_SLICE_TIME;
>>>> -    bs->slice_start   = qemu_get_clock_ns(vm_clock);
>>>> -    bs->slice_end     = bs->slice_start + bs->slice_time;
>>>> +    bs->block_timer = qemu_new_timer_ns(vm_clock, bdrv_block_timer, bs);
>>>> +    bs->slice_time  = 5 * BLOCK_IO_SLICE_TIME;
>>>> +    bs->slice_start = qemu_get_clock_ns(vm_clock);
>>>> +    bs->slice_end   = bs->slice_start + bs->slice_time;
>>>
>>> You're only changing whitespace here, right? If so, can you please use
>>> the final version in patch 1?
>> OK
>>>
>>>>      memset(&bs->io_disps, 0, sizeof(bs->io_disps));
>>>> +    bs->io_limits_enabled = true;
>>>>  }
>>>>
>>>>  bool bdrv_io_limits_enabled(BlockDriverState *bs)
>>>> @@ -137,6 +165,30 @@ bool bdrv_io_limits_enabled(BlockDriverState *bs)
>>>>           || io_limits->iops[BLOCK_IO_LIMIT_TOTAL];
>>>>  }
>>>>
>>>> +static void bdrv_io_limits_intercept(BlockDriverState *bs,
>>>> +                                     bool is_write, int nb_sectors)
>>>> +{
>>>> +    int64_t wait_time = -1;
>>>> +
>>>> +    if (!qemu_co_queue_empty(&bs->throttled_reqs)) {
>>>> +        qemu_co_queue_wait(&bs->throttled_reqs);
>>>> +        goto resume;
>>>> +    } else if (bdrv_exceed_io_limits(bs, nb_sectors, is_write, &wait_time)) {
>>>> +        qemu_mod_timer(bs->block_timer,
>>>> +                       wait_time + qemu_get_clock_ns(vm_clock));
>>>> +        qemu_co_queue_wait(&bs->throttled_reqs);
>>>> +
>>>> +resume:
>>>
>>> This goto construct isn't very nice. You could have avoided it with an
>>> "else return;"
>> else return? no, it can not be returned shortly, i think.
>>>
>>> But anyway, why do you even need the else if? Can't you directly use the
>>> while loop? The difference would be that qemu_co_queue_next() is run
>>> even if a request is allowed without going through the queue, but would
>>> that hurt?
>> Great point. thanks.
>>>
>>>
>>>> +        while (bdrv_exceed_io_limits(bs, nb_sectors, is_write, &wait_time)) {
>>>> +            qemu_mod_timer(bs->block_timer,
>>>> +                           wait_time + qemu_get_clock_ns(vm_clock));
>>>> +            qemu_co_queue_wait_insert_head(&bs->throttled_reqs);
>>>
>>> Why do you want to insert at the head? Wouldn't a queue be more
>> In fact, we hope to keep each request's timing, in FIFO mode. The next
>> throttled requests will not be dequeued until the current request is
>> allowed to be serviced. So if the current request still exceeds the
>> limits, it will be inserted to the head. All requests followed it will
>> be still in throttled_reqs queue.
>
> Ah, now this makes a lot more sense. Can you add a comment stating
> exactly this?
Sure.
>
> Of course this means that you still need the separate if because the
> first time you want to queue the coroutine at the end. But you can still
> get rid of the goto by making the part starting with the while loop
> unconditional.
Right, I have applied this policy in next version.

>
> Kevin
>
Kevin Wolf Nov. 3, 2011, 8:03 a.m. UTC | #4
Am 03.11.2011 04:18, schrieb Zhi Yong Wu:
> On Wed, Nov 2, 2011 at 7:51 PM, Kevin Wolf <kwolf@redhat.com> wrote:
>> Am 02.11.2011 07:01, schrieb Zhi Yong Wu:
>>> Signed-off-by: Zhi Yong Wu <wuzhy@linux.vnet.ibm.com>
>>> Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
>>> ---
>>>  block.c               |  233 +++++++++++++++++++++++++++++++++++++++++++++++--
>>>  block.h               |    1 +
>>>  block_int.h           |    1 +
>>>  qemu-coroutine-lock.c |    8 ++
>>>  qemu-coroutine.h      |    6 ++
>>>  5 files changed, 242 insertions(+), 7 deletions(-)
>>>
>>> diff --git a/block.c b/block.c
>>> index c70f86d..440e889 100644
>>> --- a/block.c
>>> +++ b/block.c
>>> @@ -74,6 +74,13 @@ static BlockDriverAIOCB *bdrv_co_aio_rw_vector(BlockDriverState *bs,
>>>                                                 bool is_write);
>>>  static void coroutine_fn bdrv_co_do_rw(void *opaque);
>>>
>>> +static bool bdrv_exceed_bps_limits(BlockDriverState *bs, int nb_sectors,
>>> +        bool is_write, double elapsed_time, uint64_t *wait);
>>> +static bool bdrv_exceed_iops_limits(BlockDriverState *bs, bool is_write,
>>> +        double elapsed_time, uint64_t *wait);
>>> +static bool bdrv_exceed_io_limits(BlockDriverState *bs, int nb_sectors,
>>> +        bool is_write, int64_t *wait);
>>> +
>>>  static QTAILQ_HEAD(, BlockDriverState) bdrv_states =
>>>      QTAILQ_HEAD_INITIALIZER(bdrv_states);
>>>
>>> @@ -107,6 +114,28 @@ int is_windows_drive(const char *filename)
>>>  #endif
>>>
>>>  /* throttling disk I/O limits */
>>> +void bdrv_io_limits_disable(BlockDriverState *bs)
>>> +{
>>> +    bs->io_limits_enabled = false;
>>> +
>>> +    if (!qemu_co_queue_empty(&bs->throttled_reqs)) {
>>
>> This if is unnecessary, you can just always run the while loop.
> Good catch. Removed.
>>
>>> +        while (qemu_co_queue_next(&bs->throttled_reqs));
>>> +    }
>>> +
>>> +    qemu_co_queue_init(&bs->throttled_reqs);
>>
>> Why?
> Removed.
>>
>>> +
>>> +    if (bs->block_timer) {
>>> +        qemu_del_timer(bs->block_timer);
>>> +        qemu_free_timer(bs->block_timer);
>>> +        bs->block_timer = NULL;
>>> +    }
>>> +
>>> +    bs->slice_start = 0;
>>> +    bs->slice_end   = 0;
>>> +    bs->slice_time  = 0;
>>> +    memset(&bs->io_disps, 0, sizeof(bs->io_disps));
>>> +}
>>> +
>>>  static void bdrv_block_timer(void *opaque)
>>>  {
>>>      BlockDriverState *bs = opaque;
>>> @@ -116,14 +145,13 @@ static void bdrv_block_timer(void *opaque)
>>>
>>>  void bdrv_io_limits_enable(BlockDriverState *bs)
>>>  {
>>> -    bs->io_limits_enabled = true;
>>>      qemu_co_queue_init(&bs->throttled_reqs);
>>> -
>>> -    bs->block_timer   = qemu_new_timer_ns(vm_clock, bdrv_block_timer, bs);
>>> -    bs->slice_time    = 5 * BLOCK_IO_SLICE_TIME;
>>> -    bs->slice_start   = qemu_get_clock_ns(vm_clock);
>>> -    bs->slice_end     = bs->slice_start + bs->slice_time;
>>> +    bs->block_timer = qemu_new_timer_ns(vm_clock, bdrv_block_timer, bs);
>>> +    bs->slice_time  = 5 * BLOCK_IO_SLICE_TIME;
>>> +    bs->slice_start = qemu_get_clock_ns(vm_clock);
>>> +    bs->slice_end   = bs->slice_start + bs->slice_time;
>>
>> You're only changing whitespace here, right? If so, can you please use
>> the final version in patch 1?
> OK
>>
>>>      memset(&bs->io_disps, 0, sizeof(bs->io_disps));
>>> +    bs->io_limits_enabled = true;
>>>  }
>>>
>>>  bool bdrv_io_limits_enabled(BlockDriverState *bs)
>>> @@ -137,6 +165,30 @@ bool bdrv_io_limits_enabled(BlockDriverState *bs)
>>>           || io_limits->iops[BLOCK_IO_LIMIT_TOTAL];
>>>  }
>>>
>>> +static void bdrv_io_limits_intercept(BlockDriverState *bs,
>>> +                                     bool is_write, int nb_sectors)
>>> +{
>>> +    int64_t wait_time = -1;
>>> +
>>> +    if (!qemu_co_queue_empty(&bs->throttled_reqs)) {
>>> +        qemu_co_queue_wait(&bs->throttled_reqs);
>>> +        goto resume;
>>> +    } else if (bdrv_exceed_io_limits(bs, nb_sectors, is_write, &wait_time)) {
>>> +        qemu_mod_timer(bs->block_timer,
>>> +                       wait_time + qemu_get_clock_ns(vm_clock));
>>> +        qemu_co_queue_wait(&bs->throttled_reqs);
>>> +
>>> +resume:
>>
>> This goto construct isn't very nice. You could have avoided it with an
>> "else return;"
> else return? no, it can not be returned shortly, i think.
>>
>> But anyway, why do you even need the else if? Can't you directly use the
>> while loop? The difference would be that qemu_co_queue_next() is run
>> even if a request is allowed without going through the queue, but would
>> that hurt?
> Great point. thanks.
>>
>>
>>> +        while (bdrv_exceed_io_limits(bs, nb_sectors, is_write, &wait_time)) {
>>> +            qemu_mod_timer(bs->block_timer,
>>> +                           wait_time + qemu_get_clock_ns(vm_clock));
>>> +            qemu_co_queue_wait_insert_head(&bs->throttled_reqs);
>>
>> Why do you want to insert at the head? Wouldn't a queue be more
> In fact, we hope to keep each request's timing, in FIFO mode. The next
> throttled requests will not be dequeued until the current request is
> allowed to be serviced. So if the current request still exceeds the
> limits, it will be inserted to the head. All requests followed it will
> be still in throttled_reqs queue.

Ah, now this makes a lot more sense. Can you add a comment stating
exactly this?

Of course this means that you still need the separate if because the
first time you want to queue the coroutine at the end. But you can still
get rid of the goto by making the part starting with the while loop
unconditional.

Kevin
diff mbox

Patch

diff --git a/block.c b/block.c
index c70f86d..440e889 100644
--- a/block.c
+++ b/block.c
@@ -74,6 +74,13 @@  static BlockDriverAIOCB *bdrv_co_aio_rw_vector(BlockDriverState *bs,
                                                bool is_write);
 static void coroutine_fn bdrv_co_do_rw(void *opaque);
 
+static bool bdrv_exceed_bps_limits(BlockDriverState *bs, int nb_sectors,
+        bool is_write, double elapsed_time, uint64_t *wait);
+static bool bdrv_exceed_iops_limits(BlockDriverState *bs, bool is_write,
+        double elapsed_time, uint64_t *wait);
+static bool bdrv_exceed_io_limits(BlockDriverState *bs, int nb_sectors,
+        bool is_write, int64_t *wait);
+
 static QTAILQ_HEAD(, BlockDriverState) bdrv_states =
     QTAILQ_HEAD_INITIALIZER(bdrv_states);
 
@@ -107,6 +114,28 @@  int is_windows_drive(const char *filename)
 #endif
 
 /* throttling disk I/O limits */
+void bdrv_io_limits_disable(BlockDriverState *bs)
+{
+    bs->io_limits_enabled = false;
+
+    if (!qemu_co_queue_empty(&bs->throttled_reqs)) {
+        while (qemu_co_queue_next(&bs->throttled_reqs));
+    }
+
+    qemu_co_queue_init(&bs->throttled_reqs);
+
+    if (bs->block_timer) {
+        qemu_del_timer(bs->block_timer);
+        qemu_free_timer(bs->block_timer);
+        bs->block_timer = NULL;
+    }
+
+    bs->slice_start = 0;
+    bs->slice_end   = 0;
+    bs->slice_time  = 0;
+    memset(&bs->io_disps, 0, sizeof(bs->io_disps));
+}
+
 static void bdrv_block_timer(void *opaque)
 {
     BlockDriverState *bs = opaque;
@@ -116,14 +145,13 @@  static void bdrv_block_timer(void *opaque)
 
 void bdrv_io_limits_enable(BlockDriverState *bs)
 {
-    bs->io_limits_enabled = true;
     qemu_co_queue_init(&bs->throttled_reqs);
-
-    bs->block_timer   = qemu_new_timer_ns(vm_clock, bdrv_block_timer, bs);
-    bs->slice_time    = 5 * BLOCK_IO_SLICE_TIME;
-    bs->slice_start   = qemu_get_clock_ns(vm_clock);
-    bs->slice_end     = bs->slice_start + bs->slice_time;
+    bs->block_timer = qemu_new_timer_ns(vm_clock, bdrv_block_timer, bs);
+    bs->slice_time  = 5 * BLOCK_IO_SLICE_TIME;
+    bs->slice_start = qemu_get_clock_ns(vm_clock);
+    bs->slice_end   = bs->slice_start + bs->slice_time;
     memset(&bs->io_disps, 0, sizeof(bs->io_disps));
+    bs->io_limits_enabled = true;
 }
 
 bool bdrv_io_limits_enabled(BlockDriverState *bs)
@@ -137,6 +165,30 @@  bool bdrv_io_limits_enabled(BlockDriverState *bs)
          || io_limits->iops[BLOCK_IO_LIMIT_TOTAL];
 }
 
+static void bdrv_io_limits_intercept(BlockDriverState *bs,
+                                     bool is_write, int nb_sectors)
+{
+    int64_t wait_time = -1;
+
+    if (!qemu_co_queue_empty(&bs->throttled_reqs)) {
+        qemu_co_queue_wait(&bs->throttled_reqs);
+        goto resume;
+    } else if (bdrv_exceed_io_limits(bs, nb_sectors, is_write, &wait_time)) {
+        qemu_mod_timer(bs->block_timer,
+                       wait_time + qemu_get_clock_ns(vm_clock));
+        qemu_co_queue_wait(&bs->throttled_reqs);
+
+resume:
+        while (bdrv_exceed_io_limits(bs, nb_sectors, is_write, &wait_time)) {
+            qemu_mod_timer(bs->block_timer,
+                           wait_time + qemu_get_clock_ns(vm_clock));
+            qemu_co_queue_wait_insert_head(&bs->throttled_reqs);
+        }
+
+        qemu_co_queue_next(&bs->throttled_reqs);
+    }
+}
+
 /* check if the path starts with "<protocol>:" */
 static int path_has_protocol(const char *path)
 {
@@ -719,6 +771,11 @@  int bdrv_open(BlockDriverState *bs, const char *filename, int flags,
         bdrv_dev_change_media_cb(bs, true);
     }
 
+    /* throttling disk I/O limits */
+    if (bs->io_limits_enabled) {
+        bdrv_io_limits_enable(bs);
+    }
+
     return 0;
 
 unlink_and_fail:
@@ -754,6 +811,9 @@  void bdrv_close(BlockDriverState *bs)
 
         bdrv_dev_change_media_cb(bs, false);
     }
+
+    /*throttling disk I/O limits*/
+    bdrv_io_limits_disable(bs);
 }
 
 void bdrv_close_all(void)
@@ -1292,6 +1352,11 @@  static int coroutine_fn bdrv_co_do_readv(BlockDriverState *bs,
         return -EIO;
     }
 
+    /* throttling disk read I/O */
+    if (bs->io_limits_enabled) {
+        bdrv_io_limits_intercept(bs, false, nb_sectors);
+    }
+
     return drv->bdrv_co_readv(bs, sector_num, nb_sectors, qiov);
 }
 
@@ -1322,6 +1387,11 @@  static int coroutine_fn bdrv_co_do_writev(BlockDriverState *bs,
         return -EIO;
     }
 
+    /* throttling disk write I/O */
+    if (bs->io_limits_enabled) {
+        bdrv_io_limits_intercept(bs, true, nb_sectors);
+    }
+
     ret = drv->bdrv_co_writev(bs, sector_num, nb_sectors, qiov);
 
     if (bs->dirty_bitmap) {
@@ -2048,7 +2118,6 @@  void bdrv_debug_event(BlockDriverState *bs, BlkDebugEvent event)
     }
 
     return drv->bdrv_debug_event(bs, event);
-
 }
 
 /**************************************************************/
@@ -2513,6 +2582,156 @@  void bdrv_aio_cancel(BlockDriverAIOCB *acb)
     acb->pool->cancel(acb);
 }
 
+/* block I/O throttling */
+static bool bdrv_exceed_bps_limits(BlockDriverState *bs, int nb_sectors,
+                 bool is_write, double elapsed_time, uint64_t *wait) {
+    uint64_t bps_limit = 0;
+    double   bytes_limit, bytes_disp, bytes_res;
+    double   slice_time, wait_time;
+
+    if (bs->io_limits.bps[BLOCK_IO_LIMIT_TOTAL]) {
+        bps_limit = bs->io_limits.bps[BLOCK_IO_LIMIT_TOTAL];
+    } else if (bs->io_limits.bps[is_write]) {
+        bps_limit = bs->io_limits.bps[is_write];
+    } else {
+        if (wait) {
+            *wait = 0;
+        }
+
+        return false;
+    }
+
+    slice_time = bs->slice_end - bs->slice_start;
+    slice_time /= (NANOSECONDS_PER_SECOND);
+    bytes_limit = bps_limit * slice_time;
+    bytes_disp  = bs->nr_bytes[is_write] - bs->io_disps.bytes[is_write];
+    if (bs->io_limits.bps[BLOCK_IO_LIMIT_TOTAL]) {
+        bytes_disp += bs->nr_bytes[!is_write] - bs->io_disps.bytes[!is_write];
+    }
+
+    bytes_res   = (unsigned) nb_sectors * BDRV_SECTOR_SIZE;
+
+    if (bytes_disp + bytes_res <= bytes_limit) {
+        if (wait) {
+            *wait = 0;
+        }
+
+        return false;
+    }
+
+    /* Calc approx time to dispatch */
+    wait_time = (bytes_disp + bytes_res) / bps_limit - elapsed_time;
+
+    bs->slice_time = wait_time * BLOCK_IO_SLICE_TIME * 10;
+    bs->slice_end += bs->slice_time - 3 * BLOCK_IO_SLICE_TIME;
+    if (wait) {
+        *wait = wait_time * BLOCK_IO_SLICE_TIME * 10;
+    }
+
+    return true;
+}
+
+static bool bdrv_exceed_iops_limits(BlockDriverState *bs, bool is_write,
+                             double elapsed_time, uint64_t *wait) {
+    uint64_t iops_limit = 0;
+    double   ios_limit, ios_disp;
+    double   slice_time, wait_time;
+
+    if (bs->io_limits.iops[BLOCK_IO_LIMIT_TOTAL]) {
+        iops_limit = bs->io_limits.iops[BLOCK_IO_LIMIT_TOTAL];
+    } else if (bs->io_limits.iops[is_write]) {
+        iops_limit = bs->io_limits.iops[is_write];
+    } else {
+        if (wait) {
+            *wait = 0;
+        }
+
+        return false;
+    }
+
+    slice_time = bs->slice_end - bs->slice_start;
+    slice_time /= (NANOSECONDS_PER_SECOND);
+    ios_limit  = iops_limit * slice_time;
+    ios_disp   = bs->nr_ops[is_write] - bs->io_disps.ios[is_write];
+    if (bs->io_limits.iops[BLOCK_IO_LIMIT_TOTAL]) {
+        ios_disp += bs->nr_ops[!is_write] - bs->io_disps.ios[!is_write];
+    }
+
+    if (ios_disp + 1 <= ios_limit) {
+        if (wait) {
+            *wait = 0;
+        }
+
+        return false;
+    }
+
+    /* Calc approx time to dispatch */
+    wait_time = (ios_disp + 1) / iops_limit;
+    if (wait_time > elapsed_time) {
+        wait_time = wait_time - elapsed_time;
+    } else {
+        wait_time = 0;
+    }
+
+    bs->slice_time = wait_time * BLOCK_IO_SLICE_TIME * 10;
+    bs->slice_end += bs->slice_time - 3 * BLOCK_IO_SLICE_TIME;
+    if (wait) {
+        *wait = wait_time * BLOCK_IO_SLICE_TIME * 10;
+    }
+
+    return true;
+}
+
+static bool bdrv_exceed_io_limits(BlockDriverState *bs, int nb_sectors,
+                           bool is_write, int64_t *wait) {
+    int64_t  now, max_wait;
+    uint64_t bps_wait = 0, iops_wait = 0;
+    double   elapsed_time;
+    int      bps_ret, iops_ret;
+
+    now = qemu_get_clock_ns(vm_clock);
+    if ((bs->slice_start < now)
+        && (bs->slice_end > now)) {
+        bs->slice_end = now + bs->slice_time;
+    } else {
+        bs->slice_time  =  5 * BLOCK_IO_SLICE_TIME;
+        bs->slice_start = now;
+        bs->slice_end   = now + bs->slice_time;
+
+        bs->io_disps.bytes[is_write]  = bs->nr_bytes[is_write];
+        bs->io_disps.bytes[!is_write] = bs->nr_bytes[!is_write];
+
+        bs->io_disps.ios[is_write]    = bs->nr_ops[is_write];
+        bs->io_disps.ios[!is_write]   = bs->nr_ops[!is_write];
+    }
+
+    elapsed_time  = now - bs->slice_start;
+    elapsed_time  /= (NANOSECONDS_PER_SECOND);
+
+    bps_ret  = bdrv_exceed_bps_limits(bs, nb_sectors,
+                                      is_write, elapsed_time, &bps_wait);
+    iops_ret = bdrv_exceed_iops_limits(bs, is_write,
+                                      elapsed_time, &iops_wait);
+    if (bps_ret || iops_ret) {
+        max_wait = bps_wait > iops_wait ? bps_wait : iops_wait;
+        if (wait) {
+            *wait = max_wait;
+        }
+
+        now = qemu_get_clock_ns(vm_clock);
+        if (bs->slice_end < now + max_wait) {
+            bs->slice_end = now + max_wait;
+        }
+
+        return true;
+    }
+
+    if (wait) {
+        *wait = 0;
+    }
+
+    return false;
+}
 
 /**************************************************************/
 /* async block device emulation */
diff --git a/block.h b/block.h
index bc8315d..9b5b35f 100644
--- a/block.h
+++ b/block.h
@@ -91,6 +91,7 @@  void bdrv_info_stats(Monitor *mon, QObject **ret_data);
 
 /* disk I/O throttling */
 void bdrv_io_limits_enable(BlockDriverState *bs);
+void bdrv_io_limits_disable(BlockDriverState *bs);
 bool bdrv_io_limits_enabled(BlockDriverState *bs);
 
 void bdrv_init(void);
diff --git a/block_int.h b/block_int.h
index 0abe843..1b1f7c1 100644
--- a/block_int.h
+++ b/block_int.h
@@ -39,6 +39,7 @@ 
 #define BLOCK_IO_LIMIT_TOTAL    2
 
 #define BLOCK_IO_SLICE_TIME     100000000
+#define NANOSECONDS_PER_SECOND  1000000000.0
 
 #define BLOCK_OPT_SIZE          "size"
 #define BLOCK_OPT_ENCRYPT       "encryption"
diff --git a/qemu-coroutine-lock.c b/qemu-coroutine-lock.c
index 6b58160..9549c07 100644
--- a/qemu-coroutine-lock.c
+++ b/qemu-coroutine-lock.c
@@ -61,6 +61,14 @@  void coroutine_fn qemu_co_queue_wait(CoQueue *queue)
     assert(qemu_in_coroutine());
 }
 
+void coroutine_fn qemu_co_queue_wait_insert_head(CoQueue *queue)
+{
+    Coroutine *self = qemu_coroutine_self();
+    QTAILQ_INSERT_HEAD(&queue->entries, self, co_queue_next);
+    qemu_coroutine_yield();
+    assert(qemu_in_coroutine());
+}
+
 bool qemu_co_queue_next(CoQueue *queue)
 {
     Coroutine *next;
diff --git a/qemu-coroutine.h b/qemu-coroutine.h
index b8fc4f4..8a2e5d2 100644
--- a/qemu-coroutine.h
+++ b/qemu-coroutine.h
@@ -118,6 +118,12 @@  void qemu_co_queue_init(CoQueue *queue);
 void coroutine_fn qemu_co_queue_wait(CoQueue *queue);
 
 /**
+ * Adds the current coroutine to the head of the CoQueue and transfers control to the
+ * caller of the coroutine.
+ */
+void coroutine_fn qemu_co_queue_wait_insert_head(CoQueue *queue);
+
+/**
  * Restarts the next coroutine in the CoQueue and removes it from the queue.
  *
  * Returns true if a coroutine was restarted, false if the queue is empty.