diff mbox

[v1,1/1] Submit the codes for QEMU disk I/O limits.

Message ID 1311326454-7617-2-git-send-email-wuzhy@linux.vnet.ibm.com
State New
Headers show

Commit Message

Zhi Yong Wu July 22, 2011, 9:20 a.m. UTC
Signed-off-by: Zhi Yong Wu <zwu.kernel@gmail.com>
---
 Makefile.objs     |    2 +-
 block.c           |  248 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 block.h           |    1 -
 block/blk-queue.c |   99 +++++++++++++++++++++
 block/blk-queue.h |   73 ++++++++++++++++
 block_int.h       |   21 +++++
 blockdev.c        |   20 +++++
 qemu-config.c     |   24 +++++
 qemu-option.c     |   17 ++++
 qemu-option.h     |    1 +
 qemu-options.hx   |    1 +
 11 files changed, 505 insertions(+), 2 deletions(-)
 create mode 100644 block/blk-queue.c
 create mode 100644 block/blk-queue.h

Comments

Stefan Hajnoczi July 22, 2011, 10:54 a.m. UTC | #1
On Fri, Jul 22, 2011 at 10:20 AM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
> +static void bdrv_block_timer(void *opaque)
> +{
> +    BlockDriverState *bs = opaque;
> +    BlockQueue *queue = bs->block_queue;
> +    uint64_t intval = 1;
> +
> +    while (!QTAILQ_EMPTY(&queue->requests)) {
> +        BlockIORequest *request;
> +        int ret;
> +
> +        request = QTAILQ_FIRST(&queue->requests);
> +        QTAILQ_REMOVE(&queue->requests, request, entry);
> +
> +        ret = queue->handler(request);

Please remove the function pointer and call qemu_block_queue_handler()
directly.  This indirection is not needed and makes it harder to
follow the code.

> +        if (ret == 0) {
> +            QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
> +            break;
> +        }
> +
> +        qemu_free(request);
> +    }
> +
> +    qemu_mod_timer(bs->block_timer, (intval * 1000000000) + qemu_get_clock_ns(vm_clock));

intval is always 1.  The timer should be set to the next earliest deadline.

> +}
> +
>  void bdrv_register(BlockDriver *bdrv)
>  {
>     if (!bdrv->bdrv_aio_readv) {
> @@ -476,6 +508,11 @@ static int bdrv_open_common(BlockDriverState *bs, const char *filename,
>         goto free_and_fail;
>     }
>
> +    /* throttling disk I/O limits */
> +    if (bs->io_limits != NULL) {
> +       bs->block_queue = qemu_new_block_queue(qemu_block_queue_handler);
> +    }
> +
>  #ifndef _WIN32
>     if (bs->is_temporary) {
>         unlink(filename);
> @@ -642,6 +679,16 @@ int bdrv_open(BlockDriverState *bs, const char *filename, int flags,
>             bs->change_cb(bs->change_opaque, CHANGE_MEDIA);
>     }
>
> +    /* throttling disk I/O limits */
> +    if (bs->io_limits != NULL) {

block_queue is allocated in bdrv_open_common() but these variables are
initialized in bdrv_open().  Can you move them together, I don't see a
reason to keep them apart?

> +       bs->io_disps    = qemu_mallocz(sizeof(BlockIODisp));
> +       bs->block_timer = qemu_new_timer_ns(vm_clock, bdrv_block_timer, bs);
> +       qemu_mod_timer(bs->block_timer, qemu_get_clock_ns(vm_clock));

Why is the timer being scheduled immediately?  There are no queued requests.

> +
> +       bs->slice_start[0] = qemu_get_clock_ns(vm_clock);
> +       bs->slice_start[1] = qemu_get_clock_ns(vm_clock);
> +    }
> +
>     return 0;
>
>  unlink_and_fail:
> @@ -680,6 +727,15 @@ void bdrv_close(BlockDriverState *bs)
>         if (bs->change_cb)
>             bs->change_cb(bs->change_opaque, CHANGE_MEDIA);
>     }
> +
> +    /* throttling disk I/O limits */
> +    if (bs->block_queue) {
> +       qemu_del_block_queue(bs->block_queue);

3 space indent, should be 4 spaces.

> +    }
> +
> +    if (bs->block_timer) {

qemu_free_timer() will only free() the timer memory but it will not
cancel the timer.  Use qemu_del_timer() first to ensure that the timer
is not pending:

qemu_del_timer(bs->block_timer);

> +        qemu_free_timer(bs->block_timer);
> +    }
>  }
>
>  void bdrv_close_all(void)
> @@ -1312,6 +1368,13 @@ void bdrv_get_geometry_hint(BlockDriverState *bs,
>     *psecs = bs->secs;
>  }
>
> +/* throttling disk io limits */
> +void bdrv_set_io_limits(BlockDriverState *bs,
> +                           BlockIOLimit *io_limits)
> +{
> +    bs->io_limits            = io_limits;

This function takes ownership of io_limits but never frees it.  I
suggest not taking ownership and just copying io_limits into
bs->io_limits so that the caller does not need to malloc() and the
lifecycle of bs->io_limits is completely under our control.

Easiest would be to turn BlockDriverState.io_limits into:

BlockIOLimit io_limits;

and just copy in bdrv_set_io_limits():

bs->io_limits = *io_limits;

bdrv_exceed_io_limits() returns quite quickly if no limits are set, so
I wouldn't worry about optimizing it out yet.

> +}
> +
>  /* Recognize floppy formats */
>  typedef struct FDFormat {
>     FDriveType drive;
> @@ -2111,6 +2174,154 @@ char *bdrv_snapshot_dump(char *buf, int buf_size, QEMUSnapshotInfo *sn)
>     return buf;
>  }
>
> +bool bdrv_exceed_bps_limits(BlockDriverState *bs, int nb_sectors,
> +                           bool is_write, uint64_t *wait) {
> +    int64_t  real_time;
> +    uint64_t bps_limit   = 0;
> +    double   bytes_limit, bytes_disp, bytes_res, elapsed_time;
> +    double   slice_time = 0.1, wait_time;
> +
> +    if (bs->io_limits->bps[2]) {
> +        bps_limit = bs->io_limits->bps[2];

Please define a constant (IO_LIMIT_TOTAL?) instead of using magic number 2.

> +    } else if (bs->io_limits->bps[is_write]) {
> +        bps_limit = bs->io_limits->bps[is_write];
> +    } else {
> +        if (wait) {
> +            *wait = 0;
> +        }
> +
> +        return false;
> +    }
> +
> +    real_time = qemu_get_clock_ns(vm_clock);
> +    if (bs->slice_start[is_write] + 100000000 <= real_time) {

Please define a constant for the 100 ms time slice instead of using
100000000 directly.

> +        bs->slice_start[is_write] = real_time;
> +
> +        bs->io_disps->bytes[is_write] = 0;
> +        if (bs->io_limits->bps[2]) {
> +            bs->io_disps->bytes[!is_write] = 0;
> +        }

All previous data should be discarded when a new time slice starts:
bs->io_disps->bytes[IO_LIMIT_READ] = 0;
bs->io_disps->bytes[IO_LIMIT_WRITE] = 0;

> +    }

Please start the new slice in common code.  That way you can clear
bytes[] and ios[] at the same time and don't need to duplicate this
code in both bdrv_exceed_bps_limits() and bdrv_exceed_iops_limits().

> +
> +    elapsed_time  = (real_time - bs->slice_start[is_write]) / 1000000000.0;
> +    fprintf(stderr, "real_time = %ld, slice_start = %ld, elapsed_time = %g\n", real_time, bs->slice_start[is_write], elapsed_time);
> +
> +    bytes_limit        = bps_limit * slice_time;
> +    bytes_disp  = bs->io_disps->bytes[is_write];
> +    if (bs->io_limits->bps[2]) {
> +        bytes_disp += bs->io_disps->bytes[!is_write];
> +    }
> +
> +    bytes_res   = (unsigned) nb_sectors * BDRV_SECTOR_SIZE;
> +    fprintf(stderr, "bytes_limit = %g bytes_disp = %g, bytes_res = %g, elapsed_time = %g\n", bytes_limit, bytes_disp, bytes_res, elapsed_time);
> +
> +    if (bytes_disp + bytes_res <= bytes_limit) {
> +        if (wait) {
> +            *wait = 0;
> +        }
> +
> +       fprintf(stderr, "bytes_disp + bytes_res <= bytes_limit\n");
> +        return false;
> +    }
> +
> +    /* Calc approx time to dispatch */
> +    wait_time = (bytes_disp + bytes_res - bytes_limit) / bps_limit;
> +    if (!wait_time) {
> +        wait_time = 1;
> +    }
> +
> +    wait_time = wait_time + (slice_time - elapsed_time);
> +    if (wait) {
> +       *wait = wait_time * 1000000000 + 1;
> +    }
> +
> +    return true;
> +}

After a slice expires all bytes/iops dispatched data is forgotten,
even if there are still requests queued.  This means that requests
issued by the guest immediately after a new 100 ms period will be
issued but existing queued requests will still be waiting.

And since the queued requests don't get their next chance until later,
it's possible that they will be requeued because the requests that the
guest snuck in have brought us to the limit again.

In order to solve this problem, we need to extend the current slice if
there are still requests pending.  To prevent extensions from growing
the slice forever (and keeping too much old data around), it should be
alright to cull data from 2 slices ago.  The simplest way of doing
that is to subtract the bps/iops limits from the bytes/iops
dispatched.

> @@ -2129,6 +2341,19 @@ BlockDriverAIOCB *bdrv_aio_readv(BlockDriverState *bs, int64_t sector_num,
>     if (bdrv_check_request(bs, sector_num, nb_sectors))
>         return NULL;
>
> +    /* throttling disk read I/O */
> +    if (bs->io_limits != NULL) {
> +       if (bdrv_exceed_io_limits(bs, nb_sectors, false, &wait_time)) {
> +            ret = qemu_block_queue_enqueue(bs->block_queue, bs, bdrv_aio_readv,
> +                               sector_num, qiov, nb_sectors, cb, opaque);

5 space indent, should be 4.

> +           fprintf(stderr, "bdrv_aio_readv: wait_time = %ld, timer value = %ld\n", wait_time, wait_time + qemu_get_clock_ns(vm_clock));
> +           qemu_mod_timer(bs->block_timer, wait_time + qemu_get_clock_ns(vm_clock));

Imagine 3 requests that need to be queued: A, B, and C.  Since the
timer is unconditionally set each time a request is queued, the timer
will be set to C's wait_time.  However A or B's wait_time might be
earlier and we will miss that deadline!

We really need a priority queue here.  QEMU's timers solve the same
problem with a sorted list, which might actually be faster for short
lists where a fancy data structure has too much overhead.

> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
> +                       BlockDriverState *bs,
> +                       BlockRequestHandler *handler,
> +                       int64_t sector_num,
> +                       QEMUIOVector *qiov,
> +                       int nb_sectors,
> +                       BlockDriverCompletionFunc *cb,
> +                       void *opaque)
> +{
> +    BlockIORequest *request;
> +    BlockDriverAIOCB *acb;
> +
> +    request = qemu_malloc(sizeof(BlockIORequest));
> +    request->bs = bs;
> +    request->handler = handler;
> +    request->sector_num = sector_num;
> +    request->qiov = qiov;
> +    request->nb_sectors = nb_sectors;
> +    request->cb = cb;
> +    request->opaque = opaque;
> +
> +    QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
> +
> +    acb = qemu_malloc(sizeof(*acb));
> +    acb->bs = bs;
> +    acb->cb = cb;
> +    acb->opaque = opaque;

AIOPool and qemu_aio_get() should be used instead of manually doing
this.  Otherwise bdrv_aio_cancel() does not work.

In order to free our acb when the I/O request completes we need a cb
function.  Right now acb is leaking.

> diff --git a/qemu-config.c b/qemu-config.c
> index efa892c..ad5f2d9 100644
> --- a/qemu-config.c
> +++ b/qemu-config.c
> @@ -82,6 +82,30 @@ static QemuOptsList qemu_drive_opts = {
>             .name = "boot",
>             .type = QEMU_OPT_BOOL,
>             .help = "make this a boot drive",
> +        },{
> +            .name = "iops",
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "cap its iops for this drive",

Let's explain what "iops" means:

"limit total I/O operations per second"

> +        },{
> +            .name = "iops_rd",
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "cap its iops_rd for this drive",

"limit read operations per second"

> +        },{
> +            .name = "iops_wr",
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "cap its iops_wr for this drive",

"limit write operations per second"

> +        },{
> +            .name = "bps",
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "cap its bps for this drive",

"limit total bytes per second"

> +        },{
> +            .name = "bps_rd",
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "cap its bps_rd for this drive",

"limit read bytes per second"

> +        },{
> +            .name = "bps_wr",
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "cap its iops_wr for this drive",

"limit write bytes per second"

Stefan
Zhiyong Wu July 25, 2011, 4:25 a.m. UTC | #2
On Fri, Jul 22, 2011 at 6:54 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
> On Fri, Jul 22, 2011 at 10:20 AM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
>> +static void bdrv_block_timer(void *opaque)
>> +{
>> +    BlockDriverState *bs = opaque;
>> +    BlockQueue *queue = bs->block_queue;
>> +    uint64_t intval = 1;
>> +
>> +    while (!QTAILQ_EMPTY(&queue->requests)) {
>> +        BlockIORequest *request;
>> +        int ret;
>> +
>> +        request = QTAILQ_FIRST(&queue->requests);
>> +        QTAILQ_REMOVE(&queue->requests, request, entry);
>> +
>> +        ret = queue->handler(request);
>
> Please remove the function pointer and call qemu_block_queue_handler()
> directly.  This indirection is not needed and makes it harder to
> follow the code.
Should it keep the same style with other queue implemetations such as
network queue? As you have known, network queue has one queue handler.

>
>> +        if (ret == 0) {
>> +            QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
>> +            break;
>> +        }
>> +
>> +        qemu_free(request);
>> +    }
>> +
>> +    qemu_mod_timer(bs->block_timer, (intval * 1000000000) + qemu_get_clock_ns(vm_clock));
>
> intval is always 1.  The timer should be set to the next earliest deadline.
pls see bdrv_aio_readv/writev:
+    /* throttling disk read I/O */
+    if (bs->io_limits != NULL) {
+       if (bdrv_exceed_io_limits(bs, nb_sectors, false, &wait_time)) {
+            ret = qemu_block_queue_enqueue(bs->block_queue, bs, bdrv_aio_readv,
+                               sector_num, qiov, nb_sectors, cb, opaque);
+           qemu_mod_timer(bs->block_timer, wait_time +
qemu_get_clock_ns(vm_clock));

The timer has been modified when the blk request is enqueued.

>
>> +}
>> +
>>  void bdrv_register(BlockDriver *bdrv)
>>  {
>>     if (!bdrv->bdrv_aio_readv) {
>> @@ -476,6 +508,11 @@ static int bdrv_open_common(BlockDriverState *bs, const char *filename,
>>         goto free_and_fail;
>>     }
>>
>> +    /* throttling disk I/O limits */
>> +    if (bs->io_limits != NULL) {
>> +       bs->block_queue = qemu_new_block_queue(qemu_block_queue_handler);
>> +    }
>> +
>>  #ifndef _WIN32
>>     if (bs->is_temporary) {
>>         unlink(filename);
>> @@ -642,6 +679,16 @@ int bdrv_open(BlockDriverState *bs, const char *filename, int flags,
>>             bs->change_cb(bs->change_opaque, CHANGE_MEDIA);
>>     }
>>
>> +    /* throttling disk I/O limits */
>> +    if (bs->io_limits != NULL) {
>
> block_queue is allocated in bdrv_open_common() but these variables are
> initialized in bdrv_open().  Can you move them together, I don't see a
> reason to keep them apart?
OK, They will be done.
>
>> +       bs->io_disps    = qemu_mallocz(sizeof(BlockIODisp));
>> +       bs->block_timer = qemu_new_timer_ns(vm_clock, bdrv_block_timer, bs);
>> +       qemu_mod_timer(bs->block_timer, qemu_get_clock_ns(vm_clock));
>
> Why is the timer being scheduled immediately?  There are no queued requests.
OK, you mean if no request is in block queue, the timer should be stoped?

>
>> +
>> +       bs->slice_start[0] = qemu_get_clock_ns(vm_clock);
>> +       bs->slice_start[1] = qemu_get_clock_ns(vm_clock);
>> +    }
>> +
>>     return 0;
>>
>>  unlink_and_fail:
>> @@ -680,6 +727,15 @@ void bdrv_close(BlockDriverState *bs)
>>         if (bs->change_cb)
>>             bs->change_cb(bs->change_opaque, CHANGE_MEDIA);
>>     }
>> +
>> +    /* throttling disk I/O limits */
>> +    if (bs->block_queue) {
>> +       qemu_del_block_queue(bs->block_queue);
>
> 3 space indent, should be 4 spaces.
done
>
>> +    }
>> +
>> +    if (bs->block_timer) {
>
> qemu_free_timer() will only free() the timer memory but it will not
> cancel the timer.  Use qemu_del_timer() first to ensure that the timer
> is not pending:
>
> qemu_del_timer(bs->block_timer);
OK, thanks.

>
>> +        qemu_free_timer(bs->block_timer);
>> +    }
>>  }
>>
>>  void bdrv_close_all(void)
>> @@ -1312,6 +1368,13 @@ void bdrv_get_geometry_hint(BlockDriverState *bs,
>>     *psecs = bs->secs;
>>  }
>>
>> +/* throttling disk io limits */
>> +void bdrv_set_io_limits(BlockDriverState *bs,
>> +                           BlockIOLimit *io_limits)
>> +{
>> +    bs->io_limits            = io_limits;
>
> This function takes ownership of io_limits but never frees it.  I
> suggest not taking ownership and just copying io_limits into
> bs->io_limits so that the caller does not need to malloc() and the
> lifecycle of bs->io_limits is completely under our control.
>
> Easiest would be to turn BlockDriverState.io_limits into:
>
> BlockIOLimit io_limits;
>
> and just copy in bdrv_set_io_limits():
>
> bs->io_limits = *io_limits;
Good point.
>
> bdrv_exceed_io_limits() returns quite quickly if no limits are set, so
> I wouldn't worry about optimizing it out yet.
>
>> +}
>> +
>>  /* Recognize floppy formats */
>>  typedef struct FDFormat {
>>     FDriveType drive;
>> @@ -2111,6 +2174,154 @@ char *bdrv_snapshot_dump(char *buf, int buf_size, QEMUSnapshotInfo *sn)
>>     return buf;
>>  }
>>
>> +bool bdrv_exceed_bps_limits(BlockDriverState *bs, int nb_sectors,
>> +                           bool is_write, uint64_t *wait) {
>> +    int64_t  real_time;
>> +    uint64_t bps_limit   = 0;
>> +    double   bytes_limit, bytes_disp, bytes_res, elapsed_time;
>> +    double   slice_time = 0.1, wait_time;
>> +
>> +    if (bs->io_limits->bps[2]) {
>> +        bps_limit = bs->io_limits->bps[2];
>
> Please define a constant (IO_LIMIT_TOTAL?) instead of using magic number 2.
OK.

>
>> +    } else if (bs->io_limits->bps[is_write]) {
>> +        bps_limit = bs->io_limits->bps[is_write];
>> +    } else {
>> +        if (wait) {
>> +            *wait = 0;
>> +        }
>> +
>> +        return false;
>> +    }
>> +
>> +    real_time = qemu_get_clock_ns(vm_clock);
>> +    if (bs->slice_start[is_write] + 100000000 <= real_time) {
>
> Please define a constant for the 100 ms time slice instead of using
> 100000000 directly.
OK.
>
>> +        bs->slice_start[is_write] = real_time;
>> +
>> +        bs->io_disps->bytes[is_write] = 0;
>> +        if (bs->io_limits->bps[2]) {
>> +            bs->io_disps->bytes[!is_write] = 0;
>> +        }
>
> All previous data should be discarded when a new time slice starts:
> bs->io_disps->bytes[IO_LIMIT_READ] = 0;
> bs->io_disps->bytes[IO_LIMIT_WRITE] = 0;
If only bps_rd is specified, bs->io_disps->bytes[IO_LIMIT_WRITE] will
never be used; i think that it should not be cleared here. right?

>
>> +    }
>
> Please start the new slice in common code.  That way you can clear
> bytes[] and ios[] at the same time and don't need to duplicate this
> code in both bdrv_exceed_bps_limits() and bdrv_exceed_iops_limits().
>
>> +
>> +    elapsed_time  = (real_time - bs->slice_start[is_write]) / 1000000000.0;
>> +    fprintf(stderr, "real_time = %ld, slice_start = %ld, elapsed_time = %g\n", real_time, bs->slice_start[is_write], elapsed_time);
>> +
>> +    bytes_limit        = bps_limit * slice_time;
>> +    bytes_disp  = bs->io_disps->bytes[is_write];
>> +    if (bs->io_limits->bps[2]) {
>> +        bytes_disp += bs->io_disps->bytes[!is_write];
>> +    }
>> +
>> +    bytes_res   = (unsigned) nb_sectors * BDRV_SECTOR_SIZE;
>> +    fprintf(stderr, "bytes_limit = %g bytes_disp = %g, bytes_res = %g, elapsed_time = %g\n", bytes_limit, bytes_disp, bytes_res, elapsed_time);
>> +
>> +    if (bytes_disp + bytes_res <= bytes_limit) {
>> +        if (wait) {
>> +            *wait = 0;
>> +        }
>> +
>> +       fprintf(stderr, "bytes_disp + bytes_res <= bytes_limit\n");
>> +        return false;
>> +    }
>> +
>> +    /* Calc approx time to dispatch */
>> +    wait_time = (bytes_disp + bytes_res - bytes_limit) / bps_limit;
>> +    if (!wait_time) {
>> +        wait_time = 1;
>> +    }
>> +
>> +    wait_time = wait_time + (slice_time - elapsed_time);
>> +    if (wait) {
>> +       *wait = wait_time * 1000000000 + 1;
>> +    }
>> +
>> +    return true;
>> +}
>
> After a slice expires all bytes/iops dispatched data is forgotten,
> even if there are still requests queued.  This means that requests
> issued by the guest immediately after a new 100 ms period will be
> issued but existing queued requests will still be waiting.
>
> And since the queued requests don't get their next chance until later,
> it's possible that they will be requeued because the requests that the
> guest snuck in have brought us to the limit again.
>
> In order to solve this problem, we need to extend the current slice if
> there are still requests pending.  To prevent extensions from growing
> the slice forever (and keeping too much old data around), it should be
> alright to cull data from 2 slices ago.  The simplest way of doing
> that is to subtract the bps/iops limits from the bytes/iops
> dispatched.
>
>> @@ -2129,6 +2341,19 @@ BlockDriverAIOCB *bdrv_aio_readv(BlockDriverState *bs, int64_t sector_num,
>>     if (bdrv_check_request(bs, sector_num, nb_sectors))
>>         return NULL;
>>
>> +    /* throttling disk read I/O */
>> +    if (bs->io_limits != NULL) {
>> +       if (bdrv_exceed_io_limits(bs, nb_sectors, false, &wait_time)) {
>> +            ret = qemu_block_queue_enqueue(bs->block_queue, bs, bdrv_aio_readv,
>> +                               sector_num, qiov, nb_sectors, cb, opaque);
>
> 5 space indent, should be 4.
>
>> +           fprintf(stderr, "bdrv_aio_readv: wait_time = %ld, timer value = %ld\n", wait_time, wait_time + qemu_get_clock_ns(vm_clock));
>> +           qemu_mod_timer(bs->block_timer, wait_time + qemu_get_clock_ns(vm_clock));
>
> Imagine 3 requests that need to be queued: A, B, and C.  Since the
> timer is unconditionally set each time a request is queued, the timer
> will be set to C's wait_time.  However A or B's wait_time might be
> earlier and we will miss that deadline!
>
> We really need a priority queue here.  QEMU's timers solve the same
> problem with a sorted list, which might actually be faster for short
> lists where a fancy data structure has too much overhead.
>
>> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
>> +                       BlockDriverState *bs,
>> +                       BlockRequestHandler *handler,
>> +                       int64_t sector_num,
>> +                       QEMUIOVector *qiov,
>> +                       int nb_sectors,
>> +                       BlockDriverCompletionFunc *cb,
>> +                       void *opaque)
>> +{
>> +    BlockIORequest *request;
>> +    BlockDriverAIOCB *acb;
>> +
>> +    request = qemu_malloc(sizeof(BlockIORequest));
>> +    request->bs = bs;
>> +    request->handler = handler;
>> +    request->sector_num = sector_num;
>> +    request->qiov = qiov;
>> +    request->nb_sectors = nb_sectors;
>> +    request->cb = cb;
>> +    request->opaque = opaque;
>> +
>> +    QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
>> +
>> +    acb = qemu_malloc(sizeof(*acb));
>> +    acb->bs = bs;
>> +    acb->cb = cb;
>> +    acb->opaque = opaque;
>
> AIOPool and qemu_aio_get() should be used instead of manually doing
> this.  Otherwise bdrv_aio_cancel() does not work.
>
> In order to free our acb when the I/O request completes we need a cb
> function.  Right now acb is leaking.
OK.
>
>> diff --git a/qemu-config.c b/qemu-config.c
>> index efa892c..ad5f2d9 100644
>> --- a/qemu-config.c
>> +++ b/qemu-config.c
>> @@ -82,6 +82,30 @@ static QemuOptsList qemu_drive_opts = {
>>             .name = "boot",
>>             .type = QEMU_OPT_BOOL,
>>             .help = "make this a boot drive",
>> +        },{
>> +            .name = "iops",
>> +            .type = QEMU_OPT_NUMBER,
>> +            .help = "cap its iops for this drive",
>
> Let's explain what "iops" means:
>
> "limit total I/O operations per second"
>
>> +        },{
>> +            .name = "iops_rd",
>> +            .type = QEMU_OPT_NUMBER,
>> +            .help = "cap its iops_rd for this drive",
>
> "limit read operations per second"
>
>> +        },{
>> +            .name = "iops_wr",
>> +            .type = QEMU_OPT_NUMBER,
>> +            .help = "cap its iops_wr for this drive",
>
> "limit write operations per second"
>
>> +        },{
>> +            .name = "bps",
>> +            .type = QEMU_OPT_NUMBER,
>> +            .help = "cap its bps for this drive",
>
> "limit total bytes per second"
>
>> +        },{
>> +            .name = "bps_rd",
>> +            .type = QEMU_OPT_NUMBER,
>> +            .help = "cap its bps_rd for this drive",
>
> "limit read bytes per second"
>
>> +        },{
>> +            .name = "bps_wr",
>> +            .type = QEMU_OPT_NUMBER,
>> +            .help = "cap its iops_wr for this drive",
>
> "limit write bytes per second"
Done
>
> Stefan
>
Stefan Hajnoczi July 25, 2011, 5:40 a.m. UTC | #3
On Mon, Jul 25, 2011 at 5:25 AM, Zhi Yong Wu <zwu.kernel@gmail.com> wrote:
> On Fri, Jul 22, 2011 at 6:54 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
>> On Fri, Jul 22, 2011 at 10:20 AM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
>>> +static void bdrv_block_timer(void *opaque)
>>> +{
>>> +    BlockDriverState *bs = opaque;
>>> +    BlockQueue *queue = bs->block_queue;
>>> +    uint64_t intval = 1;
>>> +
>>> +    while (!QTAILQ_EMPTY(&queue->requests)) {
>>> +        BlockIORequest *request;
>>> +        int ret;
>>> +
>>> +        request = QTAILQ_FIRST(&queue->requests);
>>> +        QTAILQ_REMOVE(&queue->requests, request, entry);
>>> +
>>> +        ret = queue->handler(request);
>>
>> Please remove the function pointer and call qemu_block_queue_handler()
>> directly.  This indirection is not needed and makes it harder to
>> follow the code.
> Should it keep the same style with other queue implemetations such as
> network queue? As you have known, network queue has one queue handler.

You mean net/queue.c:queue->deliver?  There are two deliver functions,
qemu_deliver_packet() and qemu_vlan_deliver_packet(), which is why a
function pointer is necessary.

In this case there is only one handler function so the indirection is
not necessary.

>>
>>> +        if (ret == 0) {
>>> +            QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
>>> +            break;
>>> +        }
>>> +
>>> +        qemu_free(request);
>>> +    }
>>> +
>>> +    qemu_mod_timer(bs->block_timer, (intval * 1000000000) + qemu_get_clock_ns(vm_clock));
>>
>> intval is always 1.  The timer should be set to the next earliest deadline.
> pls see bdrv_aio_readv/writev:
> +    /* throttling disk read I/O */
> +    if (bs->io_limits != NULL) {
> +       if (bdrv_exceed_io_limits(bs, nb_sectors, false, &wait_time)) {
> +            ret = qemu_block_queue_enqueue(bs->block_queue, bs, bdrv_aio_readv,
> +                               sector_num, qiov, nb_sectors, cb, opaque);
> +           qemu_mod_timer(bs->block_timer, wait_time +
> qemu_get_clock_ns(vm_clock));
>
> The timer has been modified when the blk request is enqueued.

The current algorithm seems to be:
1. Queue requests that exceed the limit and set the timer.
2. Dequeue all requests when the timer fires.
3. Set 1s periodic timer.

Why is the timer set up as a periodic 1 second timer in bdrv_block_timer()?

>>> +        bs->slice_start[is_write] = real_time;
>>> +
>>> +        bs->io_disps->bytes[is_write] = 0;
>>> +        if (bs->io_limits->bps[2]) {
>>> +            bs->io_disps->bytes[!is_write] = 0;
>>> +        }
>>
>> All previous data should be discarded when a new time slice starts:
>> bs->io_disps->bytes[IO_LIMIT_READ] = 0;
>> bs->io_disps->bytes[IO_LIMIT_WRITE] = 0;
> If only bps_rd is specified, bs->io_disps->bytes[IO_LIMIT_WRITE] will
> never be used; i think that it should not be cleared here. right?

I think there is no advantage in keeping slices separate for
read/write.  The code would be simpler and work the same if there is
only one slice and past history is cleared when a new slice starts.

Stefan
Zhiyong Wu July 25, 2011, 6:55 a.m. UTC | #4
On Mon, Jul 25, 2011 at 1:40 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
> On Mon, Jul 25, 2011 at 5:25 AM, Zhi Yong Wu <zwu.kernel@gmail.com> wrote:
>> On Fri, Jul 22, 2011 at 6:54 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
>>> On Fri, Jul 22, 2011 at 10:20 AM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
>>>> +static void bdrv_block_timer(void *opaque)
>>>> +{
>>>> +    BlockDriverState *bs = opaque;
>>>> +    BlockQueue *queue = bs->block_queue;
>>>> +    uint64_t intval = 1;
>>>> +
>>>> +    while (!QTAILQ_EMPTY(&queue->requests)) {
>>>> +        BlockIORequest *request;
>>>> +        int ret;
>>>> +
>>>> +        request = QTAILQ_FIRST(&queue->requests);
>>>> +        QTAILQ_REMOVE(&queue->requests, request, entry);
>>>> +
>>>> +        ret = queue->handler(request);
>>>
>>> Please remove the function pointer and call qemu_block_queue_handler()
>>> directly.  This indirection is not needed and makes it harder to
>>> follow the code.
>> Should it keep the same style with other queue implemetations such as
>> network queue? As you have known, network queue has one queue handler.
>
> You mean net/queue.c:queue->deliver?  There are two deliver functions,
yeah
> qemu_deliver_packet() and qemu_vlan_deliver_packet(), which is why a
> function pointer is necessary.
OK, The handler has been removed and invoked directly.
>
> In this case there is only one handler function so the indirection is
> not necessary.
>
>>>
>>>> +        if (ret == 0) {
>>>> +            QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
>>>> +            break;
>>>> +        }
>>>> +
>>>> +        qemu_free(request);
>>>> +    }
>>>> +
>>>> +    qemu_mod_timer(bs->block_timer, (intval * 1000000000) + qemu_get_clock_ns(vm_clock));
>>>
>>> intval is always 1.  The timer should be set to the next earliest deadline.
>> pls see bdrv_aio_readv/writev:
>> +    /* throttling disk read I/O */
>> +    if (bs->io_limits != NULL) {
>> +       if (bdrv_exceed_io_limits(bs, nb_sectors, false, &wait_time)) {
>> +            ret = qemu_block_queue_enqueue(bs->block_queue, bs, bdrv_aio_readv,
>> +                               sector_num, qiov, nb_sectors, cb, opaque);
>> +           qemu_mod_timer(bs->block_timer, wait_time +
>> qemu_get_clock_ns(vm_clock));
>>
>> The timer has been modified when the blk request is enqueued.
>
> The current algorithm seems to be:
> 1. Queue requests that exceed the limit and set the timer.
> 2. Dequeue all requests when the timer fires.
Yeah, but for each dequeued requests, it will be still determined if
current I/O rate exceed that limits, if yes, it will still be enqueued
into block queue again.
> 3. Set 1s periodic timer.
>
> Why is the timer set up as a periodic 1 second timer in bdrv_block_timer()?
I was aslo considering if we need to set up this type of timer before.
Since the timer has been modified after qemu_block_queue_enqueue()
function, this timer should be not modified here. I will remove this
line of line. thanks.

>
>>>> +        bs->slice_start[is_write] = real_time;
>>>> +
>>>> +        bs->io_disps->bytes[is_write] = 0;
>>>> +        if (bs->io_limits->bps[2]) {
>>>> +            bs->io_disps->bytes[!is_write] = 0;
>>>> +        }
>>>
>>> All previous data should be discarded when a new time slice starts:
>>> bs->io_disps->bytes[IO_LIMIT_READ] = 0;
>>> bs->io_disps->bytes[IO_LIMIT_WRITE] = 0;
>> If only bps_rd is specified, bs->io_disps->bytes[IO_LIMIT_WRITE] will
>> never be used; i think that it should not be cleared here. right?
>
> I think there is no advantage in keeping slices separate for
> read/write.  The code would be simpler and work the same if there is
> only one slice and past history is cleared when a new slice starts.
Done
>
> Stefan
>
Zhiyong Wu July 25, 2011, 7:08 a.m. UTC | #5
On Fri, Jul 22, 2011 at 6:54 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
> On Fri, Jul 22, 2011 at 10:20 AM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
>> +static void bdrv_block_timer(void *opaque)
>> +{
>> +    BlockDriverState *bs = opaque;
>> +    BlockQueue *queue = bs->block_queue;
>> +    uint64_t intval = 1;
>> +
>> +    while (!QTAILQ_EMPTY(&queue->requests)) {
>> +        BlockIORequest *request;
>> +        int ret;
>> +
>> +        request = QTAILQ_FIRST(&queue->requests);
>> +        QTAILQ_REMOVE(&queue->requests, request, entry);
>> +
>> +        ret = queue->handler(request);
>
> Please remove the function pointer and call qemu_block_queue_handler()
> directly.  This indirection is not needed and makes it harder to
> follow the code.
>
>> +        if (ret == 0) {
>> +            QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
>> +            break;
>> +        }
>> +
>> +        qemu_free(request);
>> +    }
>> +
>> +    qemu_mod_timer(bs->block_timer, (intval * 1000000000) + qemu_get_clock_ns(vm_clock));
>
> intval is always 1.  The timer should be set to the next earliest deadline.
>
>> +}
>> +
>>  void bdrv_register(BlockDriver *bdrv)
>>  {
>>     if (!bdrv->bdrv_aio_readv) {
>> @@ -476,6 +508,11 @@ static int bdrv_open_common(BlockDriverState *bs, const char *filename,
>>         goto free_and_fail;
>>     }
>>
>> +    /* throttling disk I/O limits */
>> +    if (bs->io_limits != NULL) {
>> +       bs->block_queue = qemu_new_block_queue(qemu_block_queue_handler);
>> +    }
>> +
>>  #ifndef _WIN32
>>     if (bs->is_temporary) {
>>         unlink(filename);
>> @@ -642,6 +679,16 @@ int bdrv_open(BlockDriverState *bs, const char *filename, int flags,
>>             bs->change_cb(bs->change_opaque, CHANGE_MEDIA);
>>     }
>>
>> +    /* throttling disk I/O limits */
>> +    if (bs->io_limits != NULL) {
>
> block_queue is allocated in bdrv_open_common() but these variables are
> initialized in bdrv_open().  Can you move them together, I don't see a
> reason to keep them apart?
>
>> +       bs->io_disps    = qemu_mallocz(sizeof(BlockIODisp));
>> +       bs->block_timer = qemu_new_timer_ns(vm_clock, bdrv_block_timer, bs);
>> +       qemu_mod_timer(bs->block_timer, qemu_get_clock_ns(vm_clock));
>
> Why is the timer being scheduled immediately?  There are no queued requests.
>
>> +
>> +       bs->slice_start[0] = qemu_get_clock_ns(vm_clock);
>> +       bs->slice_start[1] = qemu_get_clock_ns(vm_clock);
>> +    }
>> +
>>     return 0;
>>
>>  unlink_and_fail:
>> @@ -680,6 +727,15 @@ void bdrv_close(BlockDriverState *bs)
>>         if (bs->change_cb)
>>             bs->change_cb(bs->change_opaque, CHANGE_MEDIA);
>>     }
>> +
>> +    /* throttling disk I/O limits */
>> +    if (bs->block_queue) {
>> +       qemu_del_block_queue(bs->block_queue);
>
> 3 space indent, should be 4 spaces.
>
>> +    }
>> +
>> +    if (bs->block_timer) {
>
> qemu_free_timer() will only free() the timer memory but it will not
> cancel the timer.  Use qemu_del_timer() first to ensure that the timer
> is not pending:
>
> qemu_del_timer(bs->block_timer);
>
>> +        qemu_free_timer(bs->block_timer);
>> +    }
>>  }
>>
>>  void bdrv_close_all(void)
>> @@ -1312,6 +1368,13 @@ void bdrv_get_geometry_hint(BlockDriverState *bs,
>>     *psecs = bs->secs;
>>  }
>>
>> +/* throttling disk io limits */
>> +void bdrv_set_io_limits(BlockDriverState *bs,
>> +                           BlockIOLimit *io_limits)
>> +{
>> +    bs->io_limits            = io_limits;
>
> This function takes ownership of io_limits but never frees it.  I
> suggest not taking ownership and just copying io_limits into
> bs->io_limits so that the caller does not need to malloc() and the
> lifecycle of bs->io_limits is completely under our control.
>
> Easiest would be to turn BlockDriverState.io_limits into:
>
> BlockIOLimit io_limits;
>
> and just copy in bdrv_set_io_limits():
>
> bs->io_limits = *io_limits;
>
> bdrv_exceed_io_limits() returns quite quickly if no limits are set, so
> I wouldn't worry about optimizing it out yet.
>
>> +}
>> +
>>  /* Recognize floppy formats */
>>  typedef struct FDFormat {
>>     FDriveType drive;
>> @@ -2111,6 +2174,154 @@ char *bdrv_snapshot_dump(char *buf, int buf_size, QEMUSnapshotInfo *sn)
>>     return buf;
>>  }
>>
>> +bool bdrv_exceed_bps_limits(BlockDriverState *bs, int nb_sectors,
>> +                           bool is_write, uint64_t *wait) {
>> +    int64_t  real_time;
>> +    uint64_t bps_limit   = 0;
>> +    double   bytes_limit, bytes_disp, bytes_res, elapsed_time;
>> +    double   slice_time = 0.1, wait_time;
>> +
>> +    if (bs->io_limits->bps[2]) {
>> +        bps_limit = bs->io_limits->bps[2];
>
> Please define a constant (IO_LIMIT_TOTAL?) instead of using magic number 2.
>
>> +    } else if (bs->io_limits->bps[is_write]) {
>> +        bps_limit = bs->io_limits->bps[is_write];
>> +    } else {
>> +        if (wait) {
>> +            *wait = 0;
>> +        }
>> +
>> +        return false;
>> +    }
>> +
>> +    real_time = qemu_get_clock_ns(vm_clock);
>> +    if (bs->slice_start[is_write] + 100000000 <= real_time) {
>
> Please define a constant for the 100 ms time slice instead of using
> 100000000 directly.
>
>> +        bs->slice_start[is_write] = real_time;
>> +
>> +        bs->io_disps->bytes[is_write] = 0;
>> +        if (bs->io_limits->bps[2]) {
>> +            bs->io_disps->bytes[!is_write] = 0;
>> +        }
>
> All previous data should be discarded when a new time slice starts:
> bs->io_disps->bytes[IO_LIMIT_READ] = 0;
> bs->io_disps->bytes[IO_LIMIT_WRITE] = 0;
>
>> +    }
>
> Please start the new slice in common code.  That way you can clear
> bytes[] and ios[] at the same time and don't need to duplicate this
> code in both bdrv_exceed_bps_limits() and bdrv_exceed_iops_limits().
>
>> +
>> +    elapsed_time  = (real_time - bs->slice_start[is_write]) / 1000000000.0;
>> +    fprintf(stderr, "real_time = %ld, slice_start = %ld, elapsed_time = %g\n", real_time, bs->slice_start[is_write], elapsed_time);
>> +
>> +    bytes_limit        = bps_limit * slice_time;
>> +    bytes_disp  = bs->io_disps->bytes[is_write];
>> +    if (bs->io_limits->bps[2]) {
>> +        bytes_disp += bs->io_disps->bytes[!is_write];
>> +    }
>> +
>> +    bytes_res   = (unsigned) nb_sectors * BDRV_SECTOR_SIZE;
>> +    fprintf(stderr, "bytes_limit = %g bytes_disp = %g, bytes_res = %g, elapsed_time = %g\n", bytes_limit, bytes_disp, bytes_res, elapsed_time);
>> +
>> +    if (bytes_disp + bytes_res <= bytes_limit) {
>> +        if (wait) {
>> +            *wait = 0;
>> +        }
>> +
>> +       fprintf(stderr, "bytes_disp + bytes_res <= bytes_limit\n");
>> +        return false;
>> +    }
>> +
>> +    /* Calc approx time to dispatch */
>> +    wait_time = (bytes_disp + bytes_res - bytes_limit) / bps_limit;
>> +    if (!wait_time) {
>> +        wait_time = 1;
>> +    }
>> +
>> +    wait_time = wait_time + (slice_time - elapsed_time);
>> +    if (wait) {
>> +       *wait = wait_time * 1000000000 + 1;
>> +    }
>> +
>> +    return true;
>> +}
>
> After a slice expires all bytes/iops dispatched data is forgotten,
> even if there are still requests queued.  This means that requests
> issued by the guest immediately after a new 100 ms period will be
> issued but existing queued requests will still be waiting.
>
> And since the queued requests don't get their next chance until later,
> it's possible that they will be requeued because the requests that the
> guest snuck in have brought us to the limit again.
>
> In order to solve this problem, we need to extend the current slice if
Extend the current slice? like in-kernel block throttling algorithm?
Our algorithm seems not to adopt it currently.

> there are still requests pending.  To prevent extensions from growing
> the slice forever (and keeping too much old data around), it should be
> alright to cull data from 2 slices ago.  The simplest way of doing
> that is to subtract the bps/iops limits from the bytes/iops
> dispatched.
You mean that the largest value of current_slice_time is not more than
2 slice_time?

>
>> @@ -2129,6 +2341,19 @@ BlockDriverAIOCB *bdrv_aio_readv(BlockDriverState *bs, int64_t sector_num,
>>     if (bdrv_check_request(bs, sector_num, nb_sectors))
>>         return NULL;
>>
>> +    /* throttling disk read I/O */
>> +    if (bs->io_limits != NULL) {
>> +       if (bdrv_exceed_io_limits(bs, nb_sectors, false, &wait_time)) {
>> +            ret = qemu_block_queue_enqueue(bs->block_queue, bs, bdrv_aio_readv,
>> +                               sector_num, qiov, nb_sectors, cb, opaque);
>
> 5 space indent, should be 4.
>
>> +           fprintf(stderr, "bdrv_aio_readv: wait_time = %ld, timer value = %ld\n", wait_time, wait_time + qemu_get_clock_ns(vm_clock));
>> +           qemu_mod_timer(bs->block_timer, wait_time + qemu_get_clock_ns(vm_clock));
>
> Imagine 3 requests that need to be queued: A, B, and C.  Since the
> timer is unconditionally set each time a request is queued, the timer
> will be set to C's wait_time.  However A or B's wait_time might be
> earlier and we will miss that deadline!
Yeah, exactly there is this issue.
>
> We really need a priority queue here.  QEMU's timers solve the same
> problem with a sorted list, which might actually be faster for short
> lists where a fancy data structure has too much overhead.
You mean the block requests should be handled in FIFO way in order?
If the block queue is not empty, should this coming request be
enqueued at first? right?

>
>> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
>> +                       BlockDriverState *bs,
>> +                       BlockRequestHandler *handler,
>> +                       int64_t sector_num,
>> +                       QEMUIOVector *qiov,
>> +                       int nb_sectors,
>> +                       BlockDriverCompletionFunc *cb,
>> +                       void *opaque)
>> +{
>> +    BlockIORequest *request;
>> +    BlockDriverAIOCB *acb;
>> +
>> +    request = qemu_malloc(sizeof(BlockIORequest));
>> +    request->bs = bs;
>> +    request->handler = handler;
>> +    request->sector_num = sector_num;
>> +    request->qiov = qiov;
>> +    request->nb_sectors = nb_sectors;
>> +    request->cb = cb;
>> +    request->opaque = opaque;
>> +
>> +    QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
>> +
>> +    acb = qemu_malloc(sizeof(*acb));
>> +    acb->bs = bs;
>> +    acb->cb = cb;
>> +    acb->opaque = opaque;
>
> AIOPool and qemu_aio_get() should be used instead of manually doing
> this.  Otherwise bdrv_aio_cancel() does not work.
>
> In order to free our acb when the I/O request completes we need a cb
> function.  Right now acb is leaking.
Yeah,
>> +    acb->cb = cb; this expression can not complete what you expect?

>
>> diff --git a/qemu-config.c b/qemu-config.c
>> index efa892c..ad5f2d9 100644
>> --- a/qemu-config.c
>> +++ b/qemu-config.c
>> @@ -82,6 +82,30 @@ static QemuOptsList qemu_drive_opts = {
>>             .name = "boot",
>>             .type = QEMU_OPT_BOOL,
>>             .help = "make this a boot drive",
>> +        },{
>> +            .name = "iops",
>> +            .type = QEMU_OPT_NUMBER,
>> +            .help = "cap its iops for this drive",
>
> Let's explain what "iops" means:
>
> "limit total I/O operations per second"
>
>> +        },{
>> +            .name = "iops_rd",
>> +            .type = QEMU_OPT_NUMBER,
>> +            .help = "cap its iops_rd for this drive",
>
> "limit read operations per second"
>
>> +        },{
>> +            .name = "iops_wr",
>> +            .type = QEMU_OPT_NUMBER,
>> +            .help = "cap its iops_wr for this drive",
>
> "limit write operations per second"
>
>> +        },{
>> +            .name = "bps",
>> +            .type = QEMU_OPT_NUMBER,
>> +            .help = "cap its bps for this drive",
>
> "limit total bytes per second"
>
>> +        },{
>> +            .name = "bps_rd",
>> +            .type = QEMU_OPT_NUMBER,
>> +            .help = "cap its bps_rd for this drive",
>
> "limit read bytes per second"
>
>> +        },{
>> +            .name = "bps_wr",
>> +            .type = QEMU_OPT_NUMBER,
>> +            .help = "cap its iops_wr for this drive",
>
> "limit write bytes per second"
>
> Stefan
>
Stefan Hajnoczi July 25, 2011, 12:24 p.m. UTC | #6
On Mon, Jul 25, 2011 at 8:08 AM, Zhi Yong Wu <zwu.kernel@gmail.com> wrote:
> On Fri, Jul 22, 2011 at 6:54 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
>> On Fri, Jul 22, 2011 at 10:20 AM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
>>> +    elapsed_time  = (real_time - bs->slice_start[is_write]) / 1000000000.0;
>>> +    fprintf(stderr, "real_time = %ld, slice_start = %ld, elapsed_time = %g\n", real_time, bs->slice_start[is_write], elapsed_time);
>>> +
>>> +    bytes_limit        = bps_limit * slice_time;
>>> +    bytes_disp  = bs->io_disps->bytes[is_write];
>>> +    if (bs->io_limits->bps[2]) {
>>> +        bytes_disp += bs->io_disps->bytes[!is_write];
>>> +    }
>>> +
>>> +    bytes_res   = (unsigned) nb_sectors * BDRV_SECTOR_SIZE;
>>> +    fprintf(stderr, "bytes_limit = %g bytes_disp = %g, bytes_res = %g, elapsed_time = %g\n", bytes_limit, bytes_disp, bytes_res, elapsed_time);
>>> +
>>> +    if (bytes_disp + bytes_res <= bytes_limit) {
>>> +        if (wait) {
>>> +            *wait = 0;
>>> +        }
>>> +
>>> +       fprintf(stderr, "bytes_disp + bytes_res <= bytes_limit\n");
>>> +        return false;
>>> +    }
>>> +
>>> +    /* Calc approx time to dispatch */
>>> +    wait_time = (bytes_disp + bytes_res - bytes_limit) / bps_limit;
>>> +    if (!wait_time) {
>>> +        wait_time = 1;
>>> +    }
>>> +
>>> +    wait_time = wait_time + (slice_time - elapsed_time);
>>> +    if (wait) {
>>> +       *wait = wait_time * 1000000000 + 1;
>>> +    }
>>> +
>>> +    return true;
>>> +}
>>
>> After a slice expires all bytes/iops dispatched data is forgotten,
>> even if there are still requests queued.  This means that requests
>> issued by the guest immediately after a new 100 ms period will be
>> issued but existing queued requests will still be waiting.
>>
>> And since the queued requests don't get their next chance until later,
>> it's possible that they will be requeued because the requests that the
>> guest snuck in have brought us to the limit again.
>>
>> In order to solve this problem, we need to extend the current slice if
> Extend the current slice? like in-kernel block throttling algorithm?
> Our algorithm seems not to adopt it currently.

I'm not sure if extending the slice is necessary as long as new
requests are queued while previous requests are still queued.  But
extending slices is one way to deal with requests that span across
multiple slices.  See below.

>> there are still requests pending.  To prevent extensions from growing
>> the slice forever (and keeping too much old data around), it should be
>> alright to cull data from 2 slices ago.  The simplest way of doing
>> that is to subtract the bps/iops limits from the bytes/iops
>> dispatched.
> You mean that the largest value of current_slice_time is not more than
> 2 slice_time?

Yes.  If no single request is larger than the I/O limit, then the
timer value for a queued request should always be within the next
slice.  Therefore everything before last slice should be completed
already and we don't need to keep that history around.

>>
>>> @@ -2129,6 +2341,19 @@ BlockDriverAIOCB *bdrv_aio_readv(BlockDriverState *bs, int64_t sector_num,
>>>     if (bdrv_check_request(bs, sector_num, nb_sectors))
>>>         return NULL;
>>>
>>> +    /* throttling disk read I/O */
>>> +    if (bs->io_limits != NULL) {
>>> +       if (bdrv_exceed_io_limits(bs, nb_sectors, false, &wait_time)) {
>>> +            ret = qemu_block_queue_enqueue(bs->block_queue, bs, bdrv_aio_readv,
>>> +                               sector_num, qiov, nb_sectors, cb, opaque);
>>
>> 5 space indent, should be 4.
>>
>>> +           fprintf(stderr, "bdrv_aio_readv: wait_time = %ld, timer value = %ld\n", wait_time, wait_time + qemu_get_clock_ns(vm_clock));
>>> +           qemu_mod_timer(bs->block_timer, wait_time + qemu_get_clock_ns(vm_clock));
>>
>> Imagine 3 requests that need to be queued: A, B, and C.  Since the
>> timer is unconditionally set each time a request is queued, the timer
>> will be set to C's wait_time.  However A or B's wait_time might be
>> earlier and we will miss that deadline!
> Yeah, exactly there is this issue.
>>
>> We really need a priority queue here.  QEMU's timers solve the same
>> problem with a sorted list, which might actually be faster for short
>> lists where a fancy data structure has too much overhead.
> You mean the block requests should be handled in FIFO way in order?
> If the block queue is not empty, should this coming request be
> enqueued at first? right?

Yes.  If the limit was previously exceeded, enqueue new requests immediately:

/* If a limit was exceeded, immediately queue this request */
if (!QTAILQ_EMPTY(&queue->requests)) {
    if (limits->bps[IO_LIMIT_TOTAL]) {
        /* queue any rd/wr request */
    } else if (limits->bps[is_write] && another_request_is_queued[is_write]) {
        /* queue if the rd/wr-specific limit was previously exceeded */
    }

    ...same for iops...
}

This way new requests cannot skip ahead of queued requests due to the
lost history when a new slice starts.

>>
>>> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
>>> +                       BlockDriverState *bs,
>>> +                       BlockRequestHandler *handler,
>>> +                       int64_t sector_num,
>>> +                       QEMUIOVector *qiov,
>>> +                       int nb_sectors,
>>> +                       BlockDriverCompletionFunc *cb,
>>> +                       void *opaque)
>>> +{
>>> +    BlockIORequest *request;
>>> +    BlockDriverAIOCB *acb;
>>> +
>>> +    request = qemu_malloc(sizeof(BlockIORequest));
>>> +    request->bs = bs;
>>> +    request->handler = handler;
>>> +    request->sector_num = sector_num;
>>> +    request->qiov = qiov;
>>> +    request->nb_sectors = nb_sectors;
>>> +    request->cb = cb;
>>> +    request->opaque = opaque;
>>> +
>>> +    QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
>>> +
>>> +    acb = qemu_malloc(sizeof(*acb));
>>> +    acb->bs = bs;
>>> +    acb->cb = cb;
>>> +    acb->opaque = opaque;
>>
>> AIOPool and qemu_aio_get() should be used instead of manually doing
>> this.  Otherwise bdrv_aio_cancel() does not work.
>>
>> In order to free our acb when the I/O request completes we need a cb
>> function.  Right now acb is leaking.
> Yeah,
>>> +    acb->cb = cb; this expression can not complete what you expect?

A qemu_block_queue_cb() function should be called instead of directly
invoking the user's cb function.  This way the block queue code has a
chance to release its acb.

Stefan
diff mbox

Patch

diff --git a/Makefile.objs b/Makefile.objs
index 9f99ed4..06f2033 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -23,7 +23,7 @@  block-nested-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vv
 block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-cache.o
 block-nested-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
 block-nested-y += qed-check.o
-block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o
+block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o blk-queue.o
 block-nested-$(CONFIG_WIN32) += raw-win32.o
 block-nested-$(CONFIG_POSIX) += raw-posix.o
 block-nested-$(CONFIG_CURL) += curl.o
diff --git a/block.c b/block.c
index 24a25d5..0b12f08 100644
--- a/block.c
+++ b/block.c
@@ -29,6 +29,9 @@ 
 #include "module.h"
 #include "qemu-objects.h"
 
+#include "qemu-timer.h"
+#include "block/blk-queue.h"
+
 #ifdef CONFIG_BSD
 #include <sys/types.h>
 #include <sys/stat.h>
@@ -58,6 +61,10 @@  static int bdrv_read_em(BlockDriverState *bs, int64_t sector_num,
 static int bdrv_write_em(BlockDriverState *bs, int64_t sector_num,
                          const uint8_t *buf, int nb_sectors);
 
+bool bdrv_exceed_bps_limits(BlockDriverState *bs, int nb_sectors, bool is_write, uint64_t *wait);
+bool bdrv_exceed_iops_limits(BlockDriverState *bs, bool is_write, uint64_t *wait);
+bool bdrv_exceed_io_limits(BlockDriverState *bs, int nb_sectors, bool is_write, uint64_t *wait);
+
 static QTAILQ_HEAD(, BlockDriverState) bdrv_states =
     QTAILQ_HEAD_INITIALIZER(bdrv_states);
 
@@ -167,6 +174,31 @@  void path_combine(char *dest, int dest_size,
     }
 }
 
+static void bdrv_block_timer(void *opaque)
+{
+    BlockDriverState *bs = opaque;
+    BlockQueue *queue = bs->block_queue;
+    uint64_t intval = 1;
+
+    while (!QTAILQ_EMPTY(&queue->requests)) {
+        BlockIORequest *request;
+        int ret;
+
+        request = QTAILQ_FIRST(&queue->requests);
+        QTAILQ_REMOVE(&queue->requests, request, entry);
+
+        ret = queue->handler(request);
+        if (ret == 0) {
+            QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
+            break;
+        }
+
+        qemu_free(request);
+    }
+
+    qemu_mod_timer(bs->block_timer, (intval * 1000000000) + qemu_get_clock_ns(vm_clock));
+}
+
 void bdrv_register(BlockDriver *bdrv)
 {
     if (!bdrv->bdrv_aio_readv) {
@@ -476,6 +508,11 @@  static int bdrv_open_common(BlockDriverState *bs, const char *filename,
         goto free_and_fail;
     }
 
+    /* throttling disk I/O limits */
+    if (bs->io_limits != NULL) {
+    	bs->block_queue = qemu_new_block_queue(qemu_block_queue_handler);
+    }
+
 #ifndef _WIN32
     if (bs->is_temporary) {
         unlink(filename);
@@ -642,6 +679,16 @@  int bdrv_open(BlockDriverState *bs, const char *filename, int flags,
             bs->change_cb(bs->change_opaque, CHANGE_MEDIA);
     }
 
+    /* throttling disk I/O limits */
+    if (bs->io_limits != NULL) {
+	bs->io_disps    = qemu_mallocz(sizeof(BlockIODisp));
+    	bs->block_timer = qemu_new_timer_ns(vm_clock, bdrv_block_timer, bs);
+    	qemu_mod_timer(bs->block_timer, qemu_get_clock_ns(vm_clock));
+
+	bs->slice_start[0] = qemu_get_clock_ns(vm_clock);
+	bs->slice_start[1] = qemu_get_clock_ns(vm_clock);
+    }
+
     return 0;
 
 unlink_and_fail:
@@ -680,6 +727,15 @@  void bdrv_close(BlockDriverState *bs)
         if (bs->change_cb)
             bs->change_cb(bs->change_opaque, CHANGE_MEDIA);
     }
+
+    /* throttling disk I/O limits */
+    if (bs->block_queue) {
+	qemu_del_block_queue(bs->block_queue);
+    }
+
+    if (bs->block_timer) {
+        qemu_free_timer(bs->block_timer);
+    }
 }
 
 void bdrv_close_all(void)
@@ -1312,6 +1368,13 @@  void bdrv_get_geometry_hint(BlockDriverState *bs,
     *psecs = bs->secs;
 }
 
+/* throttling disk io limits */
+void bdrv_set_io_limits(BlockDriverState *bs,
+			    BlockIOLimit *io_limits)
+{
+    bs->io_limits	      = io_limits;
+}
+
 /* Recognize floppy formats */
 typedef struct FDFormat {
     FDriveType drive;
@@ -2111,6 +2174,154 @@  char *bdrv_snapshot_dump(char *buf, int buf_size, QEMUSnapshotInfo *sn)
     return buf;
 }
 
+bool bdrv_exceed_bps_limits(BlockDriverState *bs, int nb_sectors,
+                           bool is_write, uint64_t *wait) {
+    int64_t  real_time;
+    uint64_t bps_limit   = 0;
+    double   bytes_limit, bytes_disp, bytes_res, elapsed_time;
+    double   slice_time = 0.1, wait_time;
+
+    if (bs->io_limits->bps[2]) {
+        bps_limit = bs->io_limits->bps[2];
+    } else if (bs->io_limits->bps[is_write]) {
+        bps_limit = bs->io_limits->bps[is_write];
+    } else {
+        if (wait) {
+            *wait = 0;
+        }
+
+        return false;
+    }
+
+    real_time = qemu_get_clock_ns(vm_clock);
+    if (bs->slice_start[is_write] + 100000000 <= real_time) {
+        bs->slice_start[is_write] = real_time;
+
+        bs->io_disps->bytes[is_write] = 0;
+        if (bs->io_limits->bps[2]) {
+            bs->io_disps->bytes[!is_write] = 0;
+        }
+    }
+
+    elapsed_time  = (real_time - bs->slice_start[is_write]) / 1000000000.0;
+    fprintf(stderr, "real_time = %ld, slice_start = %ld, elapsed_time = %g\n", real_time, bs->slice_start[is_write], elapsed_time);
+
+    bytes_limit	= bps_limit * slice_time;
+    bytes_disp  = bs->io_disps->bytes[is_write];
+    if (bs->io_limits->bps[2]) {
+        bytes_disp += bs->io_disps->bytes[!is_write];
+    }
+
+    bytes_res   = (unsigned) nb_sectors * BDRV_SECTOR_SIZE;
+    fprintf(stderr, "bytes_limit = %g bytes_disp = %g, bytes_res = %g, elapsed_time = %g\n", bytes_limit, bytes_disp, bytes_res, elapsed_time);
+
+    if (bytes_disp + bytes_res <= bytes_limit) {
+        if (wait) {
+            *wait = 0;
+        }
+
+    	fprintf(stderr, "bytes_disp + bytes_res <= bytes_limit\n");
+        return false;
+    }
+
+    /* Calc approx time to dispatch */
+    wait_time = (bytes_disp + bytes_res - bytes_limit) / bps_limit;
+    if (!wait_time) {
+        wait_time = 1;
+    }
+
+    wait_time = wait_time + (slice_time - elapsed_time);
+    if (wait) {
+	*wait = wait_time * 1000000000 + 1;
+    }
+
+    return true;
+}
+
+bool bdrv_exceed_iops_limits(BlockDriverState *bs,
+                           bool is_write, uint64_t *wait) {
+    int64_t  real_time;
+    uint64_t iops_limit   = 0;
+    double   ios_limit, ios_disp, elapsed_time;
+    double   slice_time = 0.1, wait_time;
+
+    if (bs->io_limits->iops[2]) {
+        iops_limit = bs->io_limits->iops[2];
+    } else if (bs->io_limits->iops[is_write]) {
+        iops_limit = bs->io_limits->iops[is_write];
+    } else {
+        if (wait) {
+            *wait = 0;
+        }
+
+        return false;
+    }
+
+    real_time = qemu_get_clock_ns(vm_clock);
+    if (bs->slice_start[is_write] + 100000000 <= real_time) {
+        bs->slice_start[is_write] = real_time;
+
+        bs->io_disps->ios[is_write] = 0;
+        if (bs->io_limits->iops[2]) {
+            bs->io_disps->ios[!is_write] = 0;
+        }
+    }
+
+    elapsed_time  = (real_time - bs->slice_start[is_write]) / 1000000000.0;
+    fprintf(stderr, "real_time = %ld, slice_start = %ld, elapsed_time = %g\n", real_time, bs->slice_start[is_write], elapsed_time);
+
+    ios_limit = iops_limit * slice_time;
+    ios_disp  = bs->io_disps->ios[is_write];
+    if (bs->io_limits->iops[2]) {
+        ios_disp += bs->io_disps->ios[!is_write];
+    }
+    fprintf(stderr, "iops_limit = %ld ios_disp = %g, elapsed_time = %g\n", iops_limit, ios_disp, elapsed_time);
+
+    if (ios_disp + 1 <= ios_limit) {
+	if (wait) {
+	    *wait = 0;
+	}
+
+        fprintf(stderr, "ios_disp + 1 <= ios_limit\n");
+        return false;
+    }
+
+    /* Calc approx time to dispatch */
+    wait_time = (ios_disp + 1) / iops_limit;
+    if (wait_time > elapsed_time) {
+	wait_time = wait_time - elapsed_time;
+    } else {
+	wait_time = 0;
+    }
+
+    if (wait) {
+        *wait = wait_time * 1000000000 + 1;
+    }
+
+    return true;
+}
+
+bool bdrv_exceed_io_limits(BlockDriverState *bs, int nb_sectors,
+                           bool is_write, uint64_t *wait) {
+    uint64_t bps_wait = 0, iops_wait = 0, max_wait;
+
+    if (bdrv_exceed_bps_limits(bs, nb_sectors, is_write, &bps_wait)
+        || bdrv_exceed_iops_limits(bs, is_write, &iops_wait)) {
+        max_wait = bps_wait > iops_wait ? bps_wait : iops_wait;
+        if (wait) {
+            *wait = max_wait;
+        }
+
+        fprintf(stderr, "bdrv_exceed_io_limits: wait = %ld\n", *wait);
+        return true;
+    }
+
+    if (wait) {
+        *wait = 0;
+    }
+
+    return false;
+}
 
 /**************************************************************/
 /* async I/Os */
@@ -2121,6 +2332,7 @@  BlockDriverAIOCB *bdrv_aio_readv(BlockDriverState *bs, int64_t sector_num,
 {
     BlockDriver *drv = bs->drv;
     BlockDriverAIOCB *ret;
+    uint64_t wait_time = 0;
 
     trace_bdrv_aio_readv(bs, sector_num, nb_sectors, opaque);
 
@@ -2129,6 +2341,19 @@  BlockDriverAIOCB *bdrv_aio_readv(BlockDriverState *bs, int64_t sector_num,
     if (bdrv_check_request(bs, sector_num, nb_sectors))
         return NULL;
 
+    /* throttling disk read I/O */
+    if (bs->io_limits != NULL) {
+    	if (bdrv_exceed_io_limits(bs, nb_sectors, false, &wait_time)) {
+            ret = qemu_block_queue_enqueue(bs->block_queue, bs, bdrv_aio_readv,
+				sector_num, qiov, nb_sectors, cb, opaque);
+    	    fprintf(stderr, "bdrv_aio_readv: wait_time = %ld, timer value = %ld\n", wait_time, wait_time + qemu_get_clock_ns(vm_clock));
+    	    qemu_mod_timer(bs->block_timer, wait_time + qemu_get_clock_ns(vm_clock));
+            
+	    return ret;
+	}
+    }
+
+
     ret = drv->bdrv_aio_readv(bs, sector_num, qiov, nb_sectors,
                               cb, opaque);
 
@@ -2136,6 +2361,11 @@  BlockDriverAIOCB *bdrv_aio_readv(BlockDriverState *bs, int64_t sector_num,
 	/* Update stats even though technically transfer has not happened. */
 	bs->rd_bytes += (unsigned) nb_sectors * BDRV_SECTOR_SIZE;
 	bs->rd_ops ++;
+
+	if (bs->io_limits != NULL) {
+	    bs->io_disps->bytes[0] += (unsigned) nb_sectors * BDRV_SECTOR_SIZE;
+	    bs->io_disps->ios[0] ++;
+	}
     }
 
     return ret;
@@ -2184,6 +2414,7 @@  BlockDriverAIOCB *bdrv_aio_writev(BlockDriverState *bs, int64_t sector_num,
     BlockDriver *drv = bs->drv;
     BlockDriverAIOCB *ret;
     BlockCompleteData *blk_cb_data;
+    uint64_t wait_time = 0;
 
     trace_bdrv_aio_writev(bs, sector_num, nb_sectors, opaque);
 
@@ -2201,6 +2432,18 @@  BlockDriverAIOCB *bdrv_aio_writev(BlockDriverState *bs, int64_t sector_num,
         opaque = blk_cb_data;
     }
 
+    /* throttling disk write I/O */
+    if (bs->io_limits != NULL) {
+    	if (bdrv_exceed_io_limits(bs, nb_sectors, true, &wait_time)) {
+	    ret = qemu_block_queue_enqueue(bs->block_queue, bs, bdrv_aio_writev,
+				sector_num, qiov, nb_sectors, cb, opaque);
+    	    fprintf(stderr, "bdrv_aio_writev: wait_time = %ld, timer value = %ld\n", wait_time, wait_time + qemu_get_clock_ns(vm_clock));
+    	    qemu_mod_timer(bs->block_timer, wait_time + qemu_get_clock_ns(vm_clock));
+            return ret;
+	}
+    }
+
+
     ret = drv->bdrv_aio_writev(bs, sector_num, qiov, nb_sectors,
                                cb, opaque);
 
@@ -2211,6 +2454,11 @@  BlockDriverAIOCB *bdrv_aio_writev(BlockDriverState *bs, int64_t sector_num,
         if (bs->wr_highest_sector < sector_num + nb_sectors - 1) {
             bs->wr_highest_sector = sector_num + nb_sectors - 1;
         }
+
+	if (bs->io_limits != NULL) {
+            bs->io_disps->bytes[1] += (unsigned) nb_sectors * BDRV_SECTOR_SIZE;
+            bs->io_disps->ios[1] ++;
+	}
     }
 
     return ret;
diff --git a/block.h b/block.h
index 859d1d9..f0dac62 100644
--- a/block.h
+++ b/block.h
@@ -97,7 +97,6 @@  int bdrv_change_backing_file(BlockDriverState *bs,
     const char *backing_file, const char *backing_fmt);
 void bdrv_register(BlockDriver *bdrv);
 
-
 typedef struct BdrvCheckResult {
     int corruptions;
     int leaks;
diff --git a/block/blk-queue.c b/block/blk-queue.c
new file mode 100644
index 0000000..6cffb33
--- /dev/null
+++ b/block/blk-queue.c
@@ -0,0 +1,99 @@ 
+/*
+ * QEMU System Emulator queue for block layer
+ *
+ * Copyright (c) 2011 Zhi Yong Wu  <zwu.kernel@gmail.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#include "block_int.h"
+#include "qemu-queue.h"
+#include "block/blk-queue.h"
+
+/* The APIs for block request queue on qemu block layer.
+ */
+
+BlockQueue *qemu_new_block_queue(BlockQueueHandler *handler)
+{
+    BlockQueue *queue;
+
+    queue = qemu_mallocz(sizeof(BlockQueue));
+
+    queue->handler = handler;
+    QTAILQ_INIT(&queue->requests);
+
+    return queue;
+}
+
+void qemu_del_block_queue(BlockQueue *queue)
+{
+    BlockIORequest *request, *next;
+
+    QTAILQ_FOREACH_SAFE(request, &queue->requests, entry, next) {
+        QTAILQ_REMOVE(&queue->requests, request, entry);
+        qemu_free(request);
+    }
+
+    qemu_free(queue);
+}
+
+BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
+			BlockDriverState *bs,
+			BlockRequestHandler *handler,
+			int64_t sector_num,
+			QEMUIOVector *qiov,
+			int nb_sectors,
+			BlockDriverCompletionFunc *cb,
+			void *opaque)
+{
+    BlockIORequest *request;
+    BlockDriverAIOCB *acb;
+
+    request = qemu_malloc(sizeof(BlockIORequest));
+    request->bs = bs;
+    request->handler = handler;
+    request->sector_num = sector_num;
+    request->qiov = qiov;
+    request->nb_sectors = nb_sectors;
+    request->cb = cb;
+    request->opaque = opaque;
+
+    QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
+
+    acb = qemu_malloc(sizeof(*acb));
+    acb->bs = bs;
+    acb->cb = cb;
+    acb->opaque = opaque;
+
+    return acb;
+}
+
+int qemu_block_queue_handler(BlockIORequest *request)
+{
+    int ret;
+    BlockDriverAIOCB *res;
+
+    res = request->handler(request->bs, request->sector_num,
+        	                request->qiov, request->nb_sectors,
+				request->cb, request->opaque);
+
+    ret = (res == NULL) ? 0 : 1; 
+
+    return ret;
+}
diff --git a/block/blk-queue.h b/block/blk-queue.h
new file mode 100644
index 0000000..6c400c7
--- /dev/null
+++ b/block/blk-queue.h
@@ -0,0 +1,73 @@ 
+/*
+ * QEMU System Emulator queue definition for block layer
+ *
+ * Copyright (c) 2011 Zhi Yong Wu  <zwu.kernel@gmail.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#ifndef QEMU_BLOCK_QUEUE_H
+#define QEMU_BLOCK_QUEUE_H
+
+#include "block.h"
+#include "qemu-queue.h"
+#include "qemu-common.h"
+
+typedef BlockDriverAIOCB* (BlockRequestHandler) (BlockDriverState *bs,
+                                int64_t sector_num, QEMUIOVector *qiov,
+                                int nb_sectors, BlockDriverCompletionFunc *cb,
+                                void *opaque);
+
+struct BlockIORequest {
+    QTAILQ_ENTRY(BlockIORequest) entry;
+    BlockDriverState *bs;
+    BlockRequestHandler *handler;
+    int64_t sector_num;
+    QEMUIOVector *qiov;
+    int nb_sectors;
+    BlockDriverCompletionFunc *cb;
+    void *opaque;
+};
+
+typedef struct BlockIORequest BlockIORequest;
+
+typedef int (BlockQueueHandler) (BlockIORequest *request);
+
+struct BlockQueue {
+    BlockQueueHandler *handler;
+    QTAILQ_HEAD(requests, BlockIORequest) requests;
+};
+
+typedef struct BlockQueue BlockQueue;
+
+BlockQueue *qemu_new_block_queue(BlockQueueHandler *handler);
+
+void qemu_del_block_queue(BlockQueue *queue);
+
+BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
+                        BlockDriverState *bs,
+                        BlockRequestHandler *handler,
+                        int64_t sector_num,
+                        QEMUIOVector *qiov,
+                        int nb_sectors,
+                        BlockDriverCompletionFunc *cb,
+                        void *opaque);
+
+int qemu_block_queue_handler(BlockIORequest *request);
+#endif /* QEMU_BLOCK_QUEUE_H */
diff --git a/block_int.h b/block_int.h
index 1e265d2..a173e89 100644
--- a/block_int.h
+++ b/block_int.h
@@ -27,6 +27,7 @@ 
 #include "block.h"
 #include "qemu-option.h"
 #include "qemu-queue.h"
+#include "block/blk-queue.h"
 
 #define BLOCK_FLAG_ENCRYPT	1
 #define BLOCK_FLAG_COMPAT6	4
@@ -46,6 +47,16 @@  typedef struct AIOPool {
     BlockDriverAIOCB *free_aiocb;
 } AIOPool;
 
+typedef struct BlockIOLimit {
+    uint64_t bps[3];
+    uint64_t iops[3];
+} BlockIOLimit;
+
+typedef struct BlockIODisp {
+    uint64_t bytes[2];
+    uint64_t ios[2];
+} BlockIODisp;
+
 struct BlockDriver {
     const char *format_name;
     int instance_size;
@@ -175,6 +186,13 @@  struct BlockDriverState {
 
     void *sync_aiocb;
 
+    /* the time for latest disk I/O */
+    int64_t slice_start[2];
+    BlockIOLimit *io_limits;
+    BlockIODisp  *io_disps;
+    BlockQueue   *block_queue;
+    QEMUTimer    *block_timer;
+
     /* I/O stats (display with "info blockstats"). */
     uint64_t rd_bytes;
     uint64_t wr_bytes;
@@ -222,6 +240,9 @@  void qemu_aio_release(void *p);
 
 void *qemu_blockalign(BlockDriverState *bs, size_t size);
 
+void bdrv_set_io_limits(BlockDriverState *bs,
+                            BlockIOLimit *io_limits);
+
 #ifdef _WIN32
 int is_windows_drive(const char *filename);
 #endif
diff --git a/blockdev.c b/blockdev.c
index c263663..4d79e7a 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -238,6 +238,9 @@  DriveInfo *drive_init(QemuOpts *opts, int default_to_scsi)
     int on_read_error, on_write_error;
     const char *devaddr;
     DriveInfo *dinfo;
+    BlockIOLimit *io_limits = NULL;
+    bool iol_flag = false;
+    const char *iol_opts[7] = {"bps", "bps_rd", "bps_wr", "iops", "iops_rd", "iops_wr"};
     int is_extboot = 0;
     int snapshot = 0;
     int ret;
@@ -372,6 +375,18 @@  DriveInfo *drive_init(QemuOpts *opts, int default_to_scsi)
         return NULL;
     }
 
+    /* disk io limits */
+    iol_flag = qemu_opt_io_limits_enable_flag(opts, iol_opts);
+    if (iol_flag) {
+    	io_limits		= qemu_mallocz(sizeof(*io_limits));
+    	io_limits->bps[2]  	= qemu_opt_get_number(opts, "bps", 0);
+    	io_limits->bps[0] 	= qemu_opt_get_number(opts, "bps_rd", 0);
+    	io_limits->bps[1]  	= qemu_opt_get_number(opts, "bps_wr", 0);
+        io_limits->iops[2]    	= qemu_opt_get_number(opts, "iops", 0);
+        io_limits->iops[0] 	= qemu_opt_get_number(opts, "iops_rd", 0);
+        io_limits->iops[1] 	= qemu_opt_get_number(opts, "iops_wr", 0);
+    }
+
     on_write_error = BLOCK_ERR_STOP_ENOSPC;
     if ((buf = qemu_opt_get(opts, "werror")) != NULL) {
         if (type != IF_IDE && type != IF_SCSI && type != IF_VIRTIO && type != IF_NONE) {
@@ -483,6 +498,11 @@  DriveInfo *drive_init(QemuOpts *opts, int default_to_scsi)
 
     bdrv_set_on_error(dinfo->bdrv, on_read_error, on_write_error);
 
+    /* throttling disk io limits */
+    if (iol_flag) {
+    	bdrv_set_io_limits(dinfo->bdrv, io_limits);
+    }
+
     switch(type) {
     case IF_IDE:
     case IF_SCSI:
diff --git a/qemu-config.c b/qemu-config.c
index efa892c..ad5f2d9 100644
--- a/qemu-config.c
+++ b/qemu-config.c
@@ -82,6 +82,30 @@  static QemuOptsList qemu_drive_opts = {
             .name = "boot",
             .type = QEMU_OPT_BOOL,
             .help = "make this a boot drive",
+        },{
+            .name = "iops",
+            .type = QEMU_OPT_NUMBER,
+            .help = "cap its iops for this drive",
+        },{
+            .name = "iops_rd",
+            .type = QEMU_OPT_NUMBER,
+            .help = "cap its iops_rd for this drive",
+        },{
+            .name = "iops_wr",
+            .type = QEMU_OPT_NUMBER,
+            .help = "cap its iops_wr for this drive",
+        },{
+            .name = "bps",
+            .type = QEMU_OPT_NUMBER,
+            .help = "cap its bps for this drive",
+        },{
+            .name = "bps_rd",
+            .type = QEMU_OPT_NUMBER,
+            .help = "cap its bps_rd for this drive",
+        },{
+            .name = "bps_wr",
+            .type = QEMU_OPT_NUMBER,
+            .help = "cap its iops_wr for this drive",
         },
         { /* end of list */ }
     },
diff --git a/qemu-option.c b/qemu-option.c
index 65db542..9fe234d 100644
--- a/qemu-option.c
+++ b/qemu-option.c
@@ -562,6 +562,23 @@  uint64_t qemu_opt_get_number(QemuOpts *opts, const char *name, uint64_t defval)
     return opt->value.uint;
 }
 
+bool qemu_opt_io_limits_enable_flag(QemuOpts *opts, const char **iol_opts)
+{
+     int i;
+     uint64_t opt_val   = 0;
+     bool iol_flag = false;
+
+     for (i = 0; iol_opts[i]; i++) {
+	 opt_val = qemu_opt_get_number(opts, iol_opts[i], 0);
+	 if (opt_val != 0) {
+	     iol_flag = true;
+	     break;
+	 }
+     }
+
+     return iol_flag;
+}
+
 uint64_t qemu_opt_get_size(QemuOpts *opts, const char *name, uint64_t defval)
 {
     QemuOpt *opt = qemu_opt_find(opts, name);
diff --git a/qemu-option.h b/qemu-option.h
index b515813..fc909f9 100644
--- a/qemu-option.h
+++ b/qemu-option.h
@@ -107,6 +107,7 @@  struct QemuOptsList {
 const char *qemu_opt_get(QemuOpts *opts, const char *name);
 int qemu_opt_get_bool(QemuOpts *opts, const char *name, int defval);
 uint64_t qemu_opt_get_number(QemuOpts *opts, const char *name, uint64_t defval);
+bool qemu_opt_io_limits_enable_flag(QemuOpts *opts, const char **iol_opts);
 uint64_t qemu_opt_get_size(QemuOpts *opts, const char *name, uint64_t defval);
 int qemu_opt_set(QemuOpts *opts, const char *name, const char *value);
 typedef int (*qemu_opt_loopfunc)(const char *name, const char *value, void *opaque);
diff --git a/qemu-options.hx b/qemu-options.hx
index cb3347e..ae219f5 100644
--- a/qemu-options.hx
+++ b/qemu-options.hx
@@ -121,6 +121,7 @@  DEF("drive", HAS_ARG, QEMU_OPTION_drive,
     "       [,cache=writethrough|writeback|none|unsafe][,format=f]\n"
     "       [,serial=s][,addr=A][,id=name][,aio=threads|native]\n"
     "       [,readonly=on|off][,boot=on|off]\n"
+    "       [[,bps=b]|[[,bps_rd=r][,bps_wr=w]]][[,iops=i]|[[,iops_rd=r][,iops_wr=w]]\n"
     "                use 'file' as a drive image\n", QEMU_ARCH_ALL)
 STEXI
 @item -drive @var{option}[,@var{option}[,@var{option}[,...]]]