From patchwork Fri Jun 28 18:26:29 2013 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Paolo Bonzini X-Patchwork-Id: 255511 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from lists.gnu.org (lists.gnu.org [IPv6:2001:4830:134:3::11]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (Client did not present a certificate) by ozlabs.org (Postfix) with ESMTPS id 557182C008E for ; Sat, 29 Jun 2013 04:31:35 +1000 (EST) Received: from localhost ([::1]:59195 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1UsdSH-0002pI-Dc for incoming@patchwork.ozlabs.org; Fri, 28 Jun 2013 14:31:33 -0400 Received: from eggs.gnu.org ([2001:4830:134:3::10]:60680) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1UsdOJ-00062H-Dt for qemu-devel@nongnu.org; Fri, 28 Jun 2013 14:27:31 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1UsdO9-0000Na-Mr for qemu-devel@nongnu.org; Fri, 28 Jun 2013 14:27:27 -0400 Received: from mail-ee0-x22b.google.com ([2a00:1450:4013:c00::22b]:44008) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1UsdO9-0000NU-Cl for qemu-devel@nongnu.org; Fri, 28 Jun 2013 14:27:17 -0400 Received: by mail-ee0-f43.google.com with SMTP id l10so1178126eei.16 for ; Fri, 28 Jun 2013 11:27:16 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=sender:from:to:subject:date:message-id:x-mailer:in-reply-to :references; bh=8WEMgqfnVHYnCIstBWPxJMXhC560qqbHD5UXIKdPiYE=; b=zBtEFIPGqufHC9CYGGswfQ80k7ZhOANQbuqj2wsALEPjl4aii6EULfceaNTgHWpVyi rhKwtotHAWQMDao1MygYQUCiRBKQCUsHITNJPSA5Go4eO1cIcz6orvKKeTO3x+BWLwTC 4g9Vza+X1GxI8WGMcBJ0NtzrLa+ROaMGdgWHqxyiE02OYbPjG79wWeYCFsX0hhhhizj2 p6g1x9XV8bTBuzswh3EnhUXRLv5IRdBqFPoBm1eFGWCkt52+pTFnZngLJg3YWkD1glfP xUhMJVzC68exdEW7+3MEGKtLFy3CYYnKFT3nNITf5JgUDlkIpJDtOS9oYSfLFMrVEShz nG5w== X-Received: by 10.14.176.199 with SMTP id b47mr14918262eem.117.1372444036639; Fri, 28 Jun 2013 11:27:16 -0700 (PDT) Received: from playground.lan (net-37-116-217-184.cust.dsl.vodafone.it. [37.116.217.184]) by mx.google.com with ESMTPSA id o5sm12035344eef.5.2013.06.28.11.27.14 for (version=TLSv1.2 cipher=RC4-SHA bits=128/128); Fri, 28 Jun 2013 11:27:15 -0700 (PDT) From: Paolo Bonzini To: qemu-devel@nongnu.org Date: Fri, 28 Jun 2013 20:26:29 +0200 Message-Id: <1372444009-11544-11-git-send-email-pbonzini@redhat.com> X-Mailer: git-send-email 1.8.1.4 In-Reply-To: <1372444009-11544-1-git-send-email-pbonzini@redhat.com> References: <1372444009-11544-1-git-send-email-pbonzini@redhat.com> X-detected-operating-system: by eggs.gnu.org: Error: Malformed IPv6 address (bad octet value). X-Received-From: 2a00:1450:4013:c00::22b Subject: [Qemu-devel] [PATCH 10/30] rcu: add call_rcu X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.14 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Sender: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Signed-off-by: Paolo Bonzini --- docs/rcu.txt | 108 +++++++++++++++++++++++++++++++++++++++++++++-- include/qemu/rcu.h | 22 ++++++++++ util/rcu.c | 120 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 246 insertions(+), 4 deletions(-) diff --git a/docs/rcu.txt b/docs/rcu.txt index 118a28a..ca1c099 100644 --- a/docs/rcu.txt +++ b/docs/rcu.txt @@ -82,7 +82,50 @@ The core RCU API is small: Note that it would be valid for another update to come while synchronize_rcu is running. Because of this, it is better that the updater releases any locks it may hold before calling - synchronize_rcu. + synchronize_rcu. If this is not possible (for example, because + the updater is protected by the BQL), you can use call_rcu. + + void call_rcu1(struct rcu_head * head, + void (*func)(struct rcu_head *head)); + + This function invokes func(head) after all pre-existing RCU + read-side critical sections on all threads have completed. This + marks the end of the removal phase, with func taking care + asynchronously of the reclamation phase. + + The foo struct needs to have an rcu_head structure added, + perhaps as follows: + + struct foo { + struct rcu_head rcu; + int a; + char b; + long c; + }; + + so that the reclaimer function can fetch the struct foo address + and free it: + + call_rcu1(foo_reclaim, &foo.rcu); + + void foo_reclaim(struct rcu_head *rp) + { + struct foo *fp = container_of(rp, struct foo, rcu); + g_free(fp); + } + + For the common case where the rcu_head member is the first of the + struct, you can use the following macro. + + void call_rcu(T *p, + void (*func)(T *p), + field-name); + + call_rcu1 is typically used through this macro, in the common case + where the "struct rcu_head" is the first field in the struct. In + the above case, one could have written simply: + + call_rcu(foo_reclaim, g_free, rcu); typeof(*p) rcu_dereference(p); typeof(p) rcu_assign_pointer(p, typeof(p) v); @@ -176,6 +219,11 @@ DIFFERENCES WITH LINUX - rcu_dereference takes a _pointer_ to the variable being accessed. Wrong usage will be detected by the compiler. +- call_rcu is a macro that has an extra argument (the name of the first + field in the struct, which must be a struct rcu_head), and expects the + type of the callback's argument to be the type of the first argument. + call_rcu1 is the same as Linux's call_rcu. + - Quiescent points must be marked explicitly in the thread. @@ -231,7 +279,47 @@ The write side looks simply like this (with appropriate locking): synchronize_rcu(); free(old); -Note that the same idiom would be possible with reader/writer +If the processing cannot be done purely within the critical section, it +is possible to combine this idiom with a "real" reference count: + + rcu_read_lock(); + p = rcu_dereference(&foo); + foo_ref(p); + rcu_read_unlock(); + /* do something with p. */ + foo_unref(p); + +The write side can be like this: + + qemu_mutex_lock(&foo_mutex); + old = foo; + rcu_assign_pointer(foo, new); + qemu_mutex_unlock(&foo_mutex); + synchronize_rcu(); + foo_unref(old); + +or with call_rcu: + + qemu_mutex_lock(&foo_mutex); + old = foo; + rcu_assign_pointer(foo, new); + qemu_mutex_unlock(&foo_mutex); + call_rcu(foo_unref, old, rcu); + +In both cases, the write side only performs removal. Reclamation +happens when the last reference to a "foo" object is dropped. +Using synchronize_rcu() is undesirably expensive, because the +last reference may be dropped on the read side. Hence you can +use call_rcu() instead: + + foo_unref(struct foo *p) { + if (atomic_dec(&p->refcount) == 0) { + call_rcu(foo_destroy, p, rcu); + } + } + + +Note that the same idioms would be possible with reader/writer locks: read_lock(&foo_rwlock); write_mutex_lock(&foo_rwlock); @@ -241,13 +329,25 @@ locks: write_mutex_unlock(&foo_rwlock); free(p); + ------------------------------------------------------------------ + + read_lock(&foo_rwlock); write_mutex_lock(&foo_rwlock); + p = foo; old = foo; + foo_ref(p); foo = new; + read_unlock(&foo_rwlock); write_mutex_unlock(&foo_rwlock); + /* do something with p. */ foo_unref(old); + foo_unref(p); + +foo_unref could use a mechanism such as bottom halves to move deallocation +out of hot paths. + RCU resizable arrays -------------------- Resizable arrays can be used with RCU. The expensive RCU synchronization -only needs to take place when the array is resized. The two items to -take care of are: +(or call_rcu) only needs to take place when the array is resized. +The two items to take care of are: - ensuring that the old version of the array is available between removal and reclamation; diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h index 96841e3..1ed70bb 100644 --- a/include/qemu/rcu.h +++ b/include/qemu/rcu.h @@ -134,6 +134,28 @@ extern void synchronize_rcu(void); extern void rcu_register_thread(void); extern void rcu_unregister_thread(void); +struct rcu_head; +typedef void RCUCBFunc(struct rcu_head *head); + +struct rcu_head { + struct rcu_head *next; + RCUCBFunc *func; +}; + +extern void call_rcu1(struct rcu_head *head, RCUCBFunc *func); + +/* The operands of the minus operator must have the same type, + * which must be the one that we specify in the cast. + */ +#define call_rcu(head, func, field) \ + call_rcu1(({ \ + char __attribute__((unused)) \ + offset_must_be_zero[-offsetof(typeof(*(head)), field)], \ + func_type_invalid = (func) - (void (*)(typeof(head)))(func); \ + &(head)->field; \ + }), \ + (RCUCBFunc *)(func)) + #ifdef __cplusplus } #endif diff --git a/util/rcu.c b/util/rcu.c index e1498db..f1c5736 100644 --- a/util/rcu.c +++ b/util/rcu.c @@ -26,6 +26,7 @@ * IBM's contributions to this file may be relicensed under LGPLv2 or later. */ +#include "qemu-common.h" #include #include #include @@ -33,6 +34,7 @@ #include #include "qemu/rcu.h" #include "qemu/atomic.h" +#include "qemu/thread.h" /* * Global grace period counter. Bit 0 is one if the thread is online. @@ -166,6 +168,119 @@ void synchronize_rcu(void) } } + +#define RCU_CALL_MIN_SIZE 30 + +/* Multi-producer, single-consumer queue based on urcu/static/wfqueue.h + * from liburcu. Note that head is only used by the consumer. + */ +static struct rcu_head dummy; +static struct rcu_head *head = &dummy, **tail = &dummy.next; +static int rcu_call_count; +static QemuEvent rcu_call_ready_event; + +static void enqueue(struct rcu_head *node) +{ + struct rcu_head **old_tail; + + node->next = NULL; + old_tail = atomic_xchg(&tail, &node->next); + atomic_mb_set(old_tail, node); +} + +static struct rcu_head *try_dequeue(void) +{ + struct rcu_head *node, *next; + +retry: + /* Test for an empty list, which we do not expect. Note that for + * the consumer head and tail are always consistent. The head + * is consistent because only the consumer reads/writes it. + * The tail, because it is the first step in the enqueuing. + * It is only the next pointers that might be inconsistent. + */ + if (head == &dummy && atomic_mb_read(&tail) == &dummy.next) { + abort(); + } + + /* If the head node has NULL in its next pointer, the value is + * wrong and we need to wait until its enqueuer finishes the update. + */ + node = head; + next = atomic_mb_read(&head->next); + if (!next) { + return NULL; + } + + /* Since we are the sole consumer, and we excluded the empty case + * above, the queue will always have at least two nodes: the + * dummy node, and the one being removed. So we do not need to update + * the tail pointer. + */ + head = next; + + /* If we dequeued the dummy node, add it back at the end and retry. */ + if (node == &dummy) { + enqueue(node); + goto retry; + } + + return node; +} + +static void *call_rcu_thread(void *opaque) +{ + struct rcu_head *node; + + /* This thread is just a writer. */ + rcu_thread_offline(); + + for (;;) { + int tries = 0; + int n = atomic_read(&rcu_call_count); + + /* Heuristically wait for a decent number of callbacks to pile up. + * Fetch rcu_call_count now, we only must process elements that were + * added before synchronize_rcu() starts. + */ + while (n < RCU_CALL_MIN_SIZE && ++tries <= 5) { + g_usleep(100000); + qemu_event_reset(&rcu_call_ready_event); + n = atomic_read(&rcu_call_count); + if (n < RCU_CALL_MIN_SIZE) { + qemu_event_wait(&rcu_call_ready_event); + n = atomic_read(&rcu_call_count); + } + } + + atomic_sub(&rcu_call_count, n); + synchronize_rcu(); + while (n > 0) { + node = try_dequeue(); + while (!node) { + qemu_event_reset(&rcu_call_ready_event); + node = try_dequeue(); + if (!node) { + qemu_event_wait(&rcu_call_ready_event); + node = try_dequeue(); + } + } + + n--; + node->func(node); + } + } + abort(); +} + +void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node)) +{ + node->func = func; + enqueue(node); + atomic_inc(&rcu_call_count); + qemu_event_set(&rcu_call_ready_event); +} + void rcu_register_thread(void) { if (!tls_get_rcu_reader()) { @@ -189,7 +304,12 @@ void rcu_unregister_thread(void) static void __attribute__((__constructor__)) rcu_init(void) { + QemuThread thread; + qemu_mutex_init(&rcu_gp_lock); qemu_event_init(&rcu_gp_event, true); + + qemu_event_init(&rcu_call_ready_event, false); + qemu_thread_create(&thread, call_rcu_thread, NULL, QEMU_THREAD_DETACHED); rcu_register_thread(); }