diff mbox

[v9,1/4] block: add the block queue support

Message ID 1319796143-9061-2-git-send-email-wuzhy@linux.vnet.ibm.com
State New
Headers show

Commit Message

Zhi Yong Wu Oct. 28, 2011, 10:02 a.m. UTC
Signed-off-by: Zhi Yong Wu <wuzhy@linux.vnet.ibm.com>
---
 Makefile.objs     |    2 +-
 block.c           |   71 +++++++++++++++++--
 block.h           |   20 +++++
 block/blk-queue.c |  201 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 block/blk-queue.h |   63 +++++++++++++++++
 block_int.h       |   23 ++++++
 6 files changed, 371 insertions(+), 9 deletions(-)
 create mode 100644 block/blk-queue.c
 create mode 100644 block/blk-queue.h

Comments

Stefan Hajnoczi Oct. 31, 2011, 1:35 p.m. UTC | #1
On Fri, Oct 28, 2011 at 11:02 AM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
> +static void bdrv_io_limits_skip_set(void *opaque,
> +                                    BlockAPIType co_type,
> +                                    bool cb_skip,
> +                                    bool limit_skip) {
> +    RwCo *rwco;
> +    BlockDriverAIOCBCoroutine *aioco;
> +
> +    if (co_type == BDRV_API_SYNC) {
> +        rwco = opaque;
> +        rwco->limit_skip = limit_skip;
> +    } else if (co_type == BDRV_API_ASYNC) {
> +        aioco = opaque;
> +        aioco->cb_skip = cb_skip;
> +        aioco->limit_skip = limit_skip;
> +    } else {
> +        abort();
> +    }
> +}

The main question I have about this series is why have different cases
for sync, aio, and coroutines?  Perhaps I'm missing something but this
should all be much simpler.

All read/write requests are processed in a coroutine
(bdrv_co_do_readv()/bdrv_co_do_writev()).  That is the place to
perform I/O throttling.  Throttling should not be aware of sync, aio,
vs coroutines.

Since all requests have coroutines you could use CoQueue and the
actual queue waiting code in bdrv_co_do_readv()/bdrv_co_do_writev()
becomes:

if (bdrv_exceeds_io_limit(bs, sector_num, qiov, is_write)) {
    qemu_co_queue_wait(&bs->throttled_reqs);

    /* Wait until this request is allowed to start */
    while (bdrv_exceeds_io_limit(bs, sector_num, qiov, is_write)) {
        /* Re-inserting at the head of the CoQueue is equivalent to
         * the queue->flushing/queue->limit_exceeded behavior in your
         * patch.
         */
        qemu_co_queue_wait_insert_head(&bs->throttled_reqs);
    }
}

I think block/blk-queue.h isn't needed if you use the existing CoQueue
structure.

This is the main point I want to raise, just a few minor comments
below which may not be relevant if you can drop BlockQueue.

