Patchwork [v8,1/4] block: add the block queue support

login
register
mail settings
Submitter Zhi Yong Wu
Date Sept. 8, 2011, 10:11 a.m.
Message ID <1315476668-19812-2-git-send-email-wuzhy@linux.vnet.ibm.com>
Download mbox | patch
Permalink /patch/113881/
State New
Headers show

Comments

Zhi Yong Wu - Sept. 8, 2011, 10:11 a.m.
Signed-off-by: Zhi Yong Wu <wuzhy@linux.vnet.ibm.com>
---
 Makefile.objs     |    2 +-
 block/blk-queue.c |  201 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 block/blk-queue.h |   59 ++++++++++++++++
 block_int.h       |   27 +++++++
 4 files changed, 288 insertions(+), 1 deletions(-)
 create mode 100644 block/blk-queue.c
 create mode 100644 block/blk-queue.h
Kevin Wolf - Sept. 23, 2011, 3:32 p.m.
Am 08.09.2011 12:11, schrieb Zhi Yong Wu:
> Signed-off-by: Zhi Yong Wu <wuzhy@linux.vnet.ibm.com>
> ---
>  Makefile.objs     |    2 +-
>  block/blk-queue.c |  201 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>  block/blk-queue.h |   59 ++++++++++++++++
>  block_int.h       |   27 +++++++
>  4 files changed, 288 insertions(+), 1 deletions(-)
>  create mode 100644 block/blk-queue.c
>  create mode 100644 block/blk-queue.h
> 
> diff --git a/Makefile.objs b/Makefile.objs
> index 26b885b..5dcf456 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -33,7 +33,7 @@ block-nested-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vv
>  block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-cache.o
>  block-nested-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
>  block-nested-y += qed-check.o
> -block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o
> +block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o blk-queue.o
>  block-nested-$(CONFIG_WIN32) += raw-win32.o
>  block-nested-$(CONFIG_POSIX) += raw-posix.o
>  block-nested-$(CONFIG_CURL) += curl.o
> diff --git a/block/blk-queue.c b/block/blk-queue.c
> new file mode 100644
> index 0000000..adef497
> --- /dev/null
> +++ b/block/blk-queue.c
> @@ -0,0 +1,201 @@
> +/*
> + * QEMU System Emulator queue definition for block layer
> + *
> + * Copyright (c) IBM, Corp. 2011
> + *
> + * Authors:
> + *  Zhi Yong Wu  <wuzhy@linux.vnet.ibm.com>
> + *  Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
> + *
> + * Permission is hereby granted, free of charge, to any person obtaining a copy
> + * of this software and associated documentation files (the "Software"), to deal
> + * in the Software without restriction, including without limitation the rights
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
> + * copies of the Software, and to permit persons to whom the Software is
> + * furnished to do so, subject to the following conditions:
> + *
> + * The above copyright notice and this permission notice shall be included in
> + * all copies or substantial portions of the Software.
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
> + * THE SOFTWARE.
> + */
> +
> +#include "block_int.h"
> +#include "block/blk-queue.h"
> +#include "qemu-common.h"
> +
> +/* The APIs for block request queue on qemu block layer.
> + */
> +
> +struct BlockQueueAIOCB {
> +    BlockDriverAIOCB common;
> +    QTAILQ_ENTRY(BlockQueueAIOCB) entry;
> +    BlockRequestHandler *handler;
> +    BlockDriverAIOCB *real_acb;
> +
> +    int64_t sector_num;
> +    QEMUIOVector *qiov;
> +    int nb_sectors;
> +};

The idea is that each request is first queued on the QTAILQ, and at some
point it's removed from the queue and gets a real_acb. But it never has
both at the same time. Correct?

Can we have the basic principle of operation spelled out as a comment
somewhere near the top of the file?

> +
> +typedef struct BlockQueueAIOCB BlockQueueAIOCB;
> +
> +struct BlockQueue {
> +    QTAILQ_HEAD(requests, BlockQueueAIOCB) requests;
> +    bool req_failed;
> +    bool flushing;
> +};

I find req_failed pretty confusing. Needs documentation at least, but
most probably also a better name.

> +
> +static void qemu_block_queue_dequeue(BlockQueue *queue,
> +                                     BlockQueueAIOCB *request)
> +{
> +    BlockQueueAIOCB *req;
> +
> +    assert(queue);
> +    while (!QTAILQ_EMPTY(&queue->requests)) {
> +        req = QTAILQ_FIRST(&queue->requests);
> +        if (req == request) {
> +            QTAILQ_REMOVE(&queue->requests, req, entry);
> +            break;
> +        }
> +    }
> +}

Is it just me or is this an endless loop if the request isn't the first
element in the list?

> +
> +static void qemu_block_queue_cancel(BlockDriverAIOCB *acb)
> +{
> +    BlockQueueAIOCB *request = container_of(acb, BlockQueueAIOCB, common);
> +    if (request->real_acb) {
> +        bdrv_aio_cancel(request->real_acb);
> +    } else {
> +        assert(request->common.bs->block_queue);
> +        qemu_block_queue_dequeue(request->common.bs->block_queue,
> +                                 request);
> +    }
> +
> +    qemu_aio_release(request);
> +}
> +
> +static AIOPool block_queue_pool = {
> +    .aiocb_size         = sizeof(struct BlockQueueAIOCB),
> +    .cancel             = qemu_block_queue_cancel,
> +};
> +
> +static void qemu_block_queue_callback(void *opaque, int ret)
> +{
> +    BlockQueueAIOCB *acb = opaque;
> +
> +    if (acb->common.cb) {
> +        acb->common.cb(acb->common.opaque, ret);
> +    }
> +
> +    qemu_aio_release(acb);
> +}
> +
> +BlockQueue *qemu_new_block_queue(void)
> +{
> +    BlockQueue *queue;
> +
> +    queue = g_malloc0(sizeof(BlockQueue));
> +
> +    QTAILQ_INIT(&queue->requests);
> +
> +    queue->req_failed = true;
> +    queue->flushing   = false;
> +
> +    return queue;
> +}
> +
> +void qemu_del_block_queue(BlockQueue *queue)
> +{
> +    BlockQueueAIOCB *request, *next;
> +
> +    QTAILQ_FOREACH_SAFE(request, &queue->requests, entry, next) {
> +        QTAILQ_REMOVE(&queue->requests, request, entry);
> +        qemu_aio_release(request);
> +    }
> +
> +    g_free(queue);
> +}

Can we be sure that no AIO requests are in flight that still use the now
released AIOCB?

> +
> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
> +                        BlockDriverState *bs,
> +                        BlockRequestHandler *handler,
> +                        int64_t sector_num,
> +                        QEMUIOVector *qiov,
> +                        int nb_sectors,
> +                        BlockDriverCompletionFunc *cb,
> +                        void *opaque)
> +{
> +    BlockDriverAIOCB *acb;
> +    BlockQueueAIOCB *request;
> +
> +    if (queue->flushing) {
> +        queue->req_failed = false;
> +        return NULL;
> +    } else {
> +        acb = qemu_aio_get(&block_queue_pool, bs,
> +                           cb, opaque);
> +        request = container_of(acb, BlockQueueAIOCB, common);
> +        request->handler       = handler;
> +        request->sector_num    = sector_num;
> +        request->qiov          = qiov;
> +        request->nb_sectors    = nb_sectors;
> +        request->real_acb      = NULL;
> +        QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
> +    }
> +
> +    return acb;
> +}
> +
> +static int qemu_block_queue_handler(BlockQueueAIOCB *request)
> +{
> +    int ret;
> +    BlockDriverAIOCB *res;
> +
> +    res = request->handler(request->common.bs, request->sector_num,
> +                           request->qiov, request->nb_sectors,
> +                           qemu_block_queue_callback, request);
> +    if (res) {
> +        request->real_acb = res;
> +    }
> +
> +    ret = (res == NULL) ? 0 : 1;
> +
> +    return ret;

You mean return (res != NULL); and want to have bool as the return value
of this function.

> +}
> +
> +void qemu_block_queue_flush(BlockQueue *queue)
> +{
> +    queue->flushing = true;
> +    while (!QTAILQ_EMPTY(&queue->requests)) {
> +        BlockQueueAIOCB *request = NULL;
> +        int ret = 0;
> +
> +        request = QTAILQ_FIRST(&queue->requests);
> +        QTAILQ_REMOVE(&queue->requests, request, entry);
> +
> +        queue->req_failed = true;
> +        ret = qemu_block_queue_handler(request);
> +        if (ret == 0) {
> +            QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
> +            if (queue->req_failed) {
> +                qemu_block_queue_callback(request, -EIO);
> +                break;
> +            }
> +        }
> +    }
> +
> +    queue->req_failed = true;
> +    queue->flushing   = false;
> +}
> +
> +bool qemu_block_queue_has_pending(BlockQueue *queue)
> +{
> +    return !queue->flushing && !QTAILQ_EMPTY(&queue->requests);
> +}

