diff mbox

[v1,7/8] throttle: Add throttle group support

Message ID 1412688279-8312-8-git-send-email-benoit.canet@nodalink.com
State New
Headers show

Commit Message

Benoît Canet Oct. 7, 2014, 1:24 p.m. UTC
The throttle group support use a cooperative round robin scheduling algorithm.

The principle of the algorithm are simple:
- Each BDS of the group is used as a token in a circular way.
- The active BDS compute if a wait must be done and arm the right timer.
- If a wait must be done the token timer will be armed so the token will become
  the next active BDS.

Signed-off-by: Benoit Canet <benoit.canet@nodalink.com>
---
 block.c                   | 191 ++++++++++++++++++++++++++++++++++++++++------
 block/qapi.c              |   7 +-
 block/throttle-groups.c   |   2 +-
 blockdev.c                |  19 ++++-
 hmp.c                     |   4 +-
 include/block/block.h     |   3 +-
 include/block/block_int.h |   9 ++-
 qapi/block-core.json      |   5 +-
 qemu-options.hx           |   1 +
 qmp-commands.hx           |   3 +-
 10 files changed, 209 insertions(+), 35 deletions(-)

Comments

Fam Zheng Oct. 8, 2014, 6:53 a.m. UTC | #1
On Tue, 10/07 15:24, Benoît Canet wrote:
> The throttle group support use a cooperative round robin scheduling algorithm.
> 
> The principle of the algorithm are simple:

s/principle/principles/