> +static int qemu_block_queue_handler(BlockQueueAIOCB *request)
> +{
> +    int ret;
> +
> +    BlockDriverState *bs = request->common.bs;
> +    assert(bs);
> +
> +    if (bs->io_limits_enabled) {

I'm not sure why the BlockQueue needs to reach into BlockDriverState.
Now BlockDriverState and BlockQueue know about and depend on each
other.  It's usually nicer to keep the relationship unidirectional, if
possible.

> +        ret = request->handler(request->common.bs, request->sector_num,
> +                               request->nb_sectors, request->qiov,
> +                               request, request->co_type);
> +    } else {
> +        if (request->co_type == BDRV_API_CO) {
> +            qemu_coroutine_enter(request->co, request->cocb);
> +        } else {
> +            printf("%s, req: %p\n", __func__, (void *)request);

Debug output should be removed.

> +            bdrv_io_limits_issue_request(request, request->co_type);
> +        }
> +
> +        ret = BDRV_BLKQ_DEQ_PASS;
> +    }
> +
> +    return ret;
> +}
> +
> +void qemu_block_queue_submit(BlockQueue *queue, BlockDriverCompletionFunc *cb)
> +{
> +    BlockQueueAIOCB *request;
> +    queue->flushing = true;
> +
> +    /*QTAILQ_FOREACH_SAFE(request, &queue->requests, entry, next) {*/

Commented out code should be removed.

> +    while (!QTAILQ_EMPTY(&queue->requests)) {
> +        int ret = 0;
> +
> +        request = QTAILQ_FIRST(&queue->requests);
> +        QTAILQ_REMOVE(&queue->requests, request, entry);
> +        queue->limit_exceeded = false;
> +        ret = qemu_block_queue_handler(request);
> +        if (ret == -EIO) {
> +            cb(request, -EIO);
> +            break;
> +        } else if (ret == BDRV_BLKQ_ENQ_AGAIN) {
> +            QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
> +            break;
> +        } else if (ret == BDRV_BLKQ_DEQ_PASS) {
> +            cb(request, 0);
> +        }

What if ret is not -EIO, BDRV_BLKQ_ENQ_AGAIN, or BDRV_BLKQ_DEQ_PASS?
I think the -EIO case should be the default that calls cb(request,
ret).

> +    }
> +
> +    printf("%s, leave\n", __func__);

Debug code should be removed.

Stefan
Zhiyong Wu Nov. 1, 2011, 7:50 a.m. UTC | #2
On Mon, Oct 31, 2011 at 9:35 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
> On Fri, Oct 28, 2011 at 11:02 AM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
>> +static void bdrv_io_limits_skip_set(void *opaque,
>> +                                    BlockAPIType co_type,
>> +                                    bool cb_skip,
>> +                                    bool limit_skip) {
>> +    RwCo *rwco;
>> +    BlockDriverAIOCBCoroutine *aioco;
>> +
>> +    if (co_type == BDRV_API_SYNC) {
>> +        rwco = opaque;
>> +        rwco->limit_skip = limit_skip;
>> +    } else if (co_type == BDRV_API_ASYNC) {
>> +        aioco = opaque;
>> +        aioco->cb_skip = cb_skip;
>> +        aioco->limit_skip = limit_skip;
>> +    } else {
>> +        abort();
>> +    }
>> +}
>
I have sent out v10. It discard the queue and request defined by us,
and rebase it to CoQueue, and let Coroutine represent one I/O request.
The code logic is now much simpler.

> The main question I have about this series is why have different cases
> for sync, aio, and coroutines?  Perhaps I'm missing something but this
> should all be much simpler.
>
> All read/write requests are processed in a coroutine
> (bdrv_co_do_readv()/bdrv_co_do_writev()).  That is the place to
> perform I/O throttling.  Throttling should not be aware of sync, aio,
> vs coroutines.
>
> Since all requests have coroutines you could use CoQueue and the
> actual queue waiting code in bdrv_co_do_readv()/bdrv_co_do_writev()
> becomes:
>
> if (bdrv_exceeds_io_limit(bs, sector_num, qiov, is_write)) {
>    qemu_co_queue_wait(&bs->throttled_reqs);
>
>    /* Wait until this request is allowed to start */
>    while (bdrv_exceeds_io_limit(bs, sector_num, qiov, is_write)) {
>        /* Re-inserting at the head of the CoQueue is equivalent to
>         * the queue->flushing/queue->limit_exceeded behavior in your
>         * patch.
>         */
>        qemu_co_queue_wait_insert_head(&bs->throttled_reqs);
>    }
> }
>
> I think block/blk-queue.h isn't needed if you use the existing CoQueue
> structure.
>
> This is the main point I want to raise, just a few minor comments
> below which may not be relevant if you can drop BlockQueue.
>
>> +static int qemu_block_queue_handler(BlockQueueAIOCB *request)
>> +{
>> +    int ret;
>> +
>> +    BlockDriverState *bs = request->common.bs;
>> +    assert(bs);
>> +
>> +    if (bs->io_limits_enabled) {
>
> I'm not sure why the BlockQueue needs to reach into BlockDriverState.
> Now BlockDriverState and BlockQueue know about and depend on each
> other.  It's usually nicer to keep the relationship unidirectional, if
> possible.
>
>> +        ret = request->handler(request->common.bs, request->sector_num,
>> +                               request->nb_sectors, request->qiov,
>> +                               request, request->co_type);
>> +    } else {
>> +        if (request->co_type == BDRV_API_CO) {
>> +            qemu_coroutine_enter(request->co, request->cocb);
>> +        } else {
>> +            printf("%s, req: %p\n", __func__, (void *)request);
>
> Debug output should be removed.
>
>> +            bdrv_io_limits_issue_request(request, request->co_type);
>> +        }
>> +
>> +        ret = BDRV_BLKQ_DEQ_PASS;
>> +    }
>> +
>> +    return ret;
>> +}
>> +
>> +void qemu_block_queue_submit(BlockQueue *queue, BlockDriverCompletionFunc *cb)
>> +{
>> +    BlockQueueAIOCB *request;
>> +    queue->flushing = true;
>> +
>> +    /*QTAILQ_FOREACH_SAFE(request, &queue->requests, entry, next) {*/
>
> Commented out code should be removed.
>
>> +    while (!QTAILQ_EMPTY(&queue->requests)) {
>> +        int ret = 0;
>> +
>> +        request = QTAILQ_FIRST(&queue->requests);
>> +        QTAILQ_REMOVE(&queue->requests, request, entry);
>> +        queue->limit_exceeded = false;
>> +        ret = qemu_block_queue_handler(request);
>> +        if (ret == -EIO) {
>> +            cb(request, -EIO);
>> +            break;
>> +        } else if (ret == BDRV_BLKQ_ENQ_AGAIN) {
>> +            QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
>> +            break;
>> +        } else if (ret == BDRV_BLKQ_DEQ_PASS) {
>> +            cb(request, 0);
>> +        }
>
> What if ret is not -EIO, BDRV_BLKQ_ENQ_AGAIN, or BDRV_BLKQ_DEQ_PASS?
> I think the -EIO case should be the default that calls cb(request,
> ret).
>
>> +    }
>> +
>> +    printf("%s, leave\n", __func__);
>
> Debug code should be removed.
>
> Stefan
>
>
diff mbox

Patch

diff --git a/Makefile.objs b/Makefile.objs
index 01587c8..98891b3 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -33,7 +33,7 @@  block-nested-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vv
 block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-cache.o
 block-nested-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
 block-nested-y += qed-check.o
-block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o
+block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o blk-queue.o
 block-nested-$(CONFIG_WIN32) += raw-win32.o
 block-nested-$(CONFIG_POSIX) += raw-posix.o
 block-nested-$(CONFIG_CURL) += curl.o
diff --git a/block.c b/block.c
index 70aab63..40ab277 100644
--- a/block.c
+++ b/block.c
@@ -1026,6 +1026,7 @@  typedef struct RwCo {
     QEMUIOVector *qiov;
     bool is_write;
     int ret;
+    bool limit_skip;
 } RwCo;
 
 static void coroutine_fn bdrv_rw_co_entry(void *opaque)
@@ -1060,6 +1061,7 @@  static int bdrv_rw_co(BlockDriverState *bs, int64_t sector_num, uint8_t *buf,
         .qiov = &qiov,
         .is_write = is_write,
         .ret = NOT_DONE,
+        .limit_skip = false,
     };
 
     qemu_iovec_init_external(&qiov, &iov, 1);
@@ -1242,6 +1244,64 @@  int bdrv_pwrite_sync(BlockDriverState *bs, int64_t offset,
     return 0;
 }
 
+typedef struct BlockDriverAIOCBCoroutine {
+    BlockDriverAIOCB common;
+    BlockRequest req;
+    bool is_write;
+    QEMUBH *bh;
+    bool cb_skip;
+    bool limit_skip;
+    void *blkq_acb;
+} BlockDriverAIOCBCoroutine;
+
+/* block I/O throttling */
+typedef struct CoroutineCB {
+    BlockDriverState *bs;
+    int64_t sector_num;
+    int nb_sectors;
+    QEMUIOVector *qiov;
+} CoroutineCB;
+
+static void bdrv_io_limits_skip_set(void *opaque,
+                                    BlockAPIType co_type,
+                                    bool cb_skip,
+                                    bool limit_skip) {
+    RwCo *rwco;
+    BlockDriverAIOCBCoroutine *aioco;
+
+    if (co_type == BDRV_API_SYNC) {
+        rwco = opaque;
+        rwco->limit_skip = limit_skip;
+    } else if (co_type == BDRV_API_ASYNC) {
+        aioco = opaque;
+        aioco->cb_skip = cb_skip;
+        aioco->limit_skip = limit_skip;
+    } else {
+        abort();
+    }
+}
+
+void bdrv_io_limits_issue_request(void *opaque,
+                                  BlockAPIType co_type) {
+    BlockQueueAIOCB *blkq_acb = opaque;
+
+    if (blkq_acb->co_type == BDRV_API_CO) {
+        qemu_coroutine_enter(blkq_acb->co, blkq_acb->cocb);
+    } else {
+        CoroutineEntry *entry = NULL;
+        if (co_type == BDRV_API_SYNC) {
+            entry = bdrv_rw_co_entry;
+        } else if (co_type == BDRV_API_ASYNC) {
+            entry = bdrv_co_do_rw;
+        }
+
+        bdrv_io_limits_skip_set(blkq_acb->cocb,
+                                co_type, false, true);
+        Coroutine *co = qemu_coroutine_create(entry);
+        qemu_coroutine_enter(co, blkq_acb->cocb);
+    }
+}
+
 /*
  * Handle a read request in coroutine context
  */
@@ -2650,14 +2710,6 @@  static BlockDriverAIOCB *bdrv_aio_writev_em(BlockDriverState *bs,
     return bdrv_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
 }
 
-
-typedef struct BlockDriverAIOCBCoroutine {
-    BlockDriverAIOCB common;
-    BlockRequest req;
-    bool is_write;
-    QEMUBH* bh;
-} BlockDriverAIOCBCoroutine;
-
 static void bdrv_aio_co_cancel_em(BlockDriverAIOCB *blockacb)
 {
     qemu_aio_flush();
@@ -2711,6 +2763,9 @@  static BlockDriverAIOCB *bdrv_co_aio_rw_vector(BlockDriverState *bs,
     acb->req.nb_sectors = nb_sectors;
     acb->req.qiov = qiov;
     acb->is_write = is_write;
+    acb->blkq_acb = NULL;
+    acb->cb_skip  = false;
+    acb->limit_skip = false;
 
     co = qemu_coroutine_create(bdrv_co_do_rw);
     qemu_coroutine_enter(co, acb);
diff --git a/block.h b/block.h
index 5a042c9..b7fbc40 100644
--- a/block.h
+++ b/block.h
@@ -82,6 +82,17 @@  typedef enum {
     BDRV_IOS_MAX
 } BlockIOStatus;
 
+/* disk I/O throttling */
+typedef enum {
+    BDRV_BLKQ_ENQ_FIRST, BDRV_BLKQ_ENQ_AGAIN, BDRV_BLKQ_DEQ_PASS,
+    BDRV_BLKQ_PASS, BDRV_IO_MAX
+} BlockQueueRetType;
+
+typedef enum {
+    BDRV_API_SYNC, BDRV_API_ASYNC, BDRV_API_CO,
+    BDRV_API_MAX
+} BlockAPIType;
+
 void bdrv_iostatus_enable(BlockDriverState *bs);
 void bdrv_iostatus_reset(BlockDriverState *bs);
 void bdrv_iostatus_disable(BlockDriverState *bs);
@@ -94,6 +105,8 @@  void bdrv_info(Monitor *mon, QObject **ret_data);
 void bdrv_stats_print(Monitor *mon, const QObject *data);
 void bdrv_info_stats(Monitor *mon, QObject **ret_data);
 
+void bdrv_io_limits_issue_request(void *opaque, BlockAPIType co_type);
+
 void bdrv_init(void);
 void bdrv_init_with_whitelist(void);
 BlockDriver *bdrv_find_protocol(const char *filename);
@@ -183,6 +196,13 @@  typedef struct BlockRequest {
     int error;
 } BlockRequest;
 
+typedef struct BlockQueueAIOCB BlockQueueAIOCB;
+
+typedef int coroutine_fn (BlockRequestHandler)(BlockDriverState *bs,
+                          int64_t sector_num, int nb_sectors,
+                          QEMUIOVector *qiov,
+                          void *opaque, BlockAPIType co_type);
+
 int bdrv_aio_multiwrite(BlockDriverState *bs, BlockRequest *reqs,
     int num_reqs);
 
diff --git a/block/blk-queue.c b/block/blk-queue.c
new file mode 100644
index 0000000..04bcdac
--- /dev/null
+++ b/block/blk-queue.c
@@ -0,0 +1,201 @@ 
+/*
+ * QEMU System Emulator queue definition for block layer
+ *
+ * Copyright (c) IBM, Corp. 2011
+ *
+ * Authors:
+ *  Zhi Yong Wu  <wuzhy@linux.vnet.ibm.com>
+ *  Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+/* The APIs for block request queue on qemu block layer.
+ */
+
+#include "block.h"
+#include "block_int.h"
+#include "blk-queue.h"
+#include "qemu-common.h"
+
+static void qemu_block_queue_dequeue(BlockQueue *queue,
+                                     BlockQueueAIOCB *request)
+{
+    BlockQueueAIOCB *req, *next;
+
+    assert(queue);
+    QTAILQ_FOREACH_SAFE(req, &queue->requests, entry, next) {
+        if (req == request) {
+            QTAILQ_REMOVE(&queue->requests, req, entry);
+            break;
+        }
+    }
+}
+
+void qemu_block_queue_cancel(BlockDriverAIOCB *acb)
+{
+    BlockQueueAIOCB *request = container_of(acb, BlockQueueAIOCB, common);
+
+    assert(request->common.bs->block_queue);
+    qemu_block_queue_dequeue(request->common.bs->block_queue,
+                             request);
+
+    qemu_aio_release(request);
+}
+
+static AIOPool block_queue_pool = {
+    .aiocb_size         = sizeof(struct BlockQueueAIOCB),
+    .cancel             = qemu_block_queue_cancel,
+};
+
+static void qemu_block_queue_bh(void *opaque)
+{
+    BlockQueueAIOCB *acb = opaque;
+
+    acb->co_type = BDRV_API_MAX;
+    acb->co      = NULL;
+    acb->cocb    = NULL;
+    qemu_bh_delete(acb->bh);
+    qemu_aio_release(acb);
+}
+
+void qemu_block_queue_cb(void *opaque, int ret)
+{
+    BlockQueueAIOCB *acb = opaque;
+    acb->bh = qemu_bh_new(qemu_block_queue_bh, acb);
+    qemu_bh_schedule(acb->bh);
+}
+
+BlockQueue *qemu_new_block_queue(void)
+{
+    BlockQueue *queue;
+
+    queue = g_malloc0(sizeof(BlockQueue));
+
+    QTAILQ_INIT(&queue->requests);
+
+    queue->limit_exceeded = false;
+    queue->flushing       = false;
+
+    return queue;
+}
+
+void qemu_del_block_queue(BlockQueue *queue)
+{
+    BlockQueueAIOCB *request, *next;
+
+    QTAILQ_FOREACH_SAFE(request, &queue->requests, entry, next) {
+        QTAILQ_REMOVE(&queue->requests, request, entry);
+        qemu_aio_release(request);
+    }
+
+    g_free(queue);
+}
+
+BlockQueueAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
+                        BlockDriverState *bs,
+                        BlockRequestHandler *handler,
+                        int64_t sector_num,
+                        QEMUIOVector *qiov,
+                        int nb_sectors,
+                        BlockDriverCompletionFunc *cb,
+                        void *opaque)
+{
+    BlockDriverAIOCB *acb;
+    BlockQueueAIOCB *request;
+
+    if (queue->flushing) {
+        queue->limit_exceeded = true;
+        return NULL;
+    } else {
+        acb = qemu_aio_get(&block_queue_pool, bs,
+                           cb, opaque);
+        request = container_of(acb, BlockQueueAIOCB, common);
+        request->handler    = handler;
+        request->sector_num = sector_num;
+        request->qiov       = qiov;
+        request->nb_sectors = nb_sectors;
+        request->bh         = NULL;
+        request->co_type    = BDRV_API_MAX;
+        request->co         = NULL;
+        request->cocb       = NULL;
+        QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
+    }
+
+    return request;
+}
+
+static int qemu_block_queue_handler(BlockQueueAIOCB *request)
+{
+    int ret;
+
+    BlockDriverState *bs = request->common.bs;
+    assert(bs);
+
+    if (bs->io_limits_enabled) {
+        ret = request->handler(request->common.bs, request->sector_num,
+                               request->nb_sectors, request->qiov,
+                               request, request->co_type);
+    } else {
+        if (request->co_type == BDRV_API_CO) {
+            qemu_coroutine_enter(request->co, request->cocb);
+        } else {
+            printf("%s, req: %p\n", __func__, (void *)request);
+            bdrv_io_limits_issue_request(request, request->co_type);
+        }
+
+        ret = BDRV_BLKQ_DEQ_PASS;
+    }
+
+    return ret;
+}
+
+void qemu_block_queue_submit(BlockQueue *queue, BlockDriverCompletionFunc *cb)
+{
+    BlockQueueAIOCB *request;
+    queue->flushing = true;
+
+    /*QTAILQ_FOREACH_SAFE(request, &queue->requests, entry, next) {*/
+    while (!QTAILQ_EMPTY(&queue->requests)) {
+        int ret = 0;
+
+        request = QTAILQ_FIRST(&queue->requests);
+        QTAILQ_REMOVE(&queue->requests, request, entry);
+        queue->limit_exceeded = false;
+        ret = qemu_block_queue_handler(request);
+        if (ret == -EIO) {
+            cb(request, -EIO);
+            break;
+        } else if (ret == BDRV_BLKQ_ENQ_AGAIN) {
+            QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
+            break;
+        } else if (ret == BDRV_BLKQ_DEQ_PASS) {
+            cb(request, 0);
+        }
+    }
+
+    printf("%s, leave\n", __func__);
+    queue->limit_exceeded = false;
+    queue->flushing       = false;
+}
+
+bool qemu_block_queue_has_pending(BlockQueue *queue)
+{
+    return !QTAILQ_EMPTY(&queue->requests);
+}
diff --git a/block/blk-queue.h b/block/blk-queue.h
new file mode 100644
index 0000000..4dd3dff
--- /dev/null
+++ b/block/blk-queue.h
@@ -0,0 +1,63 @@ 
+/*
+ * QEMU System Emulator queue declaration for block layer
+ *
+ * Copyright (c) IBM, Corp. 2011
+ *
+ * Authors:
+ *  Zhi Yong Wu  <wuzhy@linux.vnet.ibm.com>
+ *  Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#ifndef QEMU_BLOCK_QUEUE_H
+#define QEMU_BLOCK_QUEUE_H
+
+#include "block.h"
+#include "block_int.h"
+#include "qemu-queue.h"
+
+typedef struct BlockQueue {
+    QTAILQ_HEAD(requests, BlockQueueAIOCB) requests;
+    bool limit_exceeded;
+    bool flushing;
+} BlockQueue;
+
+BlockQueue *qemu_new_block_queue(void);
+
+void qemu_block_queue_cancel(BlockDriverAIOCB *acb);
+
+void qemu_del_block_queue(BlockQueue *queue);
+
+BlockQueueAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
+                        BlockDriverState *bs,
+                        BlockRequestHandler *handler,
+                        int64_t sector_num,
+                        QEMUIOVector *qiov,
+                        int nb_sectors,
+                        BlockDriverCompletionFunc *cb,
+                        void *opaque);
+
+void qemu_block_queue_submit(BlockQueue *queue, BlockDriverCompletionFunc *cb);
+
+void qemu_block_queue_cb(void *opaque, int ret);
+
+bool qemu_block_queue_has_pending(BlockQueue *queue);
+
+#endif /* QEMU_BLOCK_QUEUE_H */
diff --git a/block_int.h b/block_int.h
index dac00f5..86a355d 100644
--- a/block_int.h
+++ b/block_int.h
@@ -29,6 +29,7 @@ 
 #include "qemu-queue.h"
 #include "qemu-coroutine.h"
 #include "qemu-timer.h"
+#include "block/blk-queue.h"
 
 #define BLOCK_FLAG_ENCRYPT	1
 #define BLOCK_FLAG_COMPAT6	4
@@ -49,6 +50,11 @@  typedef struct AIOPool {
     BlockDriverAIOCB *free_aiocb;
 } AIOPool;
 
+typedef struct BlockIOLimit {
+    uint64_t bps[3];
+    uint64_t iops[3];
+} BlockIOLimit;
+
 struct BlockDriver {
     const char *format_name;
     int instance_size;
@@ -183,6 +189,9 @@  struct BlockDriverState {
 
     void *sync_aiocb;
 
+    BlockQueue   *block_queue;
+    bool         io_limits_enabled;
+
     /* I/O stats (display with "info blockstats"). */
     uint64_t nr_bytes[BDRV_MAX_IOTYPE];
     uint64_t nr_ops[BDRV_MAX_IOTYPE];
@@ -219,6 +228,20 @@  struct BlockDriverAIOCB {
     BlockDriverAIOCB *next;
 };
 
+/* block I/O throttling */
+struct BlockQueueAIOCB {
+    BlockDriverAIOCB common;
+    QTAILQ_ENTRY(BlockQueueAIOCB) entry;
+    BlockRequestHandler *handler;
+    int64_t sector_num;
+    QEMUIOVector *qiov;
+    int nb_sectors;
+    QEMUBH *bh;
+    BlockAPIType co_type;
+    void *co;
+    void *cocb;
+};
+
 void get_tmp_filename(char *filename, int size);
 
 void *qemu_aio_get(AIOPool *pool, BlockDriverState *bs,