Patchwork [v5,2/4] block: add the block queue support

login
register
mail settings
Submitter Zhi Yong Wu
Date Aug. 9, 2011, 4:17 a.m.
Message ID <1312863472-6901-3-git-send-email-wuzhy@linux.vnet.ibm.com>
Download mbox | patch
Permalink /patch/109124/
State New
Headers show

Comments

Zhi Yong Wu - Aug. 9, 2011, 4:17 a.m.
The patch introduce one block queue for QEMU block layer.

Signed-off-by: Zhi Yong Wu <wuzhy@linux.vnet.ibm.com>
---
 block/blk-queue.c |  141 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 block/blk-queue.h |   73 +++++++++++++++++++++++++++
 2 files changed, 214 insertions(+), 0 deletions(-)
 create mode 100644 block/blk-queue.c
 create mode 100644 block/blk-queue.h
Ram Pai - Aug. 9, 2011, 8:46 a.m.
On Tue, Aug 09, 2011 at 12:17:50PM +0800, Zhi Yong Wu wrote:
> The patch introduce one block queue for QEMU block layer.
> 
> Signed-off-by: Zhi Yong Wu <wuzhy@linux.vnet.ibm.com>
> ---
>  block/blk-queue.c |  141 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>  block/blk-queue.h |   73 +++++++++++++++++++++++++++
>  2 files changed, 214 insertions(+), 0 deletions(-)
>  create mode 100644 block/blk-queue.c
>  create mode 100644 block/blk-queue.h
> 
> diff --git a/block/blk-queue.c b/block/blk-queue.c
> new file mode 100644
> index 0000000..f36f3e2
> --- /dev/null
> +++ b/block/blk-queue.c
> @@ -0,0 +1,141 @@
> +/*
> + * QEMU System Emulator queue definition for block layer
> + *
> + * Copyright (c) 2011 Zhi Yong Wu  <zwu.kernel@gmail.com>
> + *
> + * Permission is hereby granted, free of charge, to any person obtaining a copy
> + * of this software and associated documentation files (the "Software"), to deal
> + * in the Software without restriction, including without limitation the rights
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
> + * copies of the Software, and to permit persons to whom the Software is
> + * furnished to do so, subject to the following conditions:
> + *
> + * The above copyright notice and this permission notice shall be included in
> + * all copies or substantial portions of the Software.
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
> + * THE SOFTWARE.
> + */
> +
> +#include "block_int.h"
> +#include "qemu-queue.h"
> +#include "block/blk-queue.h"
> +
> +/* The APIs for block request queue on qemu block layer.
> + */
> +
> +static void qemu_block_queue_cancel(BlockDriverAIOCB *acb)
> +{
> +    qemu_aio_release(acb);
> +}
> +
> +static AIOPool block_queue_pool = {
> +    .aiocb_size         = sizeof(struct BlockDriverAIOCB),
> +    .cancel             = qemu_block_queue_cancel,
> +};
> +
> +static void qemu_block_queue_callback(void *opaque, int ret)
> +{
> +    BlockDriverAIOCB *acb = opaque;
> +
> +    qemu_aio_release(acb);
> +}
> +
> +BlockQueue *qemu_new_block_queue(void)
> +{
> +    BlockQueue *queue;
> +
> +    queue = qemu_mallocz(sizeof(BlockQueue));
> +
> +    QTAILQ_INIT(&queue->requests);
> +
> +    return queue;
> +}
> +
> +void qemu_del_block_queue(BlockQueue *queue)
> +{
> +    BlockIORequest *request, *next;
> +
> +    QTAILQ_FOREACH_SAFE(request, &queue->requests, entry, next) {
> +        QTAILQ_REMOVE(&queue->requests, request, entry);
> +        qemu_free(request);
> +    }
> +
> +    qemu_free(queue);
> +}
> +
> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
> +                        BlockDriverState *bs,
> +                        BlockRequestHandler *handler,
> +                        int64_t sector_num,
> +                        QEMUIOVector *qiov,
> +                        int nb_sectors,
> +                        BlockDriverCompletionFunc *cb,
> +                        void *opaque)
> +{
> +    BlockIORequest *request;
> +    BlockDriverAIOCB *acb;
> +
> +    request = qemu_malloc(sizeof(BlockIORequest));
> +    request->bs = bs;
> +    request->handler = handler;
> +    request->sector_num = sector_num;
> +    request->qiov = qiov;
> +    request->nb_sectors = nb_sectors;
> +    request->cb = cb;
> +    request->opaque = opaque;
> +
> +    QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
> +
> +    acb = qemu_aio_get(&block_queue_pool, bs,
> +                       qemu_block_queue_callback, opaque);
> +
> +    request->acb = acb;
> +
> +    return acb;
> +}
> +
> +int qemu_block_queue_handler(BlockIORequest *request)
> +{
> +    int ret;
> +    BlockDriverAIOCB *res;
> +
> +    /* indicate this req is from block queue */
> +    request->bs->req_from_queue = true;
> +
> +    res = request->handler(request->bs, request->sector_num,
> +                           request->qiov, request->nb_sectors,
> +                           request->cb, request->opaque);


should'nt you return with a failure here if res == NULL?
Otherwise  qemu_block_queue_callback() gets called which 
will release the acb.


> +
> +    if (request->acb) {
> +        qemu_block_queue_callback(request->acb, 0);
> +    }
> +
> +    ret = (res == NULL) ? 0 : 1;
> +
> +    return ret;
> +}