> - Each BDS of the group is used as a token in a circular way.
> - The active BDS compute if a wait must be done and arm the right timer.
> - If a wait must be done the token timer will be armed so the token will become
>   the next active BDS.
> 
> Signed-off-by: Benoit Canet <benoit.canet@nodalink.com>
> ---
>  block.c                   | 191 ++++++++++++++++++++++++++++++++++++++++------
>  block/qapi.c              |   7 +-
>  block/throttle-groups.c   |   2 +-
>  blockdev.c                |  19 ++++-
>  hmp.c                     |   4 +-
>  include/block/block.h     |   3 +-
>  include/block/block_int.h |   9 ++-
>  qapi/block-core.json      |   5 +-
>  qemu-options.hx           |   1 +
>  qmp-commands.hx           |   3 +-
>  10 files changed, 209 insertions(+), 35 deletions(-)
> 
> diff --git a/block.c b/block.c
> index 527ea48..e7e5607 100644
> --- a/block.c
> +++ b/block.c
> @@ -36,6 +36,7 @@
>  #include "qmp-commands.h"
>  #include "qemu/timer.h"
>  #include "qapi-event.h"
> +#include "block/throttle-groups.h"
>  
>  #ifdef CONFIG_BSD
>  #include <sys/types.h>
> @@ -129,7 +130,9 @@ void bdrv_set_io_limits(BlockDriverState *bs,
>  {
>      int i;
>  
> -    throttle_config(&bs->throttle_state, &bs->throttle_timers, cfg);
> +    throttle_group_lock(bs->throttle_state);
> +    throttle_config(bs->throttle_state, &bs->throttle_timers, cfg);
> +    throttle_group_unlock(bs->throttle_state);
>  
>      for (i = 0; i < 2; i++) {
>          qemu_co_enter_next(&bs->throttled_reqs[i]);
> @@ -156,34 +159,99 @@ static bool bdrv_start_throttled_reqs(BlockDriverState *bs)
>      return drained;
>  }
>  
> +static void bdrv_throttle_group_add(BlockDriverState *bs)
> +{
> +    int i;
> +    BlockDriverState *token;
> +
> +    for (i = 0; i < 2; i++) {
> +        /* Get the BlockDriverState having the round robin token */
> +        token = throttle_group_token(bs->throttle_state, i);
> +
> +        /* If the ThrottleGroup is new set the current BlockDriverState as
> +         * token
> +         */
> +        if (!token) {
> +            throttle_group_set_token(bs->throttle_state, bs, i);
> +        }
> +
> +    }
> +
> +    throttle_group_register_bs(bs->throttle_state, bs);
> +}
> +
> +static void bdrv_throttle_group_remove(BlockDriverState *bs)
> +{
> +    BlockDriverState *token;
> +    int i;
> +
> +    for (i = 0; i < 2; i++) {
> +        /* Get the BlockDriverState having the round robin token */
> +        token = throttle_group_token(bs->throttle_state, i);
> +        /* if this bs is the current token set the next bs as token */
> +        if (token == bs) {
> +            token = throttle_group_next_bs(token);
> +            /* take care of the case where bs is the only bs of the group */
> +            if (token == bs) {
> +                token = NULL;
> +            }
> +            throttle_group_set_token(bs->throttle_state, token, i);
> +        }
> +    }
> +
> +    /* remove the current bs from the list */
> +    QLIST_REMOVE(bs, round_robin);
> +}
> +
>  void bdrv_io_limits_disable(BlockDriverState *bs)
>  {
> +
> +    throttle_group_lock(bs->throttle_state);
>      bs->io_limits_enabled = false;
> +    throttle_group_unlock(bs->throttle_state);
>  
>      bdrv_start_throttled_reqs(bs);
>  
> +    throttle_group_lock(bs->throttle_state);
> +    bdrv_throttle_group_remove(bs);
> +    throttle_group_unlock(bs->throttle_state);
> +
> +    throttle_group_unref(bs->throttle_state);
> +    bs->throttle_state = NULL;
> +
>      throttle_timers_destroy(&bs->throttle_timers);
>  }
>  
>  static void bdrv_throttle_read_timer_cb(void *opaque)
>  {
>      BlockDriverState *bs = opaque;
> -    throttle_timer_fired(&bs->throttle_state, false);
> +
> +    throttle_group_lock(bs->throttle_state);
> +    throttle_timer_fired(bs->throttle_state, false);
> +    throttle_group_unlock(bs->throttle_state);
> +
>      qemu_co_enter_next(&bs->throttled_reqs[0]);
>  }
>  
>  static void bdrv_throttle_write_timer_cb(void *opaque)
>  {
>      BlockDriverState *bs = opaque;
> -    throttle_timer_fired(&bs->throttle_state, true);
> +
> +    throttle_group_lock(bs->throttle_state);
> +    throttle_timer_fired(bs->throttle_state, true);
> +    throttle_group_unlock(bs->throttle_state);
> +
>      qemu_co_enter_next(&bs->throttled_reqs[1]);
>  }
>  
>  /* should be called before bdrv_set_io_limits if a limit is set */
> -void bdrv_io_limits_enable(BlockDriverState *bs)
> +void bdrv_io_limits_enable(BlockDriverState *bs, const char *group)

Does this mean that after this series, all the throttle_states must be
contained inside its own throttle group? If so, we could embed ThrottleGroup
fields in ThrottleState.

It's weird when a function called throttle_group_compare takes a parameter of
ThrottleState pointer, and cast it back to ThrottleGroup with container_of.

Fam
Benoît Canet Oct. 8, 2014, 11:05 a.m. UTC | #2
On Wed, Oct 08, 2014 at 02:53:38PM +0800, Fam Zheng wrote:
> 
> Does this mean that after this series, all the throttle_states must be
> contained inside its own throttle group? If so, we could embed ThrottleGroup
> fields in ThrottleState.
> 
> It's weird when a function called throttle_group_compare takes a parameter of
> ThrottleState pointer, and cast it back to ThrottleGroup with container_of.

It's done like this to fullfill a design goal: the throttle should be reusable
without the groups and any reference to block related stuff.
So it's just a way to split the responsabilities.

Best regards

Benoît

Thanks for reviewing.

> 
> Fam
Fam Zheng Oct. 9, 2014, 8:58 a.m. UTC | #3
On Wed, 10/08 11:05, Benoît Canet wrote:
> On Wed, Oct 08, 2014 at 02:53:38PM +0800, Fam Zheng wrote:
> > 
> > Does this mean that after this series, all the throttle_states must be
> > contained inside its own throttle group? If so, we could embed ThrottleGroup
> > fields in ThrottleState.
> > 
> > It's weird when a function called throttle_group_compare takes a parameter of
> > ThrottleState pointer, and cast it back to ThrottleGroup with container_of.
> 
> It's done like this to fullfill a design goal: the throttle should be reusable
> without the groups and any reference to block related stuff.
> So it's just a way to split the responsabilities.

I see.

Having both ThrottleGroup and ThrottleState interfaces is more complicated than
just use ThrottleGroup, where a one-member group is exactly the same as
ThrottleState.

Fam
Benoît Canet Oct. 10, 2014, 11:35 a.m. UTC | #4
On Thu, Oct 09, 2014 at 04:58:22PM +0800, Fam Zheng wrote:
> On Wed, 10/08 11:05, Benoît Canet wrote:
> > On Wed, Oct 08, 2014 at 02:53:38PM +0800, Fam Zheng wrote:
> > > 
> > > Does this mean that after this series, all the throttle_states must be
> > > contained inside its own throttle group? If so, we could embed ThrottleGroup
> > > fields in ThrottleState.
> > > 
> > > It's weird when a function called throttle_group_compare takes a parameter of
> > > ThrottleState pointer, and cast it back to ThrottleGroup with container_of.
> > 
> > It's done like this to fullfill a design goal: the throttle should be reusable
> > without the groups and any reference to block related stuff.
> > So it's just a way to split the responsabilities.
> 
> I see.
> 
> Having both ThrottleGroup and ThrottleState interfaces is more complicated than
> just use ThrottleGroup, where a one-member group is exactly the same as
> ThrottleState.

Here my interest conflict between a short term strategy (comply and getting these
patches merged asap) and the technical advantage of keeping ThrottleState and
ThrottleGroup separated.

orthogonality
-------------

An advantage to keep ThrottleState and ThrottleGroup separated is that
the two roles of implementing the throttle itself and implementing the behavior
of a group are keep separated.

As a result each piece of code is simpler to write, review and test.

genericity
----------

When writing the initial throttle code I carefully designed it to be independant
of BlockDriverState.

As a result the throttle code lives in util/ and is easilly reusable into whatever
we want (a device model or network throttling).

It is mean, designed and written to be as generic as possible.

So yes merging ThrottleState and ThrottleGroup would result in the seemlingly nice
thing that one structure is simpler than two structure.

But it would also imply the fact that I really hate that this pesky
BlockDriverState structure would become tied to ThrottleState.

If we do this ThrottleState would move out of util/ to block/ and we would
have lost the ability to reuse this infrastructure. It would be a short term gain
and a long term loss.

Another fact is that the rationale "1 structure is simpler than 2" is not alway
relevant: look for BlockBackend that we are desesperately trying to move out of
BlockDriverState.

Best regards

Benoît
> 
> Fam
diff mbox

Patch

diff --git a/block.c b/block.c
index 527ea48..e7e5607 100644
--- a/block.c
+++ b/block.c
@@ -36,6 +36,7 @@ 
 #include "qmp-commands.h"
 #include "qemu/timer.h"
 #include "qapi-event.h"
+#include "block/throttle-groups.h"
 
 #ifdef CONFIG_BSD
 #include <sys/types.h>
@@ -129,7 +130,9 @@  void bdrv_set_io_limits(BlockDriverState *bs,
 {
     int i;
 
-    throttle_config(&bs->throttle_state, &bs->throttle_timers, cfg);
+    throttle_group_lock(bs->throttle_state);
+    throttle_config(bs->throttle_state, &bs->throttle_timers, cfg);
+    throttle_group_unlock(bs->throttle_state);
 
     for (i = 0; i < 2; i++) {
         qemu_co_enter_next(&bs->throttled_reqs[i]);
@@ -156,34 +159,99 @@  static bool bdrv_start_throttled_reqs(BlockDriverState *bs)
     return drained;
 }
 
+static void bdrv_throttle_group_add(BlockDriverState *bs)
+{
+    int i;
+    BlockDriverState *token;
+
+    for (i = 0; i < 2; i++) {
+        /* Get the BlockDriverState having the round robin token */
+        token = throttle_group_token(bs->throttle_state, i);
+
+        /* If the ThrottleGroup is new set the current BlockDriverState as
+         * token
+         */
+        if (!token) {
+            throttle_group_set_token(bs->throttle_state, bs, i);
+        }
+
+    }
+
+    throttle_group_register_bs(bs->throttle_state, bs);
+}
+
+static void bdrv_throttle_group_remove(BlockDriverState *bs)
+{
+    BlockDriverState *token;
+    int i;
+
+    for (i = 0; i < 2; i++) {
+        /* Get the BlockDriverState having the round robin token */
+        token = throttle_group_token(bs->throttle_state, i);
+        /* if this bs is the current token set the next bs as token */
+        if (token == bs) {
+            token = throttle_group_next_bs(token);
+            /* take care of the case where bs is the only bs of the group */
+            if (token == bs) {
+                token = NULL;
+            }
+            throttle_group_set_token(bs->throttle_state, token, i);
+        }
+    }
+
+    /* remove the current bs from the list */
+    QLIST_REMOVE(bs, round_robin);
+}
+
 void bdrv_io_limits_disable(BlockDriverState *bs)
 {
+
+    throttle_group_lock(bs->throttle_state);
     bs->io_limits_enabled = false;
+    throttle_group_unlock(bs->throttle_state);
 
     bdrv_start_throttled_reqs(bs);
 
+    throttle_group_lock(bs->throttle_state);
+    bdrv_throttle_group_remove(bs);
+    throttle_group_unlock(bs->throttle_state);
+
+    throttle_group_unref(bs->throttle_state);
+    bs->throttle_state = NULL;
+
     throttle_timers_destroy(&bs->throttle_timers);
 }
 
 static void bdrv_throttle_read_timer_cb(void *opaque)
 {
     BlockDriverState *bs = opaque;
-    throttle_timer_fired(&bs->throttle_state, false);
+
+    throttle_group_lock(bs->throttle_state);
+    throttle_timer_fired(bs->throttle_state, false);
+    throttle_group_unlock(bs->throttle_state);
+
     qemu_co_enter_next(&bs->throttled_reqs[0]);
 }
 
 static void bdrv_throttle_write_timer_cb(void *opaque)
 {
     BlockDriverState *bs = opaque;
-    throttle_timer_fired(&bs->throttle_state, true);
+
+    throttle_group_lock(bs->throttle_state);
+    throttle_timer_fired(bs->throttle_state, true);
+    throttle_group_unlock(bs->throttle_state);
+
     qemu_co_enter_next(&bs->throttled_reqs[1]);
 }
 
 /* should be called before bdrv_set_io_limits if a limit is set */
-void bdrv_io_limits_enable(BlockDriverState *bs)
+void bdrv_io_limits_enable(BlockDriverState *bs, const char *group)
 {
     assert(!bs->io_limits_enabled);
-    throttle_init(&bs->throttle_state);
+    bs->throttle_state = throttle_group_incref(group ? group : bs->device_name);
+
+    throttle_group_lock(bs->throttle_state);
+    bdrv_throttle_group_add(bs);
     throttle_timers_init(&bs->throttle_timers,
                          bdrv_get_aio_context(bs),
                          QEMU_CLOCK_VIRTUAL,
@@ -191,6 +259,53 @@  void bdrv_io_limits_enable(BlockDriverState *bs)
                          bdrv_throttle_write_timer_cb,
                          bs);
     bs->io_limits_enabled = true;
+    throttle_group_unlock(bs->throttle_state);
+}
+
+void bdrv_io_limits_update_group(BlockDriverState *bs, const char *group)
+{
+    /* this bs is not part of any group */
+    if (!bs->throttle_state) {
+        return;
+    }
+
+    /* this bs is a part of the same group than the one we want */
+    if (throttle_group_compare(bs->throttle_state, group)) {
+        return;
+    }
+
+    /* need to change the group this bs belong to */
+    bdrv_io_limits_disable(bs);
+    bdrv_io_limits_enable(bs, group);
+}
+
+/* This implement the round robin policy and must be called under ThrottleGroup
+ * lock
+ */
+static BlockDriverState *bdrv_next_throttle_token(BlockDriverState *bs,
+                                                  bool is_write)
+{
+    BlockDriverState *token, *start;
+
+    start = token = throttle_group_token(bs->throttle_state, is_write);
+
+    /* get next bs round in round robin style */
+    token = throttle_group_next_bs(token);
+    while (token != start  &&
+           qemu_co_queue_empty(&token->throttled_reqs[is_write])) {
+        token = throttle_group_next_bs(token);
+    }
+
+    /* If no IO are queued for scheduling on the next round robin token
+     * then decide the token is the current bs because chances are
+     * the current bs get the current request queued.
+     */
+    if (token == start &&
+        qemu_co_queue_empty(&token->throttled_reqs[is_write])) {
+        token = bs;
+    }
+
+    return token;
 }
 
 /* This function makes an IO wait if needed
@@ -202,32 +317,63 @@  static void bdrv_io_limits_intercept(BlockDriverState *bs,
                                      unsigned int bytes,
                                      bool is_write)
 {
+    bool empty;
     bool armed;
+    bool token_queue_empty;
+    BlockDriverState *token;
 
+    throttle_group_lock(bs->throttle_state);
+    /* get the next bs to schedule */
+    token = bdrv_next_throttle_token(bs, is_write);
     /* does this io must wait */
-    bool must_wait = throttle_schedule_timer(&bs->throttle_state,
-                                             &bs->throttle_timers,
+    bool must_wait = throttle_schedule_timer(bs->throttle_state,
+                                             &token->throttle_timers,
                                              is_write,
                                              &armed);
+    /* the timer got armed -> save the token */
+    if (armed) {
+        throttle_group_set_token(bs->throttle_state, token, is_write);
+    }
+    empty = qemu_co_queue_empty(&bs->throttled_reqs[is_write]);
+    throttle_group_unlock(bs->throttle_state);
 
     /* if must wait or any request of this type throttled queue the IO */
-    if (must_wait ||
-        !qemu_co_queue_empty(&bs->throttled_reqs[is_write])) {
+    if (must_wait || !empty) {
         qemu_co_queue_wait(&bs->throttled_reqs[is_write]);
     }
 
-    /* the IO will be executed, do the accounting */
-    throttle_account(&bs->throttle_state, is_write, bytes);
-
+    throttle_group_lock(bs->throttle_state);
+    /* get the next bs to schedule */
+    token = bdrv_next_throttle_token(bs, is_write);
+    /* is there an IO to schedule in the round robin token ? */
+    token_queue_empty = qemu_co_queue_empty(&token->throttled_reqs[is_write]);
+    /* this IO will be executed, do the accounting */
+    throttle_account(bs->throttle_state, is_write, bytes);
+    /* does the next IO queued must wait ? */
+    must_wait = throttle_schedule_timer(bs->throttle_state,
+                                        &token->throttle_timers,
+                                        is_write,
+                                        &armed);
+    /* If a timer was armed or an IO is to be scheduled in the next round robin
+     * token then save the token.
+     */
+    if (armed || !token_queue_empty) {
+        throttle_group_set_token(bs->throttle_state, token, is_write);
+    }
 
     /* if the next request must wait -> do nothing */
-    if (throttle_schedule_timer(&bs->throttle_state, &bs->throttle_timers,
-                                is_write, &armed)) {
+    if (must_wait) {
+        throttle_group_unlock(bs->throttle_state);
         return;
     }
 
-    /* else queue next request for execution */
-    qemu_co_queue_next(&bs->throttled_reqs[is_write]);
+    /* else schedule next request for execution */
+    if (!qemu_co_queue_empty(&bs->throttled_reqs[is_write])) {
+        qemu_co_queue_next(&bs->throttled_reqs[is_write]);
+    } else if (!token_queue_empty) {
+        throttle_fire_timer(&token->throttle_timers, is_write);
+    }
+    throttle_group_unlock(bs->throttle_state);
 }
 
 size_t bdrv_opt_mem_align(BlockDriverState *bs)
@@ -1996,15 +2142,16 @@  static void bdrv_move_feature_fields(BlockDriverState *bs_dest,
     bs_dest->enable_write_cache = bs_src->enable_write_cache;
 
     /* i/o throttled req */
-    memcpy(&bs_dest->throttle_state,
-           &bs_src->throttle_state,
-           sizeof(ThrottleState));
+    bs_dest->throttle_state     = bs_src->throttle_state,
+    bs_dest->io_limits_enabled  = bs_src->io_limits_enabled;
+    bs_dest->throttled_reqs[0]  = bs_src->throttled_reqs[0];
+    bs_dest->throttled_reqs[1]  = bs_src->throttled_reqs[1];
+    memcpy(&bs_dest->round_robin,
+           &bs_src->round_robin,
+           sizeof(bs_dest->round_robin));
     memcpy(&bs_dest->throttle_timers,
            &bs_src->throttle_timers,
            sizeof(ThrottleTimers));
-    bs_dest->throttled_reqs[0]  = bs_src->throttled_reqs[0];
-    bs_dest->throttled_reqs[1]  = bs_src->throttled_reqs[1];
-    bs_dest->io_limits_enabled  = bs_src->io_limits_enabled;
 
     /* r/w error */
     bs_dest->on_read_error      = bs_src->on_read_error;
diff --git a/block/qapi.c b/block/qapi.c
index 9733ebd..4086055 100644
--- a/block/qapi.c
+++ b/block/qapi.c
@@ -24,6 +24,7 @@ 
 
 #include "block/qapi.h"
 #include "block/block_int.h"
+#include "block/throttle-groups.h"
 #include "qmp-commands.h"
 #include "qapi-visit.h"
 #include "qapi/qmp-output-visitor.h"
@@ -61,7 +62,11 @@  BlockDeviceInfo *bdrv_block_device_info(BlockDriverState *bs)
 
     if (bs->io_limits_enabled) {
         ThrottleConfig cfg;
-        throttle_get_config(&bs->throttle_state, &cfg);
+
+        throttle_group_lock(bs->throttle_state);
+        throttle_get_config(bs->throttle_state, &cfg);
+        throttle_group_unlock(bs->throttle_state);
+
         info->bps     = cfg.buckets[THROTTLE_BPS_TOTAL].avg;
         info->bps_rd  = cfg.buckets[THROTTLE_BPS_READ].avg;
         info->bps_wr  = cfg.buckets[THROTTLE_BPS_WRITE].avg;
diff --git a/block/throttle-groups.c b/block/throttle-groups.c
index ea5baca..399ae5e 100644
--- a/block/throttle-groups.c
+++ b/block/throttle-groups.c
@@ -154,7 +154,7 @@  void throttle_group_register_bs(ThrottleState *ts, BlockDriverState *bs)
  */
 BlockDriverState *throttle_group_next_bs(BlockDriverState *bs)
 {
-    ThrottleState *ts = &bs->throttle_state;
+    ThrottleState *ts = bs->throttle_state;
     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
     BlockDriverState *next = QLIST_NEXT(bs, round_robin);
 
diff --git a/blockdev.c b/blockdev.c
index e595910..415c18d 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -379,6 +379,7 @@  static DriveInfo *blockdev_init(const char *file, QDict *bs_opts,
     bool has_driver_specific_opts;
     BlockdevDetectZeroesOptions detect_zeroes;
     BlockDriver *drv = NULL;
+    const char *throttling_group;
 
     /* Check common options by copying from bs_opts to opts, all other options
      * stay in bs_opts for processing by bdrv_open(). */
@@ -481,6 +482,8 @@  static DriveInfo *blockdev_init(const char *file, QDict *bs_opts,
 
     cfg.op_size = qemu_opt_get_number(opts, "throttling.iops-size", 0);
 
+    throttling_group = qemu_opt_get(opts, "throttling.group");
+
     if (!check_throttle_config(&cfg, &error)) {
         error_propagate(errp, error);
         goto early_err;
@@ -535,7 +538,7 @@  static DriveInfo *blockdev_init(const char *file, QDict *bs_opts,
 
     /* disk I/O throttling */
     if (throttle_enabled(&cfg)) {
-        bdrv_io_limits_enable(bs);
+        bdrv_io_limits_enable(bs, throttling_group);
         bdrv_set_io_limits(bs, &cfg);
     }
 
@@ -742,6 +745,8 @@  DriveInfo *drive_new(QemuOpts *all_opts, BlockInterfaceType block_default_type)
 
         { "iops_size",      "throttling.iops-size" },
 
+        { "group",          "throttling.group" },
+
         { "readonly",       "read-only" },
     };
 
@@ -1762,7 +1767,9 @@  void qmp_block_set_io_throttle(const char *device, int64_t bps, int64_t bps_rd,
                                bool has_iops_wr_max,
                                int64_t iops_wr_max,
                                bool has_iops_size,
-                               int64_t iops_size, Error **errp)
+                               int64_t iops_size,
+                               bool has_group,
+                               const char *group, Error **errp)
 {
     ThrottleConfig cfg;
     BlockDriverState *bs;
@@ -1814,9 +1821,11 @@  void qmp_block_set_io_throttle(const char *device, int64_t bps, int64_t bps_rd,
     aio_context_acquire(aio_context);
 
     if (!bs->io_limits_enabled && throttle_enabled(&cfg)) {
-        bdrv_io_limits_enable(bs);
+        bdrv_io_limits_enable(bs, has_group ? group : NULL);
     } else if (bs->io_limits_enabled && !throttle_enabled(&cfg)) {
         bdrv_io_limits_disable(bs);
+    } else if (bs->io_limits_enabled && throttle_enabled(&cfg)) {
+        bdrv_io_limits_update_group(bs, has_group ? group : NULL);
     }
 
     if (bs->io_limits_enabled) {
@@ -2739,6 +2748,10 @@  QemuOptsList qemu_common_drive_opts = {
             .type = QEMU_OPT_NUMBER,
             .help = "when limiting by iops max size of an I/O in bytes",
         },{
+            .name = "throttling.group",
+            .type = QEMU_OPT_STRING,
+            .help = "name of the block throttling group",
+        },{
             .name = "copy-on-read",
             .type = QEMU_OPT_BOOL,
             .help = "copy read data from backing file into image file",
diff --git a/hmp.c b/hmp.c
index 63d7686..f2cd294 100644
--- a/hmp.c
+++ b/hmp.c
@@ -1167,7 +1167,9 @@  void hmp_block_set_io_throttle(Monitor *mon, const QDict *qdict)
                               false,
                               0,
                               false, /* No default I/O size */
-                              0, &err);
+                              0,
+                              false,
+                              NULL, &err);
     hmp_handle_error(mon, &err);
 }
 
diff --git a/include/block/block.h b/include/block/block.h
index 3318f0d..454d86a 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -191,8 +191,9 @@  void bdrv_stats_print(Monitor *mon, const QObject *data);
 void bdrv_info_stats(Monitor *mon, QObject **ret_data);
 
 /* disk I/O throttling */
-void bdrv_io_limits_enable(BlockDriverState *bs);
+void bdrv_io_limits_enable(BlockDriverState *bs, const char *group);
 void bdrv_io_limits_disable(BlockDriverState *bs);
+void bdrv_io_limits_update_group(BlockDriverState *bs, const char *group);
 
 void bdrv_init(void);
 void bdrv_init_with_whitelist(void);
diff --git a/include/block/block_int.h b/include/block/block_int.h
index 4b199f1..a4e73f6 100644
--- a/include/block/block_int.h
+++ b/include/block/block_int.h
@@ -354,12 +354,13 @@  struct BlockDriverState {
     /* number of in-flight serialising requests */
     unsigned int serialising_in_flight;
 
-    /* I/O throttling */
-    ThrottleState throttle_state;
-    ThrottleTimers throttle_timers; 
-    CoQueue      throttled_reqs[2];
+    /* I/O throttling - following elements protected by ThrottleGroup lock */
+    ThrottleState *throttle_state;
     bool         io_limits_enabled;
+    CoQueue      throttled_reqs[2];
     QLIST_ENTRY(BlockDriverState) round_robin;
+    /* timers have their own locking */
+    ThrottleTimers throttle_timers;
 
     /* I/O stats (display with "info blockstats"). */
     BlockAcctStats stats;
diff --git a/qapi/block-core.json b/qapi/block-core.json
index 8f7089e..e6c5264 100644
--- a/qapi/block-core.json
+++ b/qapi/block-core.json
@@ -895,6 +895,9 @@ 
 #
 # @iops_size: #optional an I/O size in bytes (Since 1.7)
 #
+#
+# @group: #optional throttle group name (Since 2.2)
+#
 # Returns: Nothing on success
 #          If @device is not a valid block device, DeviceNotFound
 #
@@ -906,7 +909,7 @@ 
             '*bps_max': 'int', '*bps_rd_max': 'int',
             '*bps_wr_max': 'int', '*iops_max': 'int',
             '*iops_rd_max': 'int', '*iops_wr_max': 'int',
-            '*iops_size': 'int' } }
+            '*iops_size': 'int', '*group': 'str' } }
 
 ##
 # @block-stream:
diff --git a/qemu-options.hx b/qemu-options.hx
index 365b56c..269ca6f 100644
--- a/qemu-options.hx
+++ b/qemu-options.hx
@@ -437,6 +437,7 @@  DEF("drive", HAS_ARG, QEMU_OPTION_drive,
     "       [[,bps_max=bm]|[[,bps_rd_max=rm][,bps_wr_max=wm]]]\n"
     "       [[,iops_max=im]|[[,iops_rd_max=irm][,iops_wr_max=iwm]]]\n"
     "       [[,iops_size=is]]\n"
+    "       [[,group=g]]\n"
     "                use 'file' as a drive image\n", QEMU_ARCH_ALL)
 STEXI
 @item -drive @var{option}[,@var{option}[,@var{option}[,...]]]
diff --git a/qmp-commands.hx b/qmp-commands.hx
index 1abd619..1f61359 100644
--- a/qmp-commands.hx
+++ b/qmp-commands.hx
@@ -1662,7 +1662,7 @@  EQMP
 
     {
         .name       = "block_set_io_throttle",
-        .args_type  = "device:B,bps:l,bps_rd:l,bps_wr:l,iops:l,iops_rd:l,iops_wr:l,bps_max:l?,bps_rd_max:l?,bps_wr_max:l?,iops_max:l?,iops_rd_max:l?,iops_wr_max:l?,iops_size:l?",
+        .args_type  = "device:B,bps:l,bps_rd:l,bps_wr:l,iops:l,iops_rd:l,iops_wr:l,bps_max:l?,bps_rd_max:l?,bps_wr_max:l?,iops_max:l?,iops_rd_max:l?,iops_wr_max:l?,iops_size:l?,group:s?",
         .mhandler.cmd_new = qmp_marshal_input_block_set_io_throttle,
     },
 
@@ -1688,6 +1688,7 @@  Arguments:
 - "iops_rd_max":  read I/O operations max (json-int)
 - "iops_wr_max":  write I/O operations max (json-int)
 - "iops_size":  I/O size in bytes when limiting (json-int)
+- "group": throttle group name (json-string)
 
 Example: