From patchwork Fri Aug 2 15:53:01 2013 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Patchwork-Submitter: =?utf-8?q?Beno=C3=AEt_Canet?= X-Patchwork-Id: 264307 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from lists.gnu.org (lists.gnu.org [IPv6:2001:4830:134:3::11]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (Client did not present a certificate) by ozlabs.org (Postfix) with ESMTPS id A1E6D2C0084 for ; Sat, 3 Aug 2013 01:52:23 +1000 (EST) Received: from localhost ([::1]:44576 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1V5HeO-0006z8-No for incoming@patchwork.ozlabs.org; Fri, 02 Aug 2013 11:52:20 -0400 Received: from eggs.gnu.org ([2001:4830:134:3::10]:36418) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1V5Hdz-0006xE-KA for qemu-devel@nongnu.org; Fri, 02 Aug 2013 11:52:01 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1V5Hdv-0006TO-3L for qemu-devel@nongnu.org; Fri, 02 Aug 2013 11:51:55 -0400 Received: from nodalink.pck.nerim.net ([62.212.105.220]:52731 helo=paradis.irqsave.net) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1V5Hdu-0006T7-BF for qemu-devel@nongnu.org; Fri, 02 Aug 2013 11:51:51 -0400 Received: by paradis.irqsave.net (Postfix, from userid 1002) id 796DE874685; Fri, 2 Aug 2013 17:51:49 +0200 (CEST) Received: from localhost.localdomain (unknown [192.168.77.1]) by paradis.irqsave.net (Postfix) with ESMTP id 0DEA687469B; Fri, 2 Aug 2013 17:51:13 +0200 (CEST) From: =?UTF-8?q?Beno=C3=AEt=20Canet?= To: qemu-devel@nongnu.org Date: Fri, 2 Aug 2013 17:53:01 +0200 Message-Id: <1375458782-14888-2-git-send-email-benoit@irqsave.net> X-Mailer: git-send-email 1.7.10.4 In-Reply-To: <1375458782-14888-1-git-send-email-benoit@irqsave.net> References: <1375458782-14888-1-git-send-email-benoit@irqsave.net> MIME-Version: 1.0 X-detected-operating-system: by eggs.gnu.org: GNU/Linux 2.2.x-3.x [generic] X-Received-From: 62.212.105.220 Cc: kwolf@redhat.com, pbonzini@redhat.com, =?UTF-8?q?Beno=C3=AEt=20Canet?= , stefanha@redhat.com Subject: [Qemu-devel] =?utf-8?q?=5BRFC_V3_1/2=5D_throttle=3A_Add_a_new_thr?= =?utf-8?q?ottling_API_implementing_continuus_leaky_bucket=2E?= X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.14 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Sender: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Implement the continuous leaky bucket algorithm devised on IRC as a separate module. Signed-off-by: Benoit Canet --- include/qemu/throttle.h | 111 ++++++++++++ util/Makefile.objs | 1 + util/throttle.c | 436 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 548 insertions(+) create mode 100644 include/qemu/throttle.h create mode 100644 util/throttle.c diff --git a/include/qemu/throttle.h b/include/qemu/throttle.h new file mode 100644 index 0000000..328c782 --- /dev/null +++ b/include/qemu/throttle.h @@ -0,0 +1,111 @@ +/* + * QEMU throttling infrastructure + * + * Copyright (C) Nodalink, SARL. 2013 + * + * Author: + * Benoît Canet + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation; either version 2 or + * (at your option) version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, see . + */ + +#ifndef THROTTLING_H +#define THROTTLING_H + +#include +#include "qemu-common.h" +#include "qemu/timer.h" + +#define NANOSECONDS_PER_SECOND 1000000000.0 + +#define BUCKETS_COUNT 6 + +typedef enum { + THROTTLE_BPS_TOTAL = 0, + THROTTLE_BPS_READ = 1, + THROTTLE_BPS_WRITE = 2, + THROTTLE_OPS_TOTAL = 3, + THROTTLE_OPS_READ = 4, + THROTTLE_OPS_WRITE = 5, +} BucketType; + +typedef struct LeakyBucket { + int64_t ups; /* units per second */ + int64_t max; /* leaky bucket max in units */ + double bucket; /* bucket in units */ + int64_t previous_leak; /* timestamp of the last leak done */ +} LeakyBucket; + +/* The following structure is used to configure a ThrottleState + * It contains a bit of state: the bucket field of the LeakyBucket structure. + * However it allows to keep the code clean and the bucket field is reset to + * zero at the right time. + */ +typedef struct ThrottleConfig { + LeakyBucket buckets[6]; /* leaky buckets */ + int64_t unit_size; /* size of an unit in bytes */ + int64_t op_size; /* size of an operation in units */ +} ThrottleConfig; + +typedef struct ThrottleState { + ThrottleConfig cfg; + bool timer_is_throttling_write; /* is the timer throttling a write */ + QEMUTimer *timer; /* timer used to do the throttling */ + QEMUClock *clock; /* the clock used */ +} ThrottleState; + +/* following 3 function exposed for tests */ +bool throttle_do_start(ThrottleState *ts, + bool is_write, + int64_t size, + int64_t now, + int64_t *next_timer); + +bool throttle_do_end(ThrottleState *ts, + bool is_write, + int64_t now, + int64_t *next_timer); + +bool throttle_do_timer(ThrottleState *ts, + bool is_write, + int64_t now, + int64_t *next_timer); + +/* user API functions */ +void throttle_init(ThrottleState *ts, + QEMUClock *clock, + void (timer)(void *), + void *timer_opaque); + +void throttle_destroy(ThrottleState *ts); + +bool throttle_start(ThrottleState *ts, bool is_write, int64_t size); + +void throttle_end(ThrottleState *ts, bool is_write); + +void throttle_timer(ThrottleState *ts, int64_t now, bool *must_wait); + +void throttle_config(ThrottleState *ts, ThrottleConfig *cfg); + +void throttle_get_config(ThrottleState *ts, ThrottleConfig *cfg); + +bool throttle_enabled(ThrottleConfig *cfg); + +bool throttle_conflicting(ThrottleConfig *cfg); + +bool throttle_is_valid(ThrottleConfig *cfg); + +bool throttle_have_timer(ThrottleState *ts); + +#endif diff --git a/util/Makefile.objs b/util/Makefile.objs index dc72ab0..2bb13a2 100644 --- a/util/Makefile.objs +++ b/util/Makefile.objs @@ -11,3 +11,4 @@ util-obj-y += iov.o aes.o qemu-config.o qemu-sockets.o uri.o notify.o util-obj-y += qemu-option.o qemu-progress.o util-obj-y += hexdump.o util-obj-y += crc32c.o +util-obj-y += throttle.o diff --git a/util/throttle.c b/util/throttle.c new file mode 100644 index 0000000..4afc407 --- /dev/null +++ b/util/throttle.c @@ -0,0 +1,436 @@ +/* + * QEMU throttling infrastructure + * + * Copyright (C) Nodalink, SARL. 2013 + * + * Author: + * Benoît Canet + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation; either version 2 or + * (at your option) version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, see . + */ + +#include "qemu/throttle.h" +#include "qemu/timer.h" + +/* This function make a bucket leak + * + * @bkt: the bucket to make leak + * @now: the current timestamp in ns + */ +static void throttle_leak_bucket(LeakyBucket *bkt, int64_t now) +{ + /* compute the time elapsed since the last leak */ + int64_t delta = now - bkt->previous_leak; + double leak; + + bkt->previous_leak = now; + + if (delta <= 0) { + return; + } + + /* compute how much to leak */ + leak = (double) (bkt->ups * delta) / NANOSECONDS_PER_SECOND; + + /* make the bucket leak */ + bkt->bucket = MAX(bkt->bucket - leak, 0); +} + +/* Calculate the time delta since last leak and make proportionals leaks + * + * @now: the current timestamp in ns + */ +static void throttle_do_leak(ThrottleState *ts, int64_t now) +{ + int i; + + /* make each bucket leak */ + for (i = 0; i < BUCKETS_COUNT; i++) { + throttle_leak_bucket(&ts->cfg.buckets[i], now); + } +} + +/* do the real job of computing the time to wait */ +static int64_t throttle_do_compute_wait(int64_t limit, double extra) +{ + int64_t wait = extra * NANOSECONDS_PER_SECOND; + wait /= limit; + return wait; +} + +/* This function compute the wait time in ns that a leaky bucket should trigger + * + * @bkt: the leaky bucket we operate on + * @ret: the resulting wait time in ns or 0 if the operation can go through + */ +static int64_t throttle_compute_wait(LeakyBucket *bkt) +{ + int64_t extra; /* the number of extra units blocking the io */ + + if (!bkt->ups) { + return 0; + } + + extra = bkt->bucket - bkt->max; + + return throttle_do_compute_wait(bkt->ups, extra); +} + +/* This function compute the time that must be waited while this IO + * + * @is_write: true if the current IO is a write, false if it's a read + * @ret: time to wait + */ +static int64_t throttle_compute_max_wait(ThrottleState *ts, + bool is_write) +{ + BucketType to_check[2][4] = { {THROTTLE_BPS_TOTAL, + THROTTLE_OPS_TOTAL, + THROTTLE_BPS_READ, + THROTTLE_OPS_READ}, + {THROTTLE_BPS_TOTAL, + THROTTLE_OPS_TOTAL, + THROTTLE_BPS_WRITE, + THROTTLE_OPS_WRITE}, }; + int64_t wait, max_wait = 0; + int i; + + for (i = 0; i < 4; i++) { + BucketType index = to_check[is_write][i]; + wait = throttle_compute_wait(&ts->cfg.buckets[index]); + if (wait > max_wait) { + max_wait = wait; + } + } + + return max_wait; +} + +static bool throttle_leak_and_compute_timer(ThrottleState *ts, + bool is_write, + int64_t now, + int64_t *next_timer) +{ + int64_t wait; + + /* leak proportionally to the time elapsed */ + throttle_do_leak(ts, now); + + /* compute the wait time if any */ + wait = throttle_compute_max_wait(ts, is_write); + + /* if the code must wait compute when the next timer should fire */ + if (wait) { + *next_timer = now + wait; + return true; + } + + /* else no need to wait at all */ + *next_timer = now; + return false; +} + +bool throttle_do_start(ThrottleState *ts, + bool is_write, + int64_t size, + int64_t now, + int64_t *next_timer) +{ + double bytes_size; + double units = 1.0; + bool must_wait = throttle_leak_and_compute_timer(ts, + is_write, + now, + next_timer); + + if (must_wait) { + return true; + } + + if (ts->cfg.op_size) { + units = (double) size / ts->cfg.op_size; + } + + /* NOTE: the counter can go above the max when authorizing an IO. + * At next call the code will punish the guest by blocking the + * next IO until the counter has been decremented below the max. + * This way if a guest issue a jumbo IO bigger than the max it + * will have a chance no be authorized and will not result in a guest + * IO deadlock. + */ + + /* the IO is authorized so do the accounting and return false */ + bytes_size = size * ts->cfg.unit_size; + ts->cfg.buckets[THROTTLE_BPS_TOTAL].bucket += bytes_size; + ts->cfg.buckets[THROTTLE_OPS_TOTAL].bucket += units; + + if (is_write) { + ts->cfg.buckets[THROTTLE_BPS_WRITE].bucket += bytes_size; + ts->cfg.buckets[THROTTLE_OPS_WRITE].bucket += units; + } else { + ts->cfg.buckets[THROTTLE_BPS_READ].bucket += bytes_size; + ts->cfg.buckets[THROTTLE_OPS_READ].bucket += units; + } + + /* no wait */ + return false; +} + +bool throttle_do_end(ThrottleState *ts, + bool is_write, + int64_t now, + int64_t *next_timer) +{ + return throttle_leak_and_compute_timer(ts, is_write, now, next_timer); +} + +bool throttle_do_timer(ThrottleState *ts, + bool is_write, + int64_t now, + int64_t *next_timer) +{ + return throttle_leak_and_compute_timer(ts, is_write, now, next_timer); +} + +/* To be called first on the ThrottleState */ +void throttle_init(ThrottleState *ts, + QEMUClock *clock, + void (timer)(void *), + void *timer_opaque) +{ + memset(ts, 0, sizeof(ThrottleState)); + + ts->clock = clock; + ts->timer = qemu_new_timer_ns(vm_clock, timer, timer_opaque); +} + +/* To be called last on the ThrottleState */ +void throttle_destroy(ThrottleState *ts) +{ + assert(ts->timer != NULL); + + qemu_del_timer(ts->timer); + qemu_free_timer(ts->timer); + ts->timer = NULL; +} + +/* Used to configure the throttle + * + * @ts: the throttle state we are working on + * @cfg: the config to set + */ +void throttle_config(ThrottleState *ts, ThrottleConfig *cfg) +{ + int i; + + ts->cfg = *cfg; + + /* zero the buckets */ + for (i = 0; i < BUCKETS_COUNT; i++) { + ts->cfg.buckets[i].bucket = 0; + } + + /* init previous leaks fields */ + for (i = 0; i < BUCKETS_COUNT; i++) { + ts->cfg.buckets[i].previous_leak = qemu_get_clock_ns(vm_clock); + } + + assert(ts->timer != NULL); + if (!qemu_timer_pending(ts->timer)) { + return; + } + + /* cancel current running timer */ + qemu_del_timer(ts->timer); +} + +/* used to get config + * + * @ts: the throttle state we are working on + * @cfg: where to write the config + */ +void throttle_get_config(ThrottleState *ts, ThrottleConfig *cfg) +{ + *cfg = ts->cfg; +} + +/* avoid any stutter due to reads and writes throttling interleaving */ +static int64_t throttle_avoid_stutter(ThrottleState *ts, + bool is_write, + int64_t next_timer) +{ + int64_t current_timer; + + /* last throttled operation was of the same type -> do nothing */ + if (ts->timer_is_throttling_write == is_write) { + return next_timer; + } + + /* no timer is pending -> do nothing */ + if (!qemu_timer_pending(ts->timer)) { + return next_timer; + } + + /* get back the current running timer expiration time */ + current_timer = qemu_timer_expire_time_ns(ts->timer); + + /* if the timer in place is nearest in the future keep it */ + if (current_timer < next_timer) { + return current_timer; + } + + /* remember the time of operation the timer is throttling */ + ts->timer_is_throttling_write = is_write; + + return next_timer; +} + +bool throttle_start(ThrottleState *ts, bool is_write, int64_t size) +{ + int now = qemu_get_clock_ns(ts->clock); + int64_t next_timer; + bool must_wait; + + must_wait = throttle_do_start(ts, is_write, size, now, &next_timer); + + if (!must_wait) { + return false; + } + + next_timer = throttle_avoid_stutter(ts, is_write, next_timer); + qemu_mod_timer(ts->timer, next_timer); + return true; +} + +void throttle_end(ThrottleState *ts, bool is_write) +{ + int now = qemu_get_clock_ns(ts->clock); + int64_t next_timer; + bool must_wait; + + must_wait = throttle_do_end(ts, is_write, now, &next_timer); + + if (!must_wait) { + return; + } + + next_timer = throttle_avoid_stutter(ts, is_write, next_timer); + qemu_mod_timer(ts->timer, next_timer); +} + +static void throttle_swap_timers(int64_t *next_timers) +{ + int64_t tmp = next_timers[0]; + next_timers[0] = next_timers[1]; + next_timers[1] = tmp; +} + +static void throttle_sort_timers(int64_t *next_timers) +{ + if (next_timers[0] < next_timers[1]) { + return; + } + + throttle_swap_timers(next_timers); +} + +void throttle_timer(ThrottleState *ts, int64_t now, bool *must_wait) +{ + int64_t next_timers[2]; + int i; + + /* for reads and writes must the current IO wait and how much */ + for (i = 0; i < 2; i++) { + must_wait[i] = throttle_do_timer(ts, + i, + now, + &next_timers[i]); + } + + throttle_sort_timers(next_timers); + + /* if both read and write IO are to throttle take the smallest timer */ + if (must_wait[0] && must_wait[1]) { + qemu_mod_timer(ts->timer, next_timers[0]); + /* if only one type of IO is to throttle take the biggest timer */ + } else if (must_wait[0] || must_wait[1]) { + qemu_mod_timer(ts->timer, next_timers[1]); + } +} + +bool throttle_enabled(ThrottleConfig *cfg) +{ + int i; + + for (i = 0; i < BUCKETS_COUNT; i++) { + if (cfg->buckets[i].ups) { + return true; + } + } + + return false; +} + +bool throttle_conflicting(ThrottleConfig *cfg) +{ + bool bps_flag, ops_flag; + bool bps_max_flag, ops_max_flag; + + bps_flag = cfg->buckets[THROTTLE_BPS_TOTAL].ups && + (cfg->buckets[THROTTLE_BPS_READ].ups || + cfg->buckets[THROTTLE_BPS_WRITE].ups); + + ops_flag = cfg->buckets[THROTTLE_OPS_TOTAL].ups && + (cfg->buckets[THROTTLE_OPS_READ].ups || + cfg->buckets[THROTTLE_OPS_WRITE].ups); + + bps_max_flag = cfg->buckets[THROTTLE_BPS_TOTAL].max && + (cfg->buckets[THROTTLE_BPS_READ].max || + cfg->buckets[THROTTLE_BPS_WRITE].max); + + ops_max_flag = cfg->buckets[THROTTLE_OPS_TOTAL].max && + (cfg->buckets[THROTTLE_OPS_READ].max || + cfg->buckets[THROTTLE_OPS_WRITE].max); + + return bps_flag || ops_flag || bps_max_flag || ops_max_flag; +} + +bool throttle_is_valid(ThrottleConfig *cfg) +{ + bool invalid = false; + int i; + + for (i = 0; i < BUCKETS_COUNT; i++) { + if (cfg->buckets[i].ups < 0) { + invalid = true; + } + } + + for (i = 0; i < BUCKETS_COUNT; i++) { + if (cfg->buckets[i].max < 0) { + invalid = true; + } + } + + return !invalid; +} + +bool throttle_have_timer(ThrottleState *ts) +{ + if (ts->timer) { + return true; + } + + return false; +}