diff mbox

[5/7] coroutine: rewrite pool to avoid mutex

Message ID 1417183941-26329-6-git-send-email-pbonzini@redhat.com
State New
Headers show

Commit Message

Paolo Bonzini Nov. 28, 2014, 2:12 p.m. UTC
This patch removes the mutex by using fancy lock-free manipulation of
the pool.  Lock-free stacks and queues are not hard, but they can suffer
from the ABA problem so they are better avoided unless you have some
deferred reclamation scheme like RCU.  Otherwise you have to stick
with adding to a list, and emptying it completely.  This is what this
patch does, by coupling a lock-free global list of available coroutines
with per-CPU lists that are actually used on coroutine creation.

Whenever the destruction pool is big enough, the next thread that runs
out of coroutines will steal the whole destruction pool.  This is positive
in two ways:

1) the allocation does not have to do any atomic operation in the fast
path, it's entirely using thread-local storage.  Once every POOL_BATCH_SIZE
allocations it will do a single atomic_xchg.  Release does an atomic_cmpxchg
loop, that hopefully doesn't cause any starvation, and an atomic_inc.

2) in theory this should be completely adaptive.  The number of coroutines
around should be a little more than POOL_BATCH_SIZE * number of allocating
threads; so this also removes qemu_coroutine_adjust_pool_size.  (The previous
pool size was POOL_BATCH_SIZE * number of block backends, so it was a bit
more generous.  But you actually have many high-iodepth disks, it's better
to put them in different iothreads, which will also use separate thread
pools and aio file descriptors).

This speeds up perf/cost (in tests/test-coroutine) by a factor of ~1.33.

I still believe we will end with some kind of coroutine bypass scheme
(even coroutines _do_ allocate an AIOCB, so calling bdrv_aio_readv
directly can help), but hey it cannot hurt to optimize hot code.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 qemu-coroutine.c | 93 +++++++++++++++++++++++++-------------------------------
 1 file changed, 42 insertions(+), 51 deletions(-)

Comments

Kevin Wolf Nov. 28, 2014, 4:40 p.m. UTC | #1
Am 28.11.2014 um 15:12 hat Paolo Bonzini geschrieben:
> I still believe we will end with some kind of coroutine bypass scheme
> (even coroutines _do_ allocate an AIOCB, so calling bdrv_aio_readv
> directly can help), but hey it cannot hurt to optimize hot code.