RP
Zhiyong Wu - Aug. 9, 2011, 8:53 a.m.
On Tue, Aug 9, 2011 at 4:46 PM, Ram Pai <linuxram@us.ibm.com> wrote:
> On Tue, Aug 09, 2011 at 12:17:50PM +0800, Zhi Yong Wu wrote:
>> The patch introduce one block queue for QEMU block layer.
>>
>> Signed-off-by: Zhi Yong Wu <wuzhy@linux.vnet.ibm.com>
>> ---
>>  block/blk-queue.c |  141 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>>  block/blk-queue.h |   73 +++++++++++++++++++++++++++
>>  2 files changed, 214 insertions(+), 0 deletions(-)
>>  create mode 100644 block/blk-queue.c
>>  create mode 100644 block/blk-queue.h
>>
>> diff --git a/block/blk-queue.c b/block/blk-queue.c
>> new file mode 100644
>> index 0000000..f36f3e2
>> --- /dev/null
>> +++ b/block/blk-queue.c
>> @@ -0,0 +1,141 @@
>> +/*
>> + * QEMU System Emulator queue definition for block layer
>> + *
>> + * Copyright (c) 2011 Zhi Yong Wu  <zwu.kernel@gmail.com>
>> + *
>> + * Permission is hereby granted, free of charge, to any person obtaining a copy
>> + * of this software and associated documentation files (the "Software"), to deal
>> + * in the Software without restriction, including without limitation the rights
>> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
>> + * copies of the Software, and to permit persons to whom the Software is
>> + * furnished to do so, subject to the following conditions:
>> + *
>> + * The above copyright notice and this permission notice shall be included in
>> + * all copies or substantial portions of the Software.
>> + *
>> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
>> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
>> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
>> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
>> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
>> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
>> + * THE SOFTWARE.
>> + */
>> +
>> +#include "block_int.h"
>> +#include "qemu-queue.h"
>> +#include "block/blk-queue.h"
>> +
>> +/* The APIs for block request queue on qemu block layer.
>> + */
>> +
>> +static void qemu_block_queue_cancel(BlockDriverAIOCB *acb)
>> +{
>> +    qemu_aio_release(acb);
>> +}
>> +
>> +static AIOPool block_queue_pool = {
>> +    .aiocb_size         = sizeof(struct BlockDriverAIOCB),
>> +    .cancel             = qemu_block_queue_cancel,
>> +};
>> +
>> +static void qemu_block_queue_callback(void *opaque, int ret)
>> +{
>> +    BlockDriverAIOCB *acb = opaque;
>> +
>> +    qemu_aio_release(acb);
>> +}
>> +
>> +BlockQueue *qemu_new_block_queue(void)
>> +{
>> +    BlockQueue *queue;
>> +
>> +    queue = qemu_mallocz(sizeof(BlockQueue));
>> +
>> +    QTAILQ_INIT(&queue->requests);
>> +
>> +    return queue;
>> +}
>> +
>> +void qemu_del_block_queue(BlockQueue *queue)
>> +{
>> +    BlockIORequest *request, *next;
>> +
>> +    QTAILQ_FOREACH_SAFE(request, &queue->requests, entry, next) {
>> +        QTAILQ_REMOVE(&queue->requests, request, entry);
>> +        qemu_free(request);
>> +    }
>> +
>> +    qemu_free(queue);
>> +}
>> +
>> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
>> +                        BlockDriverState *bs,
>> +                        BlockRequestHandler *handler,
>> +                        int64_t sector_num,
>> +                        QEMUIOVector *qiov,
>> +                        int nb_sectors,
>> +                        BlockDriverCompletionFunc *cb,
>> +                        void *opaque)
>> +{
>> +    BlockIORequest *request;
>> +    BlockDriverAIOCB *acb;
>> +
>> +    request = qemu_malloc(sizeof(BlockIORequest));
>> +    request->bs = bs;
>> +    request->handler = handler;
>> +    request->sector_num = sector_num;
>> +    request->qiov = qiov;
>> +    request->nb_sectors = nb_sectors;
>> +    request->cb = cb;
>> +    request->opaque = opaque;
>> +
>> +    QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
>> +
>> +    acb = qemu_aio_get(&block_queue_pool, bs,
>> +                       qemu_block_queue_callback, opaque);
>> +
>> +    request->acb = acb;
>> +
>> +    return acb;
>> +}
>> +
>> +int qemu_block_queue_handler(BlockIORequest *request)
>> +{
>> +    int ret;
>> +    BlockDriverAIOCB *res;
>> +
>> +    /* indicate this req is from block queue */
>> +    request->bs->req_from_queue = true;
>> +
>> +    res = request->handler(request->bs, request->sector_num,
>> +                           request->qiov, request->nb_sectors,
>> +                           request->cb, request->opaque);
>
>
> should'nt you return with a failure here if res == NULL?
> Otherwise  qemu_block_queue_callback() gets called which
> will release the acb.
Thanks, Pai. When req handler fails, acb should not be released. A
condition determination should be done before callback function is
invoked. It should be like as below:
    res = request->handler(request->bs, request->sector_num,
                           request->qiov, request->nb_sectors,
                           request->cb, request->opaque);

    ret = (res == NULL) ? 0 : 1;

    if ( ret && request->acb) {
        qemu_block_queue_callback(request->acb, 0);
    }

>
>
>> +
>> +    if (request->acb) {
>> +        qemu_block_queue_callback(request->acb, 0);
>> +    }
>> +
>> +    ret = (res == NULL) ? 0 : 1;
>> +
>> +    return ret;
>> +}
>
>
> RP
>
>
Stefan Hajnoczi - Aug. 9, 2011, 12:49 p.m.
On Tue, Aug 9, 2011 at 5:17 AM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
> +/* The APIs for block request queue on qemu block layer.
> + */
> +
> +static void qemu_block_queue_cancel(BlockDriverAIOCB *acb)
> +{
> +    qemu_aio_release(acb);
> +}
> +
> +static AIOPool block_queue_pool = {
> +    .aiocb_size         = sizeof(struct BlockDriverAIOCB),
> +    .cancel             = qemu_block_queue_cancel,
> +};

The lifecycle of block_queue_pool acbs should be like this:

When a request is queued we need a BlockQueue acb because we have no
real acb yet.  So we get an acb from block_queue_pool.

If the acb is cancelled before being dispatched we need to release the
acb and remove the request from the queue.  (This patch does not
remove the request from the queue on cancel.)

