@@ -162,8 +162,11 @@ restart:
trace_thread_pool_complete(elem, elem->common.opaque, elem->ret);
}
if (elem->state == THREAD_DONE && elem->common.cb) {
+ qemu_mutex_lock(&lock);
+ int ret = elem->ret;
+ qemu_mutex_unlock(&lock);
QLIST_REMOVE(elem, all);
- elem->common.cb(elem->common.opaque, elem->ret);
+ elem->common.cb(elem->common.opaque, ret);
qemu_aio_release(elem);
goto restart;
} else {
--
1.7.12.1
From 8671e581bb65be4d3cd82a9b99fe46735c6ea76b Mon Sep 17 00:00:00 2001
From: Paolo Bonzini <pbonzini@redhat.com>
Date: Wed, 31 Oct 2012 10:09:26 +0100
Subject: [PATCH 2/4] document lock policy
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
thread-pool.c | 13 ++++++++++---
1 file modificato, 10 inserzioni(+), 3 rimozioni(-)
@@ -42,7 +42,10 @@ struct ThreadPoolElement {
enum ThreadState state;
int ret;
+ /* Access to this list is protected by lock. */
QTAILQ_ENTRY(ThreadPoolElement) reqs;
+
+ /* Access to this list is protected by the global mutex. */
QLIST_ENTRY(ThreadPoolElement) all;
};
@@ -51,14 +54,18 @@ static QemuMutex lock;
static QemuCond check_cancel;
static QemuSemaphore sem;
static int max_threads = 64;
+static QEMUBH *new_thread_bh;
+
+/* The following variables are protected by the global mutex. */
+static QLIST_HEAD(, ThreadPoolElement) head;
+
+/* The following variables are protected by lock. */
+static QTAILQ_HEAD(, ThreadPoolElement) request_list;
static int cur_threads;
static int idle_threads;
static int new_threads; /* backlog of threads we need to create */
static int pending_threads; /* threads created but not running yet */
static int pending_cancellations; /* whether we need a cond_broadcast */
-static QEMUBH *new_thread_bh;
-static QLIST_HEAD(, ThreadPoolElement) head;
-static QTAILQ_HEAD(, ThreadPoolElement) request_list;
static void *worker_thread(void *unused)
{
--
1.7.12.1
From 1b611e625a1a16c1d1b110e410f082b64c7ba332 Mon Sep 17 00:00:00 2001
From: Paolo Bonzini <pbonzini@redhat.com>
Date: Wed, 31 Oct 2012 10:08:39 +0100
Subject: [PATCH 3/4] simplify locking
Avoid repeated lock/unlock, take lock around the while loop rather
than inside.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
thread-pool.c | 11 +++--------
1 file modificato, 3 inserzioni(+), 8 rimozioni(-)
@@ -71,25 +71,21 @@ static void *worker_thread(void *unused)
{
qemu_mutex_lock(&lock);
pending_threads--;
- qemu_mutex_unlock(&lock);
do_spawn_thread();
while (1) {
ThreadPoolElement *req;
int ret;
- qemu_mutex_lock(&lock);
- idle_threads++;
- qemu_mutex_unlock(&lock);
- ret = qemu_sem_timedwait(&sem, 10000);
- qemu_mutex_lock(&lock);
- idle_threads--;
+ do {
+ idle_threads++;
+ qemu_mutex_unlock(&lock);
+ ret = qemu_sem_timedwait(&sem, 10000);
+ qemu_mutex_lock(&lock);
+ idle_threads--;
+ } while (ret == -1 && !QTAILQ_EMPTY(&request_list));
if (ret == -1) {
- if (QTAILQ_EMPTY(&request_list)) {
- break;
- }
- qemu_mutex_unlock(&lock);
- continue;
+ break;
}
req = QTAILQ_FIRST(&request_list);
@@ -105,14 +103,12 @@ static void *worker_thread(void *unused)
if (pending_cancellations) {
qemu_cond_broadcast(&check_cancel);
}
- qemu_mutex_unlock(&lock);
event_notifier_set(¬ifier);
}
cur_threads--;
qemu_mutex_unlock(&lock);
-
return NULL;
}
@@ -120,23 +116,22 @@ static void do_spawn_thread(void)
{
QemuThread t;
- qemu_mutex_lock(&lock);
+ /* Runs with lock taken. */
if (!new_threads) {
- qemu_mutex_unlock(&lock);
return;
}
new_threads--;
pending_threads++;
- qemu_mutex_unlock(&lock);
-
qemu_thread_create(&t, worker_thread, NULL, QEMU_THREAD_DETACHED);
}
static void spawn_thread_bh_fn(void *opaque)
{
+ qemu_mutex_lock(&lock);
do_spawn_thread();
+ qemu_mutex_unlock(&lock);
}
static void spawn_thread(void)
--
1.7.12.1
From 3478c7db38368804db46924fc22d90cda77f6a48 Mon Sep 17 00:00:00 2001
From: Paolo Bonzini <pbonzini@redhat.com>
Date: Wed, 31 Oct 2012 10:09:11 +0100
Subject: [PATCH 4/4] threadpool: do not take lock in event_notifier_ready
The ordering is:
worker thread consumer thread
-------------------------------------------------------------------
write ret event_notifier_test_and_clear
wmb() read state
write state rmb()
event_notifier_set read ret
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
thread-pool.c | 19 +++++++++++++------
1 file modificato, 13 inserzioni(+), 6 rimozioni(-)
@@ -39,6 +39,11 @@ struct ThreadPoolElement {
BlockDriverAIOCB common;
ThreadPoolFunc *func;
void *arg;
+
+ /* Moving state out of THREAD_QUEUED is protected by lock. After
+ * that, only the worker thread can write to it. Reads and writes
+ * of state and ret are ordered with memory barriers.
+ */
enum ThreadState state;
int ret;
@@ -97,9 +102,12 @@ static void *worker_thread(void *unused)
ret = req->func(req->arg);
- qemu_mutex_lock(&lock);
- req->state = THREAD_DONE;
req->ret = ret;
+ /* Write ret before state. */
+ smp_wmb();
+ req->state = THREAD_DONE;
+
+ qemu_mutex_lock(&lock);
if (pending_cancellations) {
qemu_cond_broadcast(&check_cancel);
}
@@ -164,11 +172,10 @@ restart:
trace_thread_pool_complete(elem, elem->common.opaque, elem->ret);
}
if (elem->state == THREAD_DONE && elem->common.cb) {
- qemu_mutex_lock(&lock);
- int ret = elem->ret;
- qemu_mutex_unlock(&lock);
QLIST_REMOVE(elem, all);
- elem->common.cb(elem->common.opaque, ret);
+ /* Read state before ret. */
+ smp_rmb();
+ elem->common.cb(elem->common.opaque, elem->ret);
qemu_aio_release(elem);
goto restart;
} else {