Patchwork [19/25] aio: add generic thread-pool facility

login
register
mail settings
Submitter Paolo Bonzini
Date Oct. 31, 2012, 9:41 a.m.
Message ID <5090F25C.1090203@redhat.com>
Download mbox | patch
Permalink /patch/195842/
State New
Headers show

Comments

Paolo Bonzini - Oct. 31, 2012, 9:41 a.m.
Il 30/10/2012 20:13, Stefan Hajnoczi ha scritto:
> On Fri, Oct 26, 2012 at 04:05:49PM +0200, Paolo Bonzini wrote:
>> +static void event_notifier_ready(EventNotifier *notifier)
>> +{
>> +    ThreadPoolElement *elem, *next;
>> +
>> +    event_notifier_test_and_clear(notifier);
>> +restart:
>> +    QLIST_FOREACH_SAFE(elem, &head, all, next) {
>> +        if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
>> +            continue;
>> +        }
>> +        if (elem->state == THREAD_DONE) {
>> +            trace_thread_pool_complete(elem, elem->common.opaque, elem->ret);
>> +        }
>> +        if (elem->state == THREAD_DONE && elem->common.cb) {
>> +            QLIST_REMOVE(elem, all);
>> +            elem->common.cb(elem->common.opaque, elem->ret);
> 
> This function didn't take the lock.  First it accessed elem->state and
> how it reads elem->ret.  We need to take the lock to ensure both
> elem->state and elem->ret have been set - otherwise we could read
> elem->ret before the return value was stored.

Right.  posix-aio-compat didn't need this because it only had ret.  
Just as important: the locking policy was not documented at all.

I'm applying some changes.  Logically (and for ease of review) they are 
four patches on top of this one, but they'll be squashed in the next 
submission.  (Hmm, the fourth should be separate).

>> +typedef struct ThreadPoolCo {
>> +    Coroutine *co;
>> +    int ret;
>> +} ThreadPoolCo;
>> +
>> +static void thread_pool_co_cb(void *opaque, int ret)
>> +{
>> +    ThreadPoolCo *co = opaque;
>> +
>> +    co->ret = ret;
>> +    qemu_coroutine_enter(co->co, NULL);
>> +}
>> +
>> +int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
>> +{
>> +    ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
>> +    assert(qemu_in_coroutine());
>> +    thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc);
>> +    qemu_coroutine_yield();
>> +    return tpc.ret;
> 
> It's important to understand that the submit_aio, yield, return ret
> pattern works because we assume this function was called as part of the
> main loop.
> 
> If thread_pool_submit_co() was called outside the event loop and global
> mutex, then there is a race between the submit_aio and yield steps where
> thread_pool_co_cb() is called before this coroutine yields!

Even before that, thread_pool_submit_aio would race on the 
non-thread-safe qemu_aio_get.  Also, head is protected by the BQL.

event_notifier_ready needs to run under the BQL too, because it 
accesses head and also calls qemu_aio_release.

qemu_aio_get and qemu_aio_release should be moved to AioContext, so 
that they can use the (upcoming) AioContext lock instead of the BQL.  
The thread pool needs to be per-AioContext instead of using globals, 
too.  However, this can be done later.

Paolo

----------------------- >8 ---------------------------
From acf39d76ddf4109fbdbc897afe0d9a23ba8ffba1 Mon Sep 17 00:00:00 2001
From: Paolo Bonzini <pbonzini@redhat.com>
Date: Wed, 31 Oct 2012 10:07:04 +0100
Subject: [PATCH 1/4] fix locking in thread-pool

event_notifier_ready accesses elem->state and then elem->ret.  We need
to take the lock to ensure both elem->state and elem->ret have been set -
otherwise we could read elem->ret before the return value was stored.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 thread-pool.c | 5 ++++-
 1 file modificato, 4 inserzioni(+). 1 rimozione(-)
Stefan Hajnoczi - Oct. 31, 2012, 1:44 p.m.
On Wed, Oct 31, 2012 at 10:41 AM, Paolo Bonzini <pbonzini@redhat.com> wrote:
> Il 30/10/2012 20:13, Stefan Hajnoczi ha scritto:
>> On Fri, Oct 26, 2012 at 04:05:49PM +0200, Paolo Bonzini wrote:
>>> +static void event_notifier_ready(EventNotifier *notifier)
>>> +{
>>> +    ThreadPoolElement *elem, *next;
>>> +
>>> +    event_notifier_test_and_clear(notifier);
>>> +restart:
>>> +    QLIST_FOREACH_SAFE(elem, &head, all, next) {
>>> +        if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
>>> +            continue;
>>> +        }
>>> +        if (elem->state == THREAD_DONE) {
>>> +            trace_thread_pool_complete(elem, elem->common.opaque, elem->ret);
>>> +        }
>>> +        if (elem->state == THREAD_DONE && elem->common.cb) {
>>> +            QLIST_REMOVE(elem, all);
>>> +            elem->common.cb(elem->common.opaque, elem->ret);
>>
>> This function didn't take the lock.  First it accessed elem->state and
>> how it reads elem->ret.  We need to take the lock to ensure both
>> elem->state and elem->ret have been set - otherwise we could read
>> elem->ret before the return value was stored.
>
> Right.  posix-aio-compat didn't need this because it only had ret.
> Just as important: the locking policy was not documented at all.
>
> I'm applying some changes.  Logically (and for ease of review) they are
> four patches on top of this one, but they'll be squashed in the next
> submission.  (Hmm, the fourth should be separate).
>
>>> +typedef struct ThreadPoolCo {
>>> +    Coroutine *co;
>>> +    int ret;
>>> +} ThreadPoolCo;
>>> +
>>> +static void thread_pool_co_cb(void *opaque, int ret)
>>> +{
>>> +    ThreadPoolCo *co = opaque;
>>> +
>>> +    co->ret = ret;
>>> +    qemu_coroutine_enter(co->co, NULL);
>>> +}
>>> +
>>> +int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
>>> +{
>>> +    ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
>>> +    assert(qemu_in_coroutine());
>>> +    thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc);
>>> +    qemu_coroutine_yield();
>>> +    return tpc.ret;
>>
>> It's important to understand that the submit_aio, yield, return ret
>> pattern works because we assume this function was called as part of the
>> main loop.
>>
>> If thread_pool_submit_co() was called outside the event loop and global
>> mutex, then there is a race between the submit_aio and yield steps where
>> thread_pool_co_cb() is called before this coroutine yields!
>
> Even before that, thread_pool_submit_aio would race on the
> non-thread-safe qemu_aio_get.  Also, head is protected by the BQL.
>
> event_notifier_ready needs to run under the BQL too, because it
> accesses head and also calls qemu_aio_release.
>
> qemu_aio_get and qemu_aio_release should be moved to AioContext, so
> that they can use the (upcoming) AioContext lock instead of the BQL.
> The thread pool needs to be per-AioContext instead of using globals,
> too.  However, this can be done later.

I just sent a patch to move from a manual freelist to g_slice_alloc().
 This does away with manual pooling and is thread-safe.  It's possible
that a lock + freelist is more efficient, but I think g_slice_alloc()
is a good start.

Your patches look good, looking forward to the next spin.

Stefan

Patch

diff --git a/thread-pool.c b/thread-pool.c
index 266f12f..e4bd4f3 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -162,8 +162,11 @@  restart:
             trace_thread_pool_complete(elem, elem->common.opaque, elem->ret);
         }
         if (elem->state == THREAD_DONE && elem->common.cb) {
+            qemu_mutex_lock(&lock);
+            int ret = elem->ret;
+            qemu_mutex_unlock(&lock);
             QLIST_REMOVE(elem, all);
-            elem->common.cb(elem->common.opaque, elem->ret);
+            elem->common.cb(elem->common.opaque, ret);
             qemu_aio_release(elem);
             goto restart;
         } else {
-- 
1.7.12.1


From 8671e581bb65be4d3cd82a9b99fe46735c6ea76b Mon Sep 17 00:00:00 2001
From: Paolo Bonzini <pbonzini@redhat.com>
Date: Wed, 31 Oct 2012 10:09:26 +0100
Subject: [PATCH 2/4] document lock policy

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 thread-pool.c | 13 ++++++++++---
 1 file modificato, 10 inserzioni(+), 3 rimozioni(-)

diff --git a/thread-pool.c b/thread-pool.c
index e4bd4f3..10bab70 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -42,7 +42,10 @@  struct ThreadPoolElement {
     enum ThreadState state;
     int ret;
 
+    /* Access to this list is protected by lock.  */
     QTAILQ_ENTRY(ThreadPoolElement) reqs;
+
+    /* Access to this list is protected by the global mutex.  */
     QLIST_ENTRY(ThreadPoolElement) all;
 };
 
@@ -51,14 +54,18 @@  static QemuMutex lock;
 static QemuCond check_cancel;
 static QemuSemaphore sem;
 static int max_threads = 64;
+static QEMUBH *new_thread_bh;
+
+/* The following variables are protected by the global mutex.  */
+static QLIST_HEAD(, ThreadPoolElement) head;
+
+/* The following variables are protected by lock.  */
+static QTAILQ_HEAD(, ThreadPoolElement) request_list;
 static int cur_threads;
 static int idle_threads;
 static int new_threads;     /* backlog of threads we need to create */
 static int pending_threads; /* threads created but not running yet */
 static int pending_cancellations; /* whether we need a cond_broadcast */
-static QEMUBH *new_thread_bh;
-static QLIST_HEAD(, ThreadPoolElement) head;
-static QTAILQ_HEAD(, ThreadPoolElement) request_list;
 
 static void *worker_thread(void *unused)
 {
-- 
1.7.12.1


From 1b611e625a1a16c1d1b110e410f082b64c7ba332 Mon Sep 17 00:00:00 2001
From: Paolo Bonzini <pbonzini@redhat.com>
Date: Wed, 31 Oct 2012 10:08:39 +0100
Subject: [PATCH 3/4] simplify locking

Avoid repeated lock/unlock, take lock around the while loop rather
than inside.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 thread-pool.c | 11 +++--------
 1 file modificato, 3 inserzioni(+), 8 rimozioni(-)

diff --git a/thread-pool.c b/thread-pool.c
index 10bab70..38ac5b4 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -71,25 +71,21 @@  static void *worker_thread(void *unused)
 {
     qemu_mutex_lock(&lock);
     pending_threads--;
-    qemu_mutex_unlock(&lock);
     do_spawn_thread();
 
     while (1) {
         ThreadPoolElement *req;
         int ret;
 
-        qemu_mutex_lock(&lock);
-        idle_threads++;
-        qemu_mutex_unlock(&lock);
-        ret = qemu_sem_timedwait(&sem, 10000);
-        qemu_mutex_lock(&lock);
-        idle_threads--;
+        do {
+            idle_threads++;
+            qemu_mutex_unlock(&lock);
+            ret = qemu_sem_timedwait(&sem, 10000);
+            qemu_mutex_lock(&lock);
+            idle_threads--;
+        } while (ret == -1 && !QTAILQ_EMPTY(&request_list));
         if (ret == -1) {
-            if (QTAILQ_EMPTY(&request_list)) {
-                break;
-            }
-            qemu_mutex_unlock(&lock);
-            continue;
+            break;
         }
 
         req = QTAILQ_FIRST(&request_list);
@@ -105,14 +103,12 @@  static void *worker_thread(void *unused)
         if (pending_cancellations) {
             qemu_cond_broadcast(&check_cancel);
         }
-        qemu_mutex_unlock(&lock);
 
         event_notifier_set(&notifier);
     }
 
     cur_threads--;
     qemu_mutex_unlock(&lock);
-
     return NULL;
 }
 
@@ -120,23 +116,22 @@  static void do_spawn_thread(void)
 {
     QemuThread t;
 
-    qemu_mutex_lock(&lock);
+    /* Runs with lock taken.  */
     if (!new_threads) {
-        qemu_mutex_unlock(&lock);
         return;
     }
 
     new_threads--;
     pending_threads++;
 
-    qemu_mutex_unlock(&lock);
-
     qemu_thread_create(&t, worker_thread, NULL, QEMU_THREAD_DETACHED);
 }
 
 static void spawn_thread_bh_fn(void *opaque)
 {
+    qemu_mutex_lock(&lock);
     do_spawn_thread();
+    qemu_mutex_unlock(&lock);
 }
 
 static void spawn_thread(void)
-- 
1.7.12.1


From 3478c7db38368804db46924fc22d90cda77f6a48 Mon Sep 17 00:00:00 2001
From: Paolo Bonzini <pbonzini@redhat.com>
Date: Wed, 31 Oct 2012 10:09:11 +0100
Subject: [PATCH 4/4] threadpool: do not take lock in event_notifier_ready

The ordering is:

    worker thread                         consumer thread
    -------------------------------------------------------------------
    write ret                             event_notifier_test_and_clear
    wmb()                                 read state
    write state                           rmb()
    event_notifier_set                    read ret

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 thread-pool.c | 19 +++++++++++++------
 1 file modificato, 13 inserzioni(+), 6 rimozioni(-)

diff --git a/thread-pool.c b/thread-pool.c
index 38ac5b4..883948a 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -39,6 +39,11 @@  struct ThreadPoolElement {
     BlockDriverAIOCB common;
     ThreadPoolFunc *func;
     void *arg;
+
+    /* Moving state out of THREAD_QUEUED is protected by lock.  After
+     * that, only the worker thread can write to it.  Reads and writes
+     * of state and ret are ordered with memory barriers.
+     */
     enum ThreadState state;
     int ret;
 
@@ -97,9 +102,12 @@  static void *worker_thread(void *unused)
 
         ret = req->func(req->arg);
 
-        qemu_mutex_lock(&lock);
-        req->state = THREAD_DONE;
         req->ret = ret;
+        /* Write ret before state.  */
+        smp_wmb();
+        req->state = THREAD_DONE;
+
+        qemu_mutex_lock(&lock);
         if (pending_cancellations) {
             qemu_cond_broadcast(&check_cancel);
         }
@@ -164,11 +172,10 @@  restart:
             trace_thread_pool_complete(elem, elem->common.opaque, elem->ret);
         }
         if (elem->state == THREAD_DONE && elem->common.cb) {
-            qemu_mutex_lock(&lock);
-            int ret = elem->ret;
-            qemu_mutex_unlock(&lock);
             QLIST_REMOVE(elem, all);
-            elem->common.cb(elem->common.opaque, ret);
+            /* Read state before ret.  */
+            smp_rmb();
+            elem->common.cb(elem->common.opaque, elem->ret);
             qemu_aio_release(elem);
             goto restart;
         } else {