Not sure if speculations about the future belong into commit messages,
but while it may turn out that a bypass is required in the end (I hope
it doesn't), the part about AIOCBs is wrong if you really consistently
use coroutines all the way down from the device to the block driver.

I think Peter picked up all of my patches to actually handle requests
this way (i.e. virtio-blk already creates the coroutine).

Kevin
Paolo Bonzini Nov. 28, 2014, 5:30 p.m. UTC | #2
On 28/11/2014 17:40, Kevin Wolf wrote:
>> > I still believe we will end with some kind of coroutine bypass scheme
>> > (even coroutines _do_ allocate an AIOCB, so calling bdrv_aio_readv
>> > directly can help), but hey it cannot hurt to optimize hot code.
>
> Not sure if speculations about the future belong into commit messages,
> but while it may turn out that a bypass is required in the end (I hope
> it doesn't), the part about AIOCBs is wrong if you really consistently
> use coroutines all the way down from the device to the block driver.

This is much harder from virtio-scsi than from virtio-blk, though.

Paolo
Paolo Bonzini Nov. 28, 2014, 5:31 p.m. UTC | #3
On 28/11/2014 17:40, Kevin Wolf wrote:
>> > I still believe we will end with some kind of coroutine bypass scheme
>> > (even coroutines _do_ allocate an AIOCB, so calling bdrv_aio_readv
>> > directly can help), but hey it cannot hurt to optimize hot code.
>
> Not sure if speculations about the future belong into commit messages,
> but while it may turn out that a bypass is required in the end (I hope
> it doesn't), the part about AIOCBs is wrong if you really consistently
> use coroutines all the way down from the device to the block driver.

This is much harder for virtio-scsi than for virtio-blk, though.

Paolo
Kevin Wolf Nov. 28, 2014, 6:34 p.m. UTC | #4
Am 28.11.2014 um 18:31 hat Paolo Bonzini geschrieben:
> 
> 
> On 28/11/2014 17:40, Kevin Wolf wrote:
> >> > I still believe we will end with some kind of coroutine bypass scheme
> >> > (even coroutines _do_ allocate an AIOCB, so calling bdrv_aio_readv
> >> > directly can help), but hey it cannot hurt to optimize hot code.
> >
> > Not sure if speculations about the future belong into commit messages,
> > but while it may turn out that a bypass is required in the end (I hope
> > it doesn't), the part about AIOCBs is wrong if you really consistently
> > use coroutines all the way down from the device to the block driver.
> 
> This is much harder for virtio-scsi than for virtio-blk, though.

Why is that? At least replacing the bdrv_aio_*() call by
coroutine_create/coroutine_enter/bdrv_co_*() is a mechanical change that
shouldn't be any harder for virtio-scsi. Whether we can optimise even
more by integration the device more with coroutines might be a different
problem, but at this point you've already got rid of AIOCBs.

Kevin
Paolo Bonzini Nov. 28, 2014, 7:57 p.m. UTC | #5
On 28/11/2014 19:34, Kevin Wolf wrote:
>>> > > Not sure if speculations about the future belong into commit messages,
>>> > > but while it may turn out that a bypass is required in the end (I hope
>>> > > it doesn't), the part about AIOCBs is wrong if you really consistently
>>> > > use coroutines all the way down from the device to the block driver.
>> > 
>> > This is much harder for virtio-scsi than for virtio-blk, though.
> Why is that? At least replacing the bdrv_aio_*() call by
> coroutine_create/coroutine_enter/bdrv_co_*() is a mechanical change that
> shouldn't be any harder for virtio-scsi. Whether we can optimise even
> more by integration the device more with coroutines might be a different
> problem, but at this point you've already got rid of AIOCBs.

Because I/O is done by the generic SCSI code, so you'd have to modify
that and the DMA helpers.  And the generic SCSI code is itself written
asynchronously in order to support HBAs that use a bounce buffer.

Paolo
diff mbox

Patch

diff --git a/qemu-coroutine.c b/qemu-coroutine.c
index bd574aa..aee1017 100644
--- a/qemu-coroutine.c
+++ b/qemu-coroutine.c
@@ -15,31 +15,57 @@ 
 #include "trace.h"
 #include "qemu-common.h"
 #include "qemu/thread.h"
+#include "qemu/atomic.h"
 #include "block/coroutine.h"
 #include "block/coroutine_int.h"
 
 enum {
-    POOL_DEFAULT_SIZE = 64,
+    POOL_BATCH_SIZE = 64,
 };
 
 /** Free list to speed up creation */
-static QemuMutex pool_lock;
-static QSLIST_HEAD(, Coroutine) pool = QSLIST_HEAD_INITIALIZER(pool);
-static unsigned int pool_size;
-static unsigned int pool_max_size = POOL_DEFAULT_SIZE;
+static QSLIST_HEAD(, Coroutine) release_pool = QSLIST_HEAD_INITIALIZER(pool);
+static unsigned int release_pool_size;
+static __thread QSLIST_HEAD(, Coroutine) alloc_pool = QSLIST_HEAD_INITIALIZER(pool);
+static __thread Notifier coroutine_pool_cleanup_notifier;
+
+static void coroutine_pool_cleanup(Notifier *n, void *value)
+{
+    Coroutine *co;
+    Coroutine *tmp;
+
+    QSLIST_FOREACH_SAFE(co, &alloc_pool, pool_next, tmp) {
+        QSLIST_REMOVE_HEAD(&alloc_pool, pool_next);
+        qemu_coroutine_delete(co);
+    }
+}
 
 Coroutine *qemu_coroutine_create(CoroutineEntry *entry)
 {
     Coroutine *co = NULL;
 
     if (CONFIG_COROUTINE_POOL) {
-        qemu_mutex_lock(&pool_lock);
-        co = QSLIST_FIRST(&pool);
+        co = QSLIST_FIRST(&alloc_pool);
+        if (!co) {
+            if (release_pool_size > POOL_BATCH_SIZE) {
+                /* Slow path; a good place to register the destructor, too.  */
+                if (!coroutine_pool_cleanup_notifier.notify) {
+                    coroutine_pool_cleanup_notifier.notify = coroutine_pool_cleanup;
+                    qemu_thread_atexit_add(&coroutine_pool_cleanup_notifier);
+                }
+
+                /* This is not exact; there could be a little skew between
+                 * release_pool_size and the actual size of release_pool.  But
+                 * it is just a heuristic, it does not need to be perfect.
+                 */
+                release_pool_size = 0;
+                QSLIST_MOVE_ATOMIC(&alloc_pool, &release_pool);
+                co = QSLIST_FIRST(&alloc_pool);
+            }
+        }
         if (co) {
-            QSLIST_REMOVE_HEAD(&pool, pool_next);
-            pool_size--;
+            QSLIST_REMOVE_HEAD(&alloc_pool, pool_next);
         }
-        qemu_mutex_unlock(&pool_lock);
     }
 
     if (!co) {
@@ -53,39 +80,19 @@  Coroutine *qemu_coroutine_create(CoroutineEntry *entry)
 
 static void coroutine_delete(Coroutine *co)
 {
+    co->caller = NULL;
+
     if (CONFIG_COROUTINE_POOL) {
-        qemu_mutex_lock(&pool_lock);
-        if (pool_size < pool_max_size) {
-            QSLIST_INSERT_HEAD(&pool, co, pool_next);
-            co->caller = NULL;
-            pool_size++;
-            qemu_mutex_unlock(&pool_lock);
+        if (release_pool_size < POOL_BATCH_SIZE * 2) {
+            QSLIST_INSERT_HEAD_ATOMIC(&release_pool, co, pool_next);
+            atomic_inc(&release_pool_size);
             return;
         }
-        qemu_mutex_unlock(&pool_lock);
     }
 
     qemu_coroutine_delete(co);
 }
 
-static void __attribute__((constructor)) coroutine_pool_init(void)
-{
-    qemu_mutex_init(&pool_lock);
-}
-
-static void __attribute__((destructor)) coroutine_pool_cleanup(void)
-{
-    Coroutine *co;
-    Coroutine *tmp;
-
-    QSLIST_FOREACH_SAFE(co, &pool, pool_next, tmp) {
-        QSLIST_REMOVE_HEAD(&pool, pool_next);
-        qemu_coroutine_delete(co);
-    }
-
-    qemu_mutex_destroy(&pool_lock);
-}
-
 static void coroutine_swap(Coroutine *from, Coroutine *to)
 {
     CoroutineAction ret;
@@ -140,20 +147,4 @@  void coroutine_fn qemu_coroutine_yield(void)
 
 void qemu_coroutine_adjust_pool_size(int n)
 {
-    qemu_mutex_lock(&pool_lock);
-
-    pool_max_size += n;
-
-    /* Callers should never take away more than they added */
-    assert(pool_max_size >= POOL_DEFAULT_SIZE);
-
-    /* Trim oversized pool down to new max */
-    while (pool_size > pool_max_size) {
-        Coroutine *co = QSLIST_FIRST(&pool);
-        QSLIST_REMOVE_HEAD(&pool, pool_next);
-        pool_size--;
-        qemu_coroutine_delete(co);
-    }
-
-    qemu_mutex_unlock(&pool_lock);
 }