diff mbox

[RFC] block-queue: Delay and batch metadata writes

Message ID 1284991010-10951-1-git-send-email-kwolf@redhat.com
State New
Headers show

Commit Message

Kevin Wolf Sept. 20, 2010, 1:56 p.m. UTC
I won't get this ready until I leave for vacation on Wednesday, so I thought I
could just as well post it as an RFC in this state.

With this patch applied, qcow2 doesn't directly access the image file any more
for metadata, but rather goes through the newly introduced blkqueue. Write
and sync requests are queued there and executed in a separate worker thread.
Reads consider the contents of the queue before accessing the the image file.

What makes this interesting is that we can delay syncs and if multiple syncs
occur, we can merge them into one bdrv_flush.

A typical sequence in qcow2 (simple cluster allocation) looks like this:

1. Update refcount table
2. bdrv_flush
3. Update L2 entry

If we delay the operation and get three of these sequences queued before
actually executing, we end up with the following result, saving two syncs:

1. Update refcount table (req 1)
2. Update refcount table (req 2)
3. Update refcount table (req 3)
4. bdrv_flush
5. Update L2 entry (req 1)
6. Update L2 entry (req 2)
7. Update L2 entry (req 3)

This patch only commits a sync if either the guests has requested a flush or if
a certain number of requests in the queue, so usually we batch more than just
three requests.

I didn't run any detailed benchmarks but just tried what happens with
installation time of a Fedora 13 guest, and while git master takes about 40-50%
longer than before the metadata syncs, we get most of it back with blkqueue.

Of course, in this state the code is not correct, but it's correct enough to
try and have qcow2 run on a file backend. Some remaining problems are:

- There's no locking between the worker thread and other functions accessing
  the same backend driver. Should be fine for file, but probably not for other
  backends.

- Error handling doesn't really exist. If something goes wrong with writing
  metadata we can't fail the guest request any more because it's long
  completed. Losing this data is actually okay, the guest hasn't flushed yet.

  However, we need to be able to fail a flush, and we also need some way to
  handle errors transparently. This probably means that we have to stop the VM
  and let the user fix things so that we can retry. The only other way would be
  to shut down the VM and end up in the same situation as with a host crash.

  Or maybe it would even be enough to start failing all new requests.

- The Makefile integration is obviously very wrong, too. It worked for me good
  enough, but you need to be aware when block-queue.o is compiled with
  RUN_TESTS and when it isn't. The tests need to be split out properly.

They are certainly fixable and shouldn't have any major impact on performance,
so that's just a matter of doing it.

Kevin

---
 Makefile               |    3 +
 Makefile.objs          |    1 +
 block-queue.c          |  720 ++++++++++++++++++++++++++++++++++++++++++++++++
 block-queue.h          |   49 ++++
 block/qcow2-cluster.c  |   28 +-
 block/qcow2-refcount.c |   44 ++--
 block/qcow2.c          |   14 +
 block/qcow2.h          |    4 +
 qemu-thread.c          |   13 +
 qemu-thread.h          |    1 +
 10 files changed, 838 insertions(+), 39 deletions(-)
 create mode 100644 block-queue.c
 create mode 100644 block-queue.h

Comments

Anthony Liguori Sept. 20, 2010, 2:31 p.m. UTC | #1
On 09/20/2010 08:56 AM, Kevin Wolf wrote:
> I won't get this ready until I leave for vacation on Wednesday, so I thought I
> could just as well post it as an RFC in this state.
>
> With this patch applied, qcow2 doesn't directly access the image file any more
> for metadata, but rather goes through the newly introduced blkqueue. Write
> and sync requests are queued there and executed in a separate worker thread.
> Reads consider the contents of the queue before accessing the the image file.
>
> What makes this interesting is that we can delay syncs and if multiple syncs
> occur, we can merge them into one bdrv_flush.
>
> A typical sequence in qcow2 (simple cluster allocation) looks like this:
>
> 1. Update refcount table
> 2. bdrv_flush
> 3. Update L2 entry
>    

Let's expand it a bit more:

1. Update refcount table
2. bdrv_flush
3. Update L2 entry
4. Write data to disk
5. Report write complete

I'm struggling to understand how a thread helps out.

If you run 1-3 in a thread, you need to inject a barrier between steps 3 
and 5 or you'll report the write complete before writing the metadata 
out.  You can't delay completing step 3 until a guest requests a flush.  
If you do, then you're implementing a writeback cache for metadata.

If you're comfortable with a writeback cache for metadata, then you 
should also be comfortable with a writeback cache for data in which 
case, cache=writeback is the answer.

If it's a matter of batching, batching can't occur if you have a barrier 
between steps 3 and 5.  The only way you can get batching is by doing a 
writeback cache for the metadata such that you can complete your request 
before the metadata is written.

Am I misunderstanding the idea?

Regards,

Anthony Liguori

