diff mbox

[RFC,11/22] Thread pool synchronization for replay

Message ID 007f01cf951f$32d6b390$98841ab0$@Dovgaluk@ispras.ru
State New
Headers show

Commit Message

Pavel Dovgalyuk July 1, 2014, 11:25 a.m. UTC
Making thread pool deterministic. All thread work is done in synchronous mode
by executing the worker and callback functions when "checkpoint" in code is
passed or instruction is executed. These events are written to the log
attached to this instruction or "checkpoint".

Signed-off-by: Pavel Dovgalyuk <pavel.dovgaluk@gmail.com>
---
diff mbox

Patch

diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
index 32afcdd..ef77934 100644
--- a/include/block/thread-pool.h
+++ b/include/block/thread-pool.h
@@ -25,17 +25,75 @@ 
 #include "block/block_int.h"
 
 typedef int ThreadPoolFunc(void *opaque);
+typedef struct ThreadPoolElement ThreadPoolElement;
+
+enum ThreadState {
+    THREAD_QUEUED,
+    THREAD_ACTIVE,
+    THREAD_DONE,
+    THREAD_CANCELED,
+};
+
+struct ThreadPool {
+    EventNotifier notifier;
+    AioContext *ctx;
+    QemuMutex lock;
+    QemuCond check_cancel;
+    QemuCond worker_stopped;
+    QemuSemaphore sem;
+    int max_threads;
+    QEMUBH *new_thread_bh;
+
+    /* The following variables are only accessed from one AioContext. */
+    QLIST_HEAD(, ThreadPoolElement) head;
+
+    /* The following variables are protected by lock.  */
+    QTAILQ_HEAD(, ThreadPoolElement) request_list;
+
+    /* The following tasks are executed by replay */
+    QTAILQ_HEAD(, ThreadPoolElement) replay_list;
+    int cur_threads;
+    int idle_threads;
+    int new_threads;     /* backlog of threads we need to create */
+    int pending_threads; /* threads created but not running yet */
+    int pending_cancellations; /* whether we need a cond_broadcast */
+    bool stopping;
+};
 
 typedef struct ThreadPool ThreadPool;
 
+struct ThreadPoolElement {
+    BlockDriverAIOCB common;
+    ThreadPool *pool;
+    ThreadPoolFunc *func;
+    void *arg;
+
+    /* Moving state out of THREAD_QUEUED is protected by lock.  After
+     * that, only the worker thread can write to it.  Reads and writes
+     * of state and ret are ordered with memory barriers.
+     */
+    enum ThreadState state;
+    int ret;
+
+    /* Access to this list is protected by lock.  */
+    QTAILQ_ENTRY(ThreadPoolElement) reqs;
+
+    /* Access to this list is protected by the global mutex.  */
+    QLIST_ENTRY(ThreadPoolElement) all;
+};
+
+
+
 ThreadPool *thread_pool_new(struct AioContext *ctx);
 void thread_pool_free(ThreadPool *pool);
 
 BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool,
         ThreadPoolFunc *func, void *arg,
-        BlockDriverCompletionFunc *cb, void *opaque);
+        BlockDriverCompletionFunc *cb, void *opaque,
+        bool replay, uint64_t replay_step);
 int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
         ThreadPoolFunc *func, void *arg);
 void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);
+void thread_pool_work(struct ThreadPool *pool, struct ThreadPoolElement *req);
 
 #endif

diff --git a/tests/test-thread-pool.c b/tests/test-thread-pool.c
index aa156bc..43f64ce 100644
--- a/tests/test-thread-pool.c
+++ b/tests/test-thread-pool.c
@@ -55,7 +55,7 @@  static void test_submit_aio(void)
 {
     WorkerTestData data = { .n = 0, .ret = -EINPROGRESS };
     data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data,
-                                        done_cb, &data);
+                                        done_cb, &data, false, 0);
 
     /* The callbacks are not called until after the first wait.  */
     active = 1;
@@ -119,7 +119,7 @@  static void test_submit_many(void)
     for (i = 0; i < 100; i++) {
         data[i].n = 0;
         data[i].ret = -EINPROGRESS;
-        thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]);
+        thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i], false, 0);
     }
 
     active = 100;
@@ -148,7 +148,7 @@  static void test_cancel(void)
         data[i].n = 0;
         data[i].ret = -EINPROGRESS;
         data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i],