When the acb is dispatched we need to keep track of the real acb (e.g.
from qcow2).  The caller will only ever see our acb because there is
no way to give them the pointer to the new acb after dispatch.  That
means we need to keep the our acb alive for the entire lifetime of the
request.  (This patch currently releases our acb when the request is
dispatched.)

If the acb is cancelled after being dispatched we need to first cancel
the real acb and then release our acb.

When the acb is dispatched we need to pass qemu_block_queue_callback
as the cb function to handler.  Inside qemu_block_queue_callback we
invoke the request cb and then release our acb.  This is how our acb
should be freed.

> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
> +                        BlockDriverState *bs,
> +                        BlockRequestHandler *handler,
> +                        int64_t sector_num,
> +                        QEMUIOVector *qiov,
> +                        int nb_sectors,
> +                        BlockDriverCompletionFunc *cb,
> +                        void *opaque)
> +{
> +    BlockIORequest *request;
> +    BlockDriverAIOCB *acb;
> +
> +    request = qemu_malloc(sizeof(BlockIORequest));
> +    request->bs = bs;
> +    request->handler = handler;
> +    request->sector_num = sector_num;
> +    request->qiov = qiov;
> +    request->nb_sectors = nb_sectors;
> +    request->cb = cb;
> +    request->opaque = opaque;
> +
> +    QTAILQ_INSERT_TAIL(&queue->requests, request, entry);

It would be simpler to define BlockQueueAIOCB and using it as our acb
instead of managing an extra BlockIORequest structure.  That way you
don't need to worry about extra mallocs and frees.

> +
> +    acb = qemu_aio_get(&block_queue_pool, bs,
> +                       qemu_block_queue_callback, opaque);
> +
> +    request->acb = acb;
> +
> +    return acb;
> +}
> +
> +int qemu_block_queue_handler(BlockIORequest *request)

This function can be static.

Stefan
Zhiyong Wu - Aug. 10, 2011, 5:54 a.m.
On Tue, Aug 9, 2011 at 8:49 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
> On Tue, Aug 9, 2011 at 5:17 AM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
>> +/* The APIs for block request queue on qemu block layer.
>> + */
>> +
>> +static void qemu_block_queue_cancel(BlockDriverAIOCB *acb)
>> +{
>> +    qemu_aio_release(acb);
>> +}
>> +
>> +static AIOPool block_queue_pool = {
>> +    .aiocb_size         = sizeof(struct BlockDriverAIOCB),
>> +    .cancel             = qemu_block_queue_cancel,
>> +};
>
> The lifecycle of block_queue_pool acbs should be like this:
>
> When a request is queued we need a BlockQueue acb because we have no
> real acb yet.  So we get an acb from block_queue_pool.
>
> If the acb is cancelled before being dispatched we need to release the
> acb and remove the request from the queue.  (This patch does not
> remove the request from the queue on cancel.)
>
> When the acb is dispatched we need to keep track of the real acb (e.g.
> from qcow2).  The caller will only ever see our acb because there is
> no way to give them the pointer to the new acb after dispatch.  That
> means we need to keep the our acb alive for the entire lifetime of the
> request.  (This patch currently releases our acb when the request is
> dispatched.)
>
> If the acb is cancelled after being dispatched we need to first cancel
> the real acb and then release our acb.
>
> When the acb is dispatched we need to pass qemu_block_queue_callback
> as the cb function to handler.  Inside qemu_block_queue_callback we
> invoke the request cb and then release our acb.  This is how our acb
> should be freed.
To the point, very good.