> If we delay the operation and get three of these sequences queued before
> actually executing, we end up with the following result, saving two syncs:
>
> 1. Update refcount table (req 1)
> 2. Update refcount table (req 2)
> 3. Update refcount table (req 3)
> 4. bdrv_flush
> 5. Update L2 entry (req 1)
> 6. Update L2 entry (req 2)
> 7. Update L2 entry (req 3)
>
> This patch only commits a sync if either the guests has requested a flush or if
> a certain number of requests in the queue, so usually we batch more than just
> three requests.
>
> I didn't run any detailed benchmarks but just tried what happens with
> installation time of a Fedora 13 guest, and while git master takes about 40-50%
> longer than before the metadata syncs, we get most of it back with blkqueue.
>
> Of course, in this state the code is not correct, but it's correct enough to
> try and have qcow2 run on a file backend. Some remaining problems are:
>
> - There's no locking between the worker thread and other functions accessing
>    the same backend driver. Should be fine for file, but probably not for other
>    backends.
>
> - Error handling doesn't really exist. If something goes wrong with writing
>    metadata we can't fail the guest request any more because it's long
>    completed. Losing this data is actually okay, the guest hasn't flushed yet.
>
>    However, we need to be able to fail a flush, and we also need some way to
>    handle errors transparently. This probably means that we have to stop the VM
>    and let the user fix things so that we can retry. The only other way would be
>    to shut down the VM and end up in the same situation as with a host crash.
>
>    Or maybe it would even be enough to start failing all new requests.
>
> - The Makefile integration is obviously very wrong, too. It worked for me good
>    enough, but you need to be aware when block-queue.o is compiled with
>    RUN_TESTS and when it isn't. The tests need to be split out properly.
>
> They are certainly fixable and shouldn't have any major impact on performance,
> so that's just a matter of doing it.
>
> Kevin
>
> ---
>   Makefile               |    3 +
>   Makefile.objs          |    1 +
>   block-queue.c          |  720 ++++++++++++++++++++++++++++++++++++++++++++++++
>   block-queue.h          |   49 ++++
>   block/qcow2-cluster.c  |   28 +-
>   block/qcow2-refcount.c |   44 ++--
>   block/qcow2.c          |   14 +
>   block/qcow2.h          |    4 +
>   qemu-thread.c          |   13 +
>   qemu-thread.h          |    1 +
>   10 files changed, 838 insertions(+), 39 deletions(-)
>   create mode 100644 block-queue.c
>   create mode 100644 block-queue.h
>
> diff --git a/Makefile b/Makefile
> index ab91d42..0202dc6 100644
> --- a/Makefile
> +++ b/Makefile
> @@ -125,6 +125,9 @@ qemu-nbd$(EXESUF): qemu-nbd.o qemu-tool.o qemu-error.o $(trace-obj-y) $(block-ob
>
>   qemu-io$(EXESUF): qemu-io.o cmd.o qemu-tool.o qemu-error.o $(trace-obj-y) $(block-obj-y) $(qobject-obj-y)
>
> +block-queue$(EXESUF): QEMU_CFLAGS += -DRUN_TESTS
> +block-queue$(EXESUF): qemu-tool.o qemu-error.o qemu-thread.o $(block-obj-y) $(qobject-obj-y)
> +
>   qemu-img-cmds.h: $(SRC_PATH)/qemu-img-cmds.hx
>   	$(call quiet-command,sh $(SRC_PATH)/hxtool -h<  $<  >  $@,"  GEN   $@")
>
> diff --git a/Makefile.objs b/Makefile.objs
> index 3ef6d80..e97a246 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -9,6 +9,7 @@ qobject-obj-y += qerror.o
>
>   block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
>   block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
> +block-obj-y += qemu-thread.o block-queue.o
>   block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
>   block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
>
> diff --git a/block-queue.c b/block-queue.c
> new file mode 100644
> index 0000000..13579a7
> --- /dev/null
> +++ b/block-queue.c
> @@ -0,0 +1,720 @@
> +/*
> + * QEMU System Emulator
> + *
> + * Copyright (c) 2010 Kevin Wolf<kwolf@redhat.com>
> + *
> + * Permission is hereby granted, free of charge, to any person obtaining a copy
> + * of this software and associated documentation files (the "Software"), to deal
> + * in the Software without restriction, including without limitation the rights
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
> + * copies of the Software, and to permit persons to whom the Software is
> + * furnished to do so, subject to the following conditions:
> + *
> + * The above copyright notice and this permission notice shall be included in
> + * all copies or substantial portions of the Software.
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
> + * THE SOFTWARE.
> + */
> +
> +#include<signal.h>
> +
> +#include "qemu-common.h"
> +#include "qemu-queue.h"
> +#include "qemu-thread.h"
> +#include "qemu-barrier.h"
> +#include "block.h"
> +#include "block-queue.h"
> +
> +enum blkqueue_req_type {
> +    REQ_TYPE_WRITE,
> +    REQ_TYPE_BARRIER,
> +};
> +
> +typedef struct BlockQueueRequest {
> +    enum blkqueue_req_type type;
> +
> +    uint64_t    offset;
> +    void*       buf;
> +    uint64_t    size;
> +    unsigned    section;
> +
> +    QTAILQ_ENTRY(BlockQueueRequest) link;
> +    QSIMPLEQ_ENTRY(BlockQueueRequest) link_section;
> +} BlockQueueRequest;
> +
> +struct BlockQueue {
> +    BlockDriverState*   bs;
> +
> +    QemuThread          thread;
> +    bool                thread_done;
> +    QemuMutex           lock;
> +    QemuMutex           flush_lock;
> +    QemuCond            cond;
> +
> +    int                 barriers_requested;
> +    int                 barriers_submitted;
> +    int                 queue_size;
> +
> +    QTAILQ_HEAD(bq_queue_head, BlockQueueRequest) queue;
> +    QSIMPLEQ_HEAD(, BlockQueueRequest) sections;
> +};
> +
> +static void *blkqueue_thread(void *bq);
> +
> +BlockQueue *blkqueue_create(BlockDriverState *bs)
> +{
> +    BlockQueue *bq = qemu_mallocz(sizeof(BlockQueue));
> +    bq->bs = bs;
> +
> +    QTAILQ_INIT(&bq->queue);
> +    QSIMPLEQ_INIT(&bq->sections);
> +
> +    qemu_mutex_init(&bq->lock);
> +    qemu_mutex_init(&bq->flush_lock);
> +    qemu_cond_init(&bq->cond);
> +
> +    bq->thread_done = false;
> +    qemu_thread_create(&bq->thread, blkqueue_thread, bq);
> +
> +    return bq;
> +}
> +
> +void blkqueue_init_context(BlockQueueContext* context, BlockQueue *bq)
> +{
> +    context->bq = bq;
> +    context->section = 0;
> +}
> +
> +void blkqueue_destroy(BlockQueue *bq)
> +{
> +    bq->thread_done = true;
> +    qemu_cond_signal(&bq->cond);
> +    qemu_thread_join(&bq->thread);
> +
> +    blkqueue_flush(bq);
> +
> +    fprintf(stderr, "blkqueue_destroy: %d/%d barriers left\n",
> +        bq->barriers_submitted, bq->barriers_requested);
> +
> +    qemu_mutex_destroy(&bq->lock);
> +    qemu_mutex_destroy(&bq->flush_lock);
> +    qemu_cond_destroy(&bq->cond);
> +
> +    assert(QTAILQ_FIRST(&bq->queue) == NULL);
> +    assert(QSIMPLEQ_FIRST(&bq->sections) == NULL);
> +    qemu_free(bq);
> +}
> +
> +int blkqueue_pread(BlockQueueContext *context, uint64_t offset, void *buf,
> +    uint64_t size)
> +{
> +    BlockQueue *bq = context->bq;
> +    BlockQueueRequest *req;
> +    int ret;
> +
> +    /*
> +     * First check if there are any pending writes for the same data. Reverse
> +     * order to return data written by the latest write.
> +     */
> +    QTAILQ_FOREACH_REVERSE(req,&bq->queue, bq_queue_head, link) {
> +        uint64_t end = offset + size;
> +        uint64_t req_end = req->offset + req->size;
> +        uint8_t *read_buf = buf;
> +        uint8_t *req_buf = req->buf;
> +
> +        /* We're only interested in queued writes */
> +        if (req->type != REQ_TYPE_WRITE) {
> +            continue;
> +        }
> +
> +        /*
> +         * If we read from a write in the queue (i.e. our read overlaps the
> +         * write request), our next write probably depends on this write, so
> +         * let's move forward to its section.
> +         */
> +        if (end>  req->offset&&  offset<  req_end) {
> +            context->section = MAX(context->section, req->section);
> +        }
> +
> +        /* How we continue, depends on the kind of overlap we have */
> +        if ((offset>= req->offset)&&  (end<= req_end)) {
> +            /* Completely contained in the write request */
> +            memcpy(buf,&req_buf[offset - req->offset], size);
> +            return 0;
> +        } else if ((end>= req->offset)&&  (end<= req_end)) {
> +            /* Overlap in the end of the read request */
> +            assert(offset<  req->offset);
> +            memcpy(&read_buf[req->offset - offset], req_buf, end - req->offset);
> +            size = req->offset - offset;
> +        } else if ((offset>= req->offset)&&  (offset<  req_end)) {
> +            /* Overlap in the start of the read request */
> +            assert(end>  req_end);
> +            memcpy(read_buf,&req_buf[offset - req->offset], req_end - offset);
> +            buf = read_buf =&read_buf[req_end - offset];
> +            offset = req_end;
> +            size = end - req_end;
> +        } else if ((req->offset>= offset)&&  (req_end<= end)) {
> +            /*
> +             * The write request is completely contained in the read request.
> +             * memcpy the data from the write request here, continue with the
> +             * data before the write request and handle the data after the
> +             * write request with a recursive call.
> +             */
> +            memcpy(&read_buf[req->offset - offset], req_buf, req_end - req->offset);
> +            size = req->offset - offset;
> +            blkqueue_pread(context, req_end,&read_buf[req_end - offset], end - req_end);
> +        }
> +    }
> +
> +    /* The requested is not written in the queue, read it from disk */
> +    ret = bdrv_pread(bq->bs, offset, buf, size);
> +    if (ret<  0) {
> +        return ret;
> +    }
> +
> +    return 0;
> +}
> +
> +int blkqueue_pwrite(BlockQueueContext *context, uint64_t offset, void *buf,
> +    uint64_t size)
> +{
> +    BlockQueue *bq = context->bq;
> +    BlockQueueRequest *section_req;
> +
> +    /* Create request structure */
> +    BlockQueueRequest *req = qemu_malloc(sizeof(*req));
> +    req->type       = REQ_TYPE_WRITE;
> +    req->offset     = offset;
> +    req->size       = size;
> +    req->buf        = qemu_malloc(size);
> +    req->section    = context->section;
> +    memcpy(req->buf, buf, size);
> +
> +    /*
> +     * Find the right place to insert it into the queue:
> +     * Right before the barrier that closes the current section.
> +     */
> +    qemu_mutex_lock(&bq->lock);
> +    QSIMPLEQ_FOREACH(section_req,&bq->sections, link_section) {
> +        if (section_req->section>= req->section) {
> +            req->section = section_req->section;
> +            context->section = section_req->section;
> +            QTAILQ_INSERT_BEFORE(section_req, req, link);
> +            bq->queue_size++;
> +            goto out;
> +        }
> +    }
> +
> +    /* If there was no barrier, just put it at the end. */
> +    QTAILQ_INSERT_TAIL(&bq->queue, req, link);
> +    bq->queue_size++;
> +    qemu_cond_signal(&bq->cond);
> +
> +out:
> +    qemu_mutex_unlock(&bq->lock);
> +    return 0;
> +}
> +
> +int blkqueue_barrier(BlockQueueContext *context)
> +{
> +    BlockQueue *bq = context->bq;
> +    BlockQueueRequest *section_req;
> +
> +    bq->barriers_requested++;
> +
> +    /* Create request structure */
> +    BlockQueueRequest *req = qemu_malloc(sizeof(*req));
> +    req->type       = REQ_TYPE_BARRIER;
> +    req->section    = context->section;
> +    req->buf        = NULL;
> +
> +    /* Find another barrier to merge with. */
> +    qemu_mutex_lock(&bq->lock);
> +    QSIMPLEQ_FOREACH(section_req,&bq->sections, link_section) {
> +        if (section_req->section>= req->section) {
> +            req->section = section_req->section;
> +            context->section = section_req->section + 1;
> +            qemu_free(req);
> +            goto out;
> +        }
> +    }
> +
> +    /*
> +     * If there wasn't a barrier for the same section yet, insert a new one at
> +     * the end.
> +     */
> +    QTAILQ_INSERT_TAIL(&bq->queue, req, link);
> +    QSIMPLEQ_INSERT_TAIL(&bq->sections, req, link_section);
> +    bq->queue_size++;
> +    context->section++;
> +    qemu_cond_signal(&bq->cond);
> +
> +    bq->barriers_submitted++;
> +
> +out:
> +    qemu_mutex_unlock(&bq->lock);
> +    return 0;
> +}
> +
> +/*
> + * Caller needs to hold the bq->lock mutex
> + */
> +static BlockQueueRequest *blkqueue_pop(BlockQueue *bq)
> +{
> +    BlockQueueRequest *req;
> +
> +    req = QTAILQ_FIRST(&bq->queue);
> +    if (req == NULL) {
> +        goto out;
> +    }
> +
> +    QTAILQ_REMOVE(&bq->queue, req, link);
> +    bq->queue_size--;
> +
> +    if (req->type == REQ_TYPE_BARRIER) {
> +        assert(QSIMPLEQ_FIRST(&bq->sections) == req);
> +        QSIMPLEQ_REMOVE_HEAD(&bq->sections, link_section);
> +    }
> +
> +out:
> +    return req;
> +}
> +
> +static void blkqueue_free_request(BlockQueueRequest *req)
> +{
> +    qemu_free(req->buf);
> +    qemu_free(req);
> +}
> +
> +static void blkqueue_process_request(BlockQueue *bq)
> +{
> +    BlockQueueRequest *req;
> +    BlockQueueRequest *req2;
> +    int ret;
> +
> +    /*
> +     * Note that we leave the request in the queue while we process it. No
> +     * other request will be queued before this one and we have only one thread
> +     * that processes the queue, so afterwards it will still be the first
> +     * request. (Not true for barriers in the first position, but we can handle
> +     * that)
> +     */
> +    req = QTAILQ_FIRST(&bq->queue);
> +    if (req == NULL) {
> +        return;
> +    }
> +
> +    switch (req->type) {
> +        case REQ_TYPE_WRITE:
> +            ret = bdrv_pwrite(bq->bs, req->offset, req->buf, req->size);
> +            if (ret<  0) {
> +                /* TODO Error reporting! */
> +                return;
> +            }
> +            break;
> +        case REQ_TYPE_BARRIER:
> +            bdrv_flush(bq->bs);
> +            break;
> +    }
> +
> +    /*
> +     * Only remove the request from the queue when it's written, so that reads
> +     * always access the right data.
> +     */
> +    qemu_mutex_lock(&bq->lock);
> +    req2 = QTAILQ_FIRST(&bq->queue);
> +    if (req == req2) {
> +        blkqueue_pop(bq);
> +        blkqueue_free_request(req);
> +    } else {
> +        /*
> +         * If it's a barrier and something has been queued before it, just
> +         * leave it in the queue and flush once again later.
> +         */
> +        assert(req->type == REQ_TYPE_BARRIER);
> +        bq->barriers_submitted++;
> +    }
> +    qemu_mutex_unlock(&bq->lock);
> +}
> +
> +struct blkqueue_flush_aiocb {
> +    BlockQueue *bq;
> +    BlockDriverCompletionFunc *cb;
> +    void *opaque;
> +};
> +
> +static void *blkqueue_aio_flush_thread(void *opaque)
> +{
> +    struct blkqueue_flush_aiocb *acb = opaque;
> +
> +    /* Process any left over requests */
> +    blkqueue_flush(acb->bq);
> +
> +    acb->cb(acb->opaque, 0);
> +    qemu_free(acb);
> +
> +    return NULL;
> +}
> +
> +void blkqueue_aio_flush(BlockQueue *bq, BlockDriverCompletionFunc *cb,
> +    void *opaque)
> +{
> +    struct blkqueue_flush_aiocb *acb;
> +
> +    acb = qemu_malloc(sizeof(*acb));
> +    acb->bq = bq;
> +    acb->cb = cb;
> +    acb->opaque = opaque;
> +
> +    qemu_thread_create(NULL, blkqueue_aio_flush_thread, acb);
> +}
> +
> +void blkqueue_flush(BlockQueue *bq)
> +{
> +    qemu_mutex_lock(&bq->flush_lock);
> +
> +    /* Process any left over requests */
> +    while (QTAILQ_FIRST(&bq->queue)) {
> +        blkqueue_process_request(bq);
> +    }
> +
> +    qemu_mutex_unlock(&bq->flush_lock);
> +}
> +
> +static void *blkqueue_thread(void *_bq)
> +{
> +    BlockQueue *bq = _bq;
> +#ifndef RUN_TESTS
> +    BlockQueueRequest *req;
> +#endif
> +
> +    qemu_mutex_lock(&bq->flush_lock);
> +    while (!bq->thread_done) {
> +        barrier();
> +#ifndef RUN_TESTS
> +        req = QTAILQ_FIRST(&bq->queue);
> +
> +        /* Don't process barriers, we only do that on flushes */
> +        if (req&&  (req->type != REQ_TYPE_BARRIER || bq->queue_size>  42)) {
> +            blkqueue_process_request(bq);
> +        } else {
> +            qemu_cond_wait(&bq->cond,&bq->flush_lock);
> +        }
> +#else
> +        qemu_cond_wait(&bq->cond,&bq->flush_lock);
> +#endif
> +    }
> +    qemu_mutex_unlock(&bq->flush_lock);
> +
> +    return NULL;
> +}
> +
> +#ifdef RUN_TESTS
> +
> +#define CHECK_WRITE(req, _offset, _size, _buf, _section) \
> +    do { \
> +        assert(req != NULL); \
> +        assert(req->type == REQ_TYPE_WRITE); \
> +        assert(req->offset == _offset); \
> +        assert(req->size == _size); \
> +        assert(req->section == _section); \
> +        assert(!memcmp(req->buf, _buf, _size)); \
> +    } while(0)
> +
> +#define CHECK_BARRIER(req, _section) \
> +    do { \
> +        assert(req != NULL); \
> +        assert(req->type == REQ_TYPE_BARRIER); \
> +        assert(req->section == _section); \
> +    } while(0)
> +
> +#define CHECK_READ(_context, _offset, _buf, _size, _cmpbuf) \
> +    do { \
> +        int ret; \
> +        memset(buf, 0, 512); \
> +        ret = blkqueue_pread(_context, _offset, _buf, _size); \
> +        assert(ret == 0); \
> +        assert(!memcmp(_cmpbuf, _buf, _size)); \
> +    } while(0)
> +
> +#define QUEUE_WRITE(_context, _offset, _buf, _size, _pattern) \
> +    do { \
> +        int ret; \
> +        memset(_buf, _pattern, _size); \
> +        ret = blkqueue_pwrite(_context, _offset, _buf, _size); \
> +        assert(ret == 0); \
> +    } while(0)
> +#define QUEUE_BARRIER(_context) \
> +    do { \
> +        int ret; \
> +        ret = blkqueue_barrier(_context); \
> +        assert(ret == 0); \
> +    } while(0)
> +
> +#define POP_CHECK_WRITE(_bq, _offset, _buf, _size, _pattern, _section) \
> +    do { \
> +        BlockQueueRequest *req; \
> +        memset(_buf, _pattern, _size); \
> +        req = blkqueue_pop(_bq); \
> +        CHECK_WRITE(req, _offset, _size, _buf, _section); \
> +        blkqueue_free_request(req); \
> +    } while(0)
> +#define POP_CHECK_BARRIER(_bq, _section) \
> +    do { \
> +        BlockQueueRequest *req; \
> +        req = blkqueue_pop(_bq); \
> +        CHECK_BARRIER(req, _section); \
> +        blkqueue_free_request(req); \
> +    } while(0)
> +
> +static void  __attribute__((used)) dump_queue(BlockQueue *bq)
> +{
> +    BlockQueueRequest *req;
> +
> +    fprintf(stderr, "--- Queue dump ---\n");
> +    QTAILQ_FOREACH(req,&bq->queue, link) {
> +        fprintf(stderr, "[%d] ", req->section);
> +        if (req->type == REQ_TYPE_WRITE) {
> +            fprintf(stderr, "Write off=%5"PRId64", len=%5"PRId64", buf=%p\n",
> +                req->offset, req->size, req->buf);
> +        } else if (req->type == REQ_TYPE_BARRIER) {
> +            fprintf(stderr, "Barrier\n");
> +        } else {
> +            fprintf(stderr, "Unknown type %d\n", req->type);
> +        }
> +    }
> +}
> +
> +static void test_basic(BlockDriverState *bs)
> +{
> +    uint8_t buf[512];
> +    BlockQueue *bq;
> +    BlockQueueContext context;
> +
> +    bq = blkqueue_create(bs);
> +    blkqueue_init_context(&context, bq);
> +
> +    /* Queue requests */
> +    QUEUE_WRITE(&context,   0, buf, 512, 0x12);
> +    QUEUE_WRITE(&context, 512, buf,  42, 0x34);
> +    QUEUE_BARRIER(&context);
> +    QUEUE_WRITE(&context, 678, buf,  42, 0x56);
> +
> +    /* Verify queue contents */
> +    POP_CHECK_WRITE(bq,     0, buf, 512, 0x12, 0);
> +    POP_CHECK_WRITE(bq,   512, buf,  42, 0x34, 0);
> +    POP_CHECK_BARRIER(bq, 0);
> +    POP_CHECK_WRITE(bq,   678, buf,  42, 0x56, 1);
> +
> +    blkqueue_destroy(bq);
> +}
> +
> +static void test_merge(BlockDriverState *bs)
> +{
> +    uint8_t buf[512];
> +    BlockQueue *bq;
> +    BlockQueueContext ctx1, ctx2;
> +
> +    bq = blkqueue_create(bs);
> +    blkqueue_init_context(&ctx1, bq);
> +    blkqueue_init_context(&ctx2, bq);
> +
> +    /* Queue requests */
> +    QUEUE_WRITE(&ctx1,    0, buf, 512, 0x12);
> +    QUEUE_BARRIER(&ctx1);
> +    QUEUE_WRITE(&ctx2,  512, buf,  42, 0x34);
> +    QUEUE_WRITE(&ctx1, 1024, buf, 512, 0x12);
> +    QUEUE_BARRIER(&ctx2);
> +    QUEUE_WRITE(&ctx2, 1512, buf,  42, 0x34);
> +
> +    /* Verify queue contents */
> +    POP_CHECK_WRITE(bq,     0, buf, 512, 0x12, 0);
> +    POP_CHECK_WRITE(bq,   512, buf,  42, 0x34, 0);
> +    POP_CHECK_BARRIER(bq, 0);
> +    POP_CHECK_WRITE(bq,  1024, buf, 512, 0x12, 1);
> +    POP_CHECK_WRITE(bq,  1512, buf,  42, 0x34, 1);
> +
> +    /* Same queue, new contexts */
> +    blkqueue_init_context(&ctx1, bq);
> +    blkqueue_init_context(&ctx2, bq);
> +
> +    /* Queue requests */
> +    QUEUE_BARRIER(&ctx2);
> +    QUEUE_WRITE(&ctx2,  512, buf,  42, 0x34);
> +    QUEUE_WRITE(&ctx2,   12, buf,  20, 0x45);
> +    QUEUE_BARRIER(&ctx2);
> +    QUEUE_WRITE(&ctx2,  892, buf, 142, 0x56);
> +
> +    QUEUE_WRITE(&ctx1,    0, buf,   8, 0x12);
> +    QUEUE_BARRIER(&ctx1);
> +    QUEUE_WRITE(&ctx1, 1024, buf, 512, 0x12);
> +    QUEUE_BARRIER(&ctx1);
> +    QUEUE_WRITE(&ctx1, 1512, buf,  42, 0x34);
> +    QUEUE_BARRIER(&ctx1);
> +
> +    /* Verify queue contents */
> +    POP_CHECK_WRITE(bq,     0, buf,   8, 0x12, 0);
> +    POP_CHECK_BARRIER(bq, 0);
> +    POP_CHECK_WRITE(bq,   512, buf,  42, 0x34, 1);
> +    POP_CHECK_WRITE(bq,    12, buf,  20, 0x45, 1);
> +    POP_CHECK_WRITE(bq,  1024, buf, 512, 0x12, 1);
> +    POP_CHECK_BARRIER(bq, 1);
> +    POP_CHECK_WRITE(bq,   892, buf, 142, 0x56, 2);
> +    POP_CHECK_WRITE(bq,  1512, buf,  42, 0x34, 2);
> +    POP_CHECK_BARRIER(bq, 2);
> +
> +    blkqueue_destroy(bq);
> +}
> +
> +static void test_read(BlockDriverState *bs)
> +{
> +    uint8_t buf[512], buf2[512];
> +    BlockQueue *bq;
> +    BlockQueueContext ctx1;
> +
> +    bq = blkqueue_create(bs);
> +    blkqueue_init_context(&ctx1, bq);
> +
> +    /* Queue requests and do some test reads */
> +    memset(buf2, 0xa5, 512);
> +    CHECK_READ(&ctx1, 0, buf, 32, buf2);
> +
> +    QUEUE_WRITE(&ctx1, 5, buf, 5, 0x12);
> +    memset(buf2, 0x12, 5);
> +    CHECK_READ(&ctx1,  5, buf, 5, buf2);
> +    CHECK_READ(&ctx1,  7, buf, 2, buf2);
> +    memset(buf2, 0xa5, 512);
> +    memset(buf2 + 5, 0x12, 5);
> +    CHECK_READ(&ctx1,  0, buf, 8, buf2);
> +    CHECK_READ(&ctx1,  0, buf, 10, buf2);
> +    CHECK_READ(&ctx1,  0, buf, 32, buf2);
> +    memset(buf2, 0xa5, 512);
> +    memset(buf2, 0x12, 5);
> +    CHECK_READ(&ctx1,  5, buf, 16, buf2);
> +    memset(buf2, 0xa5, 512);
> +    CHECK_READ(&ctx1,  0, buf,  2, buf2);
> +    CHECK_READ(&ctx1, 10, buf, 16, buf2);
> +
> +    QUEUE_WRITE(&ctx1, 0, buf, 2, 0x12);
> +    memset(&buf2[5], 0x12, 5);
> +    memset(buf2, 0x12, 2);
> +    CHECK_READ(&ctx1,  0, buf, 32, buf2);
> +
> +    /* Verify queue contents */
> +    POP_CHECK_WRITE(bq,     5, buf,   5, 0x12, 0);
> +    POP_CHECK_WRITE(bq,     0, buf,   2, 0x12, 0);
> +
> +    blkqueue_destroy(bq);
> +}
> +
> +static void test_read_order(BlockDriverState *bs)
> +{
> +    uint8_t buf[512], buf2[512];
> +    BlockQueue *bq;
> +    BlockQueueContext ctx1, ctx2;
> +
> +    bq = blkqueue_create(bs);
> +    blkqueue_init_context(&ctx1, bq);
> +    blkqueue_init_context(&ctx2, bq);
> +
> +    /* Queue requests and do some test reads */
> +    QUEUE_WRITE(&ctx1, 25, buf, 5, 0x44);
> +    QUEUE_BARRIER(&ctx1);
> +    QUEUE_WRITE(&ctx1, 5, buf, 5, 0x12);
> +    QUEUE_BARRIER(&ctx1);
> +    QUEUE_WRITE(&ctx2, 10, buf, 5, 0x34);
> +
> +    memset(buf2, 0xa5, 512);
> +    memset(buf2 + 5, 0x12, 5);
> +    memset(buf2 + 10, 0x34, 5);
> +    CHECK_READ(&ctx2, 0, buf, 20, buf2);
> +    QUEUE_WRITE(&ctx2,  0, buf, 10, 0x34);
> +    QUEUE_BARRIER(&ctx2);
> +
> +    /* Verify queue contents */
> +    POP_CHECK_WRITE(bq,    25, buf,   5, 0x44, 0);
> +    POP_CHECK_WRITE(bq,    10, buf,   5, 0x34, 0);
> +    POP_CHECK_BARRIER(bq, 0);
> +    POP_CHECK_WRITE(bq,     5, buf,   5, 0x12, 1);
> +    POP_CHECK_WRITE(bq,     0, buf,  10, 0x34, 1);
> +    POP_CHECK_BARRIER(bq, 1);
> +
> +    blkqueue_destroy(bq);
> +}
> +
> +static void test_process_request(BlockDriverState *bs)
> +{
> +    uint8_t buf[512], buf2[512];
> +    BlockQueue *bq;
> +    BlockQueueContext ctx1;
> +
> +    bq = blkqueue_create(bs);
> +    blkqueue_init_context(&ctx1, bq);
> +
> +    /* Queue requests and do a test read */
> +    QUEUE_WRITE(&ctx1, 25, buf, 5, 0x44);
> +    QUEUE_BARRIER(&ctx1);
> +
> +    memset(buf2, 0xa5, 512);
> +    memset(buf2 + 25, 0x44, 5);
> +    CHECK_READ(&ctx1, 0, buf, 64, buf2);
> +
> +    /* Process the queue (plus one call to test a NULL condition) */
> +    blkqueue_process_request(bq);
> +    blkqueue_process_request(bq);
> +    blkqueue_process_request(bq);
> +
> +    /* Verify the queue is empty */
> +    assert(blkqueue_pop(bq) == NULL);
> +
> +    /* Check if we still read the same */
> +    CHECK_READ(&ctx1, 0, buf, 64, buf2);
> +
> +    blkqueue_destroy(bq);
> +}
> +
> +static void run_test(void (*testfn)(BlockDriverState*), BlockDriverState *bs)
> +{
> +    void* buf;
> +    int ret;
> +
> +    buf = qemu_malloc(1024 * 1024);
> +    memset(buf, 0xa5, 1024 * 1024);
> +    ret = bdrv_write(bs, 0, buf, 2048);
> +    assert(ret>= 0);
> +    qemu_free(buf);
> +
> +    testfn(bs);
> +}
> +
> +int main(void)
> +{
> +    BlockDriverState *bs;
> +    int ret;
> +
> +    bdrv_init();
> +    bs = bdrv_new("");
> +    ret = bdrv_open(bs, "block-queue.img", BDRV_O_RDWR, NULL);
> +    if (ret<  0) {
> +        fprintf(stderr, "Couldn't open block-queue.img: %s\n",
> +            strerror(-ret));
> +        exit(1);
> +    }
> +
> +    run_test(&test_basic, bs);
> +    run_test(&test_merge, bs);
> +    run_test(&test_read, bs);
> +    run_test(&test_read_order, bs);
> +    run_test(&test_process_request, bs);
> +
> +    bdrv_delete(bs);
> +
> +    return 0;
> +}
> +#endif
> diff --git a/block-queue.h b/block-queue.h
> new file mode 100644
> index 0000000..4ce0e1b
> --- /dev/null
> +++ b/block-queue.h
> @@ -0,0 +1,49 @@
> +/*
> + * QEMU System Emulator
> + *
> + * Copyright (c) 2010 Kevin Wolf<kwolf@redhat.com>
> + *
> + * Permission is hereby granted, free of charge, to any person obtaining a copy
> + * of this software and associated documentation files (the "Software"), to deal
> + * in the Software without restriction, including without limitation the rights
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
> + * copies of the Software, and to permit persons to whom the Software is
> + * furnished to do so, subject to the following conditions:
> + *
> + * The above copyright notice and this permission notice shall be included in
> + * all copies or substantial portions of the Software.
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
> + * THE SOFTWARE.
> + */
> +
> +#ifndef BLOCK_QUEUE_H
> +#define BLOCK_QUEUE_H
> +
> +#include "qemu-common.h"
> +
> +typedef struct BlockQueue BlockQueue;
> +
> +typedef struct BlockQueueContext {
> +    BlockQueue* bq;
> +    unsigned    section;
> +} BlockQueueContext;
> +
> +BlockQueue *blkqueue_create(BlockDriverState *bs);
> +void blkqueue_init_context(BlockQueueContext* context, BlockQueue *bq);
> +void blkqueue_destroy(BlockQueue *bq);
> +int blkqueue_pread(BlockQueueContext *context, uint64_t offset, void *buf,
> +    uint64_t size);
> +int blkqueue_pwrite(BlockQueueContext *context, uint64_t offset, void *buf,
> +    uint64_t size);
> +int blkqueue_barrier(BlockQueueContext *context);
> +void blkqueue_flush(BlockQueue *bq);
> +void blkqueue_aio_flush(BlockQueue *bq, BlockDriverCompletionFunc *cb,
> +    void *opaque);
> +
> +#endif
> diff --git a/block/qcow2-cluster.c b/block/qcow2-cluster.c
> index f562b16..eeae173 100644
> --- a/block/qcow2-cluster.c
> +++ b/block/qcow2-cluster.c
> @@ -64,7 +64,8 @@ int qcow2_grow_l1_table(BlockDriverState *bs, int min_size)
>       BLKDBG_EVENT(bs->file, BLKDBG_L1_GROW_WRITE_TABLE);
>       for(i = 0; i<  s->l1_size; i++)
>           new_l1_table[i] = cpu_to_be64(new_l1_table[i]);
> -    ret = bdrv_pwrite_sync(bs->file, new_l1_table_offset, new_l1_table, new_l1_size2);
> +    ret = blkqueue_pwrite(&s->bq_context, new_l1_table_offset, new_l1_table, new_l1_size2);
> +    blkqueue_barrier(&s->bq_context);
>       if (ret<  0)
>           goto fail;
>       for(i = 0; i<  s->l1_size; i++)
> @@ -74,7 +75,8 @@ int qcow2_grow_l1_table(BlockDriverState *bs, int min_size)
>       BLKDBG_EVENT(bs->file, BLKDBG_L1_GROW_ACTIVATE_TABLE);
>       cpu_to_be32w((uint32_t*)data, new_l1_size);
>       cpu_to_be64w((uint64_t*)(data + 4), new_l1_table_offset);
> -    ret = bdrv_pwrite_sync(bs->file, offsetof(QCowHeader, l1_size), data,sizeof(data));
> +    ret = blkqueue_pwrite(&s->bq_context, offsetof(QCowHeader, l1_size), data, sizeof(data));
> +    blkqueue_barrier(&s->bq_context);
>       if (ret<  0) {
>           goto fail;
>       }
> @@ -177,8 +179,7 @@ static int l2_load(BlockDriverState *bs, uint64_t l2_offset,
>       *l2_table = s->l2_cache + (min_index<<  s->l2_bits);
>
>       BLKDBG_EVENT(bs->file, BLKDBG_L2_LOAD);
> -    ret = bdrv_pread(bs->file, l2_offset, *l2_table,
> -        s->l2_size * sizeof(uint64_t));
> +    ret = blkqueue_pread(&s->bq_context, l2_offset, *l2_table, s->l2_size * sizeof(uint64_t));
>       if (ret<  0) {
>           return ret;
>       }
> @@ -207,8 +208,8 @@ static int write_l1_entry(BlockDriverState *bs, int l1_index)
>       }
>
>       BLKDBG_EVENT(bs->file, BLKDBG_L1_UPDATE);
> -    ret = bdrv_pwrite_sync(bs->file, s->l1_table_offset + 8 * l1_start_index,
> -        buf, sizeof(buf));
> +    ret = blkqueue_pwrite(&s->bq_context, s->l1_table_offset + 8 * l1_start_index, buf, sizeof(buf));
> +    blkqueue_barrier(&s->bq_context);
>       if (ret<  0) {
>           return ret;
>       }
> @@ -255,16 +256,15 @@ static int l2_allocate(BlockDriverState *bs, int l1_index, uint64_t **table)
>       } else {
>           /* if there was an old l2 table, read it from the disk */
>           BLKDBG_EVENT(bs->file, BLKDBG_L2_ALLOC_COW_READ);
> -        ret = bdrv_pread(bs->file, old_l2_offset, l2_table,
> -            s->l2_size * sizeof(uint64_t));
> +        ret = blkqueue_pread(&s->bq_context, old_l2_offset, l2_table, s->l2_size * sizeof(uint64_t));
>           if (ret<  0) {
>               goto fail;
>           }
>       }
>       /* write the l2 table to the file */
>       BLKDBG_EVENT(bs->file, BLKDBG_L2_ALLOC_WRITE);
> -    ret = bdrv_pwrite_sync(bs->file, l2_offset, l2_table,
> -        s->l2_size * sizeof(uint64_t));
> +    ret = blkqueue_pwrite(&s->bq_context, l2_offset, l2_table, s->l2_size * sizeof(uint64_t));
> +    blkqueue_barrier(&s->bq_context);
>       if (ret<  0) {
>           goto fail;
>       }
> @@ -378,7 +378,7 @@ static int qcow_read(BlockDriverState *bs, int64_t sector_num,
>               memcpy(buf, s->cluster_cache + index_in_cluster * 512, 512 * n);
>           } else {
>               BLKDBG_EVENT(bs->file, BLKDBG_READ);
> -            ret = bdrv_pread(bs->file, cluster_offset + index_in_cluster * 512, buf, n * 512);
> +            ret = blkqueue_pread(&s->bq_context, cluster_offset + index_in_cluster * 512, buf, n * 512);
>               if (ret != n * 512)
>                   return -1;
>               if (s->crypt_method) {
> @@ -648,6 +648,7 @@ uint64_t qcow2_alloc_compressed_cluster_offset(BlockDriverState *bs,
>   static int write_l2_entries(BlockDriverState *bs, uint64_t *l2_table,
>       uint64_t l2_offset, int l2_index, int num)
>   {
> +    BDRVQcowState *s = bs->opaque;
>       int l2_start_index = l2_index&  ~(L1_ENTRIES_PER_SECTOR - 1);
>       int start_offset = (8 * l2_index)&  ~511;
>       int end_offset = (8 * (l2_index + num) + 511)&  ~511;
> @@ -655,8 +656,7 @@ static int write_l2_entries(BlockDriverState *bs, uint64_t *l2_table,
>       int ret;
>
>       BLKDBG_EVENT(bs->file, BLKDBG_L2_UPDATE);
> -    ret = bdrv_pwrite(bs->file, l2_offset + start_offset,
> -&l2_table[l2_start_index], len);
> +    ret = blkqueue_pwrite(&s->bq_context, l2_offset + start_offset,&l2_table[l2_start_index], len);
>       if (ret<  0) {
>           return ret;
>       }
> @@ -723,7 +723,7 @@ int qcow2_alloc_cluster_link_l2(BlockDriverState *bs, QCowL2Meta *m)
>        * Also flush bs->file to get the right order for L2 and refcount update.
>        */
>       if (j != 0) {
> -        bdrv_flush(bs->file);
> +        blkqueue_barrier(&s->bq_context);
>           for (i = 0; i<  j; i++) {
>               qcow2_free_any_clusters(bs,
>                   be64_to_cpu(old_cluster[i])&  ~QCOW_OFLAG_COPIED, 1);
> diff --git a/block/qcow2-refcount.c b/block/qcow2-refcount.c
> index 4c19e7e..0d21d1f 100644
> --- a/block/qcow2-refcount.c
> +++ b/block/qcow2-refcount.c
> @@ -44,7 +44,7 @@ static int write_refcount_block(BlockDriverState *bs)
>       }
>
>       BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_UPDATE);
> -    if (bdrv_pwrite_sync(bs->file, s->refcount_block_cache_offset,
> +    if (blkqueue_pwrite(&s->bq_context, s->refcount_block_cache_offset,
>               s->refcount_block_cache, size)<  0)
>       {
>           return -EIO;
> @@ -66,8 +66,7 @@ int qcow2_refcount_init(BlockDriverState *bs)
>       s->refcount_table = qemu_malloc(refcount_table_size2);
>       if (s->refcount_table_size>  0) {
>           BLKDBG_EVENT(bs->file, BLKDBG_REFTABLE_LOAD);
> -        ret = bdrv_pread(bs->file, s->refcount_table_offset,
> -                         s->refcount_table, refcount_table_size2);
> +        ret = bdrv_pread(bs->file, s->refcount_table_offset, s->refcount_table, refcount_table_size2);
>           if (ret != refcount_table_size2)
>               goto fail;
>           for(i = 0; i<  s->refcount_table_size; i++)
> @@ -100,8 +99,7 @@ static int load_refcount_block(BlockDriverState *bs,
>       }
>
>       BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_LOAD);
> -    ret = bdrv_pread(bs->file, refcount_block_offset, s->refcount_block_cache,
> -                     s->cluster_size);
> +    ret = blkqueue_pread(&s->bq_context, refcount_block_offset, s->refcount_block_cache, s->cluster_size);
>       if (ret<  0) {
>           return ret;
>       }
> @@ -269,8 +267,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
>
>       /* Now the new refcount block needs to be written to disk */
>       BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_WRITE);
> -    ret = bdrv_pwrite_sync(bs->file, new_block, s->refcount_block_cache,
> -        s->cluster_size);
> +    ret = blkqueue_pwrite(&s->bq_context, new_block, s->refcount_block_cache, s->cluster_size);
> +    blkqueue_barrier(&s->bq_context);
>       if (ret<  0) {
>           goto fail_block;
>       }
> @@ -279,9 +277,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
>       if (refcount_table_index<  s->refcount_table_size) {
>           uint64_t data64 = cpu_to_be64(new_block);
>           BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_HOOKUP);
> -        ret = bdrv_pwrite_sync(bs->file,
> -            s->refcount_table_offset + refcount_table_index * sizeof(uint64_t),
> -&data64, sizeof(data64));
> +        ret = blkqueue_pwrite(&s->bq_context, s->refcount_table_offset + refcount_table_index * sizeof(uint64_t),&data64, sizeof(data64));
> +        blkqueue_barrier(&s->bq_context);
>           if (ret<  0) {
>               goto fail_block;
>           }
> @@ -359,8 +356,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
>
>       /* Write refcount blocks to disk */
>       BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_WRITE_BLOCKS);
> -    ret = bdrv_pwrite_sync(bs->file, meta_offset, new_blocks,
> -        blocks_clusters * s->cluster_size);
> +    ret = blkqueue_pwrite(&s->bq_context, meta_offset, new_blocks, blocks_clusters * s->cluster_size);
> +    blkqueue_barrier(&s->bq_context);
>       qemu_free(new_blocks);
>       if (ret<  0) {
>           goto fail_table;
> @@ -372,8 +369,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
>       }
>
>       BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_WRITE_TABLE);
> -    ret = bdrv_pwrite_sync(bs->file, table_offset, new_table,
> -        table_size * sizeof(uint64_t));
> +    ret = blkqueue_pwrite(&s->bq_context, table_offset, new_table, table_size * sizeof(uint64_t));
> +    blkqueue_barrier(&s->bq_context);
>       if (ret<  0) {
>           goto fail_table;
>       }
> @@ -387,8 +384,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
>       cpu_to_be64w((uint64_t*)data, table_offset);
>       cpu_to_be32w((uint32_t*)(data + 8), table_clusters);
>       BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_SWITCH_TABLE);
> -    ret = bdrv_pwrite_sync(bs->file, offsetof(QCowHeader, refcount_table_offset),
> -        data, sizeof(data));
> +    ret = blkqueue_pwrite(&s->bq_context, offsetof(QCowHeader, refcount_table_offset), data, sizeof(data));
> +    blkqueue_barrier(&s->bq_context);
>       if (ret<  0) {
>           goto fail_table;
>       }
> @@ -444,9 +441,8 @@ static int write_refcount_block_entries(BlockDriverState *bs,
>       size = (last_index - first_index)<<  REFCOUNT_SHIFT;
>
>       BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_UPDATE_PART);
> -    ret = bdrv_pwrite_sync(bs->file,
> -        refcount_block_offset + (first_index<<  REFCOUNT_SHIFT),
> -&s->refcount_block_cache[first_index], size);
> +    ret = blkqueue_pwrite(&s->bq_context, refcount_block_offset + (first_index<<  REFCOUNT_SHIFT),&s->refcount_block_cache[first_index], size);
> +    blkqueue_barrier(&s->bq_context);
>       if (ret<  0) {
>           return ret;
>       }
> @@ -763,8 +759,7 @@ int qcow2_update_snapshot_refcount(BlockDriverState *bs,
>               l1_table = NULL;
>           }
>           l1_allocated = 1;
> -        if (bdrv_pread(bs->file, l1_table_offset,
> -                       l1_table, l1_size2) != l1_size2)
> +        if (blkqueue_pread(&s->bq_context, l1_table_offset, l1_table, l1_size2) != l1_size2)
>               goto fail;
>           for(i = 0;i<  l1_size; i++)
>               be64_to_cpus(&l1_table[i]);
> @@ -783,7 +778,7 @@ int qcow2_update_snapshot_refcount(BlockDriverState *bs,
>               old_l2_offset = l2_offset;
>               l2_offset&= ~QCOW_OFLAG_COPIED;
>               l2_modified = 0;
> -            if (bdrv_pread(bs->file, l2_offset, l2_table, l2_size) != l2_size)
> +            if (blkqueue_pread(&s->bq_context, l2_offset, l2_table, l2_size) != l2_size)
>                   goto fail;
>               for(j = 0; j<  s->l2_size; j++) {
>                   offset = be64_to_cpu(l2_table[j]);
> @@ -943,7 +938,7 @@ static int check_refcounts_l2(BlockDriverState *bs, BdrvCheckResult *res,
>       l2_size = s->l2_size * sizeof(uint64_t);
>       l2_table = qemu_malloc(l2_size);
>
> -    if (bdrv_pread(bs->file, l2_offset, l2_table, l2_size) != l2_size)
> +    if (blkqueue_pread(&s->bq_context, l2_offset, l2_table, l2_size) != l2_size)
>           goto fail;
>
>       /* Do the actual checks */
> @@ -1038,8 +1033,7 @@ static int check_refcounts_l1(BlockDriverState *bs,
>           l1_table = NULL;
>       } else {
>           l1_table = qemu_malloc(l1_size2);
> -        if (bdrv_pread(bs->file, l1_table_offset,
> -                       l1_table, l1_size2) != l1_size2)
> +        if (blkqueue_pread(&s->bq_context, l1_table_offset, l1_table, l1_size2) != l1_size2)
>               goto fail;
>           for(i = 0;i<  l1_size; i++)
>               be64_to_cpus(&l1_table[i]);
> diff --git a/block/qcow2.c b/block/qcow2.c
> index f2b1b1c..9b1cd78 100644
> --- a/block/qcow2.c
> +++ b/block/qcow2.c
> @@ -237,6 +237,10 @@ static int qcow_open(BlockDriverState *bs, int flags)
>       if (qcow2_read_snapshots(bs)<  0)
>           goto fail;
>
> +    /* Block queue */
> +    s->bq = blkqueue_create(bs->file);
> +    blkqueue_init_context(&s->bq_context, s->bq);
> +
>   #ifdef DEBUG_ALLOC
>       qcow2_check_refcounts(bs);
>   #endif
> @@ -494,6 +498,7 @@ static QCowAIOCB *qcow_aio_setup(BlockDriverState *bs,
>           int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
>           BlockDriverCompletionFunc *cb, void *opaque, int is_write)
>   {
> +    BDRVQcowState *s = bs->opaque;
>       QCowAIOCB *acb;
>
>       acb = qemu_aio_get(&qcow_aio_pool, bs, cb, opaque);
> @@ -514,6 +519,10 @@ static QCowAIOCB *qcow_aio_setup(BlockDriverState *bs,
>       acb->cluster_offset = 0;
>       acb->l2meta.nb_clusters = 0;
>       QLIST_INIT(&acb->l2meta.dependent_requests);
> +
> +    /* TODO Push the context into l2meta */
> +    blkqueue_init_context(&s->bq_context, s->bq);
> +
>       return acb;
>   }
>
> @@ -667,6 +676,7 @@ static void qcow_close(BlockDriverState *bs)
>       qemu_free(s->cluster_cache);
>       qemu_free(s->cluster_data);
>       qcow2_refcount_close(bs);
> +    blkqueue_destroy(s->bq);
>   }
>
>   /*
> @@ -1123,12 +1133,16 @@ static int qcow_write_compressed(BlockDriverState *bs, int64_t sector_num,
>
>   static void qcow_flush(BlockDriverState *bs)
>   {
> +    BDRVQcowState *s = bs->opaque;
> +    blkqueue_flush(s->bq);
>       bdrv_flush(bs->file);
>   }
>
>   static BlockDriverAIOCB *qcow_aio_flush(BlockDriverState *bs,
>            BlockDriverCompletionFunc *cb, void *opaque)
>   {
> +    BDRVQcowState *s = bs->opaque;
> +    blkqueue_flush(s->bq);
>       return bdrv_aio_flush(bs->file, cb, opaque);
>   }
>
> diff --git a/block/qcow2.h b/block/qcow2.h
> index 3ff162e..361f1ba 100644
> --- a/block/qcow2.h
> +++ b/block/qcow2.h
> @@ -26,6 +26,7 @@
>   #define BLOCK_QCOW2_H
>
>   #include "aes.h"
> +#include "block-queue.h"
>
>   //#define DEBUG_ALLOC
>   //#define DEBUG_ALLOC2
> @@ -108,6 +109,9 @@ typedef struct BDRVQcowState {
>       int64_t free_cluster_index;
>       int64_t free_byte_offset;
>
> +    BlockQueue *bq;
> +    BlockQueueContext bq_context;
> +
>       uint32_t crypt_method; /* current crypt method, 0 if no key yet */
>       uint32_t crypt_method_header;
>       AES_KEY aes_encrypt_key;
> diff --git a/qemu-thread.c b/qemu-thread.c
> index fbc78fe..abcb7a6 100644
> --- a/qemu-thread.c
> +++ b/qemu-thread.c
> @@ -167,6 +167,19 @@ void qemu_thread_create(QemuThread *thread,
>       pthread_sigmask(SIG_SETMASK,&oldset, NULL);
>   }
>
> +void *qemu_thread_join(QemuThread *thread)
> +{
> +    int err;
> +    void *ret;
> +
> +    err = pthread_join(thread->thread,&ret);
> +    if (err) {
> +        error_exit(err, __func__);
> +    }
> +
> +    return ret;
> +}
> +
>   void qemu_thread_signal(QemuThread *thread, int sig)
>   {
>       int err;
> diff --git a/qemu-thread.h b/qemu-thread.h
> index 19bb30c..2b6f218 100644
> --- a/qemu-thread.h
> +++ b/qemu-thread.h
> @@ -36,6 +36,7 @@ int qemu_cond_timedwait(QemuCond *cond, QemuMutex *mutex, uint64_t msecs);
>   void qemu_thread_create(QemuThread *thread,
>                          void *(*start_routine)(void*),
>                          void *arg);
> +void *qemu_thread_join(QemuThread *thread);
>   void qemu_thread_signal(QemuThread *thread, int sig);
>   void qemu_thread_self(QemuThread *thread);
>   int qemu_thread_equal(QemuThread *thread1, QemuThread *thread2);
>
>
Anthony Liguori Sept. 20, 2010, 2:56 p.m. UTC | #2
On 09/20/2010 09:31 AM, Anthony Liguori wrote:
>> If we delay the operation and get three of these sequences queued before
>> actually executing, we end up with the following result, saving two 
>> syncs:
>>
>> 1. Update refcount table (req 1)
>> 2. Update refcount table (req 2)
>> 3. Update refcount table (req 3)
>> 4. bdrv_flush
>> 5. Update L2 entry (req 1)
>> 6. Update L2 entry (req 2)
>> 7. Update L2 entry (req 3)
>>
>> This patch only commits a sync if either the guests has requested a 
>> flush or if
>> a certain number of requests in the queue, so usually we batch more 
>> than just
>> three requests.
>>
>> I didn't run any detailed benchmarks but just tried what happens with
>> installation time of a Fedora 13 guest, and while git master takes 
>> about 40-50%
>> longer than before the metadata syncs, we get most of it back with 
>> blkqueue.
>>
>> Of course, in this state the code is not correct, but it's correct 
>> enough to
>> try and have qcow2 run on a file backend. Some remaining problems are:
>>
>> - There's no locking between the worker thread and other functions 
>> accessing
>>    the same backend driver. Should be fine for file, but probably not 
>> for other
>>    backends.
>>
>> - Error handling doesn't really exist. If something goes wrong with 
>> writing
>>    metadata we can't fail the guest request any more because it's long
>>    completed. Losing this data is actually okay, the guest hasn't 
>> flushed yet.
>>
>>    However, we need to be able to fail a flush, and we also need some 
>> way to
>>    handle errors transparently. This probably means that we have to 
>> stop the VM
>>    and let the user fix things so that we can retry. The only other 
>> way would be
>>    to shut down the VM and end up in the same situation as with a 
>> host crash.
>>
>>    Or maybe it would even be enough to start failing all new requests.
>>
>> - The Makefile integration is obviously very wrong, too. It worked 
>> for me good
>>    enough, but you need to be aware when block-queue.o is compiled with
>>    RUN_TESTS and when it isn't. The tests need to be split out properly.
>>
>> They are certainly fixable and shouldn't have any major impact on 
>> performance,
>> so that's just a matter of doing it.
>>
>> Kevin
>>
>> ---
>>   Makefile               |    3 +
>>   Makefile.objs          |    1 +
>>   block-queue.c          |  720 
>> ++++++++++++++++++++++++++++++++++++++++++++++++
>>   block-queue.h          |   49 ++++
>>   block/qcow2-cluster.c  |   28 +-
>>   block/qcow2-refcount.c |   44 ++--
>>   block/qcow2.c          |   14 +
>>   block/qcow2.h          |    4 +
>>   qemu-thread.c          |   13 +
>>   qemu-thread.h          |    1 +
>>   10 files changed, 838 insertions(+), 39 deletions(-)
>>   create mode 100644 block-queue.c
>>   create mode 100644 block-queue.h
>>
>> diff --git a/Makefile b/Makefile
>> index ab91d42..0202dc6 100644
>> --- a/Makefile
>> +++ b/Makefile
>> @@ -125,6 +125,9 @@ qemu-nbd$(EXESUF): qemu-nbd.o qemu-tool.o 
>> qemu-error.o $(trace-obj-y) $(block-ob
>>
>>   qemu-io$(EXESUF): qemu-io.o cmd.o qemu-tool.o qemu-error.o 
>> $(trace-obj-y) $(block-obj-y) $(qobject-obj-y)
>>
>> +block-queue$(EXESUF): QEMU_CFLAGS += -DRUN_TESTS
>> +block-queue$(EXESUF): qemu-tool.o qemu-error.o qemu-thread.o 
>> $(block-obj-y) $(qobject-obj-y)
>> +
>>   qemu-img-cmds.h: $(SRC_PATH)/qemu-img-cmds.hx
>>       $(call quiet-command,sh $(SRC_PATH)/hxtool -h<  $< >  $@,"  
>> GEN   $@")
>>
>> diff --git a/Makefile.objs b/Makefile.objs
>> index 3ef6d80..e97a246 100644
>> --- a/Makefile.objs
>> +++ b/Makefile.objs
>> @@ -9,6 +9,7 @@ qobject-obj-y += qerror.o
>>
>>   block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o 
>> module.o
>>   block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
>> +block-obj-y += qemu-thread.o block-queue.o
>>   block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
>>   block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
>>
>> diff --git a/block-queue.c b/block-queue.c
>> new file mode 100644
>> index 0000000..13579a7
>> --- /dev/null
>> +++ b/block-queue.c
>> @@ -0,0 +1,720 @@
>> +/*
>> + * QEMU System Emulator
>> + *
>> + * Copyright (c) 2010 Kevin Wolf<kwolf@redhat.com>
>> + *
>> + * Permission is hereby granted, free of charge, to any person 
>> obtaining a copy
>> + * of this software and associated documentation files (the 
>> "Software"), to deal
>> + * in the Software without restriction, including without limitation 
>> the rights
>> + * to use, copy, modify, merge, publish, distribute, sublicense, 
>> and/or sell
>> + * copies of the Software, and to permit persons to whom the 
>> Software is
>> + * furnished to do so, subject to the following conditions:
>> + *
>> + * The above copyright notice and this permission notice shall be 
>> included in
>> + * all copies or substantial portions of the Software.
>> + *
>> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 
>> EXPRESS OR
>> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 
>> MERCHANTABILITY,
>> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT 
>> SHALL
>> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES 
>> OR OTHER
>> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, 
>> ARISING FROM,
>> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 
>> DEALINGS IN
>> + * THE SOFTWARE.
>> + */
>> +
>> +#include<signal.h>
>> +
>> +#include "qemu-common.h"
>> +#include "qemu-queue.h"
>> +#include "qemu-thread.h"
>> +#include "qemu-barrier.h"
>> +#include "block.h"
>> +#include "block-queue.h"
>> +
>> +enum blkqueue_req_type {
>> +    REQ_TYPE_WRITE,
>> +    REQ_TYPE_BARRIER,
>> +};
>> +
>> +typedef struct BlockQueueRequest {
>> +    enum blkqueue_req_type type;
>> +
>> +    uint64_t    offset;
>> +    void*       buf;
>> +    uint64_t    size;
>> +    unsigned    section;
>> +
>> +    QTAILQ_ENTRY(BlockQueueRequest) link;
>> +    QSIMPLEQ_ENTRY(BlockQueueRequest) link_section;
>> +} BlockQueueRequest;
>> +
>> +struct BlockQueue {
>> +    BlockDriverState*   bs;
>> +
>> +    QemuThread          thread;
>> +    bool                thread_done;
>> +    QemuMutex           lock;
>> +    QemuMutex           flush_lock;
>> +    QemuCond            cond;
>> +
>> +    int                 barriers_requested;
>> +    int                 barriers_submitted;
>> +    int                 queue_size;
>> +
>> +    QTAILQ_HEAD(bq_queue_head, BlockQueueRequest) queue;
>> +    QSIMPLEQ_HEAD(, BlockQueueRequest) sections;
>> +};
>> +
>> +static void *blkqueue_thread(void *bq);
>> +
>> +BlockQueue *blkqueue_create(BlockDriverState *bs)
>> +{
>> +    BlockQueue *bq = qemu_mallocz(sizeof(BlockQueue));
>> +    bq->bs = bs;
>> +
>> +    QTAILQ_INIT(&bq->queue);
>> +    QSIMPLEQ_INIT(&bq->sections);
>> +
>> +    qemu_mutex_init(&bq->lock);
>> +    qemu_mutex_init(&bq->flush_lock);
>> +    qemu_cond_init(&bq->cond);
>> +
>> +    bq->thread_done = false;
>> +    qemu_thread_create(&bq->thread, blkqueue_thread, bq);
>> +
>> +    return bq;
>> +}
>> +
>> +void blkqueue_init_context(BlockQueueContext* context, BlockQueue *bq)
>> +{
>> +    context->bq = bq;
>> +    context->section = 0;
>> +}
>> +
>> +void blkqueue_destroy(BlockQueue *bq)
>> +{
>> +    bq->thread_done = true;
>> +    qemu_cond_signal(&bq->cond);
>> +    qemu_thread_join(&bq->thread);
>> +
>> +    blkqueue_flush(bq);
>> +
>> +    fprintf(stderr, "blkqueue_destroy: %d/%d barriers left\n",
>> +        bq->barriers_submitted, bq->barriers_requested);
>> +
>> +    qemu_mutex_destroy(&bq->lock);
>> +    qemu_mutex_destroy(&bq->flush_lock);
>> +    qemu_cond_destroy(&bq->cond);
>> +
>> +    assert(QTAILQ_FIRST(&bq->queue) == NULL);
>> +    assert(QSIMPLEQ_FIRST(&bq->sections) == NULL);
>> +    qemu_free(bq);
>> +}
>> +
>> +int blkqueue_pread(BlockQueueContext *context, uint64_t offset, void 
>> *buf,
>> +    uint64_t size)
>> +{
>> +    BlockQueue *bq = context->bq;
>> +    BlockQueueRequest *req;
>> +    int ret;
>> +
>> +    /*
>> +     * First check if there are any pending writes for the same 
>> data. Reverse
>> +     * order to return data written by the latest write.
>> +     */
>> +    QTAILQ_FOREACH_REVERSE(req,&bq->queue, bq_queue_head, link) {
>> +        uint64_t end = offset + size;
>> +        uint64_t req_end = req->offset + req->size;
>> +        uint8_t *read_buf = buf;
>> +        uint8_t *req_buf = req->buf;
>> +
>> +        /* We're only interested in queued writes */
>> +        if (req->type != REQ_TYPE_WRITE) {
>> +            continue;
>> +        }
>> +
>> +        /*
>> +         * If we read from a write in the queue (i.e. our read 
>> overlaps the
>> +         * write request), our next write probably depends on this 
>> write, so
>> +         * let's move forward to its section.
>> +         */
>> +        if (end>  req->offset&&  offset<  req_end) {
>> +            context->section = MAX(context->section, req->section);
>> +        }
>> +
>> +        /* How we continue, depends on the kind of overlap we have */
>> +        if ((offset>= req->offset)&&  (end<= req_end)) {
>> +            /* Completely contained in the write request */
>> +            memcpy(buf,&req_buf[offset - req->offset], size);
>> +            return 0;
>> +        } else if ((end>= req->offset)&&  (end<= req_end)) {
>> +            /* Overlap in the end of the read request */
>> +            assert(offset<  req->offset);
>> +            memcpy(&read_buf[req->offset - offset], req_buf, end - 
>> req->offset);
>> +            size = req->offset - offset;
>> +        } else if ((offset>= req->offset)&&  (offset<  req_end)) {
>> +            /* Overlap in the start of the read request */
>> +            assert(end>  req_end);
>> +            memcpy(read_buf,&req_buf[offset - req->offset], req_end 
>> - offset);
>> +            buf = read_buf =&read_buf[req_end - offset];
>> +            offset = req_end;
>> +            size = end - req_end;
>> +        } else if ((req->offset>= offset)&&  (req_end<= end)) {
>> +            /*
>> +             * The write request is completely contained in the read 
>> request.
>> +             * memcpy the data from the write request here, continue 
>> with the
>> +             * data before the write request and handle the data 
>> after the
>> +             * write request with a recursive call.
>> +             */
>> +            memcpy(&read_buf[req->offset - offset], req_buf, req_end 
>> - req->offset);
>> +            size = req->offset - offset;
>> +            blkqueue_pread(context, req_end,&read_buf[req_end - 
>> offset], end - req_end);
>> +        }
>> +    }
>> +
>> +    /* The requested is not written in the queue, read it from disk */
>> +    ret = bdrv_pread(bq->bs, offset, buf, size);
>> +    if (ret<  0) {
>> +        return ret;
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +int blkqueue_pwrite(BlockQueueContext *context, uint64_t offset, 
>> void *buf,
>> +    uint64_t size)
>> +{
>> +    BlockQueue *bq = context->bq;
>> +    BlockQueueRequest *section_req;
>> +
>> +    /* Create request structure */
>> +    BlockQueueRequest *req = qemu_malloc(sizeof(*req));
>> +    req->type       = REQ_TYPE_WRITE;
>> +    req->offset     = offset;
>> +    req->size       = size;
>> +    req->buf        = qemu_malloc(size);
>> +    req->section    = context->section;
>> +    memcpy(req->buf, buf, size);
>> +
>> +    /*
>> +     * Find the right place to insert it into the queue:
>> +     * Right before the barrier that closes the current section.
>> +     */
>> +    qemu_mutex_lock(&bq->lock);
>> +    QSIMPLEQ_FOREACH(section_req,&bq->sections, link_section) {
>> +        if (section_req->section>= req->section) {
>> +            req->section = section_req->section;
>> +            context->section = section_req->section;
>> +            QTAILQ_INSERT_BEFORE(section_req, req, link);
>> +            bq->queue_size++;
>> +            goto out;
>> +        }
>> +    }
>> +
>> +    /* If there was no barrier, just put it at the end. */
>> +    QTAILQ_INSERT_TAIL(&bq->queue, req, link);
>> +    bq->queue_size++;
>> +    qemu_cond_signal(&bq->cond);
>> +
>> +out:
>> +    qemu_mutex_unlock(&bq->lock);
>> +    return 0;
>> +}
>> +
>> +int blkqueue_barrier(BlockQueueContext *context)
>> +{
>> +    BlockQueue *bq = context->bq;
>> +    BlockQueueRequest *section_req;
>> +
>> +    bq->barriers_requested++;
>> +
>> +    /* Create request structure */
>> +    BlockQueueRequest *req = qemu_malloc(sizeof(*req));
>> +    req->type       = REQ_TYPE_BARRIER;
>> +    req->section    = context->section;
>> +    req->buf        = NULL;
>> +
>> +    /* Find another barrier to merge with. */
>> +    qemu_mutex_lock(&bq->lock);
>> +    QSIMPLEQ_FOREACH(section_req,&bq->sections, link_section) {
>> +        if (section_req->section>= req->section) {
>> +            req->section = section_req->section;
>> +            context->section = section_req->section + 1;
>> +            qemu_free(req);
>> +            goto out;
>> +        }
>> +    }
>> +
>> +    /*
>> +     * If there wasn't a barrier for the same section yet, insert a 
>> new one at
>> +     * the end.
>> +     */
>> +    QTAILQ_INSERT_TAIL(&bq->queue, req, link);
>> +    QSIMPLEQ_INSERT_TAIL(&bq->sections, req, link_section);
>> +    bq->queue_size++;
>> +    context->section++;
>> +    qemu_cond_signal(&bq->cond);
>> +
>> +    bq->barriers_submitted++;
>> +
>> +out:
>> +    qemu_mutex_unlock(&bq->lock);
>> +    return 0;
>> +}
>> +
>> +/*
>> + * Caller needs to hold the bq->lock mutex
>> + */
>> +static BlockQueueRequest *blkqueue_pop(BlockQueue *bq)
>> +{
>> +    BlockQueueRequest *req;
>> +
>> +    req = QTAILQ_FIRST(&bq->queue);
>> +    if (req == NULL) {
>> +        goto out;
>> +    }
>> +
>> +    QTAILQ_REMOVE(&bq->queue, req, link);
>> +    bq->queue_size--;
>> +
>> +    if (req->type == REQ_TYPE_BARRIER) {
>> +        assert(QSIMPLEQ_FIRST(&bq->sections) == req);
>> +        QSIMPLEQ_REMOVE_HEAD(&bq->sections, link_section);
>> +    }
>> +
>> +out:
>> +    return req;
>> +}
>> +
>> +static void blkqueue_free_request(BlockQueueRequest *req)
>> +{
>> +    qemu_free(req->buf);
>> +    qemu_free(req);
>> +}
>> +
>> +static void blkqueue_process_request(BlockQueue *bq)
>> +{
>> +    BlockQueueRequest *req;
>> +    BlockQueueRequest *req2;
>> +    int ret;
>> +
>> +    /*
>> +     * Note that we leave the request in the queue while we process 
>> it. No
>> +     * other request will be queued before this one and we have only 
>> one thread
>> +     * that processes the queue, so afterwards it will still be the 
>> first
>> +     * request. (Not true for barriers in the first position, but we 
>> can handle
>> +     * that)
>> +     */
>> +    req = QTAILQ_FIRST(&bq->queue);
>> +    if (req == NULL) {
>> +        return;
>> +    }
>> +
>> +    switch (req->type) {
>> +        case REQ_TYPE_WRITE:
>> +            ret = bdrv_pwrite(bq->bs, req->offset, req->buf, 
>> req->size);
>> +            if (ret<  0) {
>> +                /* TODO Error reporting! */
>> +                return;
>> +            }
>> +            break;
>> +        case REQ_TYPE_BARRIER:
>> +            bdrv_flush(bq->bs);
>> +            break;
>> +    }
>> +
>> +    /*
>> +     * Only remove the request from the queue when it's written, so 
>> that reads
>> +     * always access the right data.
>> +     */
>> +    qemu_mutex_lock(&bq->lock);
>> +    req2 = QTAILQ_FIRST(&bq->queue);
>> +    if (req == req2) {
>> +        blkqueue_pop(bq);
>> +        blkqueue_free_request(req);
>> +    } else {
>> +        /*
>> +         * If it's a barrier and something has been queued before 
>> it, just
>> +         * leave it in the queue and flush once again later.
>> +         */
>> +        assert(req->type == REQ_TYPE_BARRIER);
>> +        bq->barriers_submitted++;
>> +    }
>> +    qemu_mutex_unlock(&bq->lock);
>> +}
>> +
>> +struct blkqueue_flush_aiocb {
>> +    BlockQueue *bq;
>> +    BlockDriverCompletionFunc *cb;
>> +    void *opaque;
>> +};
>> +
>> +static void *blkqueue_aio_flush_thread(void *opaque)
>> +{
>> +    struct blkqueue_flush_aiocb *acb = opaque;
>> +
>> +    /* Process any left over requests */
>> +    blkqueue_flush(acb->bq);
>> +
>> +    acb->cb(acb->opaque, 0);
>> +    qemu_free(acb);
>> +
>> +    return NULL;
>> +}
>> +
>> +void blkqueue_aio_flush(BlockQueue *bq, BlockDriverCompletionFunc *cb,
>> +    void *opaque)
>> +{
>> +    struct blkqueue_flush_aiocb *acb;
>> +
>> +    acb = qemu_malloc(sizeof(*acb));
>> +    acb->bq = bq;
>> +    acb->cb = cb;
>> +    acb->opaque = opaque;
>> +
>> +    qemu_thread_create(NULL, blkqueue_aio_flush_thread, acb);
>> +}
>> +
>> +void blkqueue_flush(BlockQueue *bq)
>> +{
>> +    qemu_mutex_lock(&bq->flush_lock);
>> +
>> +    /* Process any left over requests */
>> +    while (QTAILQ_FIRST(&bq->queue)) {
>> +        blkqueue_process_request(bq);
>> +    }
>> +
>> +    qemu_mutex_unlock(&bq->flush_lock);
>> +}
>> +
>> +static void *blkqueue_thread(void *_bq)
>> +{
>> +    BlockQueue *bq = _bq;
>> +#ifndef RUN_TESTS
>> +    BlockQueueRequest *req;
>> +#endif
>> +
>> +    qemu_mutex_lock(&bq->flush_lock);
>> +    while (!bq->thread_done) {
>> +        barrier();

A barrier shouldn't be needed here.

>> +#ifndef RUN_TESTS
>> +        req = QTAILQ_FIRST(&bq->queue);
>> +
>> +        /* Don't process barriers, we only do that on flushes */
>> +        if (req&&  (req->type != REQ_TYPE_BARRIER || 
>> bq->queue_size>  42)) {
>> +            blkqueue_process_request(bq);
>> +        } else {
>> +            qemu_cond_wait(&bq->cond,&bq->flush_lock);
>> +        }


The normal pattern for this is:

while (!condition) {
     qemu_cond_wait(&cond, &lock);
}
process_request()

It's generally best not to deviate from this pattern in terms of code 
readability.

A less invasive way of doing this (assuming we're okay with it from a 
correctness perspective) is to make use of qemu_aio_wait() as a 
replacement for qemu_mutex_lock() and shift the pread/pwrite calls to 
bdrv_aio_write/bdrv_aio_read.

IOW, blkqueue_pwrite stages a request via bdrv_aio_write().  
blkqueue_pread() either returns a cached read or it does a 
bdrv_pread().  The blkqueue_flush() call will then do qemu_aio_wait() to 
wait for all pending I/Os to complete.

Regards,

Anthony Liguori
Kevin Wolf Sept. 20, 2010, 3:08 p.m. UTC | #3
Am 20.09.2010 16:31, schrieb Anthony Liguori:
> On 09/20/2010 08:56 AM, Kevin Wolf wrote:
>> I won't get this ready until I leave for vacation on Wednesday, so I thought I
>> could just as well post it as an RFC in this state.
>>
>> With this patch applied, qcow2 doesn't directly access the image file any more
>> for metadata, but rather goes through the newly introduced blkqueue. Write
>> and sync requests are queued there and executed in a separate worker thread.
>> Reads consider the contents of the queue before accessing the the image file.
>>
>> What makes this interesting is that we can delay syncs and if multiple syncs
>> occur, we can merge them into one bdrv_flush.
>>
>> A typical sequence in qcow2 (simple cluster allocation) looks like this:
>>
>> 1. Update refcount table
>> 2. bdrv_flush
>> 3. Update L2 entry
>>    
> 
> Let's expand it a bit more:
> 
> 1. Update refcount table
> 2. bdrv_flush
> 3. Update L2 entry
> 4. Write data to disk
> 5. Report write complete
> 
> I'm struggling to understand how a thread helps out.

This sequence becomes:

1. Update refcount table
2. Write data to disk
3. Report write complete

And only later:

4. Update L2 entry
5. bdrv_flush (possibly merged with other flushes)

> If you run 1-3 in a thread, you need to inject a barrier between steps 3 
> and 5 or you'll report the write complete before writing the metadata 
> out.  You can't delay completing step 3 until a guest requests a flush.  
> If you do, then you're implementing a writeback cache for metadata.

Yeah, if you like to call it that, that's probably an accurate description.

> If you're comfortable with a writeback cache for metadata, then you 
> should also be comfortable with a writeback cache for data in which 
> case, cache=writeback is the answer.

Well, there is a difference: We don't pollute the host page cache with
guest data and we don't get a virtual "disk cache" as big as the host
RAM, but only a very limited queue of metadata.

Basically, in qemu we have three different types of caching:

1. O_DSYNC, everything is always synced without any explicit request.
   This is cache=writethrough.

2. Nothing is ever synced. This is cache=unsafe.

3. We present a writeback disk cache to the guest and the guest needs
   to explicitly flush to gets its data safe on disk. This is
   cache=writeback and cache=none.

So they are actually very similar, the difference is only if to use
O_DIRECT or not. In principle, regarding the integrity requirements
there is already no difference between cache=none and cache=writeback today.

We're still lacking modes for O_DSYNC | O_DIRECT and unsafe | O_DIRECT,
but they are entirely possible, because it's two different dimensions.
(And I think Christoph was planning to actually make it two independent
options)

You have a point in that we need to disable the queueing for
cache=writethrough. I'm aware of that, but forgot to mention it in the
todo list.

> If it's a matter of batching, batching can't occur if you have a barrier 
> between steps 3 and 5.  The only way you can get batching is by doing a 
> writeback cache for the metadata such that you can complete your request 
> before the metadata is written.
> 
> Am I misunderstanding the idea?

No, I think you understand it right, but maybe you were not completely
aware that cache=none doesn't mean writethrough.

Kevin
Kevin Wolf Sept. 20, 2010, 3:33 p.m. UTC | #4
Am 20.09.2010 16:56, schrieb Anthony Liguori:
>>> +void blkqueue_flush(BlockQueue *bq)
>>> +{
>>> +    qemu_mutex_lock(&bq->flush_lock);
>>> +
>>> +    /* Process any left over requests */
>>> +    while (QTAILQ_FIRST(&bq->queue)) {
>>> +        blkqueue_process_request(bq);
>>> +    }
>>> +
>>> +    qemu_mutex_unlock(&bq->flush_lock);
>>> +}
>>> +
>>> +static void *blkqueue_thread(void *_bq)
>>> +{
>>> +    BlockQueue *bq = _bq;
>>> +#ifndef RUN_TESTS
>>> +    BlockQueueRequest *req;
>>> +#endif
>>> +
>>> +    qemu_mutex_lock(&bq->flush_lock);
>>> +    while (!bq->thread_done) {
>>> +        barrier();
> 
> A barrier shouldn't be needed here.

It was needed when I started with an empty thread because gcc would
"optimize" while(!bq->thread_done) into an endless loop. I guess there
is enough code added now that gcc won't try to be clever any more, so I
can remove that.

>>> +#ifndef RUN_TESTS
>>> +        req = QTAILQ_FIRST(&bq->queue);
>>> +
>>> +        /* Don't process barriers, we only do that on flushes */
>>> +        if (req&&  (req->type != REQ_TYPE_BARRIER || 
>>> bq->queue_size>  42)) {
>>> +            blkqueue_process_request(bq);
>>> +        } else {
>>> +            qemu_cond_wait(&bq->cond,&bq->flush_lock);
>>> +        }
> 
> 
> The normal pattern for this is:
> 
> while (!condition) {
>      qemu_cond_wait(&cond, &lock);
> }
> process_request()
> 
> It's generally best not to deviate from this pattern in terms of code 
> readability.

Hm, yes, I think you're right. The code used to be a bit more involved
here initially and it seems that I missed the last obvious piece of
simplification.

> A less invasive way of doing this (assuming we're okay with it from a 
> correctness perspective) is to make use of qemu_aio_wait() as a 
> replacement for qemu_mutex_lock() and shift the pread/pwrite calls to 
> bdrv_aio_write/bdrv_aio_read.
> 
> IOW, blkqueue_pwrite stages a request via bdrv_aio_write().  
> blkqueue_pread() either returns a cached read or it does a 
> bdrv_pread().  The blkqueue_flush() call will then do qemu_aio_wait() to 
> wait for all pending I/Os to complete.

I was actually considering that, but it would have been a bit more
coding to keep track of another queue of in-flight requests, juggling
with some more AIOCBs and implementing an emulation for the missing
bdrv_aio_pwrite. Nothing really dramatic, it just was easier to start
this way.

If we come to the conclusion that bdrv_aio_write is the way to go and
it's worth the work, I'm fine with changing it.

Kevin
Avi Kivity Sept. 20, 2010, 3:33 p.m. UTC | #5
On 09/20/2010 05:08 PM, Kevin Wolf wrote:
> >
> >  Let's expand it a bit more:
> >
> >  1. Update refcount table
> >  2. bdrv_flush
> >  3. Update L2 entry
> >  4. Write data to disk
> >  5. Report write complete
> >
> >  I'm struggling to understand how a thread helps out.
>
> This sequence becomes:
>
> 1. Update refcount table
> 2. Write data to disk
> 3. Report write complete
>
> And only later:
>
> 4. Update L2 entry
> 5. bdrv_flush (possibly merged with other flushes)
>

The L2 update needs to happen after we're sure the refcount update is 
stable, so need a bdrv_flush between them.

Still, the basic idea looks sound.  You can do many refcount updates, 
flush, many L2 updates, flush.
Avi Kivity Sept. 20, 2010, 3:38 p.m. UTC | #6
On 09/20/2010 05:33 PM, Avi Kivity wrote:
>  On 09/20/2010 05:08 PM, Kevin Wolf wrote:
>> >
>> >  Let's expand it a bit more:
>> >
>> >  1. Update refcount table
>> >  2. bdrv_flush
>> >  3. Update L2 entry
>> >  4. Write data to disk
>> >  5. Report write complete
>> >
>> >  I'm struggling to understand how a thread helps out.
>>
>> This sequence becomes:
>>
>> 1. Update refcount table
>> 2. Write data to disk
>> 3. Report write complete
>>
>> And only later:
>>
>> 4. Update L2 entry
>> 5. bdrv_flush (possibly merged with other flushes)
>>
>
> The L2 update needs to happen after we're sure the refcount update is 
> stable, so need a bdrv_flush between them.
>
> Still, the basic idea looks sound.  You can do many refcount updates, 
> flush, many L2 updates, flush.
>


Oh, your original RFC has that flush in place, so all is good.
Anthony Liguori Sept. 20, 2010, 3:40 p.m. UTC | #7
On 09/20/2010 10:08 AM, Kevin Wolf wrote:
>> If you're comfortable with a writeback cache for metadata, then you
>> should also be comfortable with a writeback cache for data in which
>> case, cache=writeback is the answer.
>>      
> Well, there is a difference: We don't pollute the host page cache with
> guest data and we don't get a virtual "disk cache" as big as the host
> RAM, but only a very limited queue of metadata.
>
> Basically, in qemu we have three different types of caching:
>
> 1. O_DSYNC, everything is always synced without any explicit request.
>     This is cache=writethrough.
>    

I actually think O_DSYNC is the wrong implementation of 
cache=writethrough.  cache=writethrough should behave just like 
cache=none except that data goes through the page cache.

> 2. Nothing is ever synced. This is cache=unsafe.
>
> 3. We present a writeback disk cache to the guest and the guest needs
>     to explicitly flush to gets its data safe on disk. This is
>     cache=writeback and cache=none.
>    

We shouldn't tie the virtual disk cache to which cache= option is used 
in the host.  cache=none means that all requests go directly to the 
disk.  cache=writeback means the host acts as a writeback cache.

If your disk is in writethrough mode, exposing cache=none as a writeback 
disk cache is not correct.

> We're still lacking modes for O_DSYNC | O_DIRECT and unsafe | O_DIRECT,
> but they are entirely possible, because it's two different dimensions.
> (And I think Christoph was planning to actually make it two independent
> options)
>    

I don't really think O_DSYNC | O_DIRECT makes much sense.

>> If it's a matter of batching, batching can't occur if you have a barrier
>> between steps 3 and 5.  The only way you can get batching is by doing a
>> writeback cache for the metadata such that you can complete your request
>> before the metadata is written.
>>
>> Am I misunderstanding the idea?
>>      
> No, I think you understand it right, but maybe you were not completely
> aware that cache=none doesn't mean writethrough.
>    

No, cache=none means don't cache on the host.

In my mind, cache=none|cache=writethrough is specifically about 
eliminating the host from the cache hierarchy.  This is not a 
correctness issue with respect to integrity but rather about data loss.  
If you have strong storage with battery backed caches, then you can 
relax flushes.  However, if you've got a cache in the host and the host 
isn't battery backed, that's no longer safe to do.

So even with cache=none, if we added a writeback cache for metadata, it 
would really need to be an optional feature.  Something like 
cache=none|writethrough|metadata|writeback.

Regards,

Anthony Liguori

> Kevin
>
Kevin Wolf Sept. 20, 2010, 3:46 p.m. UTC | #8
Am 20.09.2010 17:33, schrieb Avi Kivity:
>   On 09/20/2010 05:08 PM, Kevin Wolf wrote:
>>>
>>>  Let's expand it a bit more:
>>>
>>>  1. Update refcount table
>>>  2. bdrv_flush
>>>  3. Update L2 entry
>>>  4. Write data to disk
>>>  5. Report write complete
>>>
>>>  I'm struggling to understand how a thread helps out.
>>
>> This sequence becomes:
>>
>> 1. Update refcount table
>> 2. Write data to disk
>> 3. Report write complete
>>
>> And only later:
>>
>> 4. Update L2 entry
>> 5. bdrv_flush (possibly merged with other flushes)
>>
> 
> The L2 update needs to happen after we're sure the refcount update is 
> stable, so need a bdrv_flush between them.
> 
> Still, the basic idea looks sound.  You can do many refcount updates, 
> flush, many L2 updates, flush.

Oops, my bad. Swap 4 and 5, that's what I meant. We don't need a flush
after writing the L2 entry until we need to flush for other reasons.

Kevin
Anthony Liguori Sept. 20, 2010, 3:48 p.m. UTC | #9
On 09/20/2010 10:33 AM, Kevin Wolf wrote:
> Am 20.09.2010 16:56, schrieb Anthony Liguori:
>    
>>>> +void blkqueue_flush(BlockQueue *bq)
>>>> +{
>>>> +    qemu_mutex_lock(&bq->flush_lock);
>>>> +
>>>> +    /* Process any left over requests */
>>>> +    while (QTAILQ_FIRST(&bq->queue)) {
>>>> +        blkqueue_process_request(bq);
>>>> +    }
>>>> +
>>>> +    qemu_mutex_unlock(&bq->flush_lock);
>>>> +}
>>>> +
>>>> +static void *blkqueue_thread(void *_bq)
>>>> +{
>>>> +    BlockQueue *bq = _bq;
>>>> +#ifndef RUN_TESTS
>>>> +    BlockQueueRequest *req;
>>>> +#endif
>>>> +
>>>> +    qemu_mutex_lock(&bq->flush_lock);
>>>> +    while (!bq->thread_done) {
>>>> +        barrier();
>>>>          
>> A barrier shouldn't be needed here.
>>      
> It was needed when I started with an empty thread because gcc would
> "optimize" while(!bq->thread_done) into an endless loop. I guess there
> is enough code added now that gcc won't try to be clever any more, so I
> can remove that.
>    

The qemu_cond_wait() will act as a read barrier.

>> A less invasive way of doing this (assuming we're okay with it from a
>> correctness perspective) is to make use of qemu_aio_wait() as a
>> replacement for qemu_mutex_lock() and shift the pread/pwrite calls to
>> bdrv_aio_write/bdrv_aio_read.
>>
>> IOW, blkqueue_pwrite stages a request via bdrv_aio_write().
>> blkqueue_pread() either returns a cached read or it does a
>> bdrv_pread().  The blkqueue_flush() call will then do qemu_aio_wait() to
>> wait for all pending I/Os to complete.
>>      
> I was actually considering that, but it would have been a bit more
> coding to keep track of another queue of in-flight requests, juggling
> with some more AIOCBs and implementing an emulation for the missing
> bdrv_aio_pwrite. Nothing really dramatic, it just was easier to start
> this way.
>    

bdrv_aio_pwritev is definitely useful in other places so it's worth adding.

> If we come to the conclusion that bdrv_aio_write is the way to go and
> it's worth the work, I'm fine with changing it.
>    

Adding locking to allow bdrv_pwrite/bdrv_pread to be safely called 
outside of qemu_mutex is going to carry an awful lot of complexity since 
we can do things like layer qcow2 on top of NBD.  That means 
bdrv_pread() may be repeatedly interacting with the main loop which 
means that there's no simple place to start.

I'm not fundamentally opposed to using threads for concurrency.  I think 
it's going to get super complicated though to do it here.

Regards,

Anthony Liguori

> Kevin
>
Anthony Liguori Sept. 20, 2010, 3:51 p.m. UTC | #10
On 09/20/2010 10:08 AM, Kevin Wolf wrote:
>
>> If you're comfortable with a writeback cache for metadata, then you
>> should also be comfortable with a writeback cache for data in which
>> case, cache=writeback is the answer.
>>      
> Well, there is a difference: We don't pollute the host page cache with
> guest data and we don't get a virtual "disk cache" as big as the host
> RAM, but only a very limited queue of metadata.
>    

Would it be a mortal sin to open the file twice and have a cache=none 
version for data and cache=writeback for metadata?

The two definitely aren't consistent with each other but I think the 
whole point here is that we don't care.

It opens up some other possibilities too like cache=none for data and 
cache=writethrough for metadata which may be a useful combination.

Regards,

Anthony Liguori
Kevin Wolf Sept. 20, 2010, 3:55 p.m. UTC | #11
Am 20.09.2010 17:40, schrieb Anthony Liguori:
> On 09/20/2010 10:08 AM, Kevin Wolf wrote:
>>> If you're comfortable with a writeback cache for metadata, then you
>>> should also be comfortable with a writeback cache for data in which
>>> case, cache=writeback is the answer.
>>>      
>> Well, there is a difference: We don't pollute the host page cache with
>> guest data and we don't get a virtual "disk cache" as big as the host
>> RAM, but only a very limited queue of metadata.
>>
>> Basically, in qemu we have three different types of caching:
>>
>> 1. O_DSYNC, everything is always synced without any explicit request.
>>     This is cache=writethrough.
>>    
> 
> I actually think O_DSYNC is the wrong implementation of 
> cache=writethrough.  cache=writethrough should behave just like 
> cache=none except that data goes through the page cache.

Then you have cache=writeback, basically.

>> 2. Nothing is ever synced. This is cache=unsafe.
>>
>> 3. We present a writeback disk cache to the guest and the guest needs
>>     to explicitly flush to gets its data safe on disk. This is
>>     cache=writeback and cache=none.
>>    
> 
> We shouldn't tie the virtual disk cache to which cache= option is used 
> in the host.  cache=none means that all requests go directly to the 
> disk.  cache=writeback means the host acts as a writeback cache.

No, that's not the meaning of cache=none if you take the disk cache into
consideration. It might be what you think should be the meaning of
cache=none, but it's not what it means in any qemu release.

> If your disk is in writethrough mode, exposing cache=none as a writeback 
> disk cache is not correct.

The host's disk is writethrough? In this case it's being more
conservative than needed, yes.

>> We're still lacking modes for O_DSYNC | O_DIRECT and unsafe | O_DIRECT,
>> but they are entirely possible, because it's two different dimensions.
>> (And I think Christoph was planning to actually make it two independent
>> options)
> 
> I don't really think O_DSYNC | O_DIRECT makes much sense.

Maybe, maybe not. It's just a missing entry in the matrix.

Kevin
Avi Kivity Sept. 20, 2010, 4:05 p.m. UTC | #12
On 09/20/2010 05:51 PM, Anthony Liguori wrote:
> On 09/20/2010 10:08 AM, Kevin Wolf wrote:
>>
>>> If you're comfortable with a writeback cache for metadata, then you
>>> should also be comfortable with a writeback cache for data in which
>>> case, cache=writeback is the answer.
>> Well, there is a difference: We don't pollute the host page cache with
>> guest data and we don't get a virtual "disk cache" as big as the host
>> RAM, but only a very limited queue of metadata.
>
> Would it be a mortal sin to open the file twice and have a cache=none 
> version for data and cache=writeback for metadata?
>
> The two definitely aren't consistent with each other but I think the 
> whole point here is that we don't care.
>
> It opens up some other possibilities too like cache=none for data and 
> cache=writethrough for metadata which may be a useful combination.

I've thought of this (and I think perhaps suggested it on this list).  
The question is whether the kernel doesn't slow direct io when page 
cache is present for the file (but in unconflicting ranges).

I think it's considered a valid use case (backing up a database file 
while the database is O_DIRECTing into it) but I don't know if the code 
was actually updated to support this.
Anthony Liguori Sept. 20, 2010, 4:34 p.m. UTC | #13
On 09/20/2010 10:55 AM, Kevin Wolf wrote:
> Am 20.09.2010 17:40, schrieb Anthony Liguori:
>    
>> On 09/20/2010 10:08 AM, Kevin Wolf wrote:
>>      
>>>> If you're comfortable with a writeback cache for metadata, then you
>>>> should also be comfortable with a writeback cache for data in which
>>>> case, cache=writeback is the answer.
>>>>
>>>>          
>>> Well, there is a difference: We don't pollute the host page cache with
>>> guest data and we don't get a virtual "disk cache" as big as the host
>>> RAM, but only a very limited queue of metadata.
>>>
>>> Basically, in qemu we have three different types of caching:
>>>
>>> 1. O_DSYNC, everything is always synced without any explicit request.
>>>      This is cache=writethrough.
>>>
>>>        
>> I actually think O_DSYNC is the wrong implementation of
>> cache=writethrough.  cache=writethrough should behave just like
>> cache=none except that data goes through the page cache.
>>      
> Then you have cache=writeback, basically.
>    

No.  Write through means "write requests are not completed until the 
data has been acknowledged by the next layer."  Write back means "write 
requests are completed irrespective of the data being acknowledged by 
the next layer."

Write through ensures consistency with the next layer whereas write back 
doesn't.

cache=none means that there is no cache.  If there is no cache, then 
you're guaranteed to be consistent with the next layer.

The only reason it's exposed as writeback at the emulation layer is that 
*usually* disks have writeback caches.  The fact that writethrough 
currently is stronger than the next layer (in terms that it breaks 
through the writeback cache a disk may have) is not a designed feature.  
It's an accident.

Had ext3 enabled barriers by default, writethrough would not use O_DSYNC.

>>> 2. Nothing is ever synced. This is cache=unsafe.
>>>
>>> 3. We present a writeback disk cache to the guest and the guest needs
>>>      to explicitly flush to gets its data safe on disk. This is
>>>      cache=writeback and cache=none.
>>>
>>>        
>> We shouldn't tie the virtual disk cache to which cache= option is used
>> in the host.  cache=none means that all requests go directly to the
>> disk.  cache=writeback means the host acts as a writeback cache.
>>      
> No, that's not the meaning of cache=none if you take the disk cache into
> consideration.

You can't possibly take into account the disk cache because we don't 
know anything about the disk cache in QEMU.  Just assuming disks always 
have writeback caches is wrong.

>   It might be what you think should be the meaning of
> cache=none, but it's not what it means in any qemu release.
>    

That's precisely what it's meant in every release since the cache 
options when I first introduced them.

>>> We're still lacking modes for O_DSYNC | O_DIRECT and unsafe | O_DIRECT,
>>> but they are entirely possible, because it's two different dimensions.
>>> (And I think Christoph was planning to actually make it two independent
>>> options)
>>>        
>> I don't really think O_DSYNC | O_DIRECT makes much sense.
>>      
> Maybe, maybe not. It's just a missing entry in the matrix.
>    

Regards,

Anthony Liguori
Kevin Wolf Sept. 21, 2010, 9:13 a.m. UTC | #14
Am 20.09.2010 17:51, schrieb Anthony Liguori:
> On 09/20/2010 10:08 AM, Kevin Wolf wrote:
>>
>>> If you're comfortable with a writeback cache for metadata, then you
>>> should also be comfortable with a writeback cache for data in which
>>> case, cache=writeback is the answer.
>>>      
>> Well, there is a difference: We don't pollute the host page cache with
>> guest data and we don't get a virtual "disk cache" as big as the host
>> RAM, but only a very limited queue of metadata.
> 
> Would it be a mortal sin to open the file twice and have a cache=none 
> version for data and cache=writeback for metadata?

Is the behaviour for this well-defined and portable?

> The two definitely aren't consistent with each other but I think the 
> whole point here is that we don't care.

What we do care about is ordering between data and metadata writes, for
example when doing COW, the copy of the data should have completed
before we update the L2 table.

Also, what happens (in qcow2) when we free a data cluster and reuse it
as metadata (or the other way round). Does this work or is there a
chance that the old content is resurrected?

Kevin
diff mbox

Patch

diff --git a/Makefile b/Makefile
index ab91d42..0202dc6 100644
--- a/Makefile
+++ b/Makefile
@@ -125,6 +125,9 @@  qemu-nbd$(EXESUF): qemu-nbd.o qemu-tool.o qemu-error.o $(trace-obj-y) $(block-ob
 
 qemu-io$(EXESUF): qemu-io.o cmd.o qemu-tool.o qemu-error.o $(trace-obj-y) $(block-obj-y) $(qobject-obj-y)
 
+block-queue$(EXESUF): QEMU_CFLAGS += -DRUN_TESTS
+block-queue$(EXESUF): qemu-tool.o qemu-error.o qemu-thread.o $(block-obj-y) $(qobject-obj-y)
+
 qemu-img-cmds.h: $(SRC_PATH)/qemu-img-cmds.hx
 	$(call quiet-command,sh $(SRC_PATH)/hxtool -h < $< > $@,"  GEN   $@")
 
diff --git a/Makefile.objs b/Makefile.objs
index 3ef6d80..e97a246 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -9,6 +9,7 @@  qobject-obj-y += qerror.o
 
 block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
 block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
+block-obj-y += qemu-thread.o block-queue.o
 block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
 block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
 
diff --git a/block-queue.c b/block-queue.c
new file mode 100644
index 0000000..13579a7
--- /dev/null
+++ b/block-queue.c
@@ -0,0 +1,720 @@ 
+/*
+ * QEMU System Emulator
+ *
+ * Copyright (c) 2010 Kevin Wolf <kwolf@redhat.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#include <signal.h>
+
+#include "qemu-common.h"
+#include "qemu-queue.h"
+#include "qemu-thread.h"
+#include "qemu-barrier.h"
+#include "block.h"
+#include "block-queue.h"
+
+enum blkqueue_req_type {
+    REQ_TYPE_WRITE,
+    REQ_TYPE_BARRIER,
+};
+
+typedef struct BlockQueueRequest {
+    enum blkqueue_req_type type;
+
+    uint64_t    offset;
+    void*       buf;
+    uint64_t    size;
+    unsigned    section;
+
+    QTAILQ_ENTRY(BlockQueueRequest) link;
+    QSIMPLEQ_ENTRY(BlockQueueRequest) link_section;
+} BlockQueueRequest;
+
+struct BlockQueue {
+    BlockDriverState*   bs;
+
+    QemuThread          thread;
+    bool                thread_done;
+    QemuMutex           lock;
+    QemuMutex           flush_lock;
+    QemuCond            cond;
+
+    int                 barriers_requested;
+    int                 barriers_submitted;
+    int                 queue_size;
+
+    QTAILQ_HEAD(bq_queue_head, BlockQueueRequest) queue;
+    QSIMPLEQ_HEAD(, BlockQueueRequest) sections;
+};
+
+static void *blkqueue_thread(void *bq);
+
+BlockQueue *blkqueue_create(BlockDriverState *bs)
+{
+    BlockQueue *bq = qemu_mallocz(sizeof(BlockQueue));
+    bq->bs = bs;
+
+    QTAILQ_INIT(&bq->queue);
+    QSIMPLEQ_INIT(&bq->sections);
+
+    qemu_mutex_init(&bq->lock);
+    qemu_mutex_init(&bq->flush_lock);
+    qemu_cond_init(&bq->cond);
+
+    bq->thread_done = false;
+    qemu_thread_create(&bq->thread, blkqueue_thread, bq);
+
+    return bq;
+}
+
+void blkqueue_init_context(BlockQueueContext* context, BlockQueue *bq)
+{
+    context->bq = bq;
+    context->section = 0;
+}
+
+void blkqueue_destroy(BlockQueue *bq)
+{
+    bq->thread_done = true;
+    qemu_cond_signal(&bq->cond);
+    qemu_thread_join(&bq->thread);
+
+    blkqueue_flush(bq);
+
+    fprintf(stderr, "blkqueue_destroy: %d/%d barriers left\n",
+        bq->barriers_submitted, bq->barriers_requested);
+
+    qemu_mutex_destroy(&bq->lock);
+    qemu_mutex_destroy(&bq->flush_lock);
+    qemu_cond_destroy(&bq->cond);
+
+    assert(QTAILQ_FIRST(&bq->queue) == NULL);
+    assert(QSIMPLEQ_FIRST(&bq->sections) == NULL);
+    qemu_free(bq);
+}
+
+int blkqueue_pread(BlockQueueContext *context, uint64_t offset, void *buf,
+    uint64_t size)
+{
+    BlockQueue *bq = context->bq;
+    BlockQueueRequest *req;
+    int ret;
+
+    /*
+     * First check if there are any pending writes for the same data. Reverse
+     * order to return data written by the latest write.
+     */
+    QTAILQ_FOREACH_REVERSE(req, &bq->queue, bq_queue_head, link) {
+        uint64_t end = offset + size;
+        uint64_t req_end = req->offset + req->size;
+        uint8_t *read_buf = buf;
+        uint8_t *req_buf = req->buf;
+
+        /* We're only interested in queued writes */
+        if (req->type != REQ_TYPE_WRITE) {
+            continue;
+        }
+
+        /*
+         * If we read from a write in the queue (i.e. our read overlaps the
+         * write request), our next write probably depends on this write, so
+         * let's move forward to its section.
+         */
+        if (end > req->offset && offset < req_end) {
+            context->section = MAX(context->section, req->section);
+        }
+
+        /* How we continue, depends on the kind of overlap we have */
+        if ((offset >= req->offset) && (end <= req_end)) {
+            /* Completely contained in the write request */
+            memcpy(buf, &req_buf[offset - req->offset], size);
+            return 0;
+        } else if ((end >= req->offset) && (end <= req_end)) {
+            /* Overlap in the end of the read request */
+            assert(offset < req->offset);
+            memcpy(&read_buf[req->offset - offset], req_buf, end - req->offset);
+            size = req->offset - offset;
+        } else if ((offset >= req->offset) && (offset < req_end)) {
+            /* Overlap in the start of the read request */
+            assert(end > req_end);
+            memcpy(read_buf, &req_buf[offset - req->offset], req_end - offset);
+            buf = read_buf = &read_buf[req_end - offset];
+            offset = req_end;
+            size = end - req_end;
+        } else if ((req->offset >= offset) && (req_end <= end)) {
+            /*
+             * The write request is completely contained in the read request.
+             * memcpy the data from the write request here, continue with the
+             * data before the write request and handle the data after the
+             * write request with a recursive call.
+             */
+            memcpy(&read_buf[req->offset - offset], req_buf, req_end - req->offset);
+            size = req->offset - offset;
+            blkqueue_pread(context, req_end, &read_buf[req_end - offset], end - req_end);
+        }
+    }
+
+    /* The requested is not written in the queue, read it from disk */
+    ret = bdrv_pread(bq->bs, offset, buf, size);
+    if (ret < 0) {
+        return ret;
+    }
+
+    return 0;
+}
+
+int blkqueue_pwrite(BlockQueueContext *context, uint64_t offset, void *buf,
+    uint64_t size)
+{
+    BlockQueue *bq = context->bq;
+    BlockQueueRequest *section_req;
+
+    /* Create request structure */
+    BlockQueueRequest *req = qemu_malloc(sizeof(*req));
+    req->type       = REQ_TYPE_WRITE;
+    req->offset     = offset;
+    req->size       = size;
+    req->buf        = qemu_malloc(size);
+    req->section    = context->section;
+    memcpy(req->buf, buf, size);
+
+    /*
+     * Find the right place to insert it into the queue:
+     * Right before the barrier that closes the current section.
+     */
+    qemu_mutex_lock(&bq->lock);
+    QSIMPLEQ_FOREACH(section_req, &bq->sections, link_section) {
+        if (section_req->section >= req->section) {
+            req->section = section_req->section;
+            context->section = section_req->section;
+            QTAILQ_INSERT_BEFORE(section_req, req, link);
+            bq->queue_size++;
+            goto out;
+        }
+    }
+
+    /* If there was no barrier, just put it at the end. */
+    QTAILQ_INSERT_TAIL(&bq->queue, req, link);
+    bq->queue_size++;
+    qemu_cond_signal(&bq->cond);
+
+out:
+    qemu_mutex_unlock(&bq->lock);
+    return 0;
+}
+
+int blkqueue_barrier(BlockQueueContext *context)
+{
+    BlockQueue *bq = context->bq;
+    BlockQueueRequest *section_req;
+
+    bq->barriers_requested++;
+
+    /* Create request structure */
+    BlockQueueRequest *req = qemu_malloc(sizeof(*req));
+    req->type       = REQ_TYPE_BARRIER;
+    req->section    = context->section;
+    req->buf        = NULL;
+
+    /* Find another barrier to merge with. */
+    qemu_mutex_lock(&bq->lock);
+    QSIMPLEQ_FOREACH(section_req, &bq->sections, link_section) {
+        if (section_req->section >= req->section) {
+            req->section = section_req->section;
+            context->section = section_req->section + 1;
+            qemu_free(req);
+            goto out;
+        }
+    }
+
+    /*
+     * If there wasn't a barrier for the same section yet, insert a new one at
+     * the end.
+     */
+    QTAILQ_INSERT_TAIL(&bq->queue, req, link);
+    QSIMPLEQ_INSERT_TAIL(&bq->sections, req, link_section);
+    bq->queue_size++;
+    context->section++;
+    qemu_cond_signal(&bq->cond);
+
+    bq->barriers_submitted++;
+
+out:
+    qemu_mutex_unlock(&bq->lock);
+    return 0;
+}
+
+/*
+ * Caller needs to hold the bq->lock mutex
+ */
+static BlockQueueRequest *blkqueue_pop(BlockQueue *bq)
+{
+    BlockQueueRequest *req;
+
+    req = QTAILQ_FIRST(&bq->queue);
+    if (req == NULL) {
+        goto out;
+    }
+
+    QTAILQ_REMOVE(&bq->queue, req, link);
+    bq->queue_size--;
+
+    if (req->type == REQ_TYPE_BARRIER) {
+        assert(QSIMPLEQ_FIRST(&bq->sections) == req);
+        QSIMPLEQ_REMOVE_HEAD(&bq->sections, link_section);
+    }
+
+out:
+    return req;
+}
+
+static void blkqueue_free_request(BlockQueueRequest *req)
+{
+    qemu_free(req->buf);
+    qemu_free(req);
+}
+
+static void blkqueue_process_request(BlockQueue *bq)
+{
+    BlockQueueRequest *req;
+    BlockQueueRequest *req2;
+    int ret;
+
+    /*
+     * Note that we leave the request in the queue while we process it. No
+     * other request will be queued before this one and we have only one thread
+     * that processes the queue, so afterwards it will still be the first
+     * request. (Not true for barriers in the first position, but we can handle
+     * that)
+     */
+    req = QTAILQ_FIRST(&bq->queue);
+    if (req == NULL) {
+        return;
+    }
+
+    switch (req->type) {
+        case REQ_TYPE_WRITE:
+            ret = bdrv_pwrite(bq->bs, req->offset, req->buf, req->size);
+            if (ret < 0) {
+                /* TODO Error reporting! */
+                return;
+            }
+            break;
+        case REQ_TYPE_BARRIER:
+            bdrv_flush(bq->bs);
+            break;
+    }
+
+    /*
+     * Only remove the request from the queue when it's written, so that reads
+     * always access the right data.
+     */
+    qemu_mutex_lock(&bq->lock);
+    req2 = QTAILQ_FIRST(&bq->queue);
+    if (req == req2) {
+        blkqueue_pop(bq);
+        blkqueue_free_request(req);
+    } else {
+        /*
+         * If it's a barrier and something has been queued before it, just
+         * leave it in the queue and flush once again later.
+         */
+        assert(req->type == REQ_TYPE_BARRIER);
+        bq->barriers_submitted++;
+    }
+    qemu_mutex_unlock(&bq->lock);
+}
+
+struct blkqueue_flush_aiocb {
+    BlockQueue *bq;
+    BlockDriverCompletionFunc *cb;
+    void *opaque;
+};
+
+static void *blkqueue_aio_flush_thread(void *opaque)
+{
+    struct blkqueue_flush_aiocb *acb = opaque;
+
+    /* Process any left over requests */
+    blkqueue_flush(acb->bq);
+
+    acb->cb(acb->opaque, 0);
+    qemu_free(acb);
+
+    return NULL;
+}
+
+void blkqueue_aio_flush(BlockQueue *bq, BlockDriverCompletionFunc *cb,
+    void *opaque)
+{
+    struct blkqueue_flush_aiocb *acb;
+
+    acb = qemu_malloc(sizeof(*acb));
+    acb->bq = bq;
+    acb->cb = cb;
+    acb->opaque = opaque;
+
+    qemu_thread_create(NULL, blkqueue_aio_flush_thread, acb);
+}
+
+void blkqueue_flush(BlockQueue *bq)
+{
+    qemu_mutex_lock(&bq->flush_lock);
+
+    /* Process any left over requests */
+    while (QTAILQ_FIRST(&bq->queue)) {
+        blkqueue_process_request(bq);
+    }
+
+    qemu_mutex_unlock(&bq->flush_lock);
+}
+
+static void *blkqueue_thread(void *_bq)
+{
+    BlockQueue *bq = _bq;
+#ifndef RUN_TESTS
+    BlockQueueRequest *req;
+#endif
+
+    qemu_mutex_lock(&bq->flush_lock);
+    while (!bq->thread_done) {
+        barrier();
+#ifndef RUN_TESTS
+        req = QTAILQ_FIRST(&bq->queue);
+
+        /* Don't process barriers, we only do that on flushes */
+        if (req && (req->type != REQ_TYPE_BARRIER || bq->queue_size > 42)) {
+            blkqueue_process_request(bq);
+        } else {
+            qemu_cond_wait(&bq->cond, &bq->flush_lock);
+        }
+#else
+        qemu_cond_wait(&bq->cond, &bq->flush_lock);
+#endif
+    }
+    qemu_mutex_unlock(&bq->flush_lock);
+
+    return NULL;
+}
+
+#ifdef RUN_TESTS
+
+#define CHECK_WRITE(req, _offset, _size, _buf, _section) \
+    do { \
+        assert(req != NULL); \
+        assert(req->type == REQ_TYPE_WRITE); \
+        assert(req->offset == _offset); \
+        assert(req->size == _size); \
+        assert(req->section == _section); \
+        assert(!memcmp(req->buf, _buf, _size)); \
+    } while(0)
+
+#define CHECK_BARRIER(req, _section) \
+    do { \
+        assert(req != NULL); \
+        assert(req->type == REQ_TYPE_BARRIER); \
+        assert(req->section == _section); \
+    } while(0)
+
+#define CHECK_READ(_context, _offset, _buf, _size, _cmpbuf) \
+    do { \
+        int ret; \
+        memset(buf, 0, 512); \
+        ret = blkqueue_pread(_context, _offset, _buf, _size); \
+        assert(ret == 0); \
+        assert(!memcmp(_cmpbuf, _buf, _size)); \
+    } while(0)
+
+#define QUEUE_WRITE(_context, _offset, _buf, _size, _pattern) \
+    do { \
+        int ret; \
+        memset(_buf, _pattern, _size); \
+        ret = blkqueue_pwrite(_context, _offset, _buf, _size); \
+        assert(ret == 0); \
+    } while(0)
+#define QUEUE_BARRIER(_context) \
+    do { \
+        int ret; \
+        ret = blkqueue_barrier(_context); \
+        assert(ret == 0); \
+    } while(0)
+
+#define POP_CHECK_WRITE(_bq, _offset, _buf, _size, _pattern, _section) \
+    do { \
+        BlockQueueRequest *req; \
+        memset(_buf, _pattern, _size); \
+        req = blkqueue_pop(_bq); \
+        CHECK_WRITE(req, _offset, _size, _buf, _section); \
+        blkqueue_free_request(req); \
+    } while(0)
+#define POP_CHECK_BARRIER(_bq, _section) \
+    do { \
+        BlockQueueRequest *req; \
+        req = blkqueue_pop(_bq); \
+        CHECK_BARRIER(req, _section); \
+        blkqueue_free_request(req); \
+    } while(0)
+
+static void  __attribute__((used)) dump_queue(BlockQueue *bq)
+{
+    BlockQueueRequest *req;
+
+    fprintf(stderr, "--- Queue dump ---\n");
+    QTAILQ_FOREACH(req, &bq->queue, link) {
+        fprintf(stderr, "[%d] ", req->section);
+        if (req->type == REQ_TYPE_WRITE) {
+            fprintf(stderr, "Write off=%5"PRId64", len=%5"PRId64", buf=%p\n",
+                req->offset, req->size, req->buf);
+        } else if (req->type == REQ_TYPE_BARRIER) {
+            fprintf(stderr, "Barrier\n");
+        } else {
+            fprintf(stderr, "Unknown type %d\n", req->type);
+        }
+    }
+}
+
+static void test_basic(BlockDriverState *bs)
+{
+    uint8_t buf[512];
+    BlockQueue *bq;
+    BlockQueueContext context;
+
+    bq = blkqueue_create(bs);
+    blkqueue_init_context(&context, bq);
+
+    /* Queue requests */
+    QUEUE_WRITE(&context,   0, buf, 512, 0x12);
+    QUEUE_WRITE(&context, 512, buf,  42, 0x34);
+    QUEUE_BARRIER(&context);
+    QUEUE_WRITE(&context, 678, buf,  42, 0x56);
+
+    /* Verify queue contents */
+    POP_CHECK_WRITE(bq,     0, buf, 512, 0x12, 0);
+    POP_CHECK_WRITE(bq,   512, buf,  42, 0x34, 0);
+    POP_CHECK_BARRIER(bq, 0);
+    POP_CHECK_WRITE(bq,   678, buf,  42, 0x56, 1);
+
+    blkqueue_destroy(bq);
+}
+
+static void test_merge(BlockDriverState *bs)
+{
+    uint8_t buf[512];
+    BlockQueue *bq;
+    BlockQueueContext ctx1, ctx2;
+
+    bq = blkqueue_create(bs);
+    blkqueue_init_context(&ctx1, bq);
+    blkqueue_init_context(&ctx2, bq);
+
+    /* Queue requests */
+    QUEUE_WRITE(&ctx1,    0, buf, 512, 0x12);
+    QUEUE_BARRIER(&ctx1);
+    QUEUE_WRITE(&ctx2,  512, buf,  42, 0x34);
+    QUEUE_WRITE(&ctx1, 1024, buf, 512, 0x12);
+    QUEUE_BARRIER(&ctx2);
+    QUEUE_WRITE(&ctx2, 1512, buf,  42, 0x34);
+
+    /* Verify queue contents */
+    POP_CHECK_WRITE(bq,     0, buf, 512, 0x12, 0);
+    POP_CHECK_WRITE(bq,   512, buf,  42, 0x34, 0);
+    POP_CHECK_BARRIER(bq, 0);
+    POP_CHECK_WRITE(bq,  1024, buf, 512, 0x12, 1);
+    POP_CHECK_WRITE(bq,  1512, buf,  42, 0x34, 1);
+
+    /* Same queue, new contexts */
+    blkqueue_init_context(&ctx1, bq);
+    blkqueue_init_context(&ctx2, bq);
+
+    /* Queue requests */
+    QUEUE_BARRIER(&ctx2);
+    QUEUE_WRITE(&ctx2,  512, buf,  42, 0x34);
+    QUEUE_WRITE(&ctx2,   12, buf,  20, 0x45);
+    QUEUE_BARRIER(&ctx2);
+    QUEUE_WRITE(&ctx2,  892, buf, 142, 0x56);
+
+    QUEUE_WRITE(&ctx1,    0, buf,   8, 0x12);
+    QUEUE_BARRIER(&ctx1);
+    QUEUE_WRITE(&ctx1, 1024, buf, 512, 0x12);
+    QUEUE_BARRIER(&ctx1);
+    QUEUE_WRITE(&ctx1, 1512, buf,  42, 0x34);
+    QUEUE_BARRIER(&ctx1);
+
+    /* Verify queue contents */
+    POP_CHECK_WRITE(bq,     0, buf,   8, 0x12, 0);
+    POP_CHECK_BARRIER(bq, 0);
+    POP_CHECK_WRITE(bq,   512, buf,  42, 0x34, 1);
+    POP_CHECK_WRITE(bq,    12, buf,  20, 0x45, 1);
+    POP_CHECK_WRITE(bq,  1024, buf, 512, 0x12, 1);
+    POP_CHECK_BARRIER(bq, 1);
+    POP_CHECK_WRITE(bq,   892, buf, 142, 0x56, 2);
+    POP_CHECK_WRITE(bq,  1512, buf,  42, 0x34, 2);
+    POP_CHECK_BARRIER(bq, 2);
+
+    blkqueue_destroy(bq);
+}
+
+static void test_read(BlockDriverState *bs)
+{
+    uint8_t buf[512], buf2[512];
+    BlockQueue *bq;
+    BlockQueueContext ctx1;
+
+    bq = blkqueue_create(bs);
+    blkqueue_init_context(&ctx1, bq);
+
+    /* Queue requests and do some test reads */
+    memset(buf2, 0xa5, 512);
+    CHECK_READ(&ctx1, 0, buf, 32, buf2);
+
+    QUEUE_WRITE(&ctx1, 5, buf, 5, 0x12);
+    memset(buf2, 0x12, 5);
+    CHECK_READ(&ctx1,  5, buf, 5, buf2);
+    CHECK_READ(&ctx1,  7, buf, 2, buf2);
+    memset(buf2, 0xa5, 512);
+    memset(buf2 + 5, 0x12, 5);
+    CHECK_READ(&ctx1,  0, buf, 8, buf2);
+    CHECK_READ(&ctx1,  0, buf, 10, buf2);
+    CHECK_READ(&ctx1,  0, buf, 32, buf2);
+    memset(buf2, 0xa5, 512);
+    memset(buf2, 0x12, 5);
+    CHECK_READ(&ctx1,  5, buf, 16, buf2);
+    memset(buf2, 0xa5, 512);
+    CHECK_READ(&ctx1,  0, buf,  2, buf2);
+    CHECK_READ(&ctx1, 10, buf, 16, buf2);
+
+    QUEUE_WRITE(&ctx1, 0, buf, 2, 0x12);
+    memset(&buf2[5], 0x12, 5);
+    memset(buf2, 0x12, 2);
+    CHECK_READ(&ctx1,  0, buf, 32, buf2);
+
+    /* Verify queue contents */
+    POP_CHECK_WRITE(bq,     5, buf,   5, 0x12, 0);
+    POP_CHECK_WRITE(bq,     0, buf,   2, 0x12, 0);
+
+    blkqueue_destroy(bq);
+}
+
+static void test_read_order(BlockDriverState *bs)
+{
+    uint8_t buf[512], buf2[512];
+    BlockQueue *bq;
+    BlockQueueContext ctx1, ctx2;
+
+    bq = blkqueue_create(bs);
+    blkqueue_init_context(&ctx1, bq);
+    blkqueue_init_context(&ctx2, bq);
+
+    /* Queue requests and do some test reads */
+    QUEUE_WRITE(&ctx1, 25, buf, 5, 0x44);
+    QUEUE_BARRIER(&ctx1);
+    QUEUE_WRITE(&ctx1, 5, buf, 5, 0x12);
+    QUEUE_BARRIER(&ctx1);
+    QUEUE_WRITE(&ctx2, 10, buf, 5, 0x34);
+
+    memset(buf2, 0xa5, 512);
+    memset(buf2 + 5, 0x12, 5);
+    memset(buf2 + 10, 0x34, 5);
+    CHECK_READ(&ctx2, 0, buf, 20, buf2);
+    QUEUE_WRITE(&ctx2,  0, buf, 10, 0x34);
+    QUEUE_BARRIER(&ctx2);
+
+    /* Verify queue contents */
+    POP_CHECK_WRITE(bq,    25, buf,   5, 0x44, 0);
+    POP_CHECK_WRITE(bq,    10, buf,   5, 0x34, 0);
+    POP_CHECK_BARRIER(bq, 0);
+    POP_CHECK_WRITE(bq,     5, buf,   5, 0x12, 1);
+    POP_CHECK_WRITE(bq,     0, buf,  10, 0x34, 1);
+    POP_CHECK_BARRIER(bq, 1);
+
+    blkqueue_destroy(bq);
+}
+
+static void test_process_request(BlockDriverState *bs)
+{
+    uint8_t buf[512], buf2[512];
+    BlockQueue *bq;
+    BlockQueueContext ctx1;
+
+    bq = blkqueue_create(bs);
+    blkqueue_init_context(&ctx1, bq);
+
+    /* Queue requests and do a test read */
+    QUEUE_WRITE(&ctx1, 25, buf, 5, 0x44);
+    QUEUE_BARRIER(&ctx1);
+
+    memset(buf2, 0xa5, 512);
+    memset(buf2 + 25, 0x44, 5);
+    CHECK_READ(&ctx1, 0, buf, 64, buf2);
+
+    /* Process the queue (plus one call to test a NULL condition) */
+    blkqueue_process_request(bq);
+    blkqueue_process_request(bq);
+    blkqueue_process_request(bq);
+
+    /* Verify the queue is empty */
+    assert(blkqueue_pop(bq) == NULL);
+
+    /* Check if we still read the same */
+    CHECK_READ(&ctx1, 0, buf, 64, buf2);
+
+    blkqueue_destroy(bq);
+}
+
+static void run_test(void (*testfn)(BlockDriverState*), BlockDriverState *bs)
+{
+    void* buf;
+    int ret;
+
+    buf = qemu_malloc(1024 * 1024);
+    memset(buf, 0xa5, 1024 * 1024);
+    ret = bdrv_write(bs, 0, buf, 2048);
+    assert(ret >= 0);
+    qemu_free(buf);
+
+    testfn(bs);
+}
+
+int main(void)
+{
+    BlockDriverState *bs;
+    int ret;
+
+    bdrv_init();
+    bs = bdrv_new("");
+    ret = bdrv_open(bs, "block-queue.img", BDRV_O_RDWR, NULL);
+    if (ret < 0) {
+        fprintf(stderr, "Couldn't open block-queue.img: %s\n",
+            strerror(-ret));
+        exit(1);
+    }
+
+    run_test(&test_basic, bs);
+    run_test(&test_merge, bs);
+    run_test(&test_read, bs);
+    run_test(&test_read_order, bs);
+    run_test(&test_process_request, bs);
+
+    bdrv_delete(bs);
+
+    return 0;
+}
+#endif
diff --git a/block-queue.h b/block-queue.h
new file mode 100644
index 0000000..4ce0e1b
--- /dev/null
+++ b/block-queue.h
@@ -0,0 +1,49 @@ 
+/*
+ * QEMU System Emulator
+ *
+ * Copyright (c) 2010 Kevin Wolf <kwolf@redhat.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#ifndef BLOCK_QUEUE_H
+#define BLOCK_QUEUE_H
+
+#include "qemu-common.h"
+
+typedef struct BlockQueue BlockQueue;
+
+typedef struct BlockQueueContext {
+    BlockQueue* bq;
+    unsigned    section;
+} BlockQueueContext;
+
+BlockQueue *blkqueue_create(BlockDriverState *bs);
+void blkqueue_init_context(BlockQueueContext* context, BlockQueue *bq);
+void blkqueue_destroy(BlockQueue *bq);
+int blkqueue_pread(BlockQueueContext *context, uint64_t offset, void *buf,
+    uint64_t size);
+int blkqueue_pwrite(BlockQueueContext *context, uint64_t offset, void *buf,
+    uint64_t size);
+int blkqueue_barrier(BlockQueueContext *context);
+void blkqueue_flush(BlockQueue *bq);
+void blkqueue_aio_flush(BlockQueue *bq, BlockDriverCompletionFunc *cb,
+    void *opaque);
+
+#endif
diff --git a/block/qcow2-cluster.c b/block/qcow2-cluster.c
index f562b16..eeae173 100644
--- a/block/qcow2-cluster.c
+++ b/block/qcow2-cluster.c
@@ -64,7 +64,8 @@  int qcow2_grow_l1_table(BlockDriverState *bs, int min_size)
     BLKDBG_EVENT(bs->file, BLKDBG_L1_GROW_WRITE_TABLE);
     for(i = 0; i < s->l1_size; i++)
         new_l1_table[i] = cpu_to_be64(new_l1_table[i]);
-    ret = bdrv_pwrite_sync(bs->file, new_l1_table_offset, new_l1_table, new_l1_size2);
+    ret = blkqueue_pwrite(&s->bq_context, new_l1_table_offset, new_l1_table, new_l1_size2);
+    blkqueue_barrier(&s->bq_context);
     if (ret < 0)
         goto fail;
     for(i = 0; i < s->l1_size; i++)
@@ -74,7 +75,8 @@  int qcow2_grow_l1_table(BlockDriverState *bs, int min_size)
     BLKDBG_EVENT(bs->file, BLKDBG_L1_GROW_ACTIVATE_TABLE);
     cpu_to_be32w((uint32_t*)data, new_l1_size);
     cpu_to_be64w((uint64_t*)(data + 4), new_l1_table_offset);
-    ret = bdrv_pwrite_sync(bs->file, offsetof(QCowHeader, l1_size), data,sizeof(data));
+    ret = blkqueue_pwrite(&s->bq_context, offsetof(QCowHeader, l1_size), data, sizeof(data));
+    blkqueue_barrier(&s->bq_context);
     if (ret < 0) {
         goto fail;
     }
@@ -177,8 +179,7 @@  static int l2_load(BlockDriverState *bs, uint64_t l2_offset,
     *l2_table = s->l2_cache + (min_index << s->l2_bits);
 
     BLKDBG_EVENT(bs->file, BLKDBG_L2_LOAD);
-    ret = bdrv_pread(bs->file, l2_offset, *l2_table,
-        s->l2_size * sizeof(uint64_t));
+    ret = blkqueue_pread(&s->bq_context, l2_offset, *l2_table, s->l2_size * sizeof(uint64_t));
     if (ret < 0) {
         return ret;
     }
@@ -207,8 +208,8 @@  static int write_l1_entry(BlockDriverState *bs, int l1_index)
     }
 
     BLKDBG_EVENT(bs->file, BLKDBG_L1_UPDATE);
-    ret = bdrv_pwrite_sync(bs->file, s->l1_table_offset + 8 * l1_start_index,
-        buf, sizeof(buf));
+    ret = blkqueue_pwrite(&s->bq_context, s->l1_table_offset + 8 * l1_start_index, buf, sizeof(buf));
+    blkqueue_barrier(&s->bq_context);
     if (ret < 0) {
         return ret;
     }
@@ -255,16 +256,15 @@  static int l2_allocate(BlockDriverState *bs, int l1_index, uint64_t **table)
     } else {
         /* if there was an old l2 table, read it from the disk */
         BLKDBG_EVENT(bs->file, BLKDBG_L2_ALLOC_COW_READ);
-        ret = bdrv_pread(bs->file, old_l2_offset, l2_table,
-            s->l2_size * sizeof(uint64_t));
+        ret = blkqueue_pread(&s->bq_context, old_l2_offset, l2_table, s->l2_size * sizeof(uint64_t));
         if (ret < 0) {
             goto fail;
         }
     }
     /* write the l2 table to the file */
     BLKDBG_EVENT(bs->file, BLKDBG_L2_ALLOC_WRITE);
-    ret = bdrv_pwrite_sync(bs->file, l2_offset, l2_table,
-        s->l2_size * sizeof(uint64_t));
+    ret = blkqueue_pwrite(&s->bq_context, l2_offset, l2_table, s->l2_size * sizeof(uint64_t));
+    blkqueue_barrier(&s->bq_context);
     if (ret < 0) {
         goto fail;
     }
@@ -378,7 +378,7 @@  static int qcow_read(BlockDriverState *bs, int64_t sector_num,
             memcpy(buf, s->cluster_cache + index_in_cluster * 512, 512 * n);
         } else {
             BLKDBG_EVENT(bs->file, BLKDBG_READ);
-            ret = bdrv_pread(bs->file, cluster_offset + index_in_cluster * 512, buf, n * 512);
+            ret = blkqueue_pread(&s->bq_context, cluster_offset + index_in_cluster * 512, buf, n * 512);
             if (ret != n * 512)
                 return -1;
             if (s->crypt_method) {
@@ -648,6 +648,7 @@  uint64_t qcow2_alloc_compressed_cluster_offset(BlockDriverState *bs,
 static int write_l2_entries(BlockDriverState *bs, uint64_t *l2_table,
     uint64_t l2_offset, int l2_index, int num)
 {
+    BDRVQcowState *s = bs->opaque;
     int l2_start_index = l2_index & ~(L1_ENTRIES_PER_SECTOR - 1);
     int start_offset = (8 * l2_index) & ~511;
     int end_offset = (8 * (l2_index + num) + 511) & ~511;
@@ -655,8 +656,7 @@  static int write_l2_entries(BlockDriverState *bs, uint64_t *l2_table,
     int ret;
 
     BLKDBG_EVENT(bs->file, BLKDBG_L2_UPDATE);
-    ret = bdrv_pwrite(bs->file, l2_offset + start_offset,
-        &l2_table[l2_start_index], len);
+    ret = blkqueue_pwrite(&s->bq_context, l2_offset + start_offset, &l2_table[l2_start_index], len);
     if (ret < 0) {
         return ret;
     }
@@ -723,7 +723,7 @@  int qcow2_alloc_cluster_link_l2(BlockDriverState *bs, QCowL2Meta *m)
      * Also flush bs->file to get the right order for L2 and refcount update.
      */
     if (j != 0) {
-        bdrv_flush(bs->file);
+        blkqueue_barrier(&s->bq_context);
         for (i = 0; i < j; i++) {
             qcow2_free_any_clusters(bs,
                 be64_to_cpu(old_cluster[i]) & ~QCOW_OFLAG_COPIED, 1);
diff --git a/block/qcow2-refcount.c b/block/qcow2-refcount.c
index 4c19e7e..0d21d1f 100644
--- a/block/qcow2-refcount.c
+++ b/block/qcow2-refcount.c
@@ -44,7 +44,7 @@  static int write_refcount_block(BlockDriverState *bs)
     }
 
     BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_UPDATE);
-    if (bdrv_pwrite_sync(bs->file, s->refcount_block_cache_offset,
+    if (blkqueue_pwrite(&s->bq_context, s->refcount_block_cache_offset,
             s->refcount_block_cache, size) < 0)
     {
         return -EIO;
@@ -66,8 +66,7 @@  int qcow2_refcount_init(BlockDriverState *bs)
     s->refcount_table = qemu_malloc(refcount_table_size2);
     if (s->refcount_table_size > 0) {
         BLKDBG_EVENT(bs->file, BLKDBG_REFTABLE_LOAD);
-        ret = bdrv_pread(bs->file, s->refcount_table_offset,
-                         s->refcount_table, refcount_table_size2);
+        ret = bdrv_pread(bs->file, s->refcount_table_offset, s->refcount_table, refcount_table_size2);
         if (ret != refcount_table_size2)
             goto fail;
         for(i = 0; i < s->refcount_table_size; i++)
@@ -100,8 +99,7 @@  static int load_refcount_block(BlockDriverState *bs,
     }
 
     BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_LOAD);
-    ret = bdrv_pread(bs->file, refcount_block_offset, s->refcount_block_cache,
-                     s->cluster_size);
+    ret = blkqueue_pread(&s->bq_context, refcount_block_offset, s->refcount_block_cache, s->cluster_size);
     if (ret < 0) {
         return ret;
     }
@@ -269,8 +267,8 @@  static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
 
     /* Now the new refcount block needs to be written to disk */
     BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_WRITE);
-    ret = bdrv_pwrite_sync(bs->file, new_block, s->refcount_block_cache,
-        s->cluster_size);
+    ret = blkqueue_pwrite(&s->bq_context, new_block, s->refcount_block_cache, s->cluster_size);
+    blkqueue_barrier(&s->bq_context);
     if (ret < 0) {
         goto fail_block;
     }
@@ -279,9 +277,8 @@  static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
     if (refcount_table_index < s->refcount_table_size) {
         uint64_t data64 = cpu_to_be64(new_block);
         BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_HOOKUP);
-        ret = bdrv_pwrite_sync(bs->file,
-            s->refcount_table_offset + refcount_table_index * sizeof(uint64_t),
-            &data64, sizeof(data64));
+        ret = blkqueue_pwrite(&s->bq_context, s->refcount_table_offset + refcount_table_index * sizeof(uint64_t), &data64, sizeof(data64));
+        blkqueue_barrier(&s->bq_context);
         if (ret < 0) {
             goto fail_block;
         }
@@ -359,8 +356,8 @@  static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
 
     /* Write refcount blocks to disk */
     BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_WRITE_BLOCKS);
-    ret = bdrv_pwrite_sync(bs->file, meta_offset, new_blocks,
-        blocks_clusters * s->cluster_size);
+    ret = blkqueue_pwrite(&s->bq_context, meta_offset, new_blocks, blocks_clusters * s->cluster_size);
+    blkqueue_barrier(&s->bq_context);
     qemu_free(new_blocks);
     if (ret < 0) {
         goto fail_table;
@@ -372,8 +369,8 @@  static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
     }
 
     BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_WRITE_TABLE);
-    ret = bdrv_pwrite_sync(bs->file, table_offset, new_table,
-        table_size * sizeof(uint64_t));
+    ret = blkqueue_pwrite(&s->bq_context, table_offset, new_table, table_size * sizeof(uint64_t));
+    blkqueue_barrier(&s->bq_context);
     if (ret < 0) {
         goto fail_table;
     }
@@ -387,8 +384,8 @@  static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
     cpu_to_be64w((uint64_t*)data, table_offset);
     cpu_to_be32w((uint32_t*)(data + 8), table_clusters);
     BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_SWITCH_TABLE);
-    ret = bdrv_pwrite_sync(bs->file, offsetof(QCowHeader, refcount_table_offset),
-        data, sizeof(data));
+    ret = blkqueue_pwrite(&s->bq_context, offsetof(QCowHeader, refcount_table_offset), data, sizeof(data));
+    blkqueue_barrier(&s->bq_context);
     if (ret < 0) {
         goto fail_table;
     }
@@ -444,9 +441,8 @@  static int write_refcount_block_entries(BlockDriverState *bs,
     size = (last_index - first_index) << REFCOUNT_SHIFT;
 
     BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_UPDATE_PART);
-    ret = bdrv_pwrite_sync(bs->file,
-        refcount_block_offset + (first_index << REFCOUNT_SHIFT),
-        &s->refcount_block_cache[first_index], size);
+    ret = blkqueue_pwrite(&s->bq_context, refcount_block_offset + (first_index << REFCOUNT_SHIFT), &s->refcount_block_cache[first_index], size);
+    blkqueue_barrier(&s->bq_context);
     if (ret < 0) {
         return ret;
     }
@@ -763,8 +759,7 @@  int qcow2_update_snapshot_refcount(BlockDriverState *bs,
             l1_table = NULL;
         }
         l1_allocated = 1;
-        if (bdrv_pread(bs->file, l1_table_offset,
-                       l1_table, l1_size2) != l1_size2)
+        if (blkqueue_pread(&s->bq_context, l1_table_offset, l1_table, l1_size2) != l1_size2)
             goto fail;
         for(i = 0;i < l1_size; i++)
             be64_to_cpus(&l1_table[i]);
@@ -783,7 +778,7 @@  int qcow2_update_snapshot_refcount(BlockDriverState *bs,
             old_l2_offset = l2_offset;
             l2_offset &= ~QCOW_OFLAG_COPIED;
             l2_modified = 0;
-            if (bdrv_pread(bs->file, l2_offset, l2_table, l2_size) != l2_size)
+            if (blkqueue_pread(&s->bq_context, l2_offset, l2_table, l2_size) != l2_size)
                 goto fail;
             for(j = 0; j < s->l2_size; j++) {
                 offset = be64_to_cpu(l2_table[j]);
@@ -943,7 +938,7 @@  static int check_refcounts_l2(BlockDriverState *bs, BdrvCheckResult *res,
     l2_size = s->l2_size * sizeof(uint64_t);
     l2_table = qemu_malloc(l2_size);
 
-    if (bdrv_pread(bs->file, l2_offset, l2_table, l2_size) != l2_size)
+    if (blkqueue_pread(&s->bq_context, l2_offset, l2_table, l2_size) != l2_size)
         goto fail;
 
     /* Do the actual checks */
@@ -1038,8 +1033,7 @@  static int check_refcounts_l1(BlockDriverState *bs,
         l1_table = NULL;
     } else {
         l1_table = qemu_malloc(l1_size2);
-        if (bdrv_pread(bs->file, l1_table_offset,
-                       l1_table, l1_size2) != l1_size2)
+        if (blkqueue_pread(&s->bq_context, l1_table_offset, l1_table, l1_size2) != l1_size2)
             goto fail;
         for(i = 0;i < l1_size; i++)
             be64_to_cpus(&l1_table[i]);
diff --git a/block/qcow2.c b/block/qcow2.c
index f2b1b1c..9b1cd78 100644
--- a/block/qcow2.c
+++ b/block/qcow2.c
@@ -237,6 +237,10 @@  static int qcow_open(BlockDriverState *bs, int flags)
     if (qcow2_read_snapshots(bs) < 0)
         goto fail;
 
+    /* Block queue */
+    s->bq = blkqueue_create(bs->file);
+    blkqueue_init_context(&s->bq_context, s->bq);
+
 #ifdef DEBUG_ALLOC
     qcow2_check_refcounts(bs);
 #endif
@@ -494,6 +498,7 @@  static QCowAIOCB *qcow_aio_setup(BlockDriverState *bs,
         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
         BlockDriverCompletionFunc *cb, void *opaque, int is_write)
 {
+    BDRVQcowState *s = bs->opaque;
     QCowAIOCB *acb;
 
     acb = qemu_aio_get(&qcow_aio_pool, bs, cb, opaque);
@@ -514,6 +519,10 @@  static QCowAIOCB *qcow_aio_setup(BlockDriverState *bs,
     acb->cluster_offset = 0;
     acb->l2meta.nb_clusters = 0;
     QLIST_INIT(&acb->l2meta.dependent_requests);
+
+    /* TODO Push the context into l2meta */
+    blkqueue_init_context(&s->bq_context, s->bq);
+
     return acb;
 }
 
@@ -667,6 +676,7 @@  static void qcow_close(BlockDriverState *bs)
     qemu_free(s->cluster_cache);
     qemu_free(s->cluster_data);
     qcow2_refcount_close(bs);
+    blkqueue_destroy(s->bq);
 }
 
 /*
@@ -1123,12 +1133,16 @@  static int qcow_write_compressed(BlockDriverState *bs, int64_t sector_num,
 
 static void qcow_flush(BlockDriverState *bs)
 {
+    BDRVQcowState *s = bs->opaque;
+    blkqueue_flush(s->bq);
     bdrv_flush(bs->file);
 }
 
 static BlockDriverAIOCB *qcow_aio_flush(BlockDriverState *bs,
          BlockDriverCompletionFunc *cb, void *opaque)
 {
+    BDRVQcowState *s = bs->opaque;
+    blkqueue_flush(s->bq);
     return bdrv_aio_flush(bs->file, cb, opaque);
 }
 
diff --git a/block/qcow2.h b/block/qcow2.h
index 3ff162e..361f1ba 100644
--- a/block/qcow2.h
+++ b/block/qcow2.h
@@ -26,6 +26,7 @@ 
 #define BLOCK_QCOW2_H
 
 #include "aes.h"
+#include "block-queue.h"
 
 //#define DEBUG_ALLOC
 //#define DEBUG_ALLOC2
@@ -108,6 +109,9 @@  typedef struct BDRVQcowState {
     int64_t free_cluster_index;
     int64_t free_byte_offset;
 
+    BlockQueue *bq;
+    BlockQueueContext bq_context;
+
     uint32_t crypt_method; /* current crypt method, 0 if no key yet */
     uint32_t crypt_method_header;
     AES_KEY aes_encrypt_key;
diff --git a/qemu-thread.c b/qemu-thread.c
index fbc78fe..abcb7a6 100644
--- a/qemu-thread.c
+++ b/qemu-thread.c
@@ -167,6 +167,19 @@  void qemu_thread_create(QemuThread *thread,
     pthread_sigmask(SIG_SETMASK, &oldset, NULL);
 }
 
+void *qemu_thread_join(QemuThread *thread)
+{
+    int err;
+    void *ret;
+
+    err = pthread_join(thread->thread, &ret);
+    if (err) {
+        error_exit(err, __func__);
+    }
+
+    return ret;
+}
+
 void qemu_thread_signal(QemuThread *thread, int sig)
 {
     int err;
diff --git a/qemu-thread.h b/qemu-thread.h
index 19bb30c..2b6f218 100644
--- a/qemu-thread.h
+++ b/qemu-thread.h
@@ -36,6 +36,7 @@  int qemu_cond_timedwait(QemuCond *cond, QemuMutex *mutex, uint64_t msecs);
 void qemu_thread_create(QemuThread *thread,
                        void *(*start_routine)(void*),
                        void *arg);
+void *qemu_thread_join(QemuThread *thread);
 void qemu_thread_signal(QemuThread *thread, int sig);
 void qemu_thread_self(QemuThread *thread);
 int qemu_thread_equal(QemuThread *thread1, QemuThread *thread2);