Patchwork [RFC,08/13] add call_rcu support

login
register
mail settings
Submitter Paolo Bonzini
Date Aug. 15, 2011, 9:08 p.m.
Message ID <1313442520-12062-9-git-send-email-pbonzini@redhat.com>
Download mbox | patch
Permalink /patch/110101/
State New
Headers show

Comments

Paolo Bonzini - Aug. 15, 2011, 9:08 p.m.
Unlike the RCU code, this is not (yet) meant to be equivalent to liburcu,
because we want to run call_rcu callbacks in the main thread and under
the global lock.

The data structure is indeed based on those found in liburcu (this makes
me feel safer), but more heavily commented and adapted to replace futexes
with QemuEvents.

This has a dependency on the iothread, because it relies on the fact that
qemu_schedule_bh works outside the global lock.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 Makefile.objs |    2 +-
 rcu-call.c    |  189 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 rcu.h         |    8 +++
 3 files changed, 198 insertions(+), 1 deletions(-)
 create mode 100644 rcu-call.c

Patch

diff --git a/Makefile.objs b/Makefile.objs
index 902a083..ea5a6eb 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -77,7 +77,7 @@  common-obj-y += $(net-obj-y)
 common-obj-y += $(qobject-obj-y)
 common-obj-$(CONFIG_LINUX) += $(fsdev-obj-$(CONFIG_LINUX))
 common-obj-y += readline.o console.o cursor.o qemu-error.o
-common-obj-y += $(oslib-obj-y)
+common-obj-y += $(oslib-obj-y) rcu-call.o
 common-obj-$(CONFIG_WIN32) += os-win32.o
 common-obj-$(CONFIG_POSIX) += os-posix.o
 
