diff mbox

[v6,2/4] block: add the block queue support

Message ID 1314877456-19521-3-git-send-email-wuzhy@linux.vnet.ibm.com
State New
Headers show

Commit Message

Zhi Yong Wu Sept. 1, 2011, 11:44 a.m. UTC
Signed-off-by: Zhi Yong Wu <wuzhy@linux.vnet.ibm.com>
---
 Makefile.objs     |    2 +-
 block/blk-queue.c |  226 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 block/blk-queue.h |   63 +++++++++++++++
 3 files changed, 290 insertions(+), 1 deletions(-)
 create mode 100644 block/blk-queue.c
 create mode 100644 block/blk-queue.h

Comments

Stefan Hajnoczi Sept. 1, 2011, 1:02 p.m. UTC | #1
On Thu, Sep 1, 2011 at 12:44 PM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
> +struct BlockIORequest {
> +    QTAILQ_ENTRY(BlockIORequest) entry;
> +    BlockDriverState *bs;
> +    BlockRequestHandler *handler;
> +    int64_t sector_num;
> +    QEMUIOVector *qiov;
> +    int nb_sectors;
> +    BlockDriverCompletionFunc *cb;
> +    void *opaque;
> +    BlockDriverAIOCB *acb;
> +};
> +
> +struct BlockQueue {
> +    QTAILQ_HEAD(requests, BlockIORequest) requests;
> +    BlockIORequest *request;
> +    bool flushing;
> +};
> +
> +struct BlockQueueAIOCB {
> +    BlockDriverAIOCB common;
> +    BlockDriverCompletionFunc *real_cb;
> +    BlockDriverAIOCB *real_acb;
> +    void *opaque;
> +    BlockIORequest *request;
> +};

Now it's pretty easy to merge BlockIORequest and BlockQueueAIOCB into
one struct.  That way we don't need to duplicate fields or link
structures together:

typedef struct BlockQueueAIOCB {
    BlockDriverAIOCB common;
    QTAILQ_ENTRY(BlockQueueAIOCB) entry;          /* held requests */
    BlockRequestHandler *handler;                 /* dispatch function */
    BlockDriverAIOCB *real_acb;                   /* dispatched aiocb */

    /* Request parameters */
    int64_t sector_num;
    QEMUIOVector *qiov;
    int nb_sectors;
} BlockQueueAIOCB;

This struct is kept for the lifetime of a request (both during
queueing and after dispatch).

> +
> +static void qemu_block_queue_dequeue(BlockQueue *queue, BlockIORequest *request)
> +{
> +    BlockIORequest *req;
> +
> +    while (!QTAILQ_EMPTY(&queue->requests)) {
> +        req = QTAILQ_FIRST(&queue->requests);
> +        if (req == request) {
> +            QTAILQ_REMOVE(&queue->requests, req, entry);
> +            break;
> +        }
> +    }
> +}
> +
> +static void qemu_block_queue_cancel(BlockDriverAIOCB *acb)
> +{
> +    BlockQueueAIOCB *blkacb = container_of(acb, BlockQueueAIOCB, common);
> +    if (blkacb->real_acb) {
> +        bdrv_aio_cancel(blkacb->real_acb);
> +    } else {
> +        assert(blkacb->common.bs->block_queue);
> +        qemu_block_queue_dequeue(blkacb->common.bs->block_queue,
> +                                 blkacb->request);

Can we use QTAILQ_REMOVE() and eliminate qemu_block_queue_dequeue()?
If the aiocb exists and is not dispatched (real_acb != NULL) then it
must be queued.

> +    }
> +
> +    qemu_aio_release(blkacb);
> +}
> +
> +static AIOPool block_queue_pool = {
> +    .aiocb_size         = sizeof(struct BlockQueueAIOCB),
> +    .cancel             = qemu_block_queue_cancel,
> +};
> +
> +static void qemu_block_queue_callback(void *opaque, int ret)
> +{
> +    BlockQueueAIOCB *acb = opaque;
> +
> +    if (acb->real_cb) {
> +        acb->real_cb(acb->opaque, ret);
> +    }
> +
> +    qemu_aio_release(acb);
> +}
> +
> +BlockQueue *qemu_new_block_queue(void)
> +{
> +    BlockQueue *queue;
> +
> +    queue = g_malloc0(sizeof(BlockQueue));
> +
> +    QTAILQ_INIT(&queue->requests);
> +
> +    queue->flushing = false;
> +    queue->request  = NULL;
> +
> +    return queue;
> +}
> +
> +void qemu_del_block_queue(BlockQueue *queue)
> +{
> +    BlockIORequest *request, *next;
> +
> +    QTAILQ_FOREACH_SAFE(request, &queue->requests, entry, next) {
> +        QTAILQ_REMOVE(&queue->requests, request, entry);
> +        g_free(request);

What about the acbs?  There needs to be a strategy for cleanly
shutting down completely.  In fact we should probably use cancellation
here or assert that the queue is already empty.

> +    }
> +
> +    g_free(queue);
> +}
> +
> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
> +                        BlockDriverState *bs,
> +                        BlockRequestHandler *handler,
> +                        int64_t sector_num,
> +                        QEMUIOVector *qiov,
> +                        int nb_sectors,
> +                        BlockDriverCompletionFunc *cb,
> +                        void *opaque)
> +{
> +    BlockIORequest *request;
> +    BlockDriverAIOCB *acb;
> +    BlockQueueAIOCB *blkacb;
> +
> +    if (queue->request) {
> +        request             = queue->request;
> +        assert(request->acb);
> +        acb                 = request->acb;
> +        QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
> +        queue->request      = NULL;

I don't think we need this behavior.  There is already requeuing logic
in the flush function: if request->handler() fails then the request is
put back onto the queue and we stop flushing.

So all we need here is:

if (queue->flushing) {
    return NULL;
}

This results in the request->handler() failing (returning NULL) and
the flush function then requeues this request.

In other words, during flushing we do not allow any new requests to be enqueued.

> +    } else {
> +        request             = g_malloc0(sizeof(BlockIORequest));
> +        request->bs         = bs;
> +        request->handler    = handler;
> +        request->sector_num = sector_num;
> +        request->qiov       = qiov;
> +        request->nb_sectors = nb_sectors;
> +        request->cb         = qemu_block_queue_callback;
> +        QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
> +
> +        acb = qemu_aio_get(&block_queue_pool, bs,
> +                           qemu_block_queue_callback, opaque);
> +        blkacb = container_of(acb, BlockQueueAIOCB, common);
> +        blkacb->real_cb     = cb;
> +        blkacb->real_acb    = NULL;
> +        blkacb->opaque      = opaque;
> +        blkacb->request     = request;
> +
> +        request->acb        = acb;
> +        request->opaque = (void *)blkacb;

Here's what this initialization code looks like when BlockIORequest
and BlockQueueAIOCB are merged:

acb = qemu_aio_get(&block_queue_pool, bs, cb, opaque);
acb->handler    = handler;
acb->sector_num = sector_num;
acb->qiov       = qiov;
acb->nb_sectors = nb_sectors;
acb->real_acb   = NULL;
QTAILQ_INSERT_TAIL(&queue->requests, acb, entry);

> +    }
> +
> +    return acb;
> +}
> +
> +static int qemu_block_queue_handler(BlockIORequest *request)
> +{
> +    int ret;
> +    BlockDriverAIOCB *res;
> +
> +    res = request->handler(request->bs, request->sector_num,
> +                           request->qiov, request->nb_sectors,
> +                           request->cb, request->opaque);

Please pass qemu_block_queue_callback and request->acb directly here
instead of using request->cb and request->opaque.  Those fields aren't
needed and just add indirection.

> +    if (res) {
> +        BlockQueueAIOCB *blkacb =
> +                container_of(request->acb, BlockQueueAIOCB, common);
> +        blkacb->real_acb = res;
> +    }
> +
> +    ret = (res == NULL) ? 0 : 1;
> +
> +    return ret;
> +}
> +
> +void qemu_block_queue_flush(BlockQueue *queue)
> +{
> +    queue->flushing = true;
> +    while (!QTAILQ_EMPTY(&queue->requests)) {
> +        BlockIORequest *request = NULL;
> +        int ret = 0;
> +
> +        request = QTAILQ_FIRST(&queue->requests);
> +        QTAILQ_REMOVE(&queue->requests, request, entry);
> +
> +        queue->request = request;
> +        ret = qemu_block_queue_handler(request);
> +
> +        if (ret == 0) {
> +            QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
> +            queue->request = NULL;
> +            break;
> +        }
> +
> +        if (queue->request) {
> +            g_free(request);
> +        }
> +
> +        queue->request = NULL;
> +    }
> +
> +    queue->flushing = false;
> +}
> +
> +bool qemu_block_queue_has_pending(BlockQueue *queue)
> +{
> +    return !queue->flushing && !QTAILQ_EMPTY(&queue->requests);
> +}
> diff --git a/block/blk-queue.h b/block/blk-queue.h
> new file mode 100644
> index 0000000..84c2407
> --- /dev/null
> +++ b/block/blk-queue.h
> @@ -0,0 +1,63 @@
> +/*
> + * QEMU System Emulator queue declaration for block layer
> + *
> + * Copyright (c) IBM, Corp. 2011
> + *
> + * Authors:
> + *  Zhi Yong Wu  <zwu.kernel@gmail.com>
> + *  Stefan Hajnoczi <stefanha@gmail.com>

Please use linux.vnet.ibm.com addresses and not GMail.

> + *
> + * Permission is hereby granted, free of charge, to any person obtaining a copy
> + * of this software and associated documentation files (the "Software"), to deal
> + * in the Software without restriction, including without limitation the rights
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
> + * copies of the Software, and to permit persons to whom the Software is
> + * furnished to do so, subject to the following conditions:
> + *
> + * The above copyright notice and this permission notice shall be included in
> + * all copies or substantial portions of the Software.
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
> + * THE SOFTWARE.
> + */
> +
> +#ifndef QEMU_BLOCK_QUEUE_H
> +#define QEMU_BLOCK_QUEUE_H
> +
> +#include "block.h"
> +#include "qemu-queue.h"
> +
> +typedef BlockDriverAIOCB* (BlockRequestHandler) (BlockDriverState *bs,
> +                                int64_t sector_num, QEMUIOVector *qiov,
> +                                int nb_sectors, BlockDriverCompletionFunc *cb,
> +                                void *opaque);
> +
> +typedef struct BlockIORequest BlockIORequest;

BlockIORequest does not need a forward declaration because only
blk-queue.c uses it.  It's a private type that the rest of QEMU should
not know about.

> +
> +typedef struct BlockQueue BlockQueue;
> +
> +typedef struct BlockQueueAIOCB BlockQueueAIOCB;

This is also a private type, callers only know about BlockDriverAIOCB.
 Please move to blk-queue.c.

Stefan
Zhi Yong Wu Sept. 5, 2011, 8:34 a.m. UTC | #2
On Thu, Sep 01, 2011 at 02:02:41PM +0100, Stefan Hajnoczi wrote:
> 01 Sep 2011 06:02:41 -0700 (PDT)
>Received: by 10.220.200.77 with HTTP; Thu, 1 Sep 2011 06:02:41 -0700 (PDT)
>In-Reply-To: <1314877456-19521-3-git-send-email-wuzhy@linux.vnet.ibm.com>
>References: <1314877456-19521-1-git-send-email-wuzhy@linux.vnet.ibm.com>
> <1314877456-19521-3-git-send-email-wuzhy@linux.vnet.ibm.com>
>Date: Thu, 1 Sep 2011 14:02:41 +0100
>Message-ID: <CAJSP0QXxRTb74w1kkpwAVP2BM=9G7Q3k6_dcbevKHrec21mHww@mail.gmail.com>
>From: Stefan Hajnoczi <stefanha@gmail.com>
>To: Zhi Yong Wu <wuzhy@linux.vnet.ibm.com>
>Content-Type: text/plain; charset=ISO-8859-1
>Content-Transfer-Encoding: quoted-printable
>X-detected-operating-system: by eggs.gnu.org: GNU/Linux 2.6 (newer, 2)
>X-Received-From: 209.85.220.173
>Cc: kwolf@redhat.com, aliguori@us.ibm.com, stefanha@linux.vnet.ibm.com,
> kvm@vger.kernel.org, mtosatti@redhat.com, qemu-devel@nongnu.org,
> pair@us.ibm.com, zwu.kernel@gmail.com, ryanh@us.ibm.com
>Subject: Re: [Qemu-devel] [PATCH v6 2/4] block: add the block queue support
>X-BeenThere: qemu-devel@nongnu.org
>X-Mailman-Version: 2.1.14
>Precedence: list
>List-Id: <qemu-devel.nongnu.org>
>List-Unsubscribe: <https://lists.nongnu.org/mailman/options/qemu-devel>,
> <mailto:qemu-devel-request@nongnu.org?subject=unsubscribe>
>List-Archive: </archive/html/qemu-devel>
>List-Post: <mailto:qemu-devel@nongnu.org>
>List-Help: <mailto:qemu-devel-request@nongnu.org?subject=help>
>List-Subscribe: <https://lists.nongnu.org/mailman/listinfo/qemu-devel>,
> <mailto:qemu-devel-request@nongnu.org?subject=subscribe>
>X-Mailman-Copy: yes
>Errors-To: qemu-devel-bounces+wuzhy=linux.vnet.ibm.com@nongnu.org
>Sender: qemu-devel-bounces+wuzhy=linux.vnet.ibm.com@nongnu.org
>X-Brightmail-Tracker: AAAAAA==
>X-Xagent-From: stefanha@gmail.com
>X-Xagent-To: wuzhy@linux.vnet.ibm.com
>X-Xagent-Gateway: vmsdvma.vnet.ibm.com (XAGENTU4 at VMSDVMA)
>
>On Thu, Sep 1, 2011 at 12:44 PM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
>> +struct BlockIORequest {
>> +    QTAILQ_ENTRY(BlockIORequest) entry;
>> +    BlockDriverState *bs;
>> +    BlockRequestHandler *handler;
>> +    int64_t sector_num;
>> +    QEMUIOVector *qiov;
>> +    int nb_sectors;
>> +    BlockDriverCompletionFunc *cb;
>> +    void *opaque;
>> +    BlockDriverAIOCB *acb;
>> +};
>> +
>> +struct BlockQueue {
>> +    QTAILQ_HEAD(requests, BlockIORequest) requests;
>> +    BlockIORequest *request;
>> +    bool flushing;
>> +};
>> +
>> +struct BlockQueueAIOCB {
>> +    BlockDriverAIOCB common;
>> +    BlockDriverCompletionFunc *real_cb;
>> +    BlockDriverAIOCB *real_acb;
>> +    void *opaque;
>> +    BlockIORequest *request;
>> +};
>
>Now it's pretty easy to merge BlockIORequest and BlockQueueAIOCB into
>one struct.  That way we don't need to duplicate fields or link
>structures together:
Must we merge them? I think that it will cause the logic is not clear than current way. It is weird that some BlockQueueAIOCB are linked to block queue although it reduce the LOC to some extent.
Moreover, those block-queue functions need to be rewritten.

>
>typedef struct BlockQueueAIOCB {
>    BlockDriverAIOCB common;
>    QTAILQ_ENTRY(BlockQueueAIOCB) entry;          /* held requests */
>    BlockRequestHandler *handler;                 /* dispatch function */
>    BlockDriverAIOCB *real_acb;                   /* dispatched aiocb */
>
>    /* Request parameters */
>    int64_t sector_num;
>    QEMUIOVector *qiov;
>    int nb_sectors;
>} BlockQueueAIOCB;
>
>This struct is kept for the lifetime of a request (both during
>queueing and after dispatch).
ditto.
>
>> +
>> +static void qemu_block_queue_dequeue(BlockQueue *queue, BlockIORequest *request)
>> +{
>> +    BlockIORequest *req;
>> +
>> +    while (!QTAILQ_EMPTY(&queue->requests)) {
>> +        req = QTAILQ_FIRST(&queue->requests);
>> +        if (req == request) {
>> +            QTAILQ_REMOVE(&queue->requests, req, entry);
>> +            break;
>> +        }
>> +    }
>> +}
>> +
>> +static void qemu_block_queue_cancel(BlockDriverAIOCB *acb)
>> +{
>> +    BlockQueueAIOCB *blkacb = container_of(acb, BlockQueueAIOCB, common);
>> +    if (blkacb->real_acb) {
>> +        bdrv_aio_cancel(blkacb->real_acb);
>> +    } else {
>> +        assert(blkacb->common.bs->block_queue);
>> +        qemu_block_queue_dequeue(blkacb->common.bs->block_queue,
>> +                                 blkacb->request);
>
>Can we use QTAILQ_REMOVE() and eliminate qemu_block_queue_dequeue()?
why need to replace dequeue function with QTAILQ_REMOVE()?
>If the aiocb exists and is not dispatched (real_acb != NULL) then it
>must be queued.
Can you explain clearlier?
>
>> +    }
>> +
>> +    qemu_aio_release(blkacb);
>> +}
>> +
>> +static AIOPool block_queue_pool = {
>> +    .aiocb_size         = sizeof(struct BlockQueueAIOCB),
>> +    .cancel             = qemu_block_queue_cancel,
>> +};
>> +
>> +static void qemu_block_queue_callback(void *opaque, int ret)
>> +{
>> +    BlockQueueAIOCB *acb = opaque;
>> +
>> +    if (acb->real_cb) {
>> +        acb->real_cb(acb->opaque, ret);
>> +    }
>> +
>> +    qemu_aio_release(acb);
>> +}
>> +
>> +BlockQueue *qemu_new_block_queue(void)
>> +{
>> +    BlockQueue *queue;
>> +
>> +    queue = g_malloc0(sizeof(BlockQueue));
>> +
>> +    QTAILQ_INIT(&queue->requests);
>> +
>> +    queue->flushing = false;
>> +    queue->request  = NULL;
>> +
>> +    return queue;
>> +}
>> +
>> +void qemu_del_block_queue(BlockQueue *queue)
>> +{
>> +    BlockIORequest *request, *next;
>> +
>> +    QTAILQ_FOREACH_SAFE(request, &queue->requests, entry, next) {
>> +        QTAILQ_REMOVE(&queue->requests, request, entry);
>> +        g_free(request);
>
>What about the acbs?  There needs to be a strategy for cleanly
what strategy?
>shutting down completely.  In fact we should probably use cancellation
>here or assert that the queue is already empty.
You mean if the queue has been empty, we need to assert(queue)?
>
>> +    }
>> +
>> +    g_free(queue);
>> +}
>> +
>> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
>> +                        BlockDriverState *bs,
>> +                        BlockRequestHandler *handler,
>> +                        int64_t sector_num,
>> +                        QEMUIOVector *qiov,
>> +                        int nb_sectors,
>> +                        BlockDriverCompletionFunc *cb,
>> +                        void *opaque)
>> +{
>> +    BlockIORequest *request;
>> +    BlockDriverAIOCB *acb;
>> +    BlockQueueAIOCB *blkacb;
>> +
>> +    if (queue->request) {
>> +        request             = queue->request;
>> +        assert(request->acb);
>> +        acb                 = request->acb;
>> +        QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
>> +        queue->request      = NULL;
>
>I don't think we need this behavior.  There is already requeuing logic
>in the flush function: if request->handler() fails then the request is
>put back onto the queue and we stop flushing.
>
>So all we need here is:
>
>if (queue->flushing) {
>    return NULL;
>}
very nice, thanks.
>
>This results in the request->handler() failing (returning NULL) and
>the flush function then requeues this request.
>
>In other words, during flushing we do not allow any new requests to be enqueued.
>
>> +    } else {
>> +        request             = g_malloc0(sizeof(BlockIORequest));
>> +        request->bs         = bs;
>> +        request->handler    = handler;
>> +        request->sector_num = sector_num;
>> +        request->qiov       = qiov;
>> +        request->nb_sectors = nb_sectors;
>> +        request->cb         = qemu_block_queue_callback;
>> +        QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
>> +
>> +        acb = qemu_aio_get(&block_queue_pool, bs,
>> +                           qemu_block_queue_callback, opaque);
>> +        blkacb = container_of(acb, BlockQueueAIOCB, common);
>> +        blkacb->real_cb     = cb;
>> +        blkacb->real_acb    = NULL;
>> +        blkacb->opaque      = opaque;
>> +        blkacb->request     = request;
>> +
>> +        request->acb        = acb;
>> +        request->opaque = (void *)blkacb;
>
>Here's what this initialization code looks like when BlockIORequest
>and BlockQueueAIOCB are merged:
>
>acb = qemu_aio_get(&block_queue_pool, bs, cb, opaque);
>acb->handler    = handler;
>acb->sector_num = sector_num;
>acb->qiov       = qiov;
>acb->nb_sectors = nb_sectors;
>acb->real_acb   = NULL;
>QTAILQ_INSERT_TAIL(&queue->requests, acb, entry);
pls see comment #1.
>
>> +    }
>> +
>> +    return acb;
>> +}
>> +
>> +static int qemu_block_queue_handler(BlockIORequest *request)
>> +{
>> +    int ret;
>> +    BlockDriverAIOCB *res;
>> +
>> +    res = request->handler(request->bs, request->sector_num,
>> +                           request->qiov, request->nb_sectors,
>> +                           request->cb, request->opaque);
>
>Please pass qemu_block_queue_callback and request->acb directly here
>instead of using request->cb and request->opaque.  Those fields aren't
>needed and just add indirection.
If later the other guy want to reuse qemu_block_queue_handler, and use different callback function, then this function can not be used. Your way lower this function's reusability.
>
>> +    if (res) {
>> +        BlockQueueAIOCB *blkacb =
>> +                container_of(request->acb, BlockQueueAIOCB, common);
>> +        blkacb->real_acb = res;
>> +    }
>> +
>> +    ret = (res == NULL) ? 0 : 1;
>> +
>> +    return ret;
>> +}
>> +
>> +void qemu_block_queue_flush(BlockQueue *queue)
>> +{
>> +    queue->flushing = true;
>> +    while (!QTAILQ_EMPTY(&queue->requests)) {
>> +        BlockIORequest *request = NULL;
>> +        int ret = 0;
>> +
>> +        request = QTAILQ_FIRST(&queue->requests);
>> +        QTAILQ_REMOVE(&queue->requests, request, entry);
>> +
>> +        queue->request = request;
>> +        ret = qemu_block_queue_handler(request);
>> +
>> +        if (ret == 0) {
>> +            QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
>> +            queue->request = NULL;
>> +            break;
>> +        }
>> +
>> +        if (queue->request) {
>> +            g_free(request);
>> +        }
>> +
>> +        queue->request = NULL;
>> +    }
>> +
>> +    queue->flushing = false;
>> +}
>> +
>> +bool qemu_block_queue_has_pending(BlockQueue *queue)
>> +{
>> +    return !queue->flushing && !QTAILQ_EMPTY(&queue->requests);
>> +}
>> diff --git a/block/blk-queue.h b/block/blk-queue.h
>> new file mode 100644
>> index 0000000..84c2407
>> --- /dev/null
>> +++ b/block/blk-queue.h
>> @@ -0,0 +1,63 @@
>> +/*
>> + * QEMU System Emulator queue declaration for block layer
>> + *
>> + * Copyright (c) IBM, Corp. 2011
>> + *
>> + * Authors:
>> + *  Zhi Yong Wu  <zwu.kernel@gmail.com>
>> + *  Stefan Hajnoczi <stefanha@gmail.com>
>
>Please use linux.vnet.ibm.com addresses and not GMail.
>
>> + *
>> + * Permission is hereby granted, free of charge, to any person obtaining a copy
>> + * of this software and associated documentation files (the "Software"), to deal
>> + * in the Software without restriction, including without limitation the rights
>> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
>> + * copies of the Software, and to permit persons to whom the Software is
>> + * furnished to do so, subject to the following conditions:
>> + *
>> + * The above copyright notice and this permission notice shall be included in
>> + * all copies or substantial portions of the Software.
>> + *
>> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
>> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
>> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
>> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
>> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
>> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
>> + * THE SOFTWARE.
>> + */
>> +
>> +#ifndef QEMU_BLOCK_QUEUE_H
>> +#define QEMU_BLOCK_QUEUE_H
>> +
>> +#include "block.h"
>> +#include "qemu-queue.h"
>> +
>> +typedef BlockDriverAIOCB* (BlockRequestHandler) (BlockDriverState *bs,
>> +                                int64_t sector_num, QEMUIOVector *qiov,
>> +                                int nb_sectors, BlockDriverCompletionFunc *cb,
>> +                                void *opaque);
>> +
>> +typedef struct BlockIORequest BlockIORequest;
>
>BlockIORequest does not need a forward declaration because only
>blk-queue.c uses it.  It's a private type that the rest of QEMU should
>not know about.
OK.
>
>> +
>> +typedef struct BlockQueue BlockQueue;
>> +
>> +typedef struct BlockQueueAIOCB BlockQueueAIOCB;
>
>This is also a private type, callers only know about BlockDriverAIOCB.
> Please move to blk-queue.c.
ditto
>
>Stefan
>
Stefan Hajnoczi Sept. 7, 2011, 11:13 a.m. UTC | #3
On Mon, Sep 5, 2011 at 9:34 AM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
> On Thu, Sep 01, 2011 at 02:02:41PM +0100, Stefan Hajnoczi wrote:
>> 01 Sep 2011 06:02:41 -0700 (PDT)
>>On Thu, Sep 1, 2011 at 12:44 PM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
>>> +static void qemu_block_queue_dequeue(BlockQueue *queue, BlockIORequest *request)
>>> +{
>>> +    BlockIORequest *req;
>>> +
>>> +    while (!QTAILQ_EMPTY(&queue->requests)) {
>>> +        req = QTAILQ_FIRST(&queue->requests);
>>> +        if (req == request) {
>>> +            QTAILQ_REMOVE(&queue->requests, req, entry);
>>> +            break;
>>> +        }
>>> +    }
>>> +}
>>> +
>>> +static void qemu_block_queue_cancel(BlockDriverAIOCB *acb)
>>> +{
>>> +    BlockQueueAIOCB *blkacb = container_of(acb, BlockQueueAIOCB, common);
>>> +    if (blkacb->real_acb) {
>>> +        bdrv_aio_cancel(blkacb->real_acb);
>>> +    } else {
>>> +        assert(blkacb->common.bs->block_queue);
>>> +        qemu_block_queue_dequeue(blkacb->common.bs->block_queue,
>>> +                                 blkacb->request);
>>
>>Can we use QTAILQ_REMOVE() and eliminate qemu_block_queue_dequeue()?
> why need to replace dequeue function with QTAILQ_REMOVE()?

Because the existing QTAILQ_REMOVE() macro already does what is needed.

>>> +void qemu_del_block_queue(BlockQueue *queue)
>>> +{
>>> +    BlockIORequest *request, *next;
>>> +
>>> +    QTAILQ_FOREACH_SAFE(request, &queue->requests, entry, next) {
>>> +        QTAILQ_REMOVE(&queue->requests, request, entry);
>>> +        g_free(request);
>>
>>What about the acbs?  There needs to be a strategy for cleanly
> what strategy?
>>shutting down completely.  In fact we should probably use cancellation
>>here or assert that the queue is already empty.
> You mean if the queue has been empty, we need to assert(queue)?

This patch silently deletes queued requests, which leaks the
BlockQueueAIOCBs.  The queued requests will never be issued or
completed.

qemu_del_block_queue() must cleanly destroy the queue.  This function
could require the queue to be empty, then we do not need to worry
about queued requests.  The caller can do this by flushing it before
deleting the queue.

>>> +static int qemu_block_queue_handler(BlockIORequest *request)
>>> +{
>>> +    int ret;
>>> +    BlockDriverAIOCB *res;
>>> +
>>> +    res = request->handler(request->bs, request->sector_num,
>>> +                           request->qiov, request->nb_sectors,
>>> +                           request->cb, request->opaque);
>>
>>Please pass qemu_block_queue_callback and request->acb directly here
>>instead of using request->cb and request->opaque.  Those fields aren't
>>needed and just add indirection.
> If later the other guy want to reuse qemu_block_queue_handler, and use different callback function, then this function can not be used. Your way lower this function's reusability.

Code can be changed so it's best to do things the simple way first and
extend the code if necessary later.  Trying to think ahead results in
half-finished code where the reader has to guess the author's
intention.

Stefan
diff mbox

Patch

diff --git a/Makefile.objs b/Makefile.objs
index d1f3e5d..96a7323 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -33,7 +33,7 @@  block-nested-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vv
 block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-cache.o
 block-nested-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
 block-nested-y += qed-check.o
-block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o
+block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o blk-queue.o
 block-nested-$(CONFIG_WIN32) += raw-win32.o
 block-nested-$(CONFIG_POSIX) += raw-posix.o
 block-nested-$(CONFIG_CURL) += curl.o
diff --git a/block/blk-queue.c b/block/blk-queue.c
new file mode 100644
index 0000000..eac824c
--- /dev/null
+++ b/block/blk-queue.c
@@ -0,0 +1,226 @@ 
+/*
+ * QEMU System Emulator queue definition for block layer
+ *
+ * Copyright (c) IBM, Corp. 2011
+ *
+ * Authors:
+ *  Zhi Yong Wu  <zwu.kernel@gmail.com>
+ *  Stefan Hajnoczi <stefanha@gmail.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#include "block_int.h"
+#include "block/blk-queue.h"
+#include "qemu-common.h"
+
+/* The APIs for block request queue on qemu block layer.
+ */
+
+struct BlockIORequest {
+    QTAILQ_ENTRY(BlockIORequest) entry;
+    BlockDriverState *bs;
+    BlockRequestHandler *handler;
+    int64_t sector_num;
+    QEMUIOVector *qiov;
+    int nb_sectors;
+    BlockDriverCompletionFunc *cb;
+    void *opaque;
+    BlockDriverAIOCB *acb;
+};
+
+struct BlockQueue {
+    QTAILQ_HEAD(requests, BlockIORequest) requests;
+    BlockIORequest *request;
+    bool flushing;
+};
+
+struct BlockQueueAIOCB {
+    BlockDriverAIOCB common;
+    BlockDriverCompletionFunc *real_cb;
+    BlockDriverAIOCB *real_acb;
+    void *opaque;
+    BlockIORequest *request;
+};
+
+static void qemu_block_queue_dequeue(BlockQueue *queue, BlockIORequest *request)
+{
+    BlockIORequest *req;
+
+    while (!QTAILQ_EMPTY(&queue->requests)) {
+        req = QTAILQ_FIRST(&queue->requests);
+        if (req == request) {
+            QTAILQ_REMOVE(&queue->requests, req, entry);
+            break;
+        }
+    }
+}
+
+static void qemu_block_queue_cancel(BlockDriverAIOCB *acb)
+{
+    BlockQueueAIOCB *blkacb = container_of(acb, BlockQueueAIOCB, common);
+    if (blkacb->real_acb) {
+        bdrv_aio_cancel(blkacb->real_acb);
+    } else {
+        assert(blkacb->common.bs->block_queue);
+        qemu_block_queue_dequeue(blkacb->common.bs->block_queue,
+                                 blkacb->request);
+    }
+
+    qemu_aio_release(blkacb);
+}
+
+static AIOPool block_queue_pool = {
+    .aiocb_size         = sizeof(struct BlockQueueAIOCB),
+    .cancel             = qemu_block_queue_cancel,
+};
+
+static void qemu_block_queue_callback(void *opaque, int ret)
+{
+    BlockQueueAIOCB *acb = opaque;
+
+    if (acb->real_cb) {
+        acb->real_cb(acb->opaque, ret);
+    }
+
+    qemu_aio_release(acb);
+}
+
+BlockQueue *qemu_new_block_queue(void)
+{
+    BlockQueue *queue;
+
+    queue = g_malloc0(sizeof(BlockQueue));
+
+    QTAILQ_INIT(&queue->requests);
+
+    queue->flushing = false;
+    queue->request  = NULL;
+
+    return queue;
+}
+
+void qemu_del_block_queue(BlockQueue *queue)
+{
+    BlockIORequest *request, *next;
+
+    QTAILQ_FOREACH_SAFE(request, &queue->requests, entry, next) {
+        QTAILQ_REMOVE(&queue->requests, request, entry);
+        g_free(request);
+    }
+
+    g_free(queue);
+}
+
+BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
+                        BlockDriverState *bs,
+                        BlockRequestHandler *handler,
+                        int64_t sector_num,
+                        QEMUIOVector *qiov,
+                        int nb_sectors,
+                        BlockDriverCompletionFunc *cb,
+                        void *opaque)
+{
+    BlockIORequest *request;
+    BlockDriverAIOCB *acb;
+    BlockQueueAIOCB *blkacb;
+
+    if (queue->request) {
+        request             = queue->request;
+        assert(request->acb);
+        acb                 = request->acb;
+        QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
+        queue->request      = NULL;
+    } else {
+        request             = g_malloc0(sizeof(BlockIORequest));
+        request->bs         = bs;
+        request->handler    = handler;
+        request->sector_num = sector_num;
+        request->qiov       = qiov;
+        request->nb_sectors = nb_sectors;
+        request->cb         = qemu_block_queue_callback;
+        QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
+
+        acb = qemu_aio_get(&block_queue_pool, bs,
+                           qemu_block_queue_callback, opaque);
+        blkacb = container_of(acb, BlockQueueAIOCB, common);
+        blkacb->real_cb     = cb;
+        blkacb->real_acb    = NULL;
+        blkacb->opaque      = opaque;
+        blkacb->request     = request;
+
+        request->acb        = acb;
+        request->opaque = (void *)blkacb;
+    }
+
+    return acb;
+}
+
+static int qemu_block_queue_handler(BlockIORequest *request)
+{
+    int ret;
+    BlockDriverAIOCB *res;
+
+    res = request->handler(request->bs, request->sector_num,
+                           request->qiov, request->nb_sectors,
+                           request->cb, request->opaque);
+    if (res) {
+        BlockQueueAIOCB *blkacb =
+                container_of(request->acb, BlockQueueAIOCB, common);
+        blkacb->real_acb = res;
+    }
+
+    ret = (res == NULL) ? 0 : 1;
+
+    return ret;
+}
+
+void qemu_block_queue_flush(BlockQueue *queue)
+{
+    queue->flushing = true;
+    while (!QTAILQ_EMPTY(&queue->requests)) {
+        BlockIORequest *request = NULL;
+        int ret = 0;
+
+        request = QTAILQ_FIRST(&queue->requests);
+        QTAILQ_REMOVE(&queue->requests, request, entry);
+
+        queue->request = request;
+        ret = qemu_block_queue_handler(request);
+
+        if (ret == 0) {
+            QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
+            queue->request = NULL;
+            break;
+        }
+
+        if (queue->request) {
+            g_free(request);
+        }
+
+        queue->request = NULL;
+    }
+
+    queue->flushing = false;
+}
+
+bool qemu_block_queue_has_pending(BlockQueue *queue)
+{
+    return !queue->flushing && !QTAILQ_EMPTY(&queue->requests);
+}
diff --git a/block/blk-queue.h b/block/blk-queue.h
new file mode 100644
index 0000000..84c2407
--- /dev/null
+++ b/block/blk-queue.h
@@ -0,0 +1,63 @@ 
+/*
+ * QEMU System Emulator queue declaration for block layer
+ *
+ * Copyright (c) IBM, Corp. 2011
+ *
+ * Authors:
+ *  Zhi Yong Wu  <zwu.kernel@gmail.com>
+ *  Stefan Hajnoczi <stefanha@gmail.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#ifndef QEMU_BLOCK_QUEUE_H
+#define QEMU_BLOCK_QUEUE_H
+
+#include "block.h"
+#include "qemu-queue.h"
+
+typedef BlockDriverAIOCB* (BlockRequestHandler) (BlockDriverState *bs,
+                                int64_t sector_num, QEMUIOVector *qiov,
+                                int nb_sectors, BlockDriverCompletionFunc *cb,
+                                void *opaque);
+
+typedef struct BlockIORequest BlockIORequest;
+
+typedef struct BlockQueue BlockQueue;
+
+typedef struct BlockQueueAIOCB BlockQueueAIOCB;
+
+BlockQueue *qemu_new_block_queue(void);
+
+void qemu_del_block_queue(BlockQueue *queue);
+
+BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
+                        BlockDriverState *bs,
+                        BlockRequestHandler *handler,
+                        int64_t sector_num,
+                        QEMUIOVector *qiov,
+                        int nb_sectors,
+                        BlockDriverCompletionFunc *cb,
+                        void *opaque);
+
+void qemu_block_queue_flush(BlockQueue *queue);
+
+bool qemu_block_queue_has_pending(BlockQueue *queue);
+
+#endif /* QEMU_BLOCK_QUEUE_H */