diff mbox

[04/15] rcu: add call_rcu

Message ID 1421938053-10318-5-git-send-email-pbonzini@redhat.com
State New
Headers show

Commit Message

Paolo Bonzini Jan. 22, 2015, 2:47 p.m. UTC
Asynchronous callbacks provided by call_rcu are particularly important
for QEMU, because the BQL makes it hard to use synchronize_rcu.

In addition, the current RCU implementation is not particularly friendly
to multiple concurrent synchronize_rcu callers, making call_rcu even
more important.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 docs/rcu.txt       | 110 +++++++++++++++++++++++++++++++++++++++++++++++--
 include/qemu/rcu.h |  22 ++++++++++
 util/rcu.c         | 119 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 247 insertions(+), 4 deletions(-)

Comments

Fam Zheng Jan. 26, 2015, 6:04 a.m. UTC | #1
On Thu, 01/22 15:47, Paolo Bonzini wrote:
> Asynchronous callbacks provided by call_rcu are particularly important
> for QEMU, because the BQL makes it hard to use synchronize_rcu.
> 
> In addition, the current RCU implementation is not particularly friendly
> to multiple concurrent synchronize_rcu callers, making call_rcu even
> more important.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  docs/rcu.txt       | 110 +++++++++++++++++++++++++++++++++++++++++++++++--
>  include/qemu/rcu.h |  22 ++++++++++
>  util/rcu.c         | 119 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>  3 files changed, 247 insertions(+), 4 deletions(-)
> 
> diff --git a/docs/rcu.txt b/docs/rcu.txt
> index 9938ad3..61752b9 100644
> --- a/docs/rcu.txt
> +++ b/docs/rcu.txt
> @@ -82,7 +82,50 @@ The core RCU API is small:
>          Note that it would be valid for another update to come while
>          synchronize_rcu is running.  Because of this, it is better that
>          the updater releases any locks it may hold before calling
> -        synchronize_rcu.
> +        synchronize_rcu.  If this is not possible (for example, because
> +        the updater is protected by the BQL), you can use call_rcu.
> +
> +     void call_rcu1(struct rcu_head * head,
> +                    void (*func)(struct rcu_head *head));
> +
> +        This function invokes func(head) after all pre-existing RCU
> +        read-side critical sections on all threads have completed.  This
> +        marks the end of the removal phase, with func taking care
> +        asynchronously of the reclamation phase.
> +
> +        The foo struct needs to have an rcu_head structure added,
> +        perhaps as follows:
> +
> +            struct foo {
> +                struct rcu_head rcu;
> +                int a;
> +                char b;
> +                long c;
> +            };
> +
> +        so that the reclaimer function can fetch the struct foo address
> +        and free it:
> +
> +            call_rcu1(&foo.rcu, foo_reclaim);
> +
> +            void foo_reclaim(struct rcu_head *rp)
> +            {
> +                struct foo *fp = container_of(rp, struct foo, rcu);
> +                g_free(fp);
> +            }
> +
> +        For the common case where the rcu_head member is the first of the
> +        struct, you can use the following macro.
> +
> +     void call_rcu(T *p,
> +                   void (*func)(T *p),
> +                   field-name);
> +
> +        call_rcu1 is typically used through this macro, in the common case
> +        where the "struct rcu_head" is the first field in the struct.  In
> +        the above case, one could have written simply:
> +
> +            call_rcu(foo_reclaim, g_free, rcu);
>  
>       typeof(*p) atomic_rcu_read(p);
>  
> @@ -153,6 +196,11 @@ DIFFERENCES WITH LINUX
>  - atomic_rcu_read and atomic_rcu_set replace rcu_dereference and
>    rcu_assign_pointer.  They take a _pointer_ to the variable being accessed.
>  
> +- call_rcu is a macro that has an extra argument (the name of the first
> +  field in the struct, which must be a struct rcu_head), and expects the
> +  type of the callback's argument to be the type of the first argument.
> +  call_rcu1 is the same as Linux's call_rcu.
> +
>  
>  RCU PATTERNS
>  ============
> @@ -206,7 +254,47 @@ The write side looks simply like this (with appropriate locking):
>      synchronize_rcu();
>      free(old);
>  
> -Note that the same idiom would be possible with reader/writer
> +If the processing cannot be done purely within the critical section, it
> +is possible to combine this idiom with a "real" reference count:
> +
> +    rcu_read_lock();
> +    p = atomic_rcu_read(&foo);
> +    foo_ref(p);
> +    rcu_read_unlock();
> +    /* do something with p. */
> +    foo_unref(p);
> +
> +The write side can be like this:
> +
> +    qemu_mutex_lock(&foo_mutex);
> +    old = foo;
> +    atomic_rcu_set(&foo, new);
> +    qemu_mutex_unlock(&foo_mutex);
> +    synchronize_rcu();
> +    foo_unref(old);
> +
> +or with call_rcu:
> +
> +    qemu_mutex_lock(&foo_mutex);
> +    old = foo;
> +    atomic_rcu_set(&foo, new);
> +    qemu_mutex_unlock(&foo_mutex);
> +    call_rcu(foo_unref, old, rcu);
> +
> +In both cases, the write side only performs removal.  Reclamation
> +happens when the last reference to a "foo" object is dropped.
> +Using synchronize_rcu() is undesirably expensive, because the
> +last reference may be dropped on the read side.  Hence you can
> +use call_rcu() instead:
> +
> +     foo_unref(struct foo *p) {
> +        if (atomic_fetch_dec(&p->refcount) == 1) {
> +            call_rcu(foo_destroy, p, rcu);
> +        }
> +    }
> +
> +
> +Note that the same idioms would be possible with reader/writer
>  locks:
>  
>      read_lock(&foo_rwlock);         write_mutex_lock(&foo_rwlock);
> @@ -216,13 +304,27 @@ locks:
>                                      write_mutex_unlock(&foo_rwlock);
>                                      free(p);
>  
> +    ------------------------------------------------------------------
> +
> +    read_lock(&foo_rwlock);         write_mutex_lock(&foo_rwlock);
> +    p = foo;                        old = foo;
> +    foo_ref(p);                     foo = new;
> +    read_unlock(&foo_rwlock);       foo_unref(old);
> +    /* do something with p. */      write_mutex_unlock(&foo_rwlock);
> +    read_lock(&foo_rwlock);
> +    foo_unref(p);
> +    read_unlock(&foo_rwlock);
> +
> +foo_unref could use a mechanism such as bottom halves to move deallocation
> +out of the write-side critical section.
> +
>  
>  RCU resizable arrays
>  --------------------
>  
>  Resizable arrays can be used with RCU.  The expensive RCU synchronization
> -only needs to take place when the array is resized.  The two items to
> -take care of are:
> +(or call_rcu) only needs to take place when the array is resized.
> +The two items to take care of are:
>  
>  - ensuring that the old version of the array is available between removal
>    and reclamation;
> diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h
> index da043f2..068a279 100644
> --- a/include/qemu/rcu.h
> +++ b/include/qemu/rcu.h
> @@ -118,6 +118,28 @@ extern void synchronize_rcu(void);
>  extern void rcu_register_thread(void);
>  extern void rcu_unregister_thread(void);
>  
> +struct rcu_head;
> +typedef void RCUCBFunc(struct rcu_head *head);
> +
> +struct rcu_head {
> +    struct rcu_head *next;
> +    RCUCBFunc *func;
> +};
> +
> +extern void call_rcu1(struct rcu_head *head, RCUCBFunc *func);
> +
> +/* The operands of the minus operator must have the same type,
> + * which must be the one that we specify in the cast.
> + */
> +#define call_rcu(head, func, field)                                      \
> +    call_rcu1(({                                                         \
> +         char __attribute__((unused))                                    \
> +            offset_must_be_zero[-offsetof(typeof(*(head)), field)],      \
> +            func_type_invalid = (func) - (void (*)(typeof(head)))(func); \
> +         &(head)->field;                                                 \
> +      }),                                                                \
> +      (RCUCBFunc *)(func))
> +
>  #ifdef __cplusplus
>  }
>  #endif
> diff --git a/util/rcu.c b/util/rcu.c
> index 1f737d5..c9c3e6e 100644
> --- a/util/rcu.c
> +++ b/util/rcu.c
> @@ -26,6 +26,7 @@
>   * IBM's contributions to this file may be relicensed under LGPLv2 or later.
>   */
>  
> +#include "qemu-common.h"
>  #include <stdio.h>
>  #include <assert.h>
>  #include <stdlib.h>
> @@ -33,6 +34,7 @@
>  #include <errno.h>
>  #include "qemu/rcu.h"
>  #include "qemu/atomic.h"
> +#include "qemu/thread.h"
>  
>  /*
>   * Global grace period counter.  Bit 0 is always one in rcu_gp_ctr.
> @@ -149,6 +151,116 @@ void synchronize_rcu(void)
>      qemu_mutex_unlock(&rcu_gp_lock);
>  }
>  
> +
> +#define RCU_CALL_MIN_SIZE        30
> +
> +/* Multi-producer, single-consumer queue based on urcu/static/wfqueue.h
> + * from liburcu.  Note that head is only used by the consumer.
> + */
> +static struct rcu_head dummy;
> +static struct rcu_head *head = &dummy, **tail = &dummy.next;
> +static int rcu_call_count;
> +static QemuEvent rcu_call_ready_event;
> +
> +static void enqueue(struct rcu_head *node)
> +{
> +    struct rcu_head **old_tail;
> +
> +    node->next = NULL;
> +    old_tail = atomic_xchg(&tail, &node->next);
> +    atomic_mb_set(old_tail, node);
> +}
> +
> +static struct rcu_head *try_dequeue(void)
> +{
> +    struct rcu_head *node, *next;
> +
> +retry:
> +    /* Test for an empty list, which we do not expect.  Note that for
> +     * the consumer head and tail are always consistent.  The head
> +     * is consistent because only the consumer reads/writes it.
> +     * The tail, because it is the first step in the enqueuing.
> +     * It is only the next pointers that might be inconsistent.
> +     */
> +    if (head == &dummy && atomic_mb_read(&tail) == &dummy.next) {
> +        abort();
> +    }
> +
> +    /* If the head node has NULL in its next pointer, the value is
> +     * wrong and we need to wait until its enqueuer finishes the update.
> +     */
> +    node = head;
> +    next = atomic_mb_read(&head->next);
> +    if (!next) {
> +        return NULL;
> +    }
> +
> +    /* Since we are the sole consumer, and we excluded the empty case
> +     * above, the queue will always have at least two nodes: the
> +     * dummy node, and the one being removed.  So we do not need to update
> +     * the tail pointer.
> +     */
> +    head = next;
> +
> +    /* If we dequeued the dummy node, add it back at the end and retry.  */
> +    if (node == &dummy) {
> +        enqueue(node);
> +        goto retry;
> +    }
> +
> +    return node;
> +}
> +
> +static void *call_rcu_thread(void *opaque)
> +{
> +    struct rcu_head *node;
> +
> +    for (;;) {
> +        int tries = 0;
> +        int n = atomic_read(&rcu_call_count);
> +
> +        /* Heuristically wait for a decent number of callbacks to pile up.
> +         * Fetch rcu_call_count now, we only must process elements that were
> +         * added before synchronize_rcu() starts.
> +         */
> +        while (n < RCU_CALL_MIN_SIZE && ++tries <= 5) {
> +            g_usleep(100000);
> +            qemu_event_reset(&rcu_call_ready_event);
> +            n = atomic_read(&rcu_call_count);
> +            if (n < RCU_CALL_MIN_SIZE) {
> +                qemu_event_wait(&rcu_call_ready_event);
> +                n = atomic_read(&rcu_call_count);
> +            }
> +        }
> +
> +        atomic_sub(&rcu_call_count, n);
> +        synchronize_rcu();
> +        while (n > 0) {
> +            node = try_dequeue();
> +            while (!node) {
> +                qemu_event_reset(&rcu_call_ready_event);
> +                node = try_dequeue();
> +                if (!node) {
> +                    qemu_event_wait(&rcu_call_ready_event);
> +                    node = try_dequeue();
> +                }
> +            }
> +
> +            n--;
> +            node->func(node);
> +        }
> +    }
> +    abort();
> +}
> +
> +void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node))
> +{
> +    node->func = func;
> +    enqueue(node);
> +    atomic_inc(&rcu_call_count);
> +    qemu_event_set(&rcu_call_ready_event);
> +}
> +
>  void rcu_register_thread(void)
>  {
>      assert(rcu_reader.ctr == 0);
> @@ -166,7 +278,14 @@ void rcu_unregister_thread(void)
>  
>  static void __attribute__((__constructor__)) rcu_init(void)
>  {
> +    QemuThread thread;
> +
>      qemu_mutex_init(&rcu_gp_lock);
>      qemu_event_init(&rcu_gp_event, true);
> +
> +    qemu_event_init(&rcu_call_ready_event, false);
> +    qemu_thread_create(&thread, "call_rcu", call_rcu_thread,
> +                       NULL, QEMU_THREAD_DETACHED);
> +
>      rcu_register_thread();
>  }
> -- 
> 1.8.3.1
> 
> 
> 

Reviewed-by: Fam Zheng <famz@redhat.com>
diff mbox

Patch

diff --git a/docs/rcu.txt b/docs/rcu.txt
index 9938ad3..61752b9 100644
--- a/docs/rcu.txt
+++ b/docs/rcu.txt
@@ -82,7 +82,50 @@  The core RCU API is small:
         Note that it would be valid for another update to come while
         synchronize_rcu is running.  Because of this, it is better that
         the updater releases any locks it may hold before calling
-        synchronize_rcu.
+        synchronize_rcu.  If this is not possible (for example, because
+        the updater is protected by the BQL), you can use call_rcu.
+
+     void call_rcu1(struct rcu_head * head,
+                    void (*func)(struct rcu_head *head));
+
+        This function invokes func(head) after all pre-existing RCU
+        read-side critical sections on all threads have completed.  This
+        marks the end of the removal phase, with func taking care
+        asynchronously of the reclamation phase.
+
+        The foo struct needs to have an rcu_head structure added,
+        perhaps as follows:
+
+            struct foo {
+                struct rcu_head rcu;
+                int a;
+                char b;
+                long c;
+            };
+
+        so that the reclaimer function can fetch the struct foo address
+        and free it:
+
+            call_rcu1(&foo.rcu, foo_reclaim);
+
+            void foo_reclaim(struct rcu_head *rp)
+            {
+                struct foo *fp = container_of(rp, struct foo, rcu);
+                g_free(fp);
+            }
+
+        For the common case where the rcu_head member is the first of the
+        struct, you can use the following macro.
+
+     void call_rcu(T *p,
+                   void (*func)(T *p),
+                   field-name);
+
+        call_rcu1 is typically used through this macro, in the common case
+        where the "struct rcu_head" is the first field in the struct.  In
+        the above case, one could have written simply:
+
+            call_rcu(foo_reclaim, g_free, rcu);
 
      typeof(*p) atomic_rcu_read(p);
 
@@ -153,6 +196,11 @@  DIFFERENCES WITH LINUX
 - atomic_rcu_read and atomic_rcu_set replace rcu_dereference and
   rcu_assign_pointer.  They take a _pointer_ to the variable being accessed.
 
+- call_rcu is a macro that has an extra argument (the name of the first
+  field in the struct, which must be a struct rcu_head), and expects the
+  type of the callback's argument to be the type of the first argument.
+  call_rcu1 is the same as Linux's call_rcu.
+
 
 RCU PATTERNS
 ============
@@ -206,7 +254,47 @@  The write side looks simply like this (with appropriate locking):
     synchronize_rcu();
     free(old);
 
-Note that the same idiom would be possible with reader/writer
+If the processing cannot be done purely within the critical section, it
+is possible to combine this idiom with a "real" reference count:
+
+    rcu_read_lock();
+    p = atomic_rcu_read(&foo);
+    foo_ref(p);
+    rcu_read_unlock();
+    /* do something with p. */
+    foo_unref(p);
+
+The write side can be like this:
+
+    qemu_mutex_lock(&foo_mutex);
+    old = foo;
+    atomic_rcu_set(&foo, new);
+    qemu_mutex_unlock(&foo_mutex);
+    synchronize_rcu();
+    foo_unref(old);
+
+or with call_rcu:
+
+    qemu_mutex_lock(&foo_mutex);
+    old = foo;
+    atomic_rcu_set(&foo, new);
+    qemu_mutex_unlock(&foo_mutex);
+    call_rcu(foo_unref, old, rcu);
+
+In both cases, the write side only performs removal.  Reclamation
+happens when the last reference to a "foo" object is dropped.
+Using synchronize_rcu() is undesirably expensive, because the
+last reference may be dropped on the read side.  Hence you can
+use call_rcu() instead:
+
+     foo_unref(struct foo *p) {
+        if (atomic_fetch_dec(&p->refcount) == 1) {
+            call_rcu(foo_destroy, p, rcu);
+        }
+    }
+
+
+Note that the same idioms would be possible with reader/writer
 locks:
 
     read_lock(&foo_rwlock);         write_mutex_lock(&foo_rwlock);
@@ -216,13 +304,27 @@  locks:
                                     write_mutex_unlock(&foo_rwlock);
                                     free(p);
 
+    ------------------------------------------------------------------
+
+    read_lock(&foo_rwlock);         write_mutex_lock(&foo_rwlock);
+    p = foo;                        old = foo;
+    foo_ref(p);                     foo = new;
+    read_unlock(&foo_rwlock);       foo_unref(old);
+    /* do something with p. */      write_mutex_unlock(&foo_rwlock);
+    read_lock(&foo_rwlock);
+    foo_unref(p);
+    read_unlock(&foo_rwlock);
+
+foo_unref could use a mechanism such as bottom halves to move deallocation
+out of the write-side critical section.
+
 
 RCU resizable arrays
 --------------------
 
 Resizable arrays can be used with RCU.  The expensive RCU synchronization
-only needs to take place when the array is resized.  The two items to
-take care of are:
+(or call_rcu) only needs to take place when the array is resized.
+The two items to take care of are:
 
 - ensuring that the old version of the array is available between removal
   and reclamation;
diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h
index da043f2..068a279 100644
--- a/include/qemu/rcu.h
+++ b/include/qemu/rcu.h
@@ -118,6 +118,28 @@  extern void synchronize_rcu(void);
 extern void rcu_register_thread(void);
 extern void rcu_unregister_thread(void);
 
+struct rcu_head;
+typedef void RCUCBFunc(struct rcu_head *head);
+
+struct rcu_head {
+    struct rcu_head *next;
+    RCUCBFunc *func;
+};
+
+extern void call_rcu1(struct rcu_head *head, RCUCBFunc *func);
+
+/* The operands of the minus operator must have the same type,
+ * which must be the one that we specify in the cast.
+ */
+#define call_rcu(head, func, field)                                      \
+    call_rcu1(({                                                         \
+         char __attribute__((unused))                                    \
+            offset_must_be_zero[-offsetof(typeof(*(head)), field)],      \
+            func_type_invalid = (func) - (void (*)(typeof(head)))(func); \
+         &(head)->field;                                                 \
+      }),                                                                \
+      (RCUCBFunc *)(func))
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/util/rcu.c b/util/rcu.c
index 1f737d5..c9c3e6e 100644
--- a/util/rcu.c
+++ b/util/rcu.c
@@ -26,6 +26,7 @@ 
  * IBM's contributions to this file may be relicensed under LGPLv2 or later.
  */
 
+#include "qemu-common.h"
 #include <stdio.h>
 #include <assert.h>
 #include <stdlib.h>
@@ -33,6 +34,7 @@ 
 #include <errno.h>
 #include "qemu/rcu.h"
 #include "qemu/atomic.h"
+#include "qemu/thread.h"
 
 /*
  * Global grace period counter.  Bit 0 is always one in rcu_gp_ctr.
@@ -149,6 +151,116 @@  void synchronize_rcu(void)
     qemu_mutex_unlock(&rcu_gp_lock);
 }
 
+
+#define RCU_CALL_MIN_SIZE        30
+
+/* Multi-producer, single-consumer queue based on urcu/static/wfqueue.h
+ * from liburcu.  Note that head is only used by the consumer.
+ */
+static struct rcu_head dummy;
+static struct rcu_head *head = &dummy, **tail = &dummy.next;
+static int rcu_call_count;
+static QemuEvent rcu_call_ready_event;
+
+static void enqueue(struct rcu_head *node)
+{
+    struct rcu_head **old_tail;
+
+    node->next = NULL;
+    old_tail = atomic_xchg(&tail, &node->next);
+    atomic_mb_set(old_tail, node);
+}
+
+static struct rcu_head *try_dequeue(void)
+{
+    struct rcu_head *node, *next;
+
+retry:
+    /* Test for an empty list, which we do not expect.  Note that for
+     * the consumer head and tail are always consistent.  The head
+     * is consistent because only the consumer reads/writes it.
+     * The tail, because it is the first step in the enqueuing.
+     * It is only the next pointers that might be inconsistent.
+     */
+    if (head == &dummy && atomic_mb_read(&tail) == &dummy.next) {
+        abort();
+    }
+
+    /* If the head node has NULL in its next pointer, the value is
+     * wrong and we need to wait until its enqueuer finishes the update.
+     */
+    node = head;
+    next = atomic_mb_read(&head->next);
+    if (!next) {
+        return NULL;
+    }
+
+    /* Since we are the sole consumer, and we excluded the empty case
+     * above, the queue will always have at least two nodes: the
+     * dummy node, and the one being removed.  So we do not need to update
+     * the tail pointer.
+     */
+    head = next;
+
+    /* If we dequeued the dummy node, add it back at the end and retry.  */
+    if (node == &dummy) {
+        enqueue(node);
+        goto retry;
+    }
+
+    return node;
+}
+
+static void *call_rcu_thread(void *opaque)
+{
+    struct rcu_head *node;
+
+    for (;;) {
+        int tries = 0;
+        int n = atomic_read(&rcu_call_count);
+
+        /* Heuristically wait for a decent number of callbacks to pile up.
+         * Fetch rcu_call_count now, we only must process elements that were
+         * added before synchronize_rcu() starts.
+         */
+        while (n < RCU_CALL_MIN_SIZE && ++tries <= 5) {
+            g_usleep(100000);
+            qemu_event_reset(&rcu_call_ready_event);
+            n = atomic_read(&rcu_call_count);
+            if (n < RCU_CALL_MIN_SIZE) {
+                qemu_event_wait(&rcu_call_ready_event);
+                n = atomic_read(&rcu_call_count);
+            }
+        }
+
+        atomic_sub(&rcu_call_count, n);
+        synchronize_rcu();
+        while (n > 0) {
+            node = try_dequeue();
+            while (!node) {
+                qemu_event_reset(&rcu_call_ready_event);
+                node = try_dequeue();
+                if (!node) {
+                    qemu_event_wait(&rcu_call_ready_event);
+                    node = try_dequeue();
+                }
+            }
+
+            n--;
+            node->func(node);
+        }
+    }
+    abort();
+}
+
+void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node))
+{
+    node->func = func;
+    enqueue(node);
+    atomic_inc(&rcu_call_count);
+    qemu_event_set(&rcu_call_ready_event);
+}
+
 void rcu_register_thread(void)
 {
     assert(rcu_reader.ctr == 0);
@@ -166,7 +278,14 @@  void rcu_unregister_thread(void)
 
 static void __attribute__((__constructor__)) rcu_init(void)
 {
+    QemuThread thread;
+
     qemu_mutex_init(&rcu_gp_lock);
     qemu_event_init(&rcu_gp_event, true);
+
+    qemu_event_init(&rcu_call_ready_event, false);
+    qemu_thread_create(&thread, "call_rcu", call_rcu_thread,
+                       NULL, QEMU_THREAD_DETACHED);
+
     rcu_register_thread();
 }