-                                               done_cb, &data[i]);
+                                               done_cb, &data[i], false, 0);
     }
 
     /* Starting the threads may be left to a bottom half.  Let it
diff --git a/thread-pool.c b/thread-pool.c
index dfb699d..f21252a
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -24,60 +24,36 @@ 
 #include "qemu/event_notifier.h"
 #include "block/thread-pool.h"
 #include "qemu/main-loop.h"
+#include "replay/replay.h"
 
 static void do_spawn_thread(ThreadPool *pool);
 
-typedef struct ThreadPoolElement ThreadPoolElement;
-
-enum ThreadState {
-    THREAD_QUEUED,
-    THREAD_ACTIVE,
-    THREAD_DONE,
-    THREAD_CANCELED,
-};
-
-struct ThreadPoolElement {
-    BlockDriverAIOCB common;
-    ThreadPool *pool;
-    ThreadPoolFunc *func;
-    void *arg;
-
-    /* Moving state out of THREAD_QUEUED is protected by lock.  After
-     * that, only the worker thread can write to it.  Reads and writes
-     * of state and ret are ordered with memory barriers.
-     */
-    enum ThreadState state;
+void thread_pool_work(ThreadPool *pool, ThreadPoolElement *req)
+{
     int ret;
+    if (replay_mode == REPLAY_NONE
+        || replay_get_play_submode() == REPLAY_PLAY_CHANGED) {
+        qemu_mutex_unlock(&pool->lock);
+    }
 
-    /* Access to this list is protected by lock.  */
-    QTAILQ_ENTRY(ThreadPoolElement) reqs;
+    ret = req->func(req->arg);
 
-    /* Access to this list is protected by the global mutex.  */
-    QLIST_ENTRY(ThreadPoolElement) all;
-};
+    req->ret = ret;
+    /* Write ret before state.  */
+    smp_wmb();
+    req->state = THREAD_DONE;
 
-struct ThreadPool {
-    EventNotifier notifier;
-    AioContext *ctx;
-    QemuMutex lock;
-    QemuCond check_cancel;
-    QemuCond worker_stopped;
-    QemuSemaphore sem;
-    int max_threads;
-    QEMUBH *new_thread_bh;
-
-    /* The following variables are only accessed from one AioContext. */
-    QLIST_HEAD(, ThreadPoolElement) head;
-
-    /* The following variables are protected by lock.  */
-    QTAILQ_HEAD(, ThreadPoolElement) request_list;
-    int cur_threads;
-    int idle_threads;
-    int new_threads;     /* backlog of threads we need to create */
-    int pending_threads; /* threads created but not running yet */
-    int pending_cancellations; /* whether we need a cond_broadcast */
-    bool stopping;
-};
+    if (replay_mode == REPLAY_NONE
+        || replay_get_play_submode() == REPLAY_PLAY_CHANGED) {
+        qemu_mutex_lock(&pool->lock);
+    }
+    // ??
+    if (pool->pending_cancellations) {
+        qemu_cond_broadcast(&pool->check_cancel);
+    }
+
+    event_notifier_set(&pool->notifier);
+}
 
 static void *worker_thread(void *opaque)
 {
@@ -105,21 +81,13 @@  static void *worker_thread(void *opaque)
         req = QTAILQ_FIRST(&pool->request_list);
         QTAILQ_REMOVE(&pool->request_list, req, reqs);
         req->state = THREAD_ACTIVE;
-        qemu_mutex_unlock(&pool->lock);
-
-        ret = req->func(req->arg);
-
-        req->ret = ret;
-        /* Write ret before state.  */
-        smp_wmb();
-        req->state = THREAD_DONE;
-
-        qemu_mutex_lock(&pool->lock);
-        if (pool->pending_cancellations) {
-            qemu_cond_broadcast(&pool->check_cancel);
+        
+        // add event pool func
+        if (replay_mode != REPLAY_NONE && req->common.replay) {
+            replay_add_thread_event(pool, req, req->common.replay_step);
+        } else {
+            thread_pool_work(pool, req);
         }
-
-        event_notifier_set(&pool->notifier);
     }
 
     pool->cur_threads--;
@@ -234,7 +202,7 @@  static const AIOCBInfo thread_pool_aiocb_info = {
 
 BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool,
         ThreadPoolFunc *func, void *arg,
-        BlockDriverCompletionFunc *cb, void *opaque)
+        BlockDriverCompletionFunc *cb, void *opaque, bool replay, uint64_t replay_step)
 {
     ThreadPoolElement *req;
 
@@ -243,6 +211,8 @@  BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool,
     req->arg = arg;
     req->state = THREAD_QUEUED;
     req->pool = pool;
+    req->common.replay = replay;
+    req->common.replay_step = replay_step;
 
     QLIST_INSERT_HEAD(&pool->head, req, all);
 
@@ -253,8 +223,8 @@  BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool,
         spawn_thread(pool);
     }
     QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
-    qemu_mutex_unlock(&pool->lock);
     qemu_sem_post(&pool->sem);
+    qemu_mutex_unlock(&pool->lock);
     return &req->common;
 }
 
@@ -276,14 +246,14 @@  int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
 {
     ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
     assert(qemu_in_coroutine());
-    thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc);
+    thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc, false, 0);
     qemu_coroutine_yield();
     return tpc.ret;
 }
 
 void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
 {
-    thread_pool_submit_aio(pool, func, arg, NULL, NULL);
+    thread_pool_submit_aio(pool, func, arg, NULL, NULL, false, 0);
 }
 
 static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
@@ -304,6 +274,7 @@  static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
 
     QLIST_INIT(&pool->head);
     QTAILQ_INIT(&pool->request_list);
+    QTAILQ_INIT(&pool->replay_list);
 
     aio_set_event_notifier(ctx, &pool->notifier, event_notifier_ready);
 }