Message ID | 1376326396-7676-2-git-send-email-benoit@irqsave.net |
---|---|
State | New |
Headers | show |
On Mon, 08/12 18:53, Benoît Canet wrote: > Implement the continuous leaky bucket algorithm devised on IRC as a separate > module. > > Signed-off-by: Benoit Canet <benoit@irqsave.net> > --- > include/qemu/throttle.h | 105 +++++++++++++ > util/Makefile.objs | 1 + > util/throttle.c | 391 +++++++++++++++++++++++++++++++++++++++++++++++ > 3 files changed, 497 insertions(+) > create mode 100644 include/qemu/throttle.h > create mode 100644 util/throttle.c > > diff --git a/include/qemu/throttle.h b/include/qemu/throttle.h > new file mode 100644 > index 0000000..e03bc3e > --- /dev/null > +++ b/include/qemu/throttle.h > @@ -0,0 +1,105 @@ > +/* > + * QEMU throttling infrastructure > + * > + * Copyright (C) Nodalink, SARL. 2013 > + * > + * Author: > + * Benoît Canet <benoit.canet@irqsave.net> > + * > + * This program is free software; you can redistribute it and/or > + * modify it under the terms of the GNU General Public License as > + * published by the Free Software Foundation; either version 2 or > + * (at your option) version 3 of the License. > + * > + * This program is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > + * GNU General Public License for more details. > + * > + * You should have received a copy of the GNU General Public License > + * along with this program; if not, see <http://www.gnu.org/licenses/>. > + */ > + > +#ifndef THROTTLING_H > +#define THROTTLING_H > + > +#include <stdint.h> > +#include "qemu-common.h" > +#include "qemu/timer.h" > + > +#define NANOSECONDS_PER_SECOND 1000000000.0 > + > +#define BUCKETS_COUNT 6 > + > +typedef enum { > + THROTTLE_BPS_TOTAL = 0, > + THROTTLE_BPS_READ = 1, > + THROTTLE_BPS_WRITE = 2, > + THROTTLE_OPS_TOTAL = 3, > + THROTTLE_OPS_READ = 4, > + THROTTLE_OPS_WRITE = 5, > +} BucketType; > + > +typedef struct LeakyBucket { > + double ups; /* units per second */ > + double max; /* leaky bucket max in units */ > + double bucket; /* bucket in units */ > +} LeakyBucket; > + > +/* The following structure is used to configure a ThrottleState > + * It contains a bit of state: the bucket field of the LeakyBucket structure. > + * However it allows to keep the code clean and the bucket field is reset to > + * zero at the right time. > + */ > +typedef struct ThrottleConfig { > + LeakyBucket buckets[6]; /* leaky buckets */ > + uint64_t unit_size; /* size of an unit in bytes */ > + uint64_t op_size; /* size of an operation in units */ > +} ThrottleConfig; > + > +typedef struct ThrottleState { > + ThrottleConfig cfg; /* configuration */ > + int64_t previous_leak; /* timestamp of the last leak done */ > + QEMUTimer * timers[2]; /* timers used to do the throttling */ > + QEMUClock *clock; /* the clock used */ > +} ThrottleState; > + > +/* operations on single leaky buckets */ > +void throttle_leak_bucket(LeakyBucket *bkt, int64_t delta); > + > +int64_t throttle_compute_wait(LeakyBucket *bkt); > + > +/* expose timer computation function for unit tests */ > +bool throttle_compute_timer(ThrottleState *ts, > + bool is_write, > + int64_t now, > + int64_t *next_timer); > + > +/* init/destroy cycle */ > +void throttle_init(ThrottleState *ts, > + QEMUClock *clock, > + void (read_timer)(void *), > + void (write_timer)(void *), > + void *timer_opaque); > + > +void throttle_destroy(ThrottleState *ts); > + > +bool throttle_have_timer(ThrottleState *ts); > + > +/* configuration */ > +bool throttle_enabled(ThrottleConfig *cfg); > + > +bool throttle_conflicting(ThrottleConfig *cfg); > + > +bool throttle_is_valid(ThrottleConfig *cfg); > + > +void throttle_config(ThrottleState *ts, ThrottleConfig *cfg); > + > +void throttle_get_config(ThrottleState *ts, ThrottleConfig *cfg); > + > +/* usage */ > +bool throttle_allowed(ThrottleState *ts, bool is_write); > + > +void throttle_account(ThrottleState *ts, bool is_write, uint64_t size); > + > +#endif > diff --git a/util/Makefile.objs b/util/Makefile.objs > index dc72ab0..2bb13a2 100644 > --- a/util/Makefile.objs > +++ b/util/Makefile.objs > @@ -11,3 +11,4 @@ util-obj-y += iov.o aes.o qemu-config.o qemu-sockets.o uri.o notify.o > util-obj-y += qemu-option.o qemu-progress.o > util-obj-y += hexdump.o > util-obj-y += crc32c.o > +util-obj-y += throttle.o > diff --git a/util/throttle.c b/util/throttle.c > new file mode 100644 > index 0000000..2f25d44 > --- /dev/null > +++ b/util/throttle.c > @@ -0,0 +1,391 @@ > +/* > + * QEMU throttling infrastructure > + * > + * Copyright (C) Nodalink, SARL. 2013 > + * > + * Author: > + * Benoît Canet <benoit.canet@irqsave.net> > + * > + * This program is free software; you can redistribute it and/or > + * modify it under the terms of the GNU General Public License as > + * published by the Free Software Foundation; either version 2 or > + * (at your option) version 3 of the License. > + * > + * This program is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > + * GNU General Public License for more details. > + * > + * You should have received a copy of the GNU General Public License > + * along with this program; if not, see <http://www.gnu.org/licenses/>. > + */ > + > +#include "qemu/throttle.h" > +#include "qemu/timer.h" > + > +/* This function make a bucket leak > + * > + * @bkt: the bucket to make leak > + * @delta: the time delta > + */ > +void throttle_leak_bucket(LeakyBucket *bkt, int64_t delta) > +{ > + double leak; > + > + /* compute how much to leak */ > + leak = (bkt->ups * (double) delta) / NANOSECONDS_PER_SECOND; > + > + /* make the bucket leak */ > + bkt->bucket = MAX(bkt->bucket - leak, 0); > +} > + > +/* Calculate the time delta since last leak and make proportionals leaks > + * > + * @now: the current timestamp in ns > + */ > +static void throttle_do_leak(ThrottleState *ts, int64_t now) > +{ > + /* compute the time elapsed since the last leak */ > + int64_t delta = now - ts->previous_leak; > + int i; > + > + ts->previous_leak = now; > + > + if (delta <= 0) { > + return; > + } > + > + /* make each bucket leak */ > + for (i = 0; i < BUCKETS_COUNT; i++) { > + throttle_leak_bucket(&ts->cfg.buckets[i], delta); > + } > +} > + > +/* do the real job of computing the time to wait > + * > + * @limit: the throttling limit > + * @extra: the number of operation to delay > + * @ret: the time to wait in ns > + */ > +static int64_t throttle_do_compute_wait(double limit, double extra) > +{ > + double wait = extra * NANOSECONDS_PER_SECOND; > + wait /= limit; > + return wait; > +} > + > +/* This function compute the wait time in ns that a leaky bucket should trigger > + * > + * @bkt: the leaky bucket we operate on > + * @ret: the resulting wait time in ns or 0 if the operation can go through > + */ > +int64_t throttle_compute_wait(LeakyBucket *bkt) > +{ > + double extra; /* the number of extra units blocking the io */ > + > + if (!bkt->ups) { > + return 0; > + } > + > + extra = bkt->bucket - bkt->max; > + > + if (extra <= 0) { > + return 0; > + } > + > + return throttle_do_compute_wait(bkt->ups, extra); > +} > + > +/* This function compute the time that must be waited while this IO > + * > + * @is_write: true if the current IO is a write, false if it's a read > + * @ret: time to wait > + */ > +static int64_t throttle_compute_wait_for(ThrottleState *ts, > + bool is_write, > + int64_t now) Parameter "now" not used. > +{ > + BucketType to_check[2][4] = { {THROTTLE_BPS_TOTAL, > + THROTTLE_OPS_TOTAL, > + THROTTLE_BPS_READ, > + THROTTLE_OPS_READ}, > + {THROTTLE_BPS_TOTAL, > + THROTTLE_OPS_TOTAL, > + THROTTLE_BPS_WRITE, > + THROTTLE_OPS_WRITE}, }; > + int64_t wait, max_wait = 0; > + int i; > + > + for (i = 0; i < 4; i++) { > + BucketType index = to_check[is_write][i]; > + wait = throttle_compute_wait(&ts->cfg.buckets[index]); > + if (wait > max_wait) { > + max_wait = wait; > + } > + } > + > + return max_wait; > +} > + > +/* compute the timer for this type of operation > + * > + * @is_write: the type of operation > + * @now: the current clock timerstamp s/timerstamp/timestamp/ > + * @next_timer: the resulting timer A bit confusing with the name and description. This is actually a computed timestamp for next triggering of the timer, right? > + * @ret: true if a timer must be set > + */ > +bool throttle_compute_timer(ThrottleState *ts, > + bool is_write, > + int64_t now, > + int64_t *next_timer) > +{ > + int64_t wait; > + > + /* leak proportionally to the time elapsed */ > + throttle_do_leak(ts, now); > + > + /* compute the wait time if any */ > + wait = throttle_compute_wait_for(ts, is_write, now); > + > + /* if the code must wait compute when the next timer should fire */ > + if (wait) { > + *next_timer = now + wait; > + return true; > + } > + > + /* else no need to wait at all */ > + *next_timer = now; > + return false; > +} > + > +/* To be called first on the ThrottleState */ > +void throttle_init(ThrottleState *ts, > + QEMUClock *clock, > + void (read_timer)(void *), There's a type name for this: QEMUTimerCB * read_timer_cb, > + void (write_timer)(void *), QEMUTimerCB * write_timer_cb, > + void *timer_opaque) > +{ > + memset(ts, 0, sizeof(ThrottleState)); > + > + ts->clock = clock; > + ts->timers[0] = qemu_new_timer_ns(ts->clock, read_timer, timer_opaque); > + ts->timers[1] = qemu_new_timer_ns(ts->clock, write_timer, timer_opaque); > +} > + > +/* destroy a timer */ > +static void throttle_timer_destroy(QEMUTimer **timer) > +{ > + assert(*timer != NULL); > + > + if (qemu_timer_pending(*timer)) { > + qemu_del_timer(*timer); > + } > + > + qemu_free_timer(*timer); > + *timer = NULL; > +} > + > +/* To be called last on the ThrottleState */ > +void throttle_destroy(ThrottleState *ts) > +{ > + int i; > + > + for (i = 0; i < 2; i++) { > + throttle_timer_destroy(&ts->timers[i]); > + } > +} > + > +/* is any throttling timer configured */ > +bool throttle_have_timer(ThrottleState *ts) > +{ > + if (ts->timers[0]) { > + return true; > + } > + > + return false; > +} > + > +/* Does any throttling must be done > + * > + * @cfg: the throttling configuration to inspect > + * @ret: true if throttling must be done else false > + */ > +bool throttle_enabled(ThrottleConfig *cfg) > +{ > + int i; > + > + for (i = 0; i < BUCKETS_COUNT; i++) { > + if (cfg->buckets[i].ups > 0) { > + return true; > + } > + } > + > + return false; > +} > + > +/* return true if any two throttling parameters conflicts > + * > + * @cfg: the throttling configuration to inspect > + * @ret: true if any conflict detected else false > + */ > +bool throttle_conflicting(ThrottleConfig *cfg) > +{ > + bool bps_flag, ops_flag; > + bool bps_max_flag, ops_max_flag; > + > + bps_flag = cfg->buckets[THROTTLE_BPS_TOTAL].ups && > + (cfg->buckets[THROTTLE_BPS_READ].ups || > + cfg->buckets[THROTTLE_BPS_WRITE].ups); > + > + ops_flag = cfg->buckets[THROTTLE_OPS_TOTAL].ups && > + (cfg->buckets[THROTTLE_OPS_READ].ups || > + cfg->buckets[THROTTLE_OPS_WRITE].ups); > + > + bps_max_flag = cfg->buckets[THROTTLE_BPS_TOTAL].max && > + (cfg->buckets[THROTTLE_BPS_READ].max || > + cfg->buckets[THROTTLE_BPS_WRITE].max); > + > + ops_max_flag = cfg->buckets[THROTTLE_OPS_TOTAL].max && > + (cfg->buckets[THROTTLE_OPS_READ].max || > + cfg->buckets[THROTTLE_OPS_WRITE].max); > + > + return bps_flag || ops_flag || bps_max_flag || ops_max_flag; > +} > + > +/* check if a throttling configuration is valid > + * @cfg: the throttling configuration to inspect > + * @ret: true if valid else false > + */ > +bool throttle_is_valid(ThrottleConfig *cfg) > +{ > + bool invalid = false; > + int i; > + > + for (i = 0; i < BUCKETS_COUNT; i++) { > + if (cfg->buckets[i].ups < 0) { > + invalid = true; > + } > + } > + > + for (i = 0; i < BUCKETS_COUNT; i++) { > + if (cfg->buckets[i].max < 0) { > + invalid = true; > + } > + } > + > + return !invalid; > +} > + > +/* fix bucket parameters */ > +static void throttle_fix_bucket(LeakyBucket *bkt) > +{ > + double min = bkt->ups / 10; > + /* zero bucket level */ > + bkt->bucket = 0; > + > + /* take care of not using cpu and also improve throttling precision */ > + if (bkt->ups && > + bkt->max < min) { > + bkt->max = min; > + } > +} > + > +/* take care of canceling a timer */ > +static void throttle_cancel_timer(QEMUTimer *timer) > +{ > + assert(timer != NULL); > + if (!qemu_timer_pending(timer)) { > + return; > + } > + > + qemu_del_timer(timer); > +} > + > +/* Used to configure the throttle > + * > + * @ts: the throttle state we are working on > + * @cfg: the config to set > + */ > +void throttle_config(ThrottleState *ts, ThrottleConfig *cfg) > +{ > + int i; > + > + ts->cfg = *cfg; > + > + for (i = 0; i < BUCKETS_COUNT; i++) { > + throttle_fix_bucket(&ts->cfg.buckets[i]); > + } > + > + ts->previous_leak = qemu_get_clock_ns(ts->clock); > + > + for (i = 0; i < 2; i++) { > + throttle_cancel_timer(ts->timers[i]); > + } > +} > + > +/* used to get config > + * > + * @ts: the throttle state we are working on > + * @cfg: where to write the config > + */ > +void throttle_get_config(ThrottleState *ts, ThrottleConfig *cfg) > +{ > + *cfg = ts->cfg; Any reason to copy the structure, instead of using the reference? > +} > + > + > +/* compute if an operation must be allowed and set a timer if not > + * > + * NOTE: this function is not unit tested due to it's usage of qemu_mod_timer > + * > + * @is_write: the type of operation (read/write) > + * @ret: true if the operation is allowed to flow else if must wait > + */ > +bool throttle_allowed(ThrottleState *ts, bool is_write) The function name sounds like a immutable check operation, but it actually reschedules timer. This is not a good style. > +{ > + int64_t now = qemu_get_clock_ns(ts->clock); > + int64_t next_timer; > + bool must_wait; > + > + must_wait = throttle_compute_timer(ts, > + is_write, > + now, > + &next_timer); > + > + /* if the request is throttled arm timer */ > + if (must_wait) { > + qemu_mod_timer(ts->timers[is_write], next_timer); > + } > + > + return !must_wait; > +} > + > +/* do the accounting for this operation > + * > + * @is_write: the type of operation (read/write) > + * size: the size of the operation > + */ > +void throttle_account(ThrottleState *ts, bool is_write, uint64_t size) > +{ > + double bytes_size; > + > + /* if cfg.op_size is not defined we will acccount exactly 1 operation */ > + double units = 1.0; > + if (ts->cfg.op_size) { > + units = (double) size / ts->cfg.op_size; > + } > + > + bytes_size = size * ts->cfg.unit_size; > + > + ts->cfg.buckets[THROTTLE_BPS_TOTAL].bucket += bytes_size; > + ts->cfg.buckets[THROTTLE_OPS_TOTAL].bucket += units; > + > + if (is_write) { > + ts->cfg.buckets[THROTTLE_BPS_WRITE].bucket += bytes_size; > + ts->cfg.buckets[THROTTLE_OPS_WRITE].bucket += units; > + } else { > + ts->cfg.buckets[THROTTLE_BPS_READ].bucket += bytes_size; > + ts->cfg.buckets[THROTTLE_OPS_READ].bucket += units; > + } > +} > + > -- > 1.7.10.4 > >
On Mon, Aug 12, 2013 at 06:53:12PM +0200, Benoît Canet wrote: > +#ifndef THROTTLING_H > +#define THROTTLING_H THROTTLE_H > + > +#include <stdint.h> > +#include "qemu-common.h" > +#include "qemu/timer.h" > + > +#define NANOSECONDS_PER_SECOND 1000000000.0 > + > +#define BUCKETS_COUNT 6 > + > +typedef enum { > + THROTTLE_BPS_TOTAL = 0, > + THROTTLE_BPS_READ = 1, > + THROTTLE_BPS_WRITE = 2, > + THROTTLE_OPS_TOTAL = 3, > + THROTTLE_OPS_READ = 4, > + THROTTLE_OPS_WRITE = 5, > +} BucketType; > + > +typedef struct LeakyBucket { > + double ups; /* units per second */ > + double max; /* leaky bucket max in units */ > + double bucket; /* bucket in units */ These comments aren't very clear to me :). So I guess bps or iops would be in ups. Max would be the total budget or maximum burst. Bucket might be the current level. > +} LeakyBucket; > + > +/* The following structure is used to configure a ThrottleState > + * It contains a bit of state: the bucket field of the LeakyBucket structure. > + * However it allows to keep the code clean and the bucket field is reset to > + * zero at the right time. > + */ > +typedef struct ThrottleConfig { > + LeakyBucket buckets[6]; /* leaky buckets */ s/6/THROTTLE_TYPE_MAX/ > + uint64_t unit_size; /* size of an unit in bytes */ > + uint64_t op_size; /* size of an operation in units */ It's not clear yet why we need both unit_size *and* op_size. I thought you would have a single granularity field for accounting big requests as multiple iops. > +/* This function make a bucket leak > + * > + * @bkt: the bucket to make leak > + * @delta: the time delta delta is in nanoseconds. Probably best to call it delta_ns. > +/* destroy a timer */ > +static void throttle_timer_destroy(QEMUTimer **timer) > +{ > + assert(*timer != NULL); > + > + if (qemu_timer_pending(*timer)) { > + qemu_del_timer(*timer); > + } You can always call qemu_del_timer(), the timer doesn't need to be pending. > +/* fix bucket parameters */ > +static void throttle_fix_bucket(LeakyBucket *bkt) > +{ > + double min = bkt->ups / 10; > + /* zero bucket level */ > + bkt->bucket = 0; > + > + /* take care of not using cpu and also improve throttling precision */ > + if (bkt->ups && > + bkt->max < min) { > + bkt->max = min; > + } > +} This function seems like magic. What is really going on here? Why divide by 10 and when does this case happen? > + > +/* take care of canceling a timer */ > +static void throttle_cancel_timer(QEMUTimer *timer) > +{ > + assert(timer != NULL); > + if (!qemu_timer_pending(timer)) { > + return; > + } No need to check pending first.
Il 16/08/2013 13:45, Stefan Hajnoczi ha scritto: >> > +#define BUCKETS_COUNT 6 >> > + >> > +typedef enum { >> > + THROTTLE_BPS_TOTAL = 0, >> > + THROTTLE_BPS_READ = 1, >> > + THROTTLE_BPS_WRITE = 2, >> > + THROTTLE_OPS_TOTAL = 3, >> > + THROTTLE_OPS_READ = 4, >> > + THROTTLE_OPS_WRITE = 5, >> > +} BucketType; Please remove the "= N" from the enums, and add BUCKETS_COUNT here. >> > +typedef struct LeakyBucket { >> > + double ups; /* units per second */ >> > + double max; /* leaky bucket max in units */ >> > + double bucket; /* bucket in units */ > These comments aren't very clear to me :). So I guess bps or iops would > be in ups. Max would be the total budget or maximum burst. Bucket > might be the current level. I also suggest replacing "ups" with "avg", since it's the average throughput that the leaky bucket allows after the initial burst has emptied the bucket. >> + uint64_t unit_size; /* size of an unit in bytes */ >> + uint64_t op_size; /* size of an operation in units */ > > It's not clear yet why we need both unit_size *and* op_size. I thought > you would have a single granularity field for accounting big requests as > multiple iops. IIUC the ops buckets account operations in op_size / unit_size units, while the bps buckets account operations in 1 / unit_size units, or something like that. But it needs clarification indeed. Paolo
diff --git a/include/qemu/throttle.h b/include/qemu/throttle.h new file mode 100644 index 0000000..e03bc3e --- /dev/null +++ b/include/qemu/throttle.h @@ -0,0 +1,105 @@ +/* + * QEMU throttling infrastructure + * + * Copyright (C) Nodalink, SARL. 2013 + * + * Author: + * Benoît Canet <benoit.canet@irqsave.net> + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation; either version 2 or + * (at your option) version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef THROTTLING_H +#define THROTTLING_H + +#include <stdint.h> +#include "qemu-common.h" +#include "qemu/timer.h" + +#define NANOSECONDS_PER_SECOND 1000000000.0 + +#define BUCKETS_COUNT 6 + +typedef enum { + THROTTLE_BPS_TOTAL = 0, + THROTTLE_BPS_READ = 1, + THROTTLE_BPS_WRITE = 2, + THROTTLE_OPS_TOTAL = 3, + THROTTLE_OPS_READ = 4, + THROTTLE_OPS_WRITE = 5, +} BucketType; + +typedef struct LeakyBucket { + double ups; /* units per second */ + double max; /* leaky bucket max in units */ + double bucket; /* bucket in units */ +} LeakyBucket; + +/* The following structure is used to configure a ThrottleState + * It contains a bit of state: the bucket field of the LeakyBucket structure. + * However it allows to keep the code clean and the bucket field is reset to + * zero at the right time. + */ +typedef struct ThrottleConfig { + LeakyBucket buckets[6]; /* leaky buckets */ + uint64_t unit_size; /* size of an unit in bytes */ + uint64_t op_size; /* size of an operation in units */ +} ThrottleConfig; + +typedef struct ThrottleState { + ThrottleConfig cfg; /* configuration */ + int64_t previous_leak; /* timestamp of the last leak done */ + QEMUTimer * timers[2]; /* timers used to do the throttling */ + QEMUClock *clock; /* the clock used */ +} ThrottleState; + +/* operations on single leaky buckets */ +void throttle_leak_bucket(LeakyBucket *bkt, int64_t delta); + +int64_t throttle_compute_wait(LeakyBucket *bkt); + +/* expose timer computation function for unit tests */ +bool throttle_compute_timer(ThrottleState *ts, + bool is_write, + int64_t now, + int64_t *next_timer); + +/* init/destroy cycle */ +void throttle_init(ThrottleState *ts, + QEMUClock *clock, + void (read_timer)(void *), + void (write_timer)(void *), + void *timer_opaque); + +void throttle_destroy(ThrottleState *ts); + +bool throttle_have_timer(ThrottleState *ts); + +/* configuration */ +bool throttle_enabled(ThrottleConfig *cfg); + +bool throttle_conflicting(ThrottleConfig *cfg); + +bool throttle_is_valid(ThrottleConfig *cfg); + +void throttle_config(ThrottleState *ts, ThrottleConfig *cfg); + +void throttle_get_config(ThrottleState *ts, ThrottleConfig *cfg); + +/* usage */ +bool throttle_allowed(ThrottleState *ts, bool is_write); + +void throttle_account(ThrottleState *ts, bool is_write, uint64_t size); + +#endif diff --git a/util/Makefile.objs b/util/Makefile.objs index dc72ab0..2bb13a2 100644 --- a/util/Makefile.objs +++ b/util/Makefile.objs @@ -11,3 +11,4 @@ util-obj-y += iov.o aes.o qemu-config.o qemu-sockets.o uri.o notify.o util-obj-y += qemu-option.o qemu-progress.o util-obj-y += hexdump.o util-obj-y += crc32c.o +util-obj-y += throttle.o diff --git a/util/throttle.c b/util/throttle.c new file mode 100644 index 0000000..2f25d44 --- /dev/null +++ b/util/throttle.c @@ -0,0 +1,391 @@ +/* + * QEMU throttling infrastructure + * + * Copyright (C) Nodalink, SARL. 2013 + * + * Author: + * Benoît Canet <benoit.canet@irqsave.net> + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation; either version 2 or + * (at your option) version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, see <http://www.gnu.org/licenses/>. + */ + +#include "qemu/throttle.h" +#include "qemu/timer.h" + +/* This function make a bucket leak + * + * @bkt: the bucket to make leak + * @delta: the time delta + */ +void throttle_leak_bucket(LeakyBucket *bkt, int64_t delta) +{ + double leak; + + /* compute how much to leak */ + leak = (bkt->ups * (double) delta) / NANOSECONDS_PER_SECOND; + + /* make the bucket leak */ + bkt->bucket = MAX(bkt->bucket - leak, 0); +} + +/* Calculate the time delta since last leak and make proportionals leaks + * + * @now: the current timestamp in ns + */ +static void throttle_do_leak(ThrottleState *ts, int64_t now) +{ + /* compute the time elapsed since the last leak */ + int64_t delta = now - ts->previous_leak; + int i; + + ts->previous_leak = now; + + if (delta <= 0) { + return; + } + + /* make each bucket leak */ + for (i = 0; i < BUCKETS_COUNT; i++) { + throttle_leak_bucket(&ts->cfg.buckets[i], delta); + } +} + +/* do the real job of computing the time to wait + * + * @limit: the throttling limit + * @extra: the number of operation to delay + * @ret: the time to wait in ns + */ +static int64_t throttle_do_compute_wait(double limit, double extra) +{ + double wait = extra * NANOSECONDS_PER_SECOND; + wait /= limit; + return wait; +} + +/* This function compute the wait time in ns that a leaky bucket should trigger + * + * @bkt: the leaky bucket we operate on + * @ret: the resulting wait time in ns or 0 if the operation can go through + */ +int64_t throttle_compute_wait(LeakyBucket *bkt) +{ + double extra; /* the number of extra units blocking the io */ + + if (!bkt->ups) { + return 0; + } + + extra = bkt->bucket - bkt->max; + + if (extra <= 0) { + return 0; + } + + return throttle_do_compute_wait(bkt->ups, extra); +} + +/* This function compute the time that must be waited while this IO + * + * @is_write: true if the current IO is a write, false if it's a read + * @ret: time to wait + */ +static int64_t throttle_compute_wait_for(ThrottleState *ts, + bool is_write, + int64_t now) +{ + BucketType to_check[2][4] = { {THROTTLE_BPS_TOTAL, + THROTTLE_OPS_TOTAL, + THROTTLE_BPS_READ, + THROTTLE_OPS_READ}, + {THROTTLE_BPS_TOTAL, + THROTTLE_OPS_TOTAL, + THROTTLE_BPS_WRITE, + THROTTLE_OPS_WRITE}, }; + int64_t wait, max_wait = 0; + int i; + + for (i = 0; i < 4; i++) { + BucketType index = to_check[is_write][i]; + wait = throttle_compute_wait(&ts->cfg.buckets[index]); + if (wait > max_wait) { + max_wait = wait; + } + } + + return max_wait; +} + +/* compute the timer for this type of operation + * + * @is_write: the type of operation + * @now: the current clock timerstamp + * @next_timer: the resulting timer + * @ret: true if a timer must be set + */ +bool throttle_compute_timer(ThrottleState *ts, + bool is_write, + int64_t now, + int64_t *next_timer) +{ + int64_t wait; + + /* leak proportionally to the time elapsed */ + throttle_do_leak(ts, now); + + /* compute the wait time if any */ + wait = throttle_compute_wait_for(ts, is_write, now); + + /* if the code must wait compute when the next timer should fire */ + if (wait) { + *next_timer = now + wait; + return true; + } + + /* else no need to wait at all */ + *next_timer = now; + return false; +} + +/* To be called first on the ThrottleState */ +void throttle_init(ThrottleState *ts, + QEMUClock *clock, + void (read_timer)(void *), + void (write_timer)(void *), + void *timer_opaque) +{ + memset(ts, 0, sizeof(ThrottleState)); + + ts->clock = clock; + ts->timers[0] = qemu_new_timer_ns(ts->clock, read_timer, timer_opaque); + ts->timers[1] = qemu_new_timer_ns(ts->clock, write_timer, timer_opaque); +} + +/* destroy a timer */ +static void throttle_timer_destroy(QEMUTimer **timer) +{ + assert(*timer != NULL); + + if (qemu_timer_pending(*timer)) { + qemu_del_timer(*timer); + } + + qemu_free_timer(*timer); + *timer = NULL; +} + +/* To be called last on the ThrottleState */ +void throttle_destroy(ThrottleState *ts) +{ + int i; + + for (i = 0; i < 2; i++) { + throttle_timer_destroy(&ts->timers[i]); + } +} + +/* is any throttling timer configured */ +bool throttle_have_timer(ThrottleState *ts) +{ + if (ts->timers[0]) { + return true; + } + + return false; +} + +/* Does any throttling must be done + * + * @cfg: the throttling configuration to inspect + * @ret: true if throttling must be done else false + */ +bool throttle_enabled(ThrottleConfig *cfg) +{ + int i; + + for (i = 0; i < BUCKETS_COUNT; i++) { + if (cfg->buckets[i].ups > 0) { + return true; + } + } + + return false; +} + +/* return true if any two throttling parameters conflicts + * + * @cfg: the throttling configuration to inspect + * @ret: true if any conflict detected else false + */ +bool throttle_conflicting(ThrottleConfig *cfg) +{ + bool bps_flag, ops_flag; + bool bps_max_flag, ops_max_flag; + + bps_flag = cfg->buckets[THROTTLE_BPS_TOTAL].ups && + (cfg->buckets[THROTTLE_BPS_READ].ups || + cfg->buckets[THROTTLE_BPS_WRITE].ups); + + ops_flag = cfg->buckets[THROTTLE_OPS_TOTAL].ups && + (cfg->buckets[THROTTLE_OPS_READ].ups || + cfg->buckets[THROTTLE_OPS_WRITE].ups); + + bps_max_flag = cfg->buckets[THROTTLE_BPS_TOTAL].max && + (cfg->buckets[THROTTLE_BPS_READ].max || + cfg->buckets[THROTTLE_BPS_WRITE].max); + + ops_max_flag = cfg->buckets[THROTTLE_OPS_TOTAL].max && + (cfg->buckets[THROTTLE_OPS_READ].max || + cfg->buckets[THROTTLE_OPS_WRITE].max); + + return bps_flag || ops_flag || bps_max_flag || ops_max_flag; +} + +/* check if a throttling configuration is valid + * @cfg: the throttling configuration to inspect + * @ret: true if valid else false + */ +bool throttle_is_valid(ThrottleConfig *cfg) +{ + bool invalid = false; + int i; + + for (i = 0; i < BUCKETS_COUNT; i++) { + if (cfg->buckets[i].ups < 0) { + invalid = true; + } + } + + for (i = 0; i < BUCKETS_COUNT; i++) { + if (cfg->buckets[i].max < 0) { + invalid = true; + } + } + + return !invalid; +} + +/* fix bucket parameters */ +static void throttle_fix_bucket(LeakyBucket *bkt) +{ + double min = bkt->ups / 10; + /* zero bucket level */ + bkt->bucket = 0; + + /* take care of not using cpu and also improve throttling precision */ + if (bkt->ups && + bkt->max < min) { + bkt->max = min; + } +} + +/* take care of canceling a timer */ +static void throttle_cancel_timer(QEMUTimer *timer) +{ + assert(timer != NULL); + if (!qemu_timer_pending(timer)) { + return; + } + + qemu_del_timer(timer); +} + +/* Used to configure the throttle + * + * @ts: the throttle state we are working on + * @cfg: the config to set + */ +void throttle_config(ThrottleState *ts, ThrottleConfig *cfg) +{ + int i; + + ts->cfg = *cfg; + + for (i = 0; i < BUCKETS_COUNT; i++) { + throttle_fix_bucket(&ts->cfg.buckets[i]); + } + + ts->previous_leak = qemu_get_clock_ns(ts->clock); + + for (i = 0; i < 2; i++) { + throttle_cancel_timer(ts->timers[i]); + } +} + +/* used to get config + * + * @ts: the throttle state we are working on + * @cfg: where to write the config + */ +void throttle_get_config(ThrottleState *ts, ThrottleConfig *cfg) +{ + *cfg = ts->cfg; +} + + +/* compute if an operation must be allowed and set a timer if not + * + * NOTE: this function is not unit tested due to it's usage of qemu_mod_timer + * + * @is_write: the type of operation (read/write) + * @ret: true if the operation is allowed to flow else if must wait + */ +bool throttle_allowed(ThrottleState *ts, bool is_write) +{ + int64_t now = qemu_get_clock_ns(ts->clock); + int64_t next_timer; + bool must_wait; + + must_wait = throttle_compute_timer(ts, + is_write, + now, + &next_timer); + + /* if the request is throttled arm timer */ + if (must_wait) { + qemu_mod_timer(ts->timers[is_write], next_timer); + } + + return !must_wait; +} + +/* do the accounting for this operation + * + * @is_write: the type of operation (read/write) + * size: the size of the operation + */ +void throttle_account(ThrottleState *ts, bool is_write, uint64_t size) +{ + double bytes_size; + + /* if cfg.op_size is not defined we will acccount exactly 1 operation */ + double units = 1.0; + if (ts->cfg.op_size) { + units = (double) size / ts->cfg.op_size; + } + + bytes_size = size * ts->cfg.unit_size; + + ts->cfg.buckets[THROTTLE_BPS_TOTAL].bucket += bytes_size; + ts->cfg.buckets[THROTTLE_OPS_TOTAL].bucket += units; + + if (is_write) { + ts->cfg.buckets[THROTTLE_BPS_WRITE].bucket += bytes_size; + ts->cfg.buckets[THROTTLE_OPS_WRITE].bucket += units; + } else { + ts->cfg.buckets[THROTTLE_BPS_READ].bucket += bytes_size; + ts->cfg.buckets[THROTTLE_OPS_READ].bucket += units; + } +} +
Implement the continuous leaky bucket algorithm devised on IRC as a separate module. Signed-off-by: Benoit Canet <benoit@irqsave.net> --- include/qemu/throttle.h | 105 +++++++++++++ util/Makefile.objs | 1 + util/throttle.c | 391 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 497 insertions(+) create mode 100644 include/qemu/throttle.h create mode 100644 util/throttle.c