>
>> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
>> +                        BlockDriverState *bs,
>> +                        BlockRequestHandler *handler,
>> +                        int64_t sector_num,
>> +                        QEMUIOVector *qiov,
>> +                        int nb_sectors,
>> +                        BlockDriverCompletionFunc *cb,
>> +                        void *opaque)
>> +{
>> +    BlockIORequest *request;
>> +    BlockDriverAIOCB *acb;
>> +
>> +    request = qemu_malloc(sizeof(BlockIORequest));
>> +    request->bs = bs;
>> +    request->handler = handler;
>> +    request->sector_num = sector_num;
>> +    request->qiov = qiov;
>> +    request->nb_sectors = nb_sectors;
>> +    request->cb = cb;
>> +    request->opaque = opaque;
>> +
>> +    QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
>
> It would be simpler to define BlockQueueAIOCB and using it as our acb
> instead of managing an extra BlockIORequest structure.  That way you
> don't need to worry about extra mallocs and frees.
Sorry, i don't get what you mean. how to define it? Can you elaborate?
>
>> +
>> +    acb = qemu_aio_get(&block_queue_pool, bs,
>> +                       qemu_block_queue_callback, opaque);
>> +
>> +    request->acb = acb;
>> +
>> +    return acb;
>> +}
>> +
>> +int qemu_block_queue_handler(BlockIORequest *request)
>
> This function can be static.
Sure.
>
> Stefan
>
Stefan Hajnoczi - Aug. 10, 2011, 9:37 a.m.
On Wed, Aug 10, 2011 at 01:54:33PM +0800, Zhi Yong Wu wrote:
> On Tue, Aug 9, 2011 at 8:49 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
> > On Tue, Aug 9, 2011 at 5:17 AM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
> >> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
> >> +                        BlockDriverState *bs,
> >> +                        BlockRequestHandler *handler,
> >> +                        int64_t sector_num,
> >> +                        QEMUIOVector *qiov,
> >> +                        int nb_sectors,
> >> +                        BlockDriverCompletionFunc *cb,
> >> +                        void *opaque)
> >> +{
> >> +    BlockIORequest *request;
> >> +    BlockDriverAIOCB *acb;
> >> +
> >> +    request = qemu_malloc(sizeof(BlockIORequest));
> >> +    request->bs = bs;
> >> +    request->handler = handler;
> >> +    request->sector_num = sector_num;
> >> +    request->qiov = qiov;
> >> +    request->nb_sectors = nb_sectors;
> >> +    request->cb = cb;
> >> +    request->opaque = opaque;
> >> +
> >> +    QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
> >
> > It would be simpler to define BlockQueueAIOCB and using it as our acb
> > instead of managing an extra BlockIORequest structure.  That way you
> > don't need to worry about extra mallocs and frees.
> Sorry, i don't get what you mean. how to define it? Can you elaborate?

BlockDriverAIOCB is designed to be embedded inside a bigger struct.  For
example, QEDAIOCB is a larger struct that contains BlockDriverAIOCB as
its first field:

typedef struct QEDAIOCB {
    BlockDriverAIOCB common;
    ...
} QEDAIOCB;

And the QED AIOPool contains the size of QEDAIOCB so that qemu_aio_get() can
allocate the full QEDAIOCB struct:

static AIOPool qed_aio_pool = {
    .aiocb_size         = sizeof(QEDAIOCB),
    .cancel             = qed_aio_cancel,
};

This allows QED to store per-request state in QEDAIOCB for the lifetime of a
request:

QEDAIOCB *acb = qemu_aio_get(&qed_aio_pool, bs, cb, opaque);

acb->is_write = is_write;
acb->finished = NULL;
acb->qiov = qiov;
...

I suggest creating a BlockQueueAIOCB that contains the fields from
BlockIORequest (which is no longer needed as a separate struct):

typedef struct BlockQueueAIOCB {
    BlockDriverAIOCB common;
    BlockRequestHandler *handler;
    int64_t sector_num;
    QEMUIOVector *qiov;
    int nb_sectors;
    QTAILQ_ENTRY(BlockQueueAIOCB) entry; /* pending request queue */
} BlockQueueAIOCB;

Now you can drop the malloc and simply qemu_aio_get() a new
BlockQueueAIOCB.

Stefan
Zhiyong Wu - Aug. 11, 2011, 4:59 a.m.
On Wed, Aug 10, 2011 at 5:37 PM, Stefan Hajnoczi
<stefanha@linux.vnet.ibm.com> wrote:
> On Wed, Aug 10, 2011 at 01:54:33PM +0800, Zhi Yong Wu wrote:
>> On Tue, Aug 9, 2011 at 8:49 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
>> > On Tue, Aug 9, 2011 at 5:17 AM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
>> >> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
>> >> +                        BlockDriverState *bs,
>> >> +                        BlockRequestHandler *handler,
>> >> +                        int64_t sector_num,
>> >> +                        QEMUIOVector *qiov,
>> >> +                        int nb_sectors,
>> >> +                        BlockDriverCompletionFunc *cb,
>> >> +                        void *opaque)
>> >> +{
>> >> +    BlockIORequest *request;
>> >> +    BlockDriverAIOCB *acb;
>> >> +
>> >> +    request = qemu_malloc(sizeof(BlockIORequest));
>> >> +    request->bs = bs;
>> >> +    request->handler = handler;
>> >> +    request->sector_num = sector_num;
>> >> +    request->qiov = qiov;
>> >> +    request->nb_sectors = nb_sectors;
>> >> +    request->cb = cb;
>> >> +    request->opaque = opaque;
>> >> +
>> >> +    QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
>> >
>> > It would be simpler to define BlockQueueAIOCB and using it as our acb
>> > instead of managing an extra BlockIORequest structure.  That way you
>> > don't need to worry about extra mallocs and frees.
>> Sorry, i don't get what you mean. how to define it? Can you elaborate?
>
> BlockDriverAIOCB is designed to be embedded inside a bigger struct.  For
> example, QEDAIOCB is a larger struct that contains BlockDriverAIOCB as
> its first field:
>
> typedef struct QEDAIOCB {
>    BlockDriverAIOCB common;
>    ...
> } QEDAIOCB;
>
> And the QED AIOPool contains the size of QEDAIOCB so that qemu_aio_get() can
> allocate the full QEDAIOCB struct:
>
> static AIOPool qed_aio_pool = {
>    .aiocb_size         = sizeof(QEDAIOCB),
>    .cancel             = qed_aio_cancel,
> };
>
> This allows QED to store per-request state in QEDAIOCB for the lifetime of a
> request:
>
> QEDAIOCB *acb = qemu_aio_get(&qed_aio_pool, bs, cb, opaque);
>
> acb->is_write = is_write;
> acb->finished = NULL;
> acb->qiov = qiov;
> ...
>
> I suggest creating a BlockQueueAIOCB that contains the fields from
> BlockIORequest (which is no longer needed as a separate struct):
>
> typedef struct BlockQueueAIOCB {
>    BlockDriverAIOCB common;
>    BlockRequestHandler *handler;
>    int64_t sector_num;
>    QEMUIOVector *qiov;
>    int nb_sectors;
>    QTAILQ_ENTRY(BlockQueueAIOCB) entry; /* pending request queue */
> } BlockQueueAIOCB;
>
> Now you can drop the malloc and simply qemu_aio_get() a new
> BlockQueueAIOCB.
Yesterday, i had actually done as this:). thanks.
>
> Stefan
>
Zhiyong Wu - Aug. 11, 2011, 5:36 a.m.
On Wed, Aug 10, 2011 at 5:37 PM, Stefan Hajnoczi
<stefanha@linux.vnet.ibm.com> wrote:
> On Wed, Aug 10, 2011 at 01:54:33PM +0800, Zhi Yong Wu wrote:
>> On Tue, Aug 9, 2011 at 8:49 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
>> > On Tue, Aug 9, 2011 at 5:17 AM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
>> >> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
>> >> +                        BlockDriverState *bs,
>> >> +                        BlockRequestHandler *handler,
>> >> +                        int64_t sector_num,
>> >> +                        QEMUIOVector *qiov,
>> >> +                        int nb_sectors,
>> >> +                        BlockDriverCompletionFunc *cb,
>> >> +                        void *opaque)
>> >> +{
>> >> +    BlockIORequest *request;
>> >> +    BlockDriverAIOCB *acb;
>> >> +
>> >> +    request = qemu_malloc(sizeof(BlockIORequest));
>> >> +    request->bs = bs;
>> >> +    request->handler = handler;
>> >> +    request->sector_num = sector_num;
>> >> +    request->qiov = qiov;
>> >> +    request->nb_sectors = nb_sectors;
>> >> +    request->cb = cb;
>> >> +    request->opaque = opaque;
>> >> +
>> >> +    QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
>> >
>> > It would be simpler to define BlockQueueAIOCB and using it as our acb
>> > instead of managing an extra BlockIORequest structure.  That way you
>> > don't need to worry about extra mallocs and frees.
>> Sorry, i don't get what you mean. how to define it? Can you elaborate?
>
> BlockDriverAIOCB is designed to be embedded inside a bigger struct.  For
> example, QEDAIOCB is a larger struct that contains BlockDriverAIOCB as
> its first field:
>
> typedef struct QEDAIOCB {
>    BlockDriverAIOCB common;
>    ...
> } QEDAIOCB;
>
> And the QED AIOPool contains the size of QEDAIOCB so that qemu_aio_get() can
> allocate the full QEDAIOCB struct:
>
> static AIOPool qed_aio_pool = {
>    .aiocb_size         = sizeof(QEDAIOCB),
>    .cancel             = qed_aio_cancel,
> };
>
> This allows QED to store per-request state in QEDAIOCB for the lifetime of a
> request:
>
> QEDAIOCB *acb = qemu_aio_get(&qed_aio_pool, bs, cb, opaque);
>
> acb->is_write = is_write;
> acb->finished = NULL;
> acb->qiov = qiov;
> ...
>
> I suggest creating a BlockQueueAIOCB that contains the fields from
> BlockIORequest (which is no longer needed as a separate struct):
>
> typedef struct BlockQueueAIOCB {
>    BlockDriverAIOCB common;
>    BlockRequestHandler *handler;
>    int64_t sector_num;
>    QEMUIOVector *qiov;
>    int nb_sectors;
>    QTAILQ_ENTRY(BlockQueueAIOCB) entry; /* pending request queue */
> } BlockQueueAIOCB;
Its name is a bit confusing. One ACB is inserted into block queue.
>
> Now you can drop the malloc and simply qemu_aio_get() a new
> BlockQueueAIOCB.
>
> Stefan
>
Zhiyong Wu - Aug. 12, 2011, 4:40 a.m.
On Wed, Aug 10, 2011 at 5:37 PM, Stefan Hajnoczi
<stefanha@linux.vnet.ibm.com> wrote:
> On Wed, Aug 10, 2011 at 01:54:33PM +0800, Zhi Yong Wu wrote:
>> On Tue, Aug 9, 2011 at 8:49 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
>> > On Tue, Aug 9, 2011 at 5:17 AM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
>> >> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
>> >> +                        BlockDriverState *bs,
>> >> +                        BlockRequestHandler *handler,
>> >> +                        int64_t sector_num,
>> >> +                        QEMUIOVector *qiov,
>> >> +                        int nb_sectors,
>> >> +                        BlockDriverCompletionFunc *cb,
>> >> +                        void *opaque)
>> >> +{
>> >> +    BlockIORequest *request;
>> >> +    BlockDriverAIOCB *acb;
>> >> +
>> >> +    request = qemu_malloc(sizeof(BlockIORequest));
>> >> +    request->bs = bs;
>> >> +    request->handler = handler;
>> >> +    request->sector_num = sector_num;
>> >> +    request->qiov = qiov;
>> >> +    request->nb_sectors = nb_sectors;
>> >> +    request->cb = cb;
>> >> +    request->opaque = opaque;
>> >> +
>> >> +    QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
>> >
>> > It would be simpler to define BlockQueueAIOCB and using it as our acb
>> > instead of managing an extra BlockIORequest structure.  That way you
>> > don't need to worry about extra mallocs and frees.
>> Sorry, i don't get what you mean. how to define it? Can you elaborate?
>
> BlockDriverAIOCB is designed to be embedded inside a bigger struct.  For
> example, QEDAIOCB is a larger struct that contains BlockDriverAIOCB as
> its first field:
>
> typedef struct QEDAIOCB {
>    BlockDriverAIOCB common;
>    ...
> } QEDAIOCB;
>
> And the QED AIOPool contains the size of QEDAIOCB so that qemu_aio_get() can
> allocate the full QEDAIOCB struct:
>
> static AIOPool qed_aio_pool = {
>    .aiocb_size         = sizeof(QEDAIOCB),
>    .cancel             = qed_aio_cancel,
> };
>
> This allows QED to store per-request state in QEDAIOCB for the lifetime of a
> request:
>
> QEDAIOCB *acb = qemu_aio_get(&qed_aio_pool, bs, cb, opaque);
>
> acb->is_write = is_write;
> acb->finished = NULL;
> acb->qiov = qiov;
> ...
>
> I suggest creating a BlockQueueAIOCB that contains the fields from
> BlockIORequest (which is no longer needed as a separate struct):
>
> typedef struct BlockQueueAIOCB {
>    BlockDriverAIOCB common;
>    BlockRequestHandler *handler;
>    int64_t sector_num;
>    QEMUIOVector *qiov;
>    int nb_sectors;
>    QTAILQ_ENTRY(BlockQueueAIOCB) entry; /* pending request queue */
> } BlockQueueAIOCB;
>
> Now you can drop the malloc and simply qemu_aio_get() a new
> BlockQueueAIOCB.
Yesterday, i tried this. To be honest, i do not like this solution.:).
It results in the code logic to be a bit messy, not clear.
A AIOCB struct is treated as a block request and enqueued into block
queue. It is a bit weird. So i would like to retain BlockIORequest and
define BlockQueueAIOCB pool. This struct is used to store some useful
info:
struct BlockQueueAIOCB {
    BlockDriverAIOCB common;
    bool dispatched;
    BlockDriverAIOCB *real_acb;
    BlockIORequest *request;
};

What do you think of this?

>
> Stefan
>
Stefan Hajnoczi - Aug. 12, 2011, 4:50 a.m.
On Fri, Aug 12, 2011 at 5:40 AM, Zhi Yong Wu <zwu.kernel@gmail.com> wrote:
> On Wed, Aug 10, 2011 at 5:37 PM, Stefan Hajnoczi
> <stefanha@linux.vnet.ibm.com> wrote:
>> On Wed, Aug 10, 2011 at 01:54:33PM +0800, Zhi Yong Wu wrote:
>>> On Tue, Aug 9, 2011 at 8:49 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
>>> > On Tue, Aug 9, 2011 at 5:17 AM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
>>> >> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
>>> >> +                        BlockDriverState *bs,
>>> >> +                        BlockRequestHandler *handler,
>>> >> +                        int64_t sector_num,
>>> >> +                        QEMUIOVector *qiov,
>>> >> +                        int nb_sectors,
>>> >> +                        BlockDriverCompletionFunc *cb,
>>> >> +                        void *opaque)
>>> >> +{
>>> >> +    BlockIORequest *request;
>>> >> +    BlockDriverAIOCB *acb;
>>> >> +
>>> >> +    request = qemu_malloc(sizeof(BlockIORequest));
>>> >> +    request->bs = bs;
>>> >> +    request->handler = handler;
>>> >> +    request->sector_num = sector_num;
>>> >> +    request->qiov = qiov;
>>> >> +    request->nb_sectors = nb_sectors;
>>> >> +    request->cb = cb;
>>> >> +    request->opaque = opaque;
>>> >> +
>>> >> +    QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
>>> >
>>> > It would be simpler to define BlockQueueAIOCB and using it as our acb
>>> > instead of managing an extra BlockIORequest structure.  That way you
>>> > don't need to worry about extra mallocs and frees.
>>> Sorry, i don't get what you mean. how to define it? Can you elaborate?
>>
>> BlockDriverAIOCB is designed to be embedded inside a bigger struct.  For
>> example, QEDAIOCB is a larger struct that contains BlockDriverAIOCB as
>> its first field:
>>
>> typedef struct QEDAIOCB {
>>    BlockDriverAIOCB common;
>>    ...
>> } QEDAIOCB;
>>
>> And the QED AIOPool contains the size of QEDAIOCB so that qemu_aio_get() can
>> allocate the full QEDAIOCB struct:
>>
>> static AIOPool qed_aio_pool = {
>>    .aiocb_size         = sizeof(QEDAIOCB),
>>    .cancel             = qed_aio_cancel,
>> };
>>
>> This allows QED to store per-request state in QEDAIOCB for the lifetime of a
>> request:
>>
>> QEDAIOCB *acb = qemu_aio_get(&qed_aio_pool, bs, cb, opaque);
>>
>> acb->is_write = is_write;
>> acb->finished = NULL;
>> acb->qiov = qiov;
>> ...
>>
>> I suggest creating a BlockQueueAIOCB that contains the fields from
>> BlockIORequest (which is no longer needed as a separate struct):
>>
>> typedef struct BlockQueueAIOCB {
>>    BlockDriverAIOCB common;
>>    BlockRequestHandler *handler;
>>    int64_t sector_num;
>>    QEMUIOVector *qiov;
>>    int nb_sectors;
>>    QTAILQ_ENTRY(BlockQueueAIOCB) entry; /* pending request queue */
>> } BlockQueueAIOCB;
>>
>> Now you can drop the malloc and simply qemu_aio_get() a new
>> BlockQueueAIOCB.
> Yesterday, i tried this. To be honest, i do not like this solution.:).
> It results in the code logic to be a bit messy, not clear.
> A AIOCB struct is treated as a block request and enqueued into block
> queue. It is a bit weird. So i would like to retain BlockIORequest and
> define BlockQueueAIOCB pool. This struct is used to store some useful
> info:
> struct BlockQueueAIOCB {
>    BlockDriverAIOCB common;
>    bool dispatched;
>    BlockDriverAIOCB *real_acb;
>    BlockIORequest *request;
> };
>
> What do you think of this?

Try it out and let's see the patch :).

Stefan
Zhiyong Wu - Aug. 12, 2011, 8:10 a.m.
On Tue, Aug 9, 2011 at 8:49 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
> On Tue, Aug 9, 2011 at 5:17 AM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
>> +/* The APIs for block request queue on qemu block layer.
>> + */
>> +
>> +static void qemu_block_queue_cancel(BlockDriverAIOCB *acb)
>> +{
>> +    qemu_aio_release(acb);
>> +}
>> +
>> +static AIOPool block_queue_pool = {
>> +    .aiocb_size         = sizeof(struct BlockDriverAIOCB),
>> +    .cancel             = qemu_block_queue_cancel,
>> +};
>
> The lifecycle of block_queue_pool acbs should be like this:
>
> When a request is queued we need a BlockQueue acb because we have no
> real acb yet.  So we get an acb from block_queue_pool.
>
> If the acb is cancelled before being dispatched we need to release the
> acb and remove the request from the queue.  (This patch does not
> remove the request from the queue on cancel.)
>
> When the acb is dispatched we need to keep track of the real acb (e.g.
> from qcow2).  The caller will only ever see our acb because there is
> no way to give them the pointer to the new acb after dispatch.  That
> means we need to keep the our acb alive for the entire lifetime of the
> request.  (This patch currently releases our acb when the request is
> dispatched.)
>
> If the acb is cancelled after being dispatched we need to first cancel
> the real acb and then release our acb.
>
> When the acb is dispatched we need to pass qemu_block_queue_callback
> as the cb function to handler.  Inside qemu_block_queue_callback we
When one block request have been reenqueued multiple times, it will be
difficult to store original cb function.

> invoke the request cb and then release our acb.  This is how our acb
> should be freed.
>
>> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
>> +                        BlockDriverState *bs,
>> +                        BlockRequestHandler *handler,
>> +                        int64_t sector_num,
>> +                        QEMUIOVector *qiov,
>> +                        int nb_sectors,
>> +                        BlockDriverCompletionFunc *cb,
>> +                        void *opaque)
>> +{
>> +    BlockIORequest *request;
>> +    BlockDriverAIOCB *acb;
>> +
>> +    request = qemu_malloc(sizeof(BlockIORequest));
>> +    request->bs = bs;
>> +    request->handler = handler;
>> +    request->sector_num = sector_num;
>> +    request->qiov = qiov;
>> +    request->nb_sectors = nb_sectors;
>> +    request->cb = cb;
>> +    request->opaque = opaque;
>> +
>> +    QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
>
> It would be simpler to define BlockQueueAIOCB and using it as our acb
> instead of managing an extra BlockIORequest structure.  That way you
> don't need to worry about extra mallocs and frees.
>
>> +
>> +    acb = qemu_aio_get(&block_queue_pool, bs,
>> +                       qemu_block_queue_callback, opaque);
>> +
>> +    request->acb = acb;
>> +
>> +    return acb;
>> +}
>> +
>> +int qemu_block_queue_handler(BlockIORequest *request)
>
> This function can be static.
>
> Stefan
>
Stefan Hajnoczi - Aug. 12, 2011, 8:42 a.m.
On Fri, Aug 12, 2011 at 9:10 AM, Zhi Yong Wu <zwu.kernel@gmail.com> wrote:
> On Tue, Aug 9, 2011 at 8:49 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
>> On Tue, Aug 9, 2011 at 5:17 AM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
>>> +/* The APIs for block request queue on qemu block layer.
>>> + */
>>> +
>>> +static void qemu_block_queue_cancel(BlockDriverAIOCB *acb)
>>> +{
>>> +    qemu_aio_release(acb);
>>> +}
>>> +
>>> +static AIOPool block_queue_pool = {
>>> +    .aiocb_size         = sizeof(struct BlockDriverAIOCB),
>>> +    .cancel             = qemu_block_queue_cancel,
>>> +};
>>
>> The lifecycle of block_queue_pool acbs should be like this:
>>
>> When a request is queued we need a BlockQueue acb because we have no
>> real acb yet.  So we get an acb from block_queue_pool.
>>
>> If the acb is cancelled before being dispatched we need to release the
>> acb and remove the request from the queue.  (This patch does not
>> remove the request from the queue on cancel.)
>>
>> When the acb is dispatched we need to keep track of the real acb (e.g.
>> from qcow2).  The caller will only ever see our acb because there is
>> no way to give them the pointer to the new acb after dispatch.  That
>> means we need to keep the our acb alive for the entire lifetime of the
>> request.  (This patch currently releases our acb when the request is
>> dispatched.)
>>
>> If the acb is cancelled after being dispatched we need to first cancel
>> the real acb and then release our acb.
>>
>> When the acb is dispatched we need to pass qemu_block_queue_callback
>> as the cb function to handler.  Inside qemu_block_queue_callback we
> When one block request have been reenqueued multiple times, it will be
> difficult to store original cb function.

The challenge is that blkqueue creates an acb and returns it to the
user when a request is enqueued.  Because the user now has the acb we
must keep this acb around for the lifetime of the request.

The current code will create a new acb if the request is enqueued
again.  We should avoid this.

The code needs to be structured so that a queued acb can be dispatched
directly - without creating a new BlockQueueAIOCB.

Stefan
Zhiyong Wu - Aug. 12, 2011, 9:11 a.m.
On Fri, Aug 12, 2011 at 4:42 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
> On Fri, Aug 12, 2011 at 9:10 AM, Zhi Yong Wu <zwu.kernel@gmail.com> wrote:
>> On Tue, Aug 9, 2011 at 8:49 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
>>> On Tue, Aug 9, 2011 at 5:17 AM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
>>>> +/* The APIs for block request queue on qemu block layer.
>>>> + */
>>>> +
>>>> +static void qemu_block_queue_cancel(BlockDriverAIOCB *acb)
>>>> +{
>>>> +    qemu_aio_release(acb);
>>>> +}
>>>> +
>>>> +static AIOPool block_queue_pool = {
>>>> +    .aiocb_size         = sizeof(struct BlockDriverAIOCB),
>>>> +    .cancel             = qemu_block_queue_cancel,
>>>> +};
>>>
>>> The lifecycle of block_queue_pool acbs should be like this:
>>>
>>> When a request is queued we need a BlockQueue acb because we have no
>>> real acb yet.  So we get an acb from block_queue_pool.
>>>
>>> If the acb is cancelled before being dispatched we need to release the
>>> acb and remove the request from the queue.  (This patch does not
>>> remove the request from the queue on cancel.)
>>>
>>> When the acb is dispatched we need to keep track of the real acb (e.g.
>>> from qcow2).  The caller will only ever see our acb because there is
>>> no way to give them the pointer to the new acb after dispatch.  That
>>> means we need to keep the our acb alive for the entire lifetime of the
>>> request.  (This patch currently releases our acb when the request is
>>> dispatched.)
>>>
>>> If the acb is cancelled after being dispatched we need to first cancel
>>> the real acb and then release our acb.
>>>
>>> When the acb is dispatched we need to pass qemu_block_queue_callback
>>> as the cb function to handler.  Inside qemu_block_queue_callback we
>> When one block request have been reenqueued multiple times, it will be
>> difficult to store original cb function.
>
> The challenge is that blkqueue creates an acb and returns it to the
> user when a request is enqueued.  Because the user now has the acb we
> must keep this acb around for the lifetime of the request.
>
> The current code will create a new acb if the request is enqueued
> again.  We should avoid this.
>
> The code needs to be structured so that a queued acb can be dispatched
> directly - without creating a new BlockQueueAIOCB.
Maybe we can workaround this issue by temp stashing our acb to one
field of block queue.
pls see my public git tree.:)

>
> Stefan
>

Patch

diff --git a/block/blk-queue.c b/block/blk-queue.c
new file mode 100644
index 0000000..f36f3e2
--- /dev/null
+++ b/block/blk-queue.c
@@ -0,0 +1,141 @@ 
+/*
+ * QEMU System Emulator queue definition for block layer
+ *
+ * Copyright (c) 2011 Zhi Yong Wu  <zwu.kernel@gmail.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#include "block_int.h"
+#include "qemu-queue.h"
+#include "block/blk-queue.h"
+
+/* The APIs for block request queue on qemu block layer.
+ */
+
+static void qemu_block_queue_cancel(BlockDriverAIOCB *acb)
+{
+    qemu_aio_release(acb);
+}
+
+static AIOPool block_queue_pool = {
+    .aiocb_size         = sizeof(struct BlockDriverAIOCB),
+    .cancel             = qemu_block_queue_cancel,
+};
+
+static void qemu_block_queue_callback(void *opaque, int ret)
+{
+    BlockDriverAIOCB *acb = opaque;
+
+    qemu_aio_release(acb);
+}
+
+BlockQueue *qemu_new_block_queue(void)
+{
+    BlockQueue *queue;
+
+    queue = qemu_mallocz(sizeof(BlockQueue));
+
+    QTAILQ_INIT(&queue->requests);
+
+    return queue;
+}
+
+void qemu_del_block_queue(BlockQueue *queue)
+{
+    BlockIORequest *request, *next;
+
+    QTAILQ_FOREACH_SAFE(request, &queue->requests, entry, next) {
+        QTAILQ_REMOVE(&queue->requests, request, entry);
+        qemu_free(request);
+    }
+
+    qemu_free(queue);
+}
+
+BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
+                        BlockDriverState *bs,
+                        BlockRequestHandler *handler,
+                        int64_t sector_num,
+                        QEMUIOVector *qiov,
+                        int nb_sectors,
+                        BlockDriverCompletionFunc *cb,
+                        void *opaque)
+{
+    BlockIORequest *request;
+    BlockDriverAIOCB *acb;
+
+    request = qemu_malloc(sizeof(BlockIORequest));
+    request->bs = bs;
+    request->handler = handler;
+    request->sector_num = sector_num;
+    request->qiov = qiov;
+    request->nb_sectors = nb_sectors;
+    request->cb = cb;
+    request->opaque = opaque;
+
+    QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
+
+    acb = qemu_aio_get(&block_queue_pool, bs,
+                       qemu_block_queue_callback, opaque);
+
+    request->acb = acb;
+
+    return acb;
+}
+
+int qemu_block_queue_handler(BlockIORequest *request)
+{
+    int ret;
+    BlockDriverAIOCB *res;
+
+    /* indicate this req is from block queue */
+    request->bs->req_from_queue = true;
+
+    res = request->handler(request->bs, request->sector_num,
+                           request->qiov, request->nb_sectors,
+                           request->cb, request->opaque);
+
+    if (request->acb) {
+        qemu_block_queue_callback(request->acb, 0);
+    }
+
+    ret = (res == NULL) ? 0 : 1;
+
+    return ret;
+}
+
+void qemu_block_queue_flush(BlockQueue *queue)
+{
+    while (!QTAILQ_EMPTY(&queue->requests)) {
+        BlockIORequest *request = NULL;
+        int ret = 0;
+
+        request = QTAILQ_FIRST(&queue->requests);
+        QTAILQ_REMOVE(&queue->requests, request, entry);
+
+        ret = qemu_block_queue_handler(request);
+        if (ret == 0) {
+            QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
+            break;
+        }
+
+        qemu_free(request);
+    }
+}
diff --git a/block/blk-queue.h b/block/blk-queue.h
new file mode 100644
index 0000000..281b679
--- /dev/null
+++ b/block/blk-queue.h
@@ -0,0 +1,73 @@ 
+/*
+ * QEMU System Emulator queue declaration for block layer
+ *
+ * Copyright (c) 2011 Zhi Yong Wu  <zwu.kernel@gmail.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#ifndef QEMU_BLOCK_QUEUE_H
+#define QEMU_BLOCK_QUEUE_H
+
+#include "block.h"
+#include "qemu-queue.h"
+#include "qemu-common.h"
+
+typedef BlockDriverAIOCB* (BlockRequestHandler) (BlockDriverState *bs,
+                                int64_t sector_num, QEMUIOVector *qiov,
+                                int nb_sectors, BlockDriverCompletionFunc *cb,
+                                void *opaque);
+
+struct BlockIORequest {
+    QTAILQ_ENTRY(BlockIORequest) entry;
+    BlockDriverState *bs;
+    BlockRequestHandler *handler;
+    int64_t sector_num;
+    QEMUIOVector *qiov;
+    int nb_sectors;
+    BlockDriverCompletionFunc *cb;
+    void *opaque;
+    BlockDriverAIOCB *acb;
+};
+
+typedef struct BlockIORequest BlockIORequest;
+
+struct BlockQueue {
+    QTAILQ_HEAD(requests, BlockIORequest) requests;
+};
+
+typedef struct BlockQueue BlockQueue;
+
+BlockQueue *qemu_new_block_queue(void);
+
+void qemu_del_block_queue(BlockQueue *queue);
+
+BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
+                        BlockDriverState *bs,
+                        BlockRequestHandler *handler,
+                        int64_t sector_num,
+                        QEMUIOVector *qiov,
+                        int nb_sectors,
+                        BlockDriverCompletionFunc *cb,
+                        void *opaque);
+
+int qemu_block_queue_handler(BlockIORequest *request);
+
+void qemu_block_queue_flush(BlockQueue *queue);
+#endif /* QEMU_BLOCK_QUEUE_H */