Why doesn't the queue have pending requests in the middle of a flush
operation? (That is, the flush hasn't completed yet)

> diff --git a/block/blk-queue.h b/block/blk-queue.h
> new file mode 100644
> index 0000000..c1529f7
> --- /dev/null
> +++ b/block/blk-queue.h
> @@ -0,0 +1,59 @@
> +/*
> + * QEMU System Emulator queue declaration for block layer
> + *
> + * Copyright (c) IBM, Corp. 2011
> + *
> + * Authors:
> + *  Zhi Yong Wu  <wuzhy@linux.vnet.ibm.com>
> + *  Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
> + *
> + * Permission is hereby granted, free of charge, to any person obtaining a copy
> + * of this software and associated documentation files (the "Software"), to deal
> + * in the Software without restriction, including without limitation the rights
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
> + * copies of the Software, and to permit persons to whom the Software is
> + * furnished to do so, subject to the following conditions:
> + *
> + * The above copyright notice and this permission notice shall be included in
> + * all copies or substantial portions of the Software.
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
> + * THE SOFTWARE.
> + */
> +
> +#ifndef QEMU_BLOCK_QUEUE_H
> +#define QEMU_BLOCK_QUEUE_H
> +
> +#include "block.h"
> +#include "qemu-queue.h"
> +
> +typedef BlockDriverAIOCB* (BlockRequestHandler) (BlockDriverState *bs,
> +                                int64_t sector_num, QEMUIOVector *qiov,
> +                                int nb_sectors, BlockDriverCompletionFunc *cb,
> +                                void *opaque);
> +
> +typedef struct BlockQueue BlockQueue;
> +
> +BlockQueue *qemu_new_block_queue(void);
> +
> +void qemu_del_block_queue(BlockQueue *queue);
> +
> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
> +                        BlockDriverState *bs,
> +                        BlockRequestHandler *handler,
> +                        int64_t sector_num,
> +                        QEMUIOVector *qiov,
> +                        int nb_sectors,
> +                        BlockDriverCompletionFunc *cb,
> +                        void *opaque);
> +
> +void qemu_block_queue_flush(BlockQueue *queue);
> +
> +bool qemu_block_queue_has_pending(BlockQueue *queue);
> +
> +#endif /* QEMU_BLOCK_QUEUE_H */
> diff --git a/block_int.h b/block_int.h
> index 8a72b80..201e635 100644
> --- a/block_int.h
> +++ b/block_int.h
> @@ -29,10 +29,18 @@
>  #include "qemu-queue.h"
>  #include "qemu-coroutine.h"
>  #include "qemu-timer.h"
> +#include "block/blk-queue.h"
>  
>  #define BLOCK_FLAG_ENCRYPT	1
>  #define BLOCK_FLAG_COMPAT6	4
>  
> +#define BLOCK_IO_LIMIT_READ     0
> +#define BLOCK_IO_LIMIT_WRITE    1
> +#define BLOCK_IO_LIMIT_TOTAL    2
> +
> +#define BLOCK_IO_SLICE_TIME     100000000
> +#define NANOSECONDS_PER_SECOND  1000000000.0
> +
>  #define BLOCK_OPT_SIZE          "size"
>  #define BLOCK_OPT_ENCRYPT       "encryption"
>  #define BLOCK_OPT_COMPAT6       "compat6"
> @@ -49,6 +57,16 @@ typedef struct AIOPool {
>      BlockDriverAIOCB *free_aiocb;
>  } AIOPool;
>  
> +typedef struct BlockIOLimit {
> +    uint64_t bps[3];
> +    uint64_t iops[3];
> +} BlockIOLimit;
> +
> +typedef struct BlockIODisp {
> +    uint64_t bytes[2];
> +    uint64_t ios[2];
> +} BlockIODisp;
> +
>  struct BlockDriver {
>      const char *format_name;
>      int instance_size;
> @@ -184,6 +202,15 @@ struct BlockDriverState {
>  
>      void *sync_aiocb;
>  
> +    /* the time for latest disk I/O */
> +    int64_t slice_start;
> +    int64_t slice_end;
> +    BlockIOLimit io_limits;
> +    BlockIODisp  io_disps;
> +    BlockQueue   *block_queue;
> +    QEMUTimer    *block_timer;
> +    bool         io_limits_enabled;
> +
>      /* I/O stats (display with "info blockstats"). */
>      uint64_t nr_bytes[BDRV_MAX_IOTYPE];
>      uint64_t nr_ops[BDRV_MAX_IOTYPE];

The changes to block_int.h look unrelated to this patch. Maybe they
should come later in the series.

Kevin
Zhiyong Wu - Sept. 26, 2011, 8:01 a.m.
On Fri, Sep 23, 2011 at 11:32 PM, Kevin Wolf <kwolf@redhat.com> wrote:
> Am 08.09.2011 12:11, schrieb Zhi Yong Wu:
>> Signed-off-by: Zhi Yong Wu <wuzhy@linux.vnet.ibm.com>
>> ---
>>  Makefile.objs     |    2 +-
>>  block/blk-queue.c |  201 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>>  block/blk-queue.h |   59 ++++++++++++++++
>>  block_int.h       |   27 +++++++
>>  4 files changed, 288 insertions(+), 1 deletions(-)
>>  create mode 100644 block/blk-queue.c
>>  create mode 100644 block/blk-queue.h
>>
>> diff --git a/Makefile.objs b/Makefile.objs
>> index 26b885b..5dcf456 100644
>> --- a/Makefile.objs
>> +++ b/Makefile.objs
>> @@ -33,7 +33,7 @@ block-nested-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vv
>>  block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-cache.o
>>  block-nested-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
>>  block-nested-y += qed-check.o
>> -block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o
>> +block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o blk-queue.o
>>  block-nested-$(CONFIG_WIN32) += raw-win32.o
>>  block-nested-$(CONFIG_POSIX) += raw-posix.o
>>  block-nested-$(CONFIG_CURL) += curl.o
>> diff --git a/block/blk-queue.c b/block/blk-queue.c
>> new file mode 100644
>> index 0000000..adef497
>> --- /dev/null
>> +++ b/block/blk-queue.c
>> @@ -0,0 +1,201 @@
>> +/*
>> + * QEMU System Emulator queue definition for block layer
>> + *
>> + * Copyright (c) IBM, Corp. 2011
>> + *
>> + * Authors:
>> + *  Zhi Yong Wu  <wuzhy@linux.vnet.ibm.com>
>> + *  Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
>> + *
>> + * Permission is hereby granted, free of charge, to any person obtaining a copy
>> + * of this software and associated documentation files (the "Software"), to deal
>> + * in the Software without restriction, including without limitation the rights
>> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
>> + * copies of the Software, and to permit persons to whom the Software is
>> + * furnished to do so, subject to the following conditions:
>> + *
>> + * The above copyright notice and this permission notice shall be included in
>> + * all copies or substantial portions of the Software.
>> + *
>> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
>> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
>> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
>> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
>> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
>> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
>> + * THE SOFTWARE.
>> + */
>> +
>> +#include "block_int.h"
>> +#include "block/blk-queue.h"
>> +#include "qemu-common.h"
>> +
>> +/* The APIs for block request queue on qemu block layer.
>> + */
>> +
>> +struct BlockQueueAIOCB {
>> +    BlockDriverAIOCB common;
>> +    QTAILQ_ENTRY(BlockQueueAIOCB) entry;
>> +    BlockRequestHandler *handler;
>> +    BlockDriverAIOCB *real_acb;
>> +
>> +    int64_t sector_num;
>> +    QEMUIOVector *qiov;
>> +    int nb_sectors;
>> +};
>
> The idea is that each request is first queued on the QTAILQ, and at some
> point it's removed from the queue and gets a real_acb. But it never has
> both at the same time. Correct?
NO. if block I/O throttling is enabled and I/O rate at runtime exceed
this limits, this request will be enqueued.
It represents the whole lifecycle of one enqueued request.

>
> Can we have the basic principle of operation spelled out as a comment
> somewhere near the top of the file?
OK, i will.
>
>> +
>> +typedef struct BlockQueueAIOCB BlockQueueAIOCB;
>> +
>> +struct BlockQueue {
>> +    QTAILQ_HEAD(requests, BlockQueueAIOCB) requests;
>> +    bool req_failed;
>> +    bool flushing;
>> +};
>
> I find req_failed pretty confusing. Needs documentation at least, but
> most probably also a better name.
OK. request_has_failed?
>
>> +
>> +static void qemu_block_queue_dequeue(BlockQueue *queue,
>> +                                     BlockQueueAIOCB *request)
>> +{
>> +    BlockQueueAIOCB *req;
>> +
>> +    assert(queue);
>> +    while (!QTAILQ_EMPTY(&queue->requests)) {
>> +        req = QTAILQ_FIRST(&queue->requests);
>> +        if (req == request) {
>> +            QTAILQ_REMOVE(&queue->requests, req, entry);
>> +            break;
>> +        }
>> +    }
>> +}
>
> Is it just me or is this an endless loop if the request isn't the first
> element in the list?
queue->requests is only used to store requests which exceed the limits.
Why is the request not the first evlement?
>
>> +
>> +static void qemu_block_queue_cancel(BlockDriverAIOCB *acb)
>> +{
>> +    BlockQueueAIOCB *request = container_of(acb, BlockQueueAIOCB, common);
>> +    if (request->real_acb) {
>> +        bdrv_aio_cancel(request->real_acb);
>> +    } else {
>> +        assert(request->common.bs->block_queue);
>> +        qemu_block_queue_dequeue(request->common.bs->block_queue,
>> +                                 request);
>> +    }
>> +
>> +    qemu_aio_release(request);
>> +}
>> +
>> +static AIOPool block_queue_pool = {
>> +    .aiocb_size         = sizeof(struct BlockQueueAIOCB),
>> +    .cancel             = qemu_block_queue_cancel,
>> +};
>> +
>> +static void qemu_block_queue_callback(void *opaque, int ret)
>> +{
>> +    BlockQueueAIOCB *acb = opaque;
>> +
>> +    if (acb->common.cb) {
>> +        acb->common.cb(acb->common.opaque, ret);
>> +    }
>> +
>> +    qemu_aio_release(acb);
>> +}
>> +
>> +BlockQueue *qemu_new_block_queue(void)
>> +{
>> +    BlockQueue *queue;
>> +
>> +    queue = g_malloc0(sizeof(BlockQueue));
>> +
>> +    QTAILQ_INIT(&queue->requests);
>> +
>> +    queue->req_failed = true;
>> +    queue->flushing   = false;
>> +
>> +    return queue;
>> +}
>> +
>> +void qemu_del_block_queue(BlockQueue *queue)
>> +{
>> +    BlockQueueAIOCB *request, *next;
>> +
>> +    QTAILQ_FOREACH_SAFE(request, &queue->requests, entry, next) {
>> +        QTAILQ_REMOVE(&queue->requests, request, entry);
>> +        qemu_aio_release(request);
>> +    }
>> +
>> +    g_free(queue);
>> +}
>
> Can we be sure that no AIO requests are in flight that still use the now
> released AIOCB?
Yeah, since qemu core code is serially performed, i think that when
qemu_del_block_queue is performed, no requests are in flight. Right?

>
>> +
>> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
>> +                        BlockDriverState *bs,
>> +                        BlockRequestHandler *handler,
>> +                        int64_t sector_num,
>> +                        QEMUIOVector *qiov,
>> +                        int nb_sectors,
>> +                        BlockDriverCompletionFunc *cb,
>> +                        void *opaque)
>> +{
>> +    BlockDriverAIOCB *acb;
>> +    BlockQueueAIOCB *request;
>> +
>> +    if (queue->flushing) {
>> +        queue->req_failed = false;
>> +        return NULL;
>> +    } else {
>> +        acb = qemu_aio_get(&block_queue_pool, bs,
>> +                           cb, opaque);
>> +        request = container_of(acb, BlockQueueAIOCB, common);
>> +        request->handler       = handler;
>> +        request->sector_num    = sector_num;
>> +        request->qiov          = qiov;
>> +        request->nb_sectors    = nb_sectors;
>> +        request->real_acb      = NULL;
>> +        QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
>> +    }
>> +
>> +    return acb;
>> +}
>> +
>> +static int qemu_block_queue_handler(BlockQueueAIOCB *request)
>> +{
>> +    int ret;
>> +    BlockDriverAIOCB *res;
>> +
>> +    res = request->handler(request->common.bs, request->sector_num,
>> +                           request->qiov, request->nb_sectors,
>> +                           qemu_block_queue_callback, request);
>> +    if (res) {
>> +        request->real_acb = res;
>> +    }
>> +
>> +    ret = (res == NULL) ? 0 : 1;
>> +
>> +    return ret;
>
> You mean return (res != NULL); and want to have bool as the return value
> of this function.
Yeah, thanks. i will modify as below:
ret = (res == NULL) ? false : true;
and
static bool qemu_block_queue_handler()

>
>> +}
>> +
>> +void qemu_block_queue_flush(BlockQueue *queue)
>> +{
>> +    queue->flushing = true;
>> +    while (!QTAILQ_EMPTY(&queue->requests)) {
>> +        BlockQueueAIOCB *request = NULL;
>> +        int ret = 0;
>> +
>> +        request = QTAILQ_FIRST(&queue->requests);
>> +        QTAILQ_REMOVE(&queue->requests, request, entry);
>> +
>> +        queue->req_failed = true;
>> +        ret = qemu_block_queue_handler(request);
>> +        if (ret == 0) {
>> +            QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
>> +            if (queue->req_failed) {
>> +                qemu_block_queue_callback(request, -EIO);
>> +                break;
>> +            }
>> +        }
>> +    }
>> +
>> +    queue->req_failed = true;
>> +    queue->flushing   = false;
>> +}
>> +
>> +bool qemu_block_queue_has_pending(BlockQueue *queue)
>> +{
>> +    return !queue->flushing && !QTAILQ_EMPTY(&queue->requests);
>> +}
>
> Why doesn't the queue have pending requests in the middle of a flush
> operation? (That is, the flush hasn't completed yet)
It is possible for the queue to have pending requests. if yes, how about?
>
>> diff --git a/block/blk-queue.h b/block/blk-queue.h
>> new file mode 100644
>> index 0000000..c1529f7
>> --- /dev/null
>> +++ b/block/blk-queue.h
>> @@ -0,0 +1,59 @@
>> +/*
>> + * QEMU System Emulator queue declaration for block layer
>> + *
>> + * Copyright (c) IBM, Corp. 2011
>> + *
>> + * Authors:
>> + *  Zhi Yong Wu  <wuzhy@linux.vnet.ibm.com>
>> + *  Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
>> + *
>> + * Permission is hereby granted, free of charge, to any person obtaining a copy
>> + * of this software and associated documentation files (the "Software"), to deal
>> + * in the Software without restriction, including without limitation the rights
>> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
>> + * copies of the Software, and to permit persons to whom the Software is
>> + * furnished to do so, subject to the following conditions:
>> + *
>> + * The above copyright notice and this permission notice shall be included in
>> + * all copies or substantial portions of the Software.
>> + *
>> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
>> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
>> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
>> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
>> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
>> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
>> + * THE SOFTWARE.
>> + */
>> +
>> +#ifndef QEMU_BLOCK_QUEUE_H
>> +#define QEMU_BLOCK_QUEUE_H
>> +
>> +#include "block.h"
>> +#include "qemu-queue.h"
>> +
>> +typedef BlockDriverAIOCB* (BlockRequestHandler) (BlockDriverState *bs,
>> +                                int64_t sector_num, QEMUIOVector *qiov,
>> +                                int nb_sectors, BlockDriverCompletionFunc *cb,
>> +                                void *opaque);
>> +
>> +typedef struct BlockQueue BlockQueue;
>> +
>> +BlockQueue *qemu_new_block_queue(void);
>> +
>> +void qemu_del_block_queue(BlockQueue *queue);
>> +
>> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
>> +                        BlockDriverState *bs,
>> +                        BlockRequestHandler *handler,
>> +                        int64_t sector_num,
>> +                        QEMUIOVector *qiov,
>> +                        int nb_sectors,
>> +                        BlockDriverCompletionFunc *cb,
>> +                        void *opaque);
>> +
>> +void qemu_block_queue_flush(BlockQueue *queue);
>> +
>> +bool qemu_block_queue_has_pending(BlockQueue *queue);
>> +
>> +#endif /* QEMU_BLOCK_QUEUE_H */
>> diff --git a/block_int.h b/block_int.h
>> index 8a72b80..201e635 100644
>> --- a/block_int.h
>> +++ b/block_int.h
>> @@ -29,10 +29,18 @@
>>  #include "qemu-queue.h"
>>  #include "qemu-coroutine.h"
>>  #include "qemu-timer.h"
>> +#include "block/blk-queue.h"
>>
>>  #define BLOCK_FLAG_ENCRYPT   1
>>  #define BLOCK_FLAG_COMPAT6   4
>>
>> +#define BLOCK_IO_LIMIT_READ     0
>> +#define BLOCK_IO_LIMIT_WRITE    1
>> +#define BLOCK_IO_LIMIT_TOTAL    2
>> +
>> +#define BLOCK_IO_SLICE_TIME     100000000
>> +#define NANOSECONDS_PER_SECOND  1000000000.0
>> +
>>  #define BLOCK_OPT_SIZE          "size"
>>  #define BLOCK_OPT_ENCRYPT       "encryption"
>>  #define BLOCK_OPT_COMPAT6       "compat6"
>> @@ -49,6 +57,16 @@ typedef struct AIOPool {
>>      BlockDriverAIOCB *free_aiocb;
>>  } AIOPool;
>>
>> +typedef struct BlockIOLimit {
>> +    uint64_t bps[3];
>> +    uint64_t iops[3];
>> +} BlockIOLimit;
>> +
>> +typedef struct BlockIODisp {
>> +    uint64_t bytes[2];
>> +    uint64_t ios[2];
>> +} BlockIODisp;
>> +
>>  struct BlockDriver {
>>      const char *format_name;
>>      int instance_size;
>> @@ -184,6 +202,15 @@ struct BlockDriverState {
>>
>>      void *sync_aiocb;
>>
>> +    /* the time for latest disk I/O */
>> +    int64_t slice_start;
>> +    int64_t slice_end;
>> +    BlockIOLimit io_limits;
>> +    BlockIODisp  io_disps;
>> +    BlockQueue   *block_queue;
>> +    QEMUTimer    *block_timer;
>> +    bool         io_limits_enabled;
>> +
>>      /* I/O stats (display with "info blockstats"). */
>>      uint64_t nr_bytes[BDRV_MAX_IOTYPE];
>>      uint64_t nr_ops[BDRV_MAX_IOTYPE];
>
> The changes to block_int.h look unrelated to this patch. Maybe they
> should come later in the series.
OK, i move them to related series. thanks.

>
> Kevin
>
>
Kevin Wolf - Oct. 17, 2011, 10:17 a.m.
Am 26.09.2011 10:01, schrieb Zhi Yong Wu:
> On Fri, Sep 23, 2011 at 11:32 PM, Kevin Wolf <kwolf@redhat.com> wrote:
>> Am 08.09.2011 12:11, schrieb Zhi Yong Wu:
>>> Signed-off-by: Zhi Yong Wu <wuzhy@linux.vnet.ibm.com>
>>> ---
>>>  Makefile.objs     |    2 +-
>>>  block/blk-queue.c |  201 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>>>  block/blk-queue.h |   59 ++++++++++++++++
>>>  block_int.h       |   27 +++++++
>>>  4 files changed, 288 insertions(+), 1 deletions(-)
>>>  create mode 100644 block/blk-queue.c
>>>  create mode 100644 block/blk-queue.h
>>>
>>> diff --git a/Makefile.objs b/Makefile.objs
>>> index 26b885b..5dcf456 100644
>>> --- a/Makefile.objs
>>> +++ b/Makefile.objs
>>> @@ -33,7 +33,7 @@ block-nested-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vv
>>>  block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-cache.o
>>>  block-nested-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
>>>  block-nested-y += qed-check.o
>>> -block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o
>>> +block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o blk-queue.o
>>>  block-nested-$(CONFIG_WIN32) += raw-win32.o
>>>  block-nested-$(CONFIG_POSIX) += raw-posix.o
>>>  block-nested-$(CONFIG_CURL) += curl.o
>>> diff --git a/block/blk-queue.c b/block/blk-queue.c
>>> new file mode 100644
>>> index 0000000..adef497
>>> --- /dev/null
>>> +++ b/block/blk-queue.c
>>> @@ -0,0 +1,201 @@
>>> +/*
>>> + * QEMU System Emulator queue definition for block layer
>>> + *
>>> + * Copyright (c) IBM, Corp. 2011
>>> + *
>>> + * Authors:
>>> + *  Zhi Yong Wu  <wuzhy@linux.vnet.ibm.com>
>>> + *  Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
>>> + *
>>> + * Permission is hereby granted, free of charge, to any person obtaining a copy
>>> + * of this software and associated documentation files (the "Software"), to deal
>>> + * in the Software without restriction, including without limitation the rights
>>> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
>>> + * copies of the Software, and to permit persons to whom the Software is
>>> + * furnished to do so, subject to the following conditions:
>>> + *
>>> + * The above copyright notice and this permission notice shall be included in
>>> + * all copies or substantial portions of the Software.
>>> + *
>>> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
>>> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
>>> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
>>> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
>>> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
>>> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
>>> + * THE SOFTWARE.
>>> + */
>>> +
>>> +#include "block_int.h"
>>> +#include "block/blk-queue.h"
>>> +#include "qemu-common.h"
>>> +
>>> +/* The APIs for block request queue on qemu block layer.
>>> + */
>>> +
>>> +struct BlockQueueAIOCB {
>>> +    BlockDriverAIOCB common;
>>> +    QTAILQ_ENTRY(BlockQueueAIOCB) entry;
>>> +    BlockRequestHandler *handler;
>>> +    BlockDriverAIOCB *real_acb;
>>> +
>>> +    int64_t sector_num;
>>> +    QEMUIOVector *qiov;
>>> +    int nb_sectors;
>>> +};
>>
>> The idea is that each request is first queued on the QTAILQ, and at some
>> point it's removed from the queue and gets a real_acb. But it never has
>> both at the same time. Correct?
> NO. if block I/O throttling is enabled and I/O rate at runtime exceed
> this limits, this request will be enqueued.
> It represents the whole lifecycle of one enqueued request.

What are the conditions under which the request will still be enqueued,
but has a real_acb at the same time?

>>> +
>>> +typedef struct BlockQueueAIOCB BlockQueueAIOCB;
>>> +
>>> +struct BlockQueue {
>>> +    QTAILQ_HEAD(requests, BlockQueueAIOCB) requests;
>>> +    bool req_failed;
>>> +    bool flushing;
>>> +};
>>
>> I find req_failed pretty confusing. Needs documentation at least, but
>> most probably also a better name.
> OK. request_has_failed?

No, that doesn't describe what it's really doing.

You set req_failed = true by default and then on some obscure condition
clear it or not. It's tracking something, but I'm not sure what meaning
it has during the whole process.

>>> +
>>> +static void qemu_block_queue_dequeue(BlockQueue *queue,
>>> +                                     BlockQueueAIOCB *request)
>>> +{
>>> +    BlockQueueAIOCB *req;
>>> +
>>> +    assert(queue);
>>> +    while (!QTAILQ_EMPTY(&queue->requests)) {
>>> +        req = QTAILQ_FIRST(&queue->requests);
>>> +        if (req == request) {
>>> +            QTAILQ_REMOVE(&queue->requests, req, entry);
>>> +            break;
>>> +        }
>>> +    }
>>> +}
>>
>> Is it just me or is this an endless loop if the request isn't the first
>> element in the list?
> queue->requests is only used to store requests which exceed the limits.
> Why is the request not the first evlement?

Why do you have a loop if it's always the first element?

>>> +void qemu_del_block_queue(BlockQueue *queue)
>>> +{
>>> +    BlockQueueAIOCB *request, *next;
>>> +
>>> +    QTAILQ_FOREACH_SAFE(request, &queue->requests, entry, next) {
>>> +        QTAILQ_REMOVE(&queue->requests, request, entry);
>>> +        qemu_aio_release(request);
>>> +    }
>>> +
>>> +    g_free(queue);
>>> +}
>>
>> Can we be sure that no AIO requests are in flight that still use the now
>> released AIOCB?
> Yeah, since qemu core code is serially performed, i think that when
> qemu_del_block_queue is performed, no requests are in flight. Right?

Patch 2 has this code:

+void bdrv_io_limits_disable(BlockDriverState *bs)
+{
+    bs->io_limits_enabled = false;
+
+    if (bs->block_queue) {
+        qemu_block_queue_flush(bs->block_queue);
+        qemu_del_block_queue(bs->block_queue);
+        bs->block_queue = NULL;
+    }

Does this mean that you can't disable I/O limits while the VM is running?

>>> +
>>> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
>>> +                        BlockDriverState *bs,
>>> +                        BlockRequestHandler *handler,
>>> +                        int64_t sector_num,
>>> +                        QEMUIOVector *qiov,
>>> +                        int nb_sectors,
>>> +                        BlockDriverCompletionFunc *cb,
>>> +                        void *opaque)
>>> +{
>>> +    BlockDriverAIOCB *acb;
>>> +    BlockQueueAIOCB *request;
>>> +
>>> +    if (queue->flushing) {
>>> +        queue->req_failed = false;
>>> +        return NULL;
>>> +    } else {
>>> +        acb = qemu_aio_get(&block_queue_pool, bs,
>>> +                           cb, opaque);
>>> +        request = container_of(acb, BlockQueueAIOCB, common);
>>> +        request->handler       = handler;
>>> +        request->sector_num    = sector_num;
>>> +        request->qiov          = qiov;
>>> +        request->nb_sectors    = nb_sectors;
>>> +        request->real_acb      = NULL;
>>> +        QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
>>> +    }
>>> +
>>> +    return acb;
>>> +}
>>> +
>>> +static int qemu_block_queue_handler(BlockQueueAIOCB *request)
>>> +{
>>> +    int ret;
>>> +    BlockDriverAIOCB *res;
>>> +
>>> +    res = request->handler(request->common.bs, request->sector_num,
>>> +                           request->qiov, request->nb_sectors,
>>> +                           qemu_block_queue_callback, request);
>>> +    if (res) {
>>> +        request->real_acb = res;
>>> +    }
>>> +
>>> +    ret = (res == NULL) ? 0 : 1;
>>> +
>>> +    return ret;
>>
>> You mean return (res != NULL); and want to have bool as the return value
>> of this function.
> Yeah, thanks. i will modify as below:
> ret = (res == NULL) ? false : true;

ret = (res != NULL) is really more readable.

> and
> static bool qemu_block_queue_handler()
> 
>>
>>> +}
>>> +
>>> +void qemu_block_queue_flush(BlockQueue *queue)
>>> +{
>>> +    queue->flushing = true;
>>> +    while (!QTAILQ_EMPTY(&queue->requests)) {
>>> +        BlockQueueAIOCB *request = NULL;
>>> +        int ret = 0;
>>> +
>>> +        request = QTAILQ_FIRST(&queue->requests);
>>> +        QTAILQ_REMOVE(&queue->requests, request, entry);
>>> +
>>> +        queue->req_failed = true;
>>> +        ret = qemu_block_queue_handler(request);
>>> +        if (ret == 0) {
>>> +            QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
>>> +            if (queue->req_failed) {
>>> +                qemu_block_queue_callback(request, -EIO);
>>> +                break;
>>> +            }
>>> +        }
>>> +    }
>>> +
>>> +    queue->req_failed = true;
>>> +    queue->flushing   = false;
>>> +}
>>> +
>>> +bool qemu_block_queue_has_pending(BlockQueue *queue)
>>> +{
>>> +    return !queue->flushing && !QTAILQ_EMPTY(&queue->requests);
>>> +}
>>
>> Why doesn't the queue have pending requests in the middle of a flush
>> operation? (That is, the flush hasn't completed yet)
> It is possible for the queue to have pending requests. if yes, how about?

Sorry, can't parse this.

I don't understand why the !queue->flushing part is correct.

Kevin
Zhiyong Wu - Oct. 18, 2011, 7 a.m.
On Mon, Oct 17, 2011 at 6:17 PM, Paolo Bonzini <pbonzini@redhat.com> wrote:
> On 10/17/2011 12:17 PM, Kevin Wolf wrote:
>>
>> > > >  +
>> > > >  +static int qemu_block_queue_handler(BlockQueueAIOCB *request)
>> > > >  +{
>> > > >  +    int ret;
>> > > >  +    BlockDriverAIOCB *res;
>> > > >  +
>> > > >  +    res = request->handler(request->common.bs,
>> > > > request->sector_num,
>> > > >  +                           request->qiov, request->nb_sectors,
>> > > >  +                           qemu_block_queue_callback, request);
>> > > >  +    if (res) {
>> > > >  +        request->real_acb = res;
>> > > >  +    }
>> > > >  +
>> > > >  +    ret = (res == NULL) ? 0 : 1;
>> > > >  +
>> > > >  +    return ret;
>> > >
>> > >  You mean return (res != NULL); and want to have bool as the return
>> > > value
>> > >  of this function.
>> >
>> >  Yeah, thanks. i will modify as below:
>> >  ret = (res == NULL) ? false : true;
>>
>> ret = (res != NULL) is really more readable.
>
> "return (res != NULL);" is even nicer! :)
Great, thanks
>
> Paolo
>
> --
> To unsubscribe from this list: send the line "unsubscribe kvm" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
>
Zhiyong Wu - Oct. 18, 2011, 8:07 a.m.
On Mon, Oct 17, 2011 at 6:17 PM, Kevin Wolf <kwolf@redhat.com> wrote:
> Am 26.09.2011 10:01, schrieb Zhi Yong Wu:
>> On Fri, Sep 23, 2011 at 11:32 PM, Kevin Wolf <kwolf@redhat.com> wrote:
>>> Am 08.09.2011 12:11, schrieb Zhi Yong Wu:
>>>> Signed-off-by: Zhi Yong Wu <wuzhy@linux.vnet.ibm.com>
>>>> ---
>>>>  Makefile.objs     |    2 +-
>>>>  block/blk-queue.c |  201 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>>>>  block/blk-queue.h |   59 ++++++++++++++++
>>>>  block_int.h       |   27 +++++++
>>>>  4 files changed, 288 insertions(+), 1 deletions(-)
>>>>  create mode 100644 block/blk-queue.c
>>>>  create mode 100644 block/blk-queue.h
>>>>
>>>> diff --git a/Makefile.objs b/Makefile.objs
>>>> index 26b885b..5dcf456 100644
>>>> --- a/Makefile.objs
>>>> +++ b/Makefile.objs
>>>> @@ -33,7 +33,7 @@ block-nested-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vv
>>>>  block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-cache.o
>>>>  block-nested-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
>>>>  block-nested-y += qed-check.o
>>>> -block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o
>>>> +block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o blk-queue.o
>>>>  block-nested-$(CONFIG_WIN32) += raw-win32.o
>>>>  block-nested-$(CONFIG_POSIX) += raw-posix.o
>>>>  block-nested-$(CONFIG_CURL) += curl.o
>>>> diff --git a/block/blk-queue.c b/block/blk-queue.c
>>>> new file mode 100644
>>>> index 0000000..adef497
>>>> --- /dev/null
>>>> +++ b/block/blk-queue.c
>>>> @@ -0,0 +1,201 @@
>>>> +/*
>>>> + * QEMU System Emulator queue definition for block layer
>>>> + *
>>>> + * Copyright (c) IBM, Corp. 2011
>>>> + *
>>>> + * Authors:
>>>> + *  Zhi Yong Wu  <wuzhy@linux.vnet.ibm.com>
>>>> + *  Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
>>>> + *
>>>> + * Permission is hereby granted, free of charge, to any person obtaining a copy
>>>> + * of this software and associated documentation files (the "Software"), to deal
>>>> + * in the Software without restriction, including without limitation the rights
>>>> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
>>>> + * copies of the Software, and to permit persons to whom the Software is
>>>> + * furnished to do so, subject to the following conditions:
>>>> + *
>>>> + * The above copyright notice and this permission notice shall be included in
>>>> + * all copies or substantial portions of the Software.
>>>> + *
>>>> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
>>>> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
>>>> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
>>>> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
>>>> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
>>>> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
>>>> + * THE SOFTWARE.
>>>> + */
>>>> +
>>>> +#include "block_int.h"
>>>> +#include "block/blk-queue.h"
>>>> +#include "qemu-common.h"
>>>> +
>>>> +/* The APIs for block request queue on qemu block layer.
>>>> + */
>>>> +
>>>> +struct BlockQueueAIOCB {
>>>> +    BlockDriverAIOCB common;
>>>> +    QTAILQ_ENTRY(BlockQueueAIOCB) entry;
>>>> +    BlockRequestHandler *handler;
>>>> +    BlockDriverAIOCB *real_acb;
>>>> +
>>>> +    int64_t sector_num;
>>>> +    QEMUIOVector *qiov;
>>>> +    int nb_sectors;
>>>> +};
>>>
>>> The idea is that each request is first queued on the QTAILQ, and at some
>>> point it's removed from the queue and gets a real_acb. But it never has
>>> both at the same time. Correct?
>> NO. if block I/O throttling is enabled and I/O rate at runtime exceed
>> this limits, this request will be enqueued.
>> It represents the whole lifecycle of one enqueued request.
>
> What are the conditions under which the request will still be enqueued,
> but has a real_acb at the same time?
When the request is not serviced and still need to be enqueued, one
real_acb will be existable at the same time.
thanks for your good catch.:)
When one request will be mallocated at the first time, we should save
the got acb to real_acb
qemu_block_queue_enqueue():
              request->real_acb      = acb;

>
>>>> +
>>>> +typedef struct BlockQueueAIOCB BlockQueueAIOCB;
>>>> +
>>>> +struct BlockQueue {
>>>> +    QTAILQ_HEAD(requests, BlockQueueAIOCB) requests;
>>>> +    bool req_failed;
>>>> +    bool flushing;
>>>> +};
>>>
>>> I find req_failed pretty confusing. Needs documentation at least, but
>>> most probably also a better name.
>> OK. request_has_failed?
>
> No, that doesn't describe what it's really doing.
>
> You set req_failed = true by default and then on some obscure condition
> clear it or not. It's tracking something, but I'm not sure what meaning
> it has during the whole process.
In qemu_block_queue_flush,
When bdrv_aio_readv/writev return NULL, if req_failed is changed to
false, it indicates that the request exceeds the limits again; if
req_failed is still true, it indicates that one I/O error takes place,
at the monent, qemu_block_queue_callback(request, -EIO) need to be
called to report this to upper layer.

>
>>>> +
>>>> +static void qemu_block_queue_dequeue(BlockQueue *queue,
>>>> +                                     BlockQueueAIOCB *request)
>>>> +{
>>>> +    BlockQueueAIOCB *req;
>>>> +
>>>> +    assert(queue);
>>>> +    while (!QTAILQ_EMPTY(&queue->requests)) {
>>>> +        req = QTAILQ_FIRST(&queue->requests);
>>>> +        if (req == request) {
>>>> +            QTAILQ_REMOVE(&queue->requests, req, entry);
>>>> +            break;
>>>> +        }
>>>> +    }
>>>> +}
>>>
>>> Is it just me or is this an endless loop if the request isn't the first
>>> element in the list?
>> queue->requests is only used to store requests which exceed the limits.
>> Why is the request not the first evlement?
>
> Why do you have a loop if it's always the first element?
Ah, it can cause dead loop, and QTAILQ_FOREACH_SAFE should be adopted
here. thanks.
    QTAILQ_FOREACH_SAFE(req, &queue->requests, entry, next) {
        if (*req == *request) {
            QTAILQ_REMOVE(&queue->requests, req, entry);
            break;
        }
    }

>
>>>> +void qemu_del_block_queue(BlockQueue *queue)
>>>> +{
>>>> +    BlockQueueAIOCB *request, *next;
>>>> +
>>>> +    QTAILQ_FOREACH_SAFE(request, &queue->requests, entry, next) {
>>>> +        QTAILQ_REMOVE(&queue->requests, request, entry);
>>>> +        qemu_aio_release(request);
>>>> +    }
>>>> +
>>>> +    g_free(queue);
>>>> +}
>>>
>>> Can we be sure that no AIO requests are in flight that still use the now
>>> released AIOCB?
>> Yeah, since qemu core code is serially performed, i think that when
>> qemu_del_block_queue is performed, no requests are in flight. Right?
>
> Patch 2 has this code:
>
> +void bdrv_io_limits_disable(BlockDriverState *bs)
> +{
> +    bs->io_limits_enabled = false;
> +
> +    if (bs->block_queue) {
> +        qemu_block_queue_flush(bs->block_queue);
> +        qemu_del_block_queue(bs->block_queue);
> +        bs->block_queue = NULL;
> +    }
>
> Does this mean that you can't disable I/O limits while the VM is running?
NO, you can even though VM is running.
>
>>>> +
>>>> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
>>>> +                        BlockDriverState *bs,
>>>> +                        BlockRequestHandler *handler,
>>>> +                        int64_t sector_num,
>>>> +                        QEMUIOVector *qiov,
>>>> +                        int nb_sectors,
>>>> +                        BlockDriverCompletionFunc *cb,
>>>> +                        void *opaque)
>>>> +{
>>>> +    BlockDriverAIOCB *acb;
>>>> +    BlockQueueAIOCB *request;
>>>> +
>>>> +    if (queue->flushing) {
>>>> +        queue->req_failed = false;
>>>> +        return NULL;
>>>> +    } else {
>>>> +        acb = qemu_aio_get(&block_queue_pool, bs,
>>>> +                           cb, opaque);
>>>> +        request = container_of(acb, BlockQueueAIOCB, common);
>>>> +        request->handler       = handler;
>>>> +        request->sector_num    = sector_num;
>>>> +        request->qiov          = qiov;
>>>> +        request->nb_sectors    = nb_sectors;
>>>> +        request->real_acb      = NULL;
>>>> +        QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
>>>> +    }
>>>> +
>>>> +    return acb;
>>>> +}
>>>> +
>>>> +static int qemu_block_queue_handler(BlockQueueAIOCB *request)
>>>> +{
>>>> +    int ret;
>>>> +    BlockDriverAIOCB *res;
>>>> +
>>>> +    res = request->handler(request->common.bs, request->sector_num,
>>>> +                           request->qiov, request->nb_sectors,
>>>> +                           qemu_block_queue_callback, request);
>>>> +    if (res) {
>>>> +        request->real_acb = res;
>>>> +    }
>>>> +
>>>> +    ret = (res == NULL) ? 0 : 1;
>>>> +
>>>> +    return ret;
>>>
>>> You mean return (res != NULL); and want to have bool as the return value
>>> of this function.
>> Yeah, thanks. i will modify as below:
>> ret = (res == NULL) ? false : true;
>
> ret = (res != NULL) is really more readable.
I have adopted Paolo's suggestion.

>
>> and
>> static bool qemu_block_queue_handler()
>>
>>>
>>>> +}
>>>> +
>>>> +void qemu_block_queue_flush(BlockQueue *queue)
>>>> +{
>>>> +    queue->flushing = true;
>>>> +    while (!QTAILQ_EMPTY(&queue->requests)) {
>>>> +        BlockQueueAIOCB *request = NULL;
>>>> +        int ret = 0;
>>>> +
>>>> +        request = QTAILQ_FIRST(&queue->requests);
>>>> +        QTAILQ_REMOVE(&queue->requests, request, entry);
>>>> +
>>>> +        queue->req_failed = true;
>>>> +        ret = qemu_block_queue_handler(request);
>>>> +        if (ret == 0) {
>>>> +            QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
>>>> +            if (queue->req_failed) {
>>>> +                qemu_block_queue_callback(request, -EIO);
>>>> +                break;
>>>> +            }
>>>> +        }
>>>> +    }
>>>> +
>>>> +    queue->req_failed = true;
>>>> +    queue->flushing   = false;
>>>> +}
>>>> +
>>>> +bool qemu_block_queue_has_pending(BlockQueue *queue)
>>>> +{
>>>> +    return !queue->flushing && !QTAILQ_EMPTY(&queue->requests);
>>>> +}
>>>
>>> Why doesn't the queue have pending requests in the middle of a flush
>>> operation? (That is, the flush hasn't completed yet)
>> It is possible for the queue to have pending requests. if yes, how about?
>
> Sorry, can't parse this.
>
> I don't understand why the !queue->flushing part is correct.
>
> Kevin
>
Kevin Wolf - Oct. 18, 2011, 8:36 a.m.
Am 18.10.2011 10:07, schrieb Zhi Yong Wu:
> On Mon, Oct 17, 2011 at 6:17 PM, Kevin Wolf <kwolf@redhat.com> wrote:
>>>>> +
>>>>> +typedef struct BlockQueueAIOCB BlockQueueAIOCB;
>>>>> +
>>>>> +struct BlockQueue {
>>>>> +    QTAILQ_HEAD(requests, BlockQueueAIOCB) requests;
>>>>> +    bool req_failed;
>>>>> +    bool flushing;
>>>>> +};
>>>>
>>>> I find req_failed pretty confusing. Needs documentation at least, but
>>>> most probably also a better name.
>>> OK. request_has_failed?
>>
>> No, that doesn't describe what it's really doing.
>>
>> You set req_failed = true by default and then on some obscure condition
>> clear it or not. It's tracking something, but I'm not sure what meaning
>> it has during the whole process.
> In qemu_block_queue_flush,
> When bdrv_aio_readv/writev return NULL, if req_failed is changed to
> false, it indicates that the request exceeds the limits again; if
> req_failed is still true, it indicates that one I/O error takes place,
> at the monent, qemu_block_queue_callback(request, -EIO) need to be
> called to report this to upper layer.

Okay, this makes some more sense now.

How about reversing the logic and maintaining a limit_exceeded flag
instead of a req_failed? I think this would be clearer.

>>>>> +void qemu_del_block_queue(BlockQueue *queue)
>>>>> +{
>>>>> +    BlockQueueAIOCB *request, *next;
>>>>> +
>>>>> +    QTAILQ_FOREACH_SAFE(request, &queue->requests, entry, next) {
>>>>> +        QTAILQ_REMOVE(&queue->requests, request, entry);
>>>>> +        qemu_aio_release(request);
>>>>> +    }
>>>>> +
>>>>> +    g_free(queue);
>>>>> +}
>>>>
>>>> Can we be sure that no AIO requests are in flight that still use the now
>>>> released AIOCB?
>>> Yeah, since qemu core code is serially performed, i think that when
>>> qemu_del_block_queue is performed, no requests are in flight. Right?
>>
>> Patch 2 has this code:
>>
>> +void bdrv_io_limits_disable(BlockDriverState *bs)
>> +{
>> +    bs->io_limits_enabled = false;
>> +
>> +    if (bs->block_queue) {
>> +        qemu_block_queue_flush(bs->block_queue);
>> +        qemu_del_block_queue(bs->block_queue);
>> +        bs->block_queue = NULL;
>> +    }
>>
>> Does this mean that you can't disable I/O limits while the VM is running?
> NO, you can even though VM is running.

Okay, in general qemu_block_queue_flush() empties the queue so that
there are no requests left that qemu_del_block_queue() could drop from
the queue. So in the common case it doesn't even enter the FOREACH loop.

I think problems start when requests have failed or exceeded the limit
again, then you have requests queued even after
qemu_block_queue_flush(). You must be aware of this, otherwise the code
in qemu_del_block_queue() wouldn't exist.

But you can't release the ACBs without having called their callback,
otherwise the caller would still assume that its ACB pointer is valid.
Maybe calling the callback before releasing the ACB would be enough.

However, for failed requests see below.

>>
>>>>> +
>>>>> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
>>>>> +                        BlockDriverState *bs,
>>>>> +                        BlockRequestHandler *handler,
>>>>> +                        int64_t sector_num,
>>>>> +                        QEMUIOVector *qiov,
>>>>> +                        int nb_sectors,
>>>>> +                        BlockDriverCompletionFunc *cb,
>>>>> +                        void *opaque)
>>>>> +{
>>>>> +    BlockDriverAIOCB *acb;
>>>>> +    BlockQueueAIOCB *request;
>>>>> +
>>>>> +    if (queue->flushing) {
>>>>> +        queue->req_failed = false;
>>>>> +        return NULL;
>>>>> +    } else {
>>>>> +        acb = qemu_aio_get(&block_queue_pool, bs,
>>>>> +                           cb, opaque);
>>>>> +        request = container_of(acb, BlockQueueAIOCB, common);
>>>>> +        request->handler       = handler;
>>>>> +        request->sector_num    = sector_num;
>>>>> +        request->qiov          = qiov;
>>>>> +        request->nb_sectors    = nb_sectors;
>>>>> +        request->real_acb      = NULL;
>>>>> +        QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
>>>>> +    }
>>>>> +
>>>>> +    return acb;
>>>>> +}
>>>>> +
>>>>> +static int qemu_block_queue_handler(BlockQueueAIOCB *request)
>>>>> +{
>>>>> +    int ret;
>>>>> +    BlockDriverAIOCB *res;
>>>>> +
>>>>> +    res = request->handler(request->common.bs, request->sector_num,
>>>>> +                           request->qiov, request->nb_sectors,
>>>>> +                           qemu_block_queue_callback, request);
>>>>> +    if (res) {
>>>>> +        request->real_acb = res;
>>>>> +    }
>>>>> +
>>>>> +    ret = (res == NULL) ? 0 : 1;
>>>>> +
>>>>> +    return ret;
>>>>
>>>> You mean return (res != NULL); and want to have bool as the return value
>>>> of this function.
>>> Yeah, thanks. i will modify as below:
>>> ret = (res == NULL) ? false : true;
>>
>> ret = (res != NULL) is really more readable.
> I have adopted Paolo's suggestion.
> 
>>
>>> and
>>> static bool qemu_block_queue_handler()
>>>
>>>>
>>>>> +}
>>>>> +
>>>>> +void qemu_block_queue_flush(BlockQueue *queue)
>>>>> +{
>>>>> +    queue->flushing = true;
>>>>> +    while (!QTAILQ_EMPTY(&queue->requests)) {
>>>>> +        BlockQueueAIOCB *request = NULL;
>>>>> +        int ret = 0;
>>>>> +
>>>>> +        request = QTAILQ_FIRST(&queue->requests);
>>>>> +        QTAILQ_REMOVE(&queue->requests, request, entry);
>>>>> +
>>>>> +        queue->req_failed = true;
>>>>> +        ret = qemu_block_queue_handler(request);
>>>>> +        if (ret == 0) {
>>>>> +            QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
>>>>> +            if (queue->req_failed) {
>>>>> +                qemu_block_queue_callback(request, -EIO);
>>>>> +                break;

When a request has failed, you call its callback, but still leave it
queued. I think this is wrong.

>>>>> +            }
>>>>> +        }
>>>>> +    }
>>>>> +
>>>>> +    queue->req_failed = true;
>>>>> +    queue->flushing   = false;
>>>>> +}
>>>>> +
>>>>> +bool qemu_block_queue_has_pending(BlockQueue *queue)
>>>>> +{
>>>>> +    return !queue->flushing && !QTAILQ_EMPTY(&queue->requests);
>>>>> +}
>>>>
>>>> Why doesn't the queue have pending requests in the middle of a flush
>>>> operation? (That is, the flush hasn't completed yet)
>>> It is possible for the queue to have pending requests. if yes, how about?
>>
>> Sorry, can't parse this.
>>
>> I don't understand why the !queue->flushing part is correct.

What about this?

Kevin
Zhiyong Wu - Oct. 18, 2011, 9:29 a.m.
On Tue, Oct 18, 2011 at 4:36 PM, Kevin Wolf <kwolf@redhat.com> wrote:
> Am 18.10.2011 10:07, schrieb Zhi Yong Wu:
>> On Mon, Oct 17, 2011 at 6:17 PM, Kevin Wolf <kwolf@redhat.com> wrote:
>>>>>> +
>>>>>> +typedef struct BlockQueueAIOCB BlockQueueAIOCB;
>>>>>> +
>>>>>> +struct BlockQueue {
>>>>>> +    QTAILQ_HEAD(requests, BlockQueueAIOCB) requests;
>>>>>> +    bool req_failed;
>>>>>> +    bool flushing;
>>>>>> +};
>>>>>
>>>>> I find req_failed pretty confusing. Needs documentation at least, but
>>>>> most probably also a better name.
>>>> OK. request_has_failed?
>>>
>>> No, that doesn't describe what it's really doing.
>>>
>>> You set req_failed = true by default and then on some obscure condition
>>> clear it or not. It's tracking something, but I'm not sure what meaning
>>> it has during the whole process.
>> In qemu_block_queue_flush,
>> When bdrv_aio_readv/writev return NULL, if req_failed is changed to
>> false, it indicates that the request exceeds the limits again; if
>> req_failed is still true, it indicates that one I/O error takes place,
>> at the monent, qemu_block_queue_callback(request, -EIO) need to be
>> called to report this to upper layer.
>
> Okay, this makes some more sense now.
>
> How about reversing the logic and maintaining a limit_exceeded flag
> instead of a req_failed? I think this would be clearer.
OK
>
>>>>>> +void qemu_del_block_queue(BlockQueue *queue)
>>>>>> +{
>>>>>> +    BlockQueueAIOCB *request, *next;
>>>>>> +
>>>>>> +    QTAILQ_FOREACH_SAFE(request, &queue->requests, entry, next) {
>>>>>> +        QTAILQ_REMOVE(&queue->requests, request, entry);
>>>>>> +        qemu_aio_release(request);
>>>>>> +    }
>>>>>> +
>>>>>> +    g_free(queue);
>>>>>> +}
>>>>>
>>>>> Can we be sure that no AIO requests are in flight that still use the now
>>>>> released AIOCB?
>>>> Yeah, since qemu core code is serially performed, i think that when
>>>> qemu_del_block_queue is performed, no requests are in flight. Right?
>>>
>>> Patch 2 has this code:
>>>
>>> +void bdrv_io_limits_disable(BlockDriverState *bs)
>>> +{
>>> +    bs->io_limits_enabled = false;
>>> +
>>> +    if (bs->block_queue) {
>>> +        qemu_block_queue_flush(bs->block_queue);
>>> +        qemu_del_block_queue(bs->block_queue);
>>> +        bs->block_queue = NULL;
>>> +    }
>>>
>>> Does this mean that you can't disable I/O limits while the VM is running?
>> NO, you can even though VM is running.
>
> Okay, in general qemu_block_queue_flush() empties the queue so that
> there are no requests left that qemu_del_block_queue() could drop from
> the queue. So in the common case it doesn't even enter the FOREACH loop.
I think that we should adopt !QTAILQ_EMPTY(&queue->requests), not
QTAILQ_FOREACH_SAFE in qemu_del_block_queue(),
right?
>
> I think problems start when requests have failed or exceeded the limit
> again, then you have requests queued even after
> qemu_block_queue_flush(). You must be aware of this, otherwise the code
> in qemu_del_block_queue() wouldn't exist.
>
> But you can't release the ACBs without having called their callback,
> otherwise the caller would still assume that its ACB pointer is valid.
> Maybe calling the callback before releasing the ACB would be enough.
Good, thanks.
>
> However, for failed requests see below.
>
>>>
>>>>>> +
>>>>>> +BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
>>>>>> +                        BlockDriverState *bs,
>>>>>> +                        BlockRequestHandler *handler,
>>>>>> +                        int64_t sector_num,
>>>>>> +                        QEMUIOVector *qiov,
>>>>>> +                        int nb_sectors,
>>>>>> +                        BlockDriverCompletionFunc *cb,
>>>>>> +                        void *opaque)
>>>>>> +{
>>>>>> +    BlockDriverAIOCB *acb;
>>>>>> +    BlockQueueAIOCB *request;
>>>>>> +
>>>>>> +    if (queue->flushing) {
>>>>>> +        queue->req_failed = false;
>>>>>> +        return NULL;
>>>>>> +    } else {
>>>>>> +        acb = qemu_aio_get(&block_queue_pool, bs,
>>>>>> +                           cb, opaque);
>>>>>> +        request = container_of(acb, BlockQueueAIOCB, common);
>>>>>> +        request->handler       = handler;
>>>>>> +        request->sector_num    = sector_num;
>>>>>> +        request->qiov          = qiov;
>>>>>> +        request->nb_sectors    = nb_sectors;
>>>>>> +        request->real_acb      = NULL;
>>>>>> +        QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
>>>>>> +    }
>>>>>> +
>>>>>> +    return acb;
>>>>>> +}
>>>>>> +
>>>>>> +static int qemu_block_queue_handler(BlockQueueAIOCB *request)
>>>>>> +{
>>>>>> +    int ret;
>>>>>> +    BlockDriverAIOCB *res;
>>>>>> +
>>>>>> +    res = request->handler(request->common.bs, request->sector_num,
>>>>>> +                           request->qiov, request->nb_sectors,
>>>>>> +                           qemu_block_queue_callback, request);
>>>>>> +    if (res) {
>>>>>> +        request->real_acb = res;
>>>>>> +    }
>>>>>> +
>>>>>> +    ret = (res == NULL) ? 0 : 1;
>>>>>> +
>>>>>> +    return ret;
>>>>>
>>>>> You mean return (res != NULL); and want to have bool as the return value
>>>>> of this function.
>>>> Yeah, thanks. i will modify as below:
>>>> ret = (res == NULL) ? false : true;
>>>
>>> ret = (res != NULL) is really more readable.
>> I have adopted Paolo's suggestion.
>>
>>>
>>>> and
>>>> static bool qemu_block_queue_handler()
>>>>
>>>>>
>>>>>> +}
>>>>>> +
>>>>>> +void qemu_block_queue_flush(BlockQueue *queue)
>>>>>> +{
>>>>>> +    queue->flushing = true;
>>>>>> +    while (!QTAILQ_EMPTY(&queue->requests)) {
>>>>>> +        BlockQueueAIOCB *request = NULL;
>>>>>> +        int ret = 0;
>>>>>> +
>>>>>> +        request = QTAILQ_FIRST(&queue->requests);
>>>>>> +        QTAILQ_REMOVE(&queue->requests, request, entry);
>>>>>> +
>>>>>> +        queue->req_failed = true;
>>>>>> +        ret = qemu_block_queue_handler(request);
>>>>>> +        if (ret == 0) {
>>>>>> +            QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
>>>>>> +            if (queue->req_failed) {
>>>>>> +                qemu_block_queue_callback(request, -EIO);
>>>>>> +                break;
>
> When a request has failed, you call its callback, but still leave it
> queued. I think this is wrong.
Right, we should not insert the failed request back to block queue.
>
>>>>>> +            }
>>>>>> +        }
>>>>>> +    }
>>>>>> +
>>>>>> +    queue->req_failed = true;
>>>>>> +    queue->flushing   = false;
>>>>>> +}
>>>>>> +
>>>>>> +bool qemu_block_queue_has_pending(BlockQueue *queue)
>>>>>> +{
>>>>>> +    return !queue->flushing && !QTAILQ_EMPTY(&queue->requests);
>>>>>> +}
>>>>>
>>>>> Why doesn't the queue have pending requests in the middle of a flush
>>>>> operation? (That is, the flush hasn't completed yet)
>>>> It is possible for the queue to have pending requests. if yes, how about?
>>>
>>> Sorry, can't parse this.
>>>
>>> I don't understand why the !queue->flushing part is correct.
>
> What about this?
When bdrv_aio_readv/writev handle one request, it will determine if
block queue is not being flushed and isn't NULL; if yes, It assume
that this request is one new request from upper layer, so it won't
determine if the I/O rate at runtime has exceeded the limits, but
immediately insert it into block queue.

>
> Kevin
>
Kevin Wolf - Oct. 18, 2011, 9:56 a.m.
Am 18.10.2011 11:29, schrieb Zhi Yong Wu:
>>>>>>> +void qemu_del_block_queue(BlockQueue *queue)
>>>>>>> +{
>>>>>>> +    BlockQueueAIOCB *request, *next;
>>>>>>> +
>>>>>>> +    QTAILQ_FOREACH_SAFE(request, &queue->requests, entry, next) {
>>>>>>> +        QTAILQ_REMOVE(&queue->requests, request, entry);
>>>>>>> +        qemu_aio_release(request);
>>>>>>> +    }
>>>>>>> +
>>>>>>> +    g_free(queue);
>>>>>>> +}
>>>>>>
>>>>>> Can we be sure that no AIO requests are in flight that still use the now
>>>>>> released AIOCB?
>>>>> Yeah, since qemu core code is serially performed, i think that when
>>>>> qemu_del_block_queue is performed, no requests are in flight. Right?
>>>>
>>>> Patch 2 has this code:
>>>>
>>>> +void bdrv_io_limits_disable(BlockDriverState *bs)
>>>> +{
>>>> +    bs->io_limits_enabled = false;
>>>> +
>>>> +    if (bs->block_queue) {
>>>> +        qemu_block_queue_flush(bs->block_queue);
>>>> +        qemu_del_block_queue(bs->block_queue);
>>>> +        bs->block_queue = NULL;
>>>> +    }
>>>>
>>>> Does this mean that you can't disable I/O limits while the VM is running?
>>> NO, you can even though VM is running.
>>
>> Okay, in general qemu_block_queue_flush() empties the queue so that
>> there are no requests left that qemu_del_block_queue() could drop from
>> the queue. So in the common case it doesn't even enter the FOREACH loop.
> I think that we should adopt !QTAILQ_EMPTY(&queue->requests), not
> QTAILQ_FOREACH_SAFE in qemu_del_block_queue(),
> right?

I think QTAILQ_FOREACH_SAFE is fine.

>>
>> I think problems start when requests have failed or exceeded the limit
>> again, then you have requests queued even after
>> qemu_block_queue_flush(). You must be aware of this, otherwise the code
>> in qemu_del_block_queue() wouldn't exist.
>>
>> But you can't release the ACBs without having called their callback,
>> otherwise the caller would still assume that its ACB pointer is valid.
>> Maybe calling the callback before releasing the ACB would be enough.
> Good, thanks.
>>>>
>>>>>>> +            }
>>>>>>> +        }
>>>>>>> +    }
>>>>>>> +
>>>>>>> +    queue->req_failed = true;
>>>>>>> +    queue->flushing   = false;
>>>>>>> +}
>>>>>>> +
>>>>>>> +bool qemu_block_queue_has_pending(BlockQueue *queue)
>>>>>>> +{
>>>>>>> +    return !queue->flushing && !QTAILQ_EMPTY(&queue->requests);
>>>>>>> +}
>>>>>>
>>>>>> Why doesn't the queue have pending requests in the middle of a flush
>>>>>> operation? (That is, the flush hasn't completed yet)
>>>>> It is possible for the queue to have pending requests. if yes, how about?
>>>>
>>>> Sorry, can't parse this.
>>>>
>>>> I don't understand why the !queue->flushing part is correct.
>>
>> What about this?
> When bdrv_aio_readv/writev handle one request, it will determine if
> block queue is not being flushed and isn't NULL; if yes, It assume
> that this request is one new request from upper layer, so it won't
> determine if the I/O rate at runtime has exceeded the limits, but
> immediately insert it into block queue.

Hm, I think I understand what you're saying, but only after looking at
patch 3. This is not really implementing a has_pending(), but
has_pending_and_caller_wasnt_called_during_flush(). I think it would be
better to handle the queue->flushing condition in the caller where its
use is more obvious.

Kevin
Zhiyong Wu - Oct. 18, 2011, 1:29 p.m.
On Tue, Oct 18, 2011 at 5:56 PM, Kevin Wolf <kwolf@redhat.com> wrote:
> Am 18.10.2011 11:29, schrieb Zhi Yong Wu:
>>>>>>>> +void qemu_del_block_queue(BlockQueue *queue)
>>>>>>>> +{
>>>>>>>> +    BlockQueueAIOCB *request, *next;
>>>>>>>> +
>>>>>>>> +    QTAILQ_FOREACH_SAFE(request, &queue->requests, entry, next) {
>>>>>>>> +        QTAILQ_REMOVE(&queue->requests, request, entry);
>>>>>>>> +        qemu_aio_release(request);
>>>>>>>> +    }
>>>>>>>> +
>>>>>>>> +    g_free(queue);
>>>>>>>> +}
>>>>>>>
>>>>>>> Can we be sure that no AIO requests are in flight that still use the now
>>>>>>> released AIOCB?
>>>>>> Yeah, since qemu core code is serially performed, i think that when
>>>>>> qemu_del_block_queue is performed, no requests are in flight. Right?
>>>>>
>>>>> Patch 2 has this code:
>>>>>
>>>>> +void bdrv_io_limits_disable(BlockDriverState *bs)
>>>>> +{
>>>>> +    bs->io_limits_enabled = false;
>>>>> +
>>>>> +    if (bs->block_queue) {
>>>>> +        qemu_block_queue_flush(bs->block_queue);
>>>>> +        qemu_del_block_queue(bs->block_queue);
>>>>> +        bs->block_queue = NULL;
>>>>> +    }
>>>>>
>>>>> Does this mean that you can't disable I/O limits while the VM is running?
>>>> NO, you can even though VM is running.
>>>
>>> Okay, in general qemu_block_queue_flush() empties the queue so that
>>> there are no requests left that qemu_del_block_queue() could drop from
>>> the queue. So in the common case it doesn't even enter the FOREACH loop.
>> I think that we should adopt !QTAILQ_EMPTY(&queue->requests), not
>> QTAILQ_FOREACH_SAFE in qemu_del_block_queue(),
>> right?
>
> I think QTAILQ_FOREACH_SAFE is fine.
>
>>>
>>> I think problems start when requests have failed or exceeded the limit
>>> again, then you have requests queued even after
>>> qemu_block_queue_flush(). You must be aware of this, otherwise the code
>>> in qemu_del_block_queue() wouldn't exist.
>>>
>>> But you can't release the ACBs without having called their callback,
>>> otherwise the caller would still assume that its ACB pointer is valid.
>>> Maybe calling the callback before releasing the ACB would be enough.
>> Good, thanks.
>>>>>
>>>>>>>> +            }
>>>>>>>> +        }
>>>>>>>> +    }
>>>>>>>> +
>>>>>>>> +    queue->req_failed = true;
>>>>>>>> +    queue->flushing   = false;
>>>>>>>> +}
>>>>>>>> +
>>>>>>>> +bool qemu_block_queue_has_pending(BlockQueue *queue)
>>>>>>>> +{
>>>>>>>> +    return !queue->flushing && !QTAILQ_EMPTY(&queue->requests);
>>>>>>>> +}
>>>>>>>
>>>>>>> Why doesn't the queue have pending requests in the middle of a flush
>>>>>>> operation? (That is, the flush hasn't completed yet)
>>>>>> It is possible for the queue to have pending requests. if yes, how about?
>>>>>
>>>>> Sorry, can't parse this.
>>>>>
>>>>> I don't understand why the !queue->flushing part is correct.
>>>
>>> What about this?
>> When bdrv_aio_readv/writev handle one request, it will determine if
>> block queue is not being flushed and isn't NULL; if yes, It assume
>> that this request is one new request from upper layer, so it won't
>> determine if the I/O rate at runtime has exceeded the limits, but
>> immediately insert it into block queue.
>
> Hm, I think I understand what you're saying, but only after looking at
> patch 3. This is not really implementing a has_pending(), but
> has_pending_and_caller_wasnt_called_during_flush(). I think it would be
Correct.
> better to handle the queue->flushing condition in the caller where its
> use is more obvious.
OK. i will do as this.
>
> Kevin
>

Patch

diff --git a/Makefile.objs b/Makefile.objs
index 26b885b..5dcf456 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -33,7 +33,7 @@  block-nested-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vv
 block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-cache.o
 block-nested-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
 block-nested-y += qed-check.o
-block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o
+block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o blk-queue.o
 block-nested-$(CONFIG_WIN32) += raw-win32.o
 block-nested-$(CONFIG_POSIX) += raw-posix.o
 block-nested-$(CONFIG_CURL) += curl.o
diff --git a/block/blk-queue.c b/block/blk-queue.c
new file mode 100644
index 0000000..adef497
--- /dev/null
+++ b/block/blk-queue.c
@@ -0,0 +1,201 @@ 
+/*
+ * QEMU System Emulator queue definition for block layer
+ *
+ * Copyright (c) IBM, Corp. 2011
+ *
+ * Authors:
+ *  Zhi Yong Wu  <wuzhy@linux.vnet.ibm.com>
+ *  Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#include "block_int.h"
+#include "block/blk-queue.h"
+#include "qemu-common.h"
+
+/* The APIs for block request queue on qemu block layer.
+ */
+
+struct BlockQueueAIOCB {
+    BlockDriverAIOCB common;
+    QTAILQ_ENTRY(BlockQueueAIOCB) entry;
+    BlockRequestHandler *handler;
+    BlockDriverAIOCB *real_acb;
+
+    int64_t sector_num;
+    QEMUIOVector *qiov;
+    int nb_sectors;
+};
+
+typedef struct BlockQueueAIOCB BlockQueueAIOCB;
+
+struct BlockQueue {
+    QTAILQ_HEAD(requests, BlockQueueAIOCB) requests;
+    bool req_failed;
+    bool flushing;
+};
+
+static void qemu_block_queue_dequeue(BlockQueue *queue,
+                                     BlockQueueAIOCB *request)
+{
+    BlockQueueAIOCB *req;
+
+    assert(queue);
+    while (!QTAILQ_EMPTY(&queue->requests)) {
+        req = QTAILQ_FIRST(&queue->requests);
+        if (req == request) {
+            QTAILQ_REMOVE(&queue->requests, req, entry);
+            break;
+        }
+    }
+}
+
+static void qemu_block_queue_cancel(BlockDriverAIOCB *acb)
+{
+    BlockQueueAIOCB *request = container_of(acb, BlockQueueAIOCB, common);
+    if (request->real_acb) {
+        bdrv_aio_cancel(request->real_acb);
+    } else {
+        assert(request->common.bs->block_queue);
+        qemu_block_queue_dequeue(request->common.bs->block_queue,
+                                 request);
+    }
+
+    qemu_aio_release(request);
+}
+
+static AIOPool block_queue_pool = {
+    .aiocb_size         = sizeof(struct BlockQueueAIOCB),
+    .cancel             = qemu_block_queue_cancel,
+};
+
+static void qemu_block_queue_callback(void *opaque, int ret)
+{
+    BlockQueueAIOCB *acb = opaque;
+
+    if (acb->common.cb) {
+        acb->common.cb(acb->common.opaque, ret);
+    }
+
+    qemu_aio_release(acb);
+}
+
+BlockQueue *qemu_new_block_queue(void)
+{
+    BlockQueue *queue;
+
+    queue = g_malloc0(sizeof(BlockQueue));
+
+    QTAILQ_INIT(&queue->requests);
+
+    queue->req_failed = true;
+    queue->flushing   = false;
+
+    return queue;
+}
+
+void qemu_del_block_queue(BlockQueue *queue)
+{
+    BlockQueueAIOCB *request, *next;
+
+    QTAILQ_FOREACH_SAFE(request, &queue->requests, entry, next) {
+        QTAILQ_REMOVE(&queue->requests, request, entry);
+        qemu_aio_release(request);
+    }
+
+    g_free(queue);
+}
+
+BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
+                        BlockDriverState *bs,
+                        BlockRequestHandler *handler,
+                        int64_t sector_num,
+                        QEMUIOVector *qiov,
+                        int nb_sectors,
+                        BlockDriverCompletionFunc *cb,
+                        void *opaque)
+{
+    BlockDriverAIOCB *acb;
+    BlockQueueAIOCB *request;
+
+    if (queue->flushing) {
+        queue->req_failed = false;
+        return NULL;
+    } else {
+        acb = qemu_aio_get(&block_queue_pool, bs,
+                           cb, opaque);
+        request = container_of(acb, BlockQueueAIOCB, common);
+        request->handler       = handler;
+        request->sector_num    = sector_num;
+        request->qiov          = qiov;
+        request->nb_sectors    = nb_sectors;
+        request->real_acb      = NULL;
+        QTAILQ_INSERT_TAIL(&queue->requests, request, entry);
+    }
+
+    return acb;
+}
+
+static int qemu_block_queue_handler(BlockQueueAIOCB *request)
+{
+    int ret;
+    BlockDriverAIOCB *res;
+
+    res = request->handler(request->common.bs, request->sector_num,
+                           request->qiov, request->nb_sectors,
+                           qemu_block_queue_callback, request);
+    if (res) {
+        request->real_acb = res;
+    }
+
+    ret = (res == NULL) ? 0 : 1;
+
+    return ret;
+}
+
+void qemu_block_queue_flush(BlockQueue *queue)
+{
+    queue->flushing = true;
+    while (!QTAILQ_EMPTY(&queue->requests)) {
+        BlockQueueAIOCB *request = NULL;
+        int ret = 0;
+
+        request = QTAILQ_FIRST(&queue->requests);
+        QTAILQ_REMOVE(&queue->requests, request, entry);
+
+        queue->req_failed = true;
+        ret = qemu_block_queue_handler(request);
+        if (ret == 0) {
+            QTAILQ_INSERT_HEAD(&queue->requests, request, entry);
+            if (queue->req_failed) {
+                qemu_block_queue_callback(request, -EIO);
+                break;
+            }
+        }
+    }
+
+    queue->req_failed = true;
+    queue->flushing   = false;
+}
+
+bool qemu_block_queue_has_pending(BlockQueue *queue)
+{
+    return !queue->flushing && !QTAILQ_EMPTY(&queue->requests);
+}
diff --git a/block/blk-queue.h b/block/blk-queue.h
new file mode 100644
index 0000000..c1529f7
--- /dev/null
+++ b/block/blk-queue.h
@@ -0,0 +1,59 @@ 
+/*
+ * QEMU System Emulator queue declaration for block layer
+ *
+ * Copyright (c) IBM, Corp. 2011
+ *
+ * Authors:
+ *  Zhi Yong Wu  <wuzhy@linux.vnet.ibm.com>
+ *  Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#ifndef QEMU_BLOCK_QUEUE_H
+#define QEMU_BLOCK_QUEUE_H
+
+#include "block.h"
+#include "qemu-queue.h"
+
+typedef BlockDriverAIOCB* (BlockRequestHandler) (BlockDriverState *bs,
+                                int64_t sector_num, QEMUIOVector *qiov,
+                                int nb_sectors, BlockDriverCompletionFunc *cb,
+                                void *opaque);
+
+typedef struct BlockQueue BlockQueue;
+
+BlockQueue *qemu_new_block_queue(void);
+
+void qemu_del_block_queue(BlockQueue *queue);
+
+BlockDriverAIOCB *qemu_block_queue_enqueue(BlockQueue *queue,
+                        BlockDriverState *bs,
+                        BlockRequestHandler *handler,
+                        int64_t sector_num,
+                        QEMUIOVector *qiov,
+                        int nb_sectors,
+                        BlockDriverCompletionFunc *cb,
+                        void *opaque);
+
+void qemu_block_queue_flush(BlockQueue *queue);
+
+bool qemu_block_queue_has_pending(BlockQueue *queue);
+
+#endif /* QEMU_BLOCK_QUEUE_H */
diff --git a/block_int.h b/block_int.h
index 8a72b80..201e635 100644
--- a/block_int.h
+++ b/block_int.h
@@ -29,10 +29,18 @@ 
 #include "qemu-queue.h"
 #include "qemu-coroutine.h"
 #include "qemu-timer.h"
+#include "block/blk-queue.h"
 
 #define BLOCK_FLAG_ENCRYPT	1
 #define BLOCK_FLAG_COMPAT6	4
 
+#define BLOCK_IO_LIMIT_READ     0
+#define BLOCK_IO_LIMIT_WRITE    1
+#define BLOCK_IO_LIMIT_TOTAL    2
+
+#define BLOCK_IO_SLICE_TIME     100000000
+#define NANOSECONDS_PER_SECOND  1000000000.0
+
 #define BLOCK_OPT_SIZE          "size"
 #define BLOCK_OPT_ENCRYPT       "encryption"
 #define BLOCK_OPT_COMPAT6       "compat6"
@@ -49,6 +57,16 @@  typedef struct AIOPool {
     BlockDriverAIOCB *free_aiocb;
 } AIOPool;
 
+typedef struct BlockIOLimit {
+    uint64_t bps[3];
+    uint64_t iops[3];
+} BlockIOLimit;
+
+typedef struct BlockIODisp {
+    uint64_t bytes[2];
+    uint64_t ios[2];
+} BlockIODisp;
+
 struct BlockDriver {
     const char *format_name;
     int instance_size;
@@ -184,6 +202,15 @@  struct BlockDriverState {
 
     void *sync_aiocb;
 
+    /* the time for latest disk I/O */
+    int64_t slice_start;
+    int64_t slice_end;
+    BlockIOLimit io_limits;
+    BlockIODisp  io_disps;
+    BlockQueue   *block_queue;
+    QEMUTimer    *block_timer;
+    bool         io_limits_enabled;
+
     /* I/O stats (display with "info blockstats"). */
     uint64_t nr_bytes[BDRV_MAX_IOTYPE];
     uint64_t nr_ops[BDRV_MAX_IOTYPE];