diff --git a/rcu-call.c b/rcu-call.c
new file mode 100644
index 0000000..cee4d8f
--- /dev/null
+++ b/rcu-call.c
@@ -0,0 +1,189 @@ 
+/*
+ * call_rcu implementation
+ *
+ * Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright 2011 Red Hat, Inc.
+ *
+ * Ported to QEMU by Paolo Bonzini  <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#include "qemu-common.h"
+#include "qemu-thread.h"
+#include "qemu-barrier.h"
+#include "rcu.h"
+
+/* Communication between the call_rcu thread and bottom half.  */
+
+static int rcu_call_count, rcu_call_todo;
+static QemuEvent rcu_call_done_event;
+static QEMUBH *rcu_call_bh;
+
+#define RCU_CALL_MIN_SIZE        30
+
+/* Multi-producer, single-consumer queue based on urcu/static/wfqueue.h
+ * from liburcu.  */
+
+static struct rcu_head dummy;
+static struct rcu_head *head = &dummy, **tail = &dummy.next;
+
+static QemuEvent rcu_call_ready_event;
+
+static void enqueue (struct rcu_head *node)
+{
+    struct rcu_head **old_tail;
+
+    node->next = NULL;
+
+    /*
+     * We have to do:
+     *    old_tail = tail, tail = &node->next;
+     *    *old_tail = node;
+     *
+     * The first line is done in the CAS loop below.  Note that producers only
+     * read from TAIL, so they already see a consistent state after this line.
+     * The loop is equivalent to
+     *
+     *     old_tail = __sync_swap(&tail, &node->next);
+     *
+     * but only clang has __sync_swap, not GCC. :(
+     */
+#if defined __i386__ || defined __x86_64__
+    asm volatile ("xchg%z0 %0, %1"
+                  : "=r" (old_tail), "+m" (ACCESS_ONCE(tail))
+                  : "0" (&node->next)
+                  : "memory");
+#else
+    do {
+        old_tail = ACCESS_ONCE(tail);
+    } while (!__sync_bool_compare_swap(&tail, old_tail, &node->next));
+#endif
+
+    /* At this point the consumer may still see a null head->next.  In this
+       case it will wait, so we have time to set the head->next and wake
+       them up.  */
+    ACCESS_ONCE(*old_tail) = node;
+}
+
+static struct rcu_head *dequeue(void)
+{
+    struct rcu_head *node;
+
+retry:
+    /* Test for an empty list, which we do not expect.  Note that for
+     * the consumer head and tail are always consistent.  The head
+     * is consistent because only the consumer reads/writes it.
+     * The tail, because it is the first step in the enqueuing.
+     * It is only the next pointers that might be inconsistent.  */
+    if (head == &dummy && ACCESS_ONCE(tail) == &dummy.next) {
+        abort();
+    }
+
+    /* Since we are the sole consumer, and we excluded the empty case
+     * above, the queue will always have at least two nodes (the dummy
+     * node, and the one being removed.  This means two things.  First,
+     * we do not need to update the tail pointer; second, if the head node
+     * has NULL in its next pointer, the value is wrong and we need
+     * to wait until its enqueuer finishes the update.  */
+    node = head;
+    head = ACCESS_ONCE(node->next);
+    if (head == NULL) {
+        do {
+            qemu_event_wait(&rcu_call_ready_event);
+            qemu_event_reset(&rcu_call_ready_event);
+            head = ACCESS_ONCE(node->next);
+        } while (head == NULL);
+        qemu_event_set(&rcu_call_ready_event);
+    }
+
+    /* If we dequeued the dummy node, add it back at the end and retry.  */
+    if (node == &dummy) {
+        enqueue(node);
+        goto retry;
+    }
+
+    return node;
+}
+
+static void call_rcu_bh(void *opaque)
+{
+    for (;;) {
+        int n;
+        smp_mb();
+        n = ACCESS_ONCE(rcu_call_todo);
+        if (n == 0) {
+            break;
+        }
+
+        /* We know that N callbacks have seen at least one grace
+         * period since they were added.  Process them.  */
+        __sync_fetch_and_add(&rcu_call_todo, -n);
+        while (n-- > 0) {
+            struct rcu_head *node = dequeue();
+            node->func(node);
+        }
+    }
+    qemu_event_set(&rcu_call_done_event);
+}
+
+static void *call_rcu_thread(void *opaque)
+{
+    for (;;) {
+        int n;
+
+        /* Heuristically wait for a decent number of callbacks to pile up.  */
+        qemu_event_wait(&rcu_call_done_event);
+        n = ACCESS_ONCE(rcu_call_count);
+        if (n < RCU_CALL_MIN_SIZE) {
+            n = ACCESS_ONCE(rcu_call_count);
+            if (n < RCU_CALL_MIN_SIZE) {
+                int tries = 0;
+                do {
+                    qemu_event_wait(&rcu_call_ready_event);
+                    qemu_event_reset(&rcu_call_ready_event);
+                    qemu_msleep(100);
+                    n = ACCESS_ONCE(rcu_call_count);
+                } while (n < RCU_CALL_MIN_SIZE && ++tries <= 5);
+                qemu_event_set(&rcu_call_ready_event);
+            }
+        }
+
+        /* Fetch rcu_call_count now.  The bottom half must only see
+         * elements added before synchronize_rcu() starts.  */
+        __sync_fetch_and_add(&rcu_call_count, -n);
+        synchronize_rcu();
+        __sync_fetch_and_add(&rcu_call_todo, n);
+
+        qemu_event_reset(&rcu_call_done_event);
+        qemu_bh_schedule(rcu_call_bh);
+    }
+    abort();
+}
+
+static void call_rcu_init(void)
+{
+    QemuThread thread;
+
+    qemu_event_init(&rcu_call_ready_event, false);
+    qemu_event_init(&rcu_call_done_event, true);
+    rcu_call_bh = qemu_bh_new(call_rcu_bh, NULL);
+
+    qemu_thread_create(&thread, call_rcu_thread, NULL);
+}
+
+void call_rcu(struct rcu_head *node, void (*func)(struct rcu_head *node))
+{
+    static QemuOnce init = QEMU_ONCE_INIT;
+    qemu_once(&init, call_rcu_init);
+    node->func = func;
+    enqueue(node);
+    __sync_fetch_and_add(&rcu_call_count, 1);
+    qemu_event_set(&rcu_call_ready_event);
+}
+
+void rcu_free(struct rcu_head *head)
+{
+    qemu_free(head);
+}
diff --git a/rcu.h b/rcu.h
index 569f696..86a1fea 100644
--- a/rcu.h
+++ b/rcu.h
@@ -128,6 +128,14 @@  extern void synchronize_rcu(void);
 extern void rcu_register_thread(void);
 extern void rcu_unregister_thread(void);
 
+struct rcu_head {
+	struct rcu_head *next;
+	void (*func)(struct rcu_head *head);
+};
+
+extern void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *head));
+extern void rcu_free(struct rcu_head *head);
+
 #ifdef __cplusplus 
 }
 #endif