Patchwork [v10,2/3] block: add I/O throttling algorithm

login
register
mail settings
Submitter Zhi Yong Wu
Date Nov. 1, 2011, 7:40 a.m.
Message ID <1320133259-14219-3-git-send-email-wuzhy@linux.vnet.ibm.com>
Download mbox | patch
Permalink /patch/123037/
State New
Headers show

Comments

Zhi Yong Wu - Nov. 1, 2011, 7:40 a.m.
Signed-off-by: Zhi Yong Wu <wuzhy@linux.vnet.ibm.com>
---
 block.c               |  228 +++++++++++++++++++++++++++++++++++++++++++++++++
 block.h               |    1 +
 block_int.h           |    1 +
 qemu-coroutine-lock.c |    8 ++
 qemu-coroutine.h      |    6 ++
 5 files changed, 244 insertions(+), 0 deletions(-)
Stefan Hajnoczi - Nov. 1, 2011, 11:33 a.m.
On Tue, Nov 1, 2011 at 7:40 AM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
> +static void bdrv_io_limits_intercept(BlockDriverState *bs,
> +                                     int nb_sectors)
> +{
> +    int64_t wait_time = -1;
> +
> +    if (!qemu_co_queue_empty(&bs->throttled_reqs)) {
> +        qemu_co_queue_wait(&bs->throttled_reqs);
> +        goto resume;
> +    } else if (bdrv_exceed_io_limits(bs, nb_sectors, false, &wait_time)) {
> +        if (wait_time != -1) {
> +            qemu_mod_timer(bs->block_timer,
> +                           wait_time + qemu_get_clock_ns(vm_clock));
> +        }
> +
> +        qemu_co_queue_wait(&bs->throttled_reqs);
> +
> +resume:
> +        while (bdrv_exceed_io_limits(bs, nb_sectors, false, &wait_time)) {

is_write needs to be passed in to bdrv_exceed_io_limits().  Currently
this accounts every I/O as a read.

> +            qemu_mod_timer(bs->block_timer,
> +                           wait_time + qemu_get_clock_ns(vm_clock));

Do you need if (wait_time != -1) here?

Stefan
Zhiyong Wu - Nov. 1, 2011, 11:40 a.m.
On Tue, Nov 1, 2011 at 7:33 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
> On Tue, Nov 1, 2011 at 7:40 AM, Zhi Yong Wu <wuzhy@linux.vnet.ibm.com> wrote:
>> +static void bdrv_io_limits_intercept(BlockDriverState *bs,
>> +                                     int nb_sectors)
>> +{
>> +    int64_t wait_time = -1;
>> +
>> +    if (!qemu_co_queue_empty(&bs->throttled_reqs)) {
>> +        qemu_co_queue_wait(&bs->throttled_reqs);
>> +        goto resume;
>> +    } else if (bdrv_exceed_io_limits(bs, nb_sectors, false, &wait_time)) {
>> +        if (wait_time != -1) {
>> +            qemu_mod_timer(bs->block_timer,
>> +                           wait_time + qemu_get_clock_ns(vm_clock));
>> +        }
>> +
>> +        qemu_co_queue_wait(&bs->throttled_reqs);
>> +
>> +resume:
>> +        while (bdrv_exceed_io_limits(bs, nb_sectors, false, &wait_time)) {
>
> is_write needs to be passed in to bdrv_exceed_io_limits().  Currently
> this accounts every I/O as a read.
Sorry, It is one stupid error.

>
>> +            qemu_mod_timer(bs->block_timer,
>> +                           wait_time + qemu_get_clock_ns(vm_clock));
>
> Do you need if (wait_time != -1) here?
Actually i think that we can drop the condition in our code.

>
> Stefan
>

Patch

diff --git a/block.c b/block.c
index 8f08dc5..cb89372 100644
--- a/block.c
+++ b/block.c
@@ -74,6 +74,13 @@  static BlockDriverAIOCB *bdrv_co_aio_rw_vector(BlockDriverState *bs,
                                                bool is_write);
 static void coroutine_fn bdrv_co_do_rw(void *opaque);
 
+static bool bdrv_exceed_bps_limits(BlockDriverState *bs, int nb_sectors,
+        bool is_write, double elapsed_time, uint64_t *wait);
+static bool bdrv_exceed_iops_limits(BlockDriverState *bs, bool is_write,
+        double elapsed_time, uint64_t *wait);
+static bool bdrv_exceed_io_limits(BlockDriverState *bs, int nb_sectors,
+        bool is_write, int64_t *wait);
+
 static QTAILQ_HEAD(, BlockDriverState) bdrv_states =
     QTAILQ_HEAD_INITIALIZER(bdrv_states);
 
@@ -107,6 +114,28 @@  int is_windows_drive(const char *filename)
 #endif
 
 /* throttling disk I/O limits */
+void bdrv_io_limits_disable(BlockDriverState *bs)
+{
+    bs->io_limits_enabled = false;
+
+    if (!qemu_co_queue_empty(&bs->throttled_reqs)) {
+        while (qemu_co_queue_next(&bs->throttled_reqs));
+    }
+
+    qemu_co_queue_init(&bs->throttled_reqs);
+
+    if (bs->block_timer) {
+        qemu_del_timer(bs->block_timer);
+        qemu_free_timer(bs->block_timer);
+        bs->block_timer = NULL;
+    }
+
+    bs->slice_start = 0;
+    bs->slice_end   = 0;
+    bs->slice_time  = 0;
+    memset(&bs->io_disps, 0, sizeof(bs->io_disps));
+}
+
 static void bdrv_block_timer(void *opaque)
 {
     BlockDriverState *bs = opaque;
@@ -137,6 +166,33 @@  bool bdrv_io_limits_enabled(BlockDriverState *bs)
          || io_limits->iops[BLOCK_IO_LIMIT_TOTAL];
 }
 
+static void bdrv_io_limits_intercept(BlockDriverState *bs,
+                                     int nb_sectors)
+{
+    int64_t wait_time = -1;
+
+    if (!qemu_co_queue_empty(&bs->throttled_reqs)) {
+        qemu_co_queue_wait(&bs->throttled_reqs);
+        goto resume;
+    } else if (bdrv_exceed_io_limits(bs, nb_sectors, false, &wait_time)) {
+        if (wait_time != -1) {
+            qemu_mod_timer(bs->block_timer,
+                           wait_time + qemu_get_clock_ns(vm_clock));
+        }
+
+        qemu_co_queue_wait(&bs->throttled_reqs);
+
+resume:
+        while (bdrv_exceed_io_limits(bs, nb_sectors, false, &wait_time)) {
+            qemu_mod_timer(bs->block_timer,
+                           wait_time + qemu_get_clock_ns(vm_clock));
+            qemu_co_queue_wait_insert_head(&bs->throttled_reqs);
+        }
+
+        qemu_co_queue_next(&bs->throttled_reqs);
+    }
+}
+
 /* check if the path starts with "<protocol>:" */
 static int path_has_protocol(const char *path)
 {
@@ -719,6 +775,11 @@  int bdrv_open(BlockDriverState *bs, const char *filename, int flags,
         bdrv_dev_change_media_cb(bs, true);
     }
 
+    /* throttling disk I/O limits */
+    if (bs->io_limits_enabled) {
+        bdrv_io_limits_enable(bs);
+    }
+
     return 0;
 
 unlink_and_fail:
@@ -754,6 +815,9 @@  void bdrv_close(BlockDriverState *bs)
 
         bdrv_dev_change_media_cb(bs, false);
     }
+
+    /*throttling disk I/O limits*/
+    bdrv_io_limits_disable(bs);
 }
 
 void bdrv_close_all(void)
@@ -1292,6 +1356,11 @@  static int coroutine_fn bdrv_co_do_readv(BlockDriverState *bs,
         return -EIO;
     }
 
+    /* throttling disk read I/O */
+    if (bs->io_limits_enabled) {
+        bdrv_io_limits_intercept(bs, nb_sectors);
+    }
+
     return drv->bdrv_co_readv(bs, sector_num, nb_sectors, qiov);
 }
 
@@ -1322,6 +1391,11 @@  static int coroutine_fn bdrv_co_do_writev(BlockDriverState *bs,
         return -EIO;
     }
 
+    /* throttling disk write I/O */
+    if (bs->io_limits_enabled) {
+        bdrv_io_limits_intercept(bs, nb_sectors);
+    }
+
     ret = drv->bdrv_co_writev(bs, sector_num, nb_sectors, qiov);
 
     if (bs->dirty_bitmap) {
@@ -2513,6 +2587,160 @@  void bdrv_aio_cancel(BlockDriverAIOCB *acb)
     acb->pool->cancel(acb);
 }
 
+/* block I/O throttling */
+static bool bdrv_exceed_bps_limits(BlockDriverState *bs, int nb_sectors,
+                 bool is_write, double elapsed_time, uint64_t *wait) {
+    uint64_t bps_limit = 0;
+    double   bytes_limit, bytes_disp, bytes_res;
+    double   slice_time, wait_time;
+
+    if (bs->io_limits.bps[BLOCK_IO_LIMIT_TOTAL]) {
+        bps_limit = bs->io_limits.bps[BLOCK_IO_LIMIT_TOTAL];
+    } else if (bs->io_limits.bps[is_write]) {
+        bps_limit = bs->io_limits.bps[is_write];
+    } else {
+        if (wait) {
+            *wait = 0;
+        }
+
+        return false;
+    }
+
+    slice_time = bs->slice_end - bs->slice_start;
+    slice_time /= (NANOSECONDS_PER_SECOND);
+    bytes_limit = bps_limit * slice_time;
+    bytes_disp  = bs->nr_bytes[is_write] - bs->io_disps.bytes[is_write];
+    if (bs->io_limits.bps[BLOCK_IO_LIMIT_TOTAL]) {
+        bytes_disp += bs->nr_bytes[!is_write] - bs->io_disps.bytes[!is_write];
+    }
+
+    bytes_res   = (unsigned) nb_sectors * BDRV_SECTOR_SIZE;
+
+    if (bytes_disp + bytes_res <= bytes_limit) {
+        if (wait) {
+            *wait = 0;
+        }
+
+        return false;
+    }
+
+    /* Calc approx time to dispatch */
+    wait_time = (bytes_disp + bytes_res) / bps_limit - elapsed_time;
+
+    bs->slice_time = wait_time * BLOCK_IO_SLICE_TIME * 10;
+    bs->slice_end += bs->slice_time - 3 * BLOCK_IO_SLICE_TIME;
+    if (wait) {
+        *wait = wait_time * BLOCK_IO_SLICE_TIME * 10;
+    }
+
+    return true;
+}
+
+static bool bdrv_exceed_iops_limits(BlockDriverState *bs, bool is_write,
+                             double elapsed_time, uint64_t *wait) {
+    uint64_t iops_limit = 0;
+    double   ios_limit, ios_disp;
+    double   slice_time, wait_time;
+
+    if (bs->io_limits.iops[BLOCK_IO_LIMIT_TOTAL]) {
+        iops_limit = bs->io_limits.iops[BLOCK_IO_LIMIT_TOTAL];
+    } else if (bs->io_limits.iops[is_write]) {
+        iops_limit = bs->io_limits.iops[is_write];
+    } else {
+        if (wait) {
+            *wait = 0;
+        }
+
+        return false;
+    }
+
+    slice_time = bs->slice_end - bs->slice_start;
+    slice_time /= (NANOSECONDS_PER_SECOND);
+    ios_limit  = iops_limit * slice_time;
+    ios_disp   = bs->nr_ops[is_write] - bs->io_disps.ios[is_write];
+    if (bs->io_limits.iops[BLOCK_IO_LIMIT_TOTAL]) {
+        ios_disp += bs->nr_ops[!is_write] - bs->io_disps.ios[!is_write];
+    }
+
+    if (ios_disp + 1 <= ios_limit) {
+        if (wait) {
+            *wait = 0;
+        }
+
+        return false;
+    }
+
+    /* Calc approx time to dispatch */
+    wait_time = (ios_disp + 1) / iops_limit;
+    if (wait_time > elapsed_time) {
+        wait_time = wait_time - elapsed_time;
+    } else {
+        wait_time = 0;
+    }
+
+    bs->slice_time = wait_time * BLOCK_IO_SLICE_TIME * 10;
+    bs->slice_end += bs->slice_time - 3 * BLOCK_IO_SLICE_TIME;
+    if (wait) {
+        *wait = wait_time * BLOCK_IO_SLICE_TIME * 10;
+    }
+
+    return true;
+}
+
+static bool bdrv_exceed_io_limits(BlockDriverState *bs, int nb_sectors,
+                           bool is_write, int64_t *wait) {
+    int64_t  now, max_wait;
+    uint64_t bps_wait = 0, iops_wait = 0;
+    double   elapsed_time;
+    int      bps_ret, iops_ret;
+
+    now = qemu_get_clock_ns(vm_clock);
+    if ((bs->slice_start < now)
+        && (bs->slice_end > now)) {
+        bs->slice_end = now + bs->slice_time;
+    } else {
+        bs->slice_time  =  5 * BLOCK_IO_SLICE_TIME;
+        bs->slice_start = now;
+        bs->slice_end   = now + bs->slice_time;
+
+        bs->io_disps.bytes[is_write]  = bs->nr_bytes[is_write];
+        bs->io_disps.bytes[!is_write] = bs->nr_bytes[!is_write];
+
+        bs->io_disps.ios[is_write]    = bs->nr_ops[is_write];
+        bs->io_disps.ios[!is_write]   = bs->nr_ops[!is_write];
+    }
+
+    elapsed_time  = now - bs->slice_start;
+    elapsed_time  /= (NANOSECONDS_PER_SECOND);
+
+    bps_ret  = bdrv_exceed_bps_limits(bs, nb_sectors,
+                                      is_write, elapsed_time, &bps_wait);
+    iops_ret = bdrv_exceed_iops_limits(bs, is_write,
+                                      elapsed_time, &iops_wait);
+    if (bps_ret || iops_ret) {
+        max_wait = bps_wait > iops_wait ? bps_wait : iops_wait;
+        if (!qemu_co_queue_empty(&bs->throttled_reqs)) {
+            max_wait = -1;
+        }
+
+        if (wait) {
+            *wait = max_wait;
+        }
+
+        now = qemu_get_clock_ns(vm_clock);
+        if (bs->slice_end < now + max_wait) {
+            bs->slice_end = now + max_wait;
+        }
+
+        return true;
+    }
+
+    if (wait) {
+        *wait = 0;
+    }
+
+    return false;
+}
 
 /**************************************************************/
 /* async block device emulation */
diff --git a/block.h b/block.h
index bc8315d..9b5b35f 100644
--- a/block.h
+++ b/block.h
@@ -91,6 +91,7 @@  void bdrv_info_stats(Monitor *mon, QObject **ret_data);
 
 /* disk I/O throttling */
 void bdrv_io_limits_enable(BlockDriverState *bs);
+void bdrv_io_limits_disable(BlockDriverState *bs);
 bool bdrv_io_limits_enabled(BlockDriverState *bs);
 
 void bdrv_init(void);
diff --git a/block_int.h b/block_int.h
index b835ef6..fc53f5b 100644
--- a/block_int.h
+++ b/block_int.h
@@ -39,6 +39,7 @@ 
 #define BLOCK_IO_LIMIT_TOTAL    2
 
 #define BLOCK_IO_SLICE_TIME     100000000
+#define NANOSECONDS_PER_SECOND  1000000000.0
 
 #define BLOCK_OPT_SIZE          "size"
 #define BLOCK_OPT_ENCRYPT       "encryption"
diff --git a/qemu-coroutine-lock.c b/qemu-coroutine-lock.c
index 6b58160..9549c07 100644
--- a/qemu-coroutine-lock.c
+++ b/qemu-coroutine-lock.c
@@ -61,6 +61,14 @@  void coroutine_fn qemu_co_queue_wait(CoQueue *queue)
     assert(qemu_in_coroutine());
 }
 
+void coroutine_fn qemu_co_queue_wait_insert_head(CoQueue *queue)
+{
+    Coroutine *self = qemu_coroutine_self();
+    QTAILQ_INSERT_HEAD(&queue->entries, self, co_queue_next);
+    qemu_coroutine_yield();
+    assert(qemu_in_coroutine());
+}
+
 bool qemu_co_queue_next(CoQueue *queue)
 {
     Coroutine *next;
diff --git a/qemu-coroutine.h b/qemu-coroutine.h
index b8fc4f4..8a2e5d2 100644
--- a/qemu-coroutine.h
+++ b/qemu-coroutine.h
@@ -118,6 +118,12 @@  void qemu_co_queue_init(CoQueue *queue);
 void coroutine_fn qemu_co_queue_wait(CoQueue *queue);
 
 /**
+ * Adds the current coroutine to the head of the CoQueue and transfers control to the
+ * caller of the coroutine.
+ */
+void coroutine_fn qemu_co_queue_wait_insert_head(CoQueue *queue);
+
+/**
  * Restarts the next coroutine in the CoQueue and removes it from the queue.
  *
  * Returns true if a coroutine was restarted, false if the queue is empty.