From patchwork Thu Jul 17 11:05:15 2014 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Pavel Dovgalyuk X-Patchwork-Id: 371158 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from lists.gnu.org (lists.gnu.org [IPv6:2001:4830:134:3::11]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 470CD14017A for ; Thu, 17 Jul 2014 23:53:06 +1000 (EST) Received: from localhost ([::1]:44431 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1X7m7M-0007Se-9R for incoming@patchwork.ozlabs.org; Thu, 17 Jul 2014 09:53:04 -0400 Received: from eggs.gnu.org ([2001:4830:134:3::10]:50723) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1X7jeL-0006W6-HM for qemu-devel@nongnu.org; Thu, 17 Jul 2014 07:15:04 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1X7jeE-000815-Nb for qemu-devel@nongnu.org; Thu, 17 Jul 2014 07:14:57 -0400 Received: from mail.ispras.ru ([83.149.199.45]:47572) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1X7jUu-0005P2-J2 for qemu-devel@nongnu.org; Thu, 17 Jul 2014 07:05:12 -0400 Received: from [10.10.150.172] (unknown [80.250.189.177]) by mail.ispras.ru (Postfix) with ESMTPSA id CBEC8540157; Thu, 17 Jul 2014 15:05:11 +0400 (MSK) To: qemu-devel@nongnu.org From: Pavel Dovgalyuk Date: Thu, 17 Jul 2014 15:05:15 +0400 Message-ID: <20140717110515.8352.83804.stgit@PASHA-ISP> In-Reply-To: <20140717110153.8352.80175.stgit@PASHA-ISP> References: <20140717110153.8352.80175.stgit@PASHA-ISP> User-Agent: StGit/0.16 MIME-Version: 1.0 X-detected-operating-system: by eggs.gnu.org: GNU/Linux 3.x X-Received-From: 83.149.199.45 X-Mailman-Approved-At: Thu, 17 Jul 2014 09:36:33 -0400 Cc: peter.maydell@linaro.org, peter.crosthwaite@xilinx.com, mark.burton@greensocs.com, real@ispras.ru, batuzovk@ispras.ru, pavel.dovgaluk@ispras.ru, pbonzini@redhat.com, fred.konrad@greensocs.com Subject: [Qemu-devel] [RFC PATCH v2 35/49] replay: thread pool X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.14 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Sender: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org This patch modifies thread pool to allow replaying asynchronous thread tasks synchronously in replay mode. Signed-off-by: Pavel Dovgalyuk --- block/raw-posix.c | 6 +++-- block/raw-win32.c | 4 ++- include/block/thread-pool.h | 4 ++- replay/replay-events.c | 11 +++++++++ replay/replay-internal.h | 3 ++ replay/replay.h | 2 ++ stubs/replay.c | 4 +++ tests/test-thread-pool.c | 7 +++--- thread-pool.c | 53 +++++++++++++++++++++++++++++-------------- 9 files changed, 69 insertions(+), 25 deletions(-) diff --git a/block/raw-posix.c b/block/raw-posix.c index a857def..f600736 100644 --- a/block/raw-posix.c +++ b/block/raw-posix.c @@ -1025,7 +1025,9 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd, trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); - return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque, + qiov ? qiov->replay : false, + qiov ? qiov->replay_step : 0); } static BlockDriverAIOCB *raw_aio_submit(BlockDriverState *bs, @@ -1853,7 +1855,7 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState *bs, acb->aio_ioctl_buf = buf; acb->aio_ioctl_cmd = req; pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); - return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque, false, 0); } #elif defined(__FreeBSD__) || defined(__FreeBSD_kernel__) diff --git a/block/raw-win32.c b/block/raw-win32.c index 902eab6..212307c 100644 --- a/block/raw-win32.c +++ b/block/raw-win32.c @@ -158,7 +158,9 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile, trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); - return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque, + qiov ? qiov->replay : false, + qiov ? qiov->replay_step : 0); } int qemu_ftruncate64(int fd, int64_t length) diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h index 32afcdd..df74f8d 100644 --- a/include/block/thread-pool.h +++ b/include/block/thread-pool.h @@ -33,9 +33,11 @@ void thread_pool_free(ThreadPool *pool); BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool, ThreadPoolFunc *func, void *arg, - BlockDriverCompletionFunc *cb, void *opaque); + BlockDriverCompletionFunc *cb, void *opaque, + bool replay, uint64_t replay_step); int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, void *arg); void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg); +void thread_pool_work(ThreadPool *pool, void *r); #endif diff --git a/replay/replay-events.c b/replay/replay-events.c index 81a5a6b..f39889d 100755 --- a/replay/replay-events.c +++ b/replay/replay-events.c @@ -11,6 +11,7 @@ #include "replay.h" #include "replay-internal.h" +#include "block/thread-pool.h" typedef struct Event { int event_kind; @@ -38,6 +39,9 @@ static void replay_run_event(Event *event) case REPLAY_ASYNC_EVENT_BH: aio_bh_call(event->opaque); break; + case REPLAY_ASYNC_EVENT_THREAD: + thread_pool_work((ThreadPool *)event->opaque, event->opaque2); + break; default: fprintf(stderr, "Replay: invalid async event ID (%d) in the queue\n", event->event_kind); @@ -125,6 +129,11 @@ void replay_add_bh_event(void *bh, uint64_t id) replay_add_event_internal(REPLAY_ASYNC_EVENT_BH, bh, NULL, id); } +void replay_add_thread_event(void *opaque, void *opaque2, uint64_t id) +{ + replay_add_event_internal(REPLAY_ASYNC_EVENT_THREAD, opaque, opaque2, id); +} + void replay_save_events(int opt) { qemu_mutex_lock(&lock); @@ -143,6 +152,7 @@ void replay_save_events(int opt) /* save event-specific data */ switch (event->event_kind) { case REPLAY_ASYNC_EVENT_BH: + case REPLAY_ASYNC_EVENT_THREAD: replay_put_qword(event->id); break; } @@ -175,6 +185,7 @@ void replay_read_events(int opt) /* Execute some events without searching them in the queue */ switch (read_event_kind) { case REPLAY_ASYNC_EVENT_BH: + case REPLAY_ASYNC_EVENT_THREAD: if (read_id == -1) { read_id = replay_get_qword(); } diff --git a/replay/replay-internal.h b/replay/replay-internal.h index bbb117e..3f97fd7 100755 --- a/replay/replay-internal.h +++ b/replay/replay-internal.h @@ -40,7 +40,8 @@ /* Asynchronous events IDs */ #define REPLAY_ASYNC_EVENT_BH 0 -#define REPLAY_ASYNC_COUNT 1 +#define REPLAY_ASYNC_EVENT_THREAD 1 +#define REPLAY_ASYNC_COUNT 2 typedef struct ReplayState { /*! Cached clock values. */ diff --git a/replay/replay.h b/replay/replay.h index 3040b6b..e96c74f 100755 --- a/replay/replay.h +++ b/replay/replay.h @@ -100,5 +100,7 @@ int replay_checkpoint(unsigned int checkpoint); void replay_disable_events(void); /*! Adds BH event to the queue */ void replay_add_bh_event(void *bh, uint64_t id); +/*! Adds thread event to the queue */ +void replay_add_thread_event(void *pool, void *req, uint64_t id); #endif diff --git a/stubs/replay.c b/stubs/replay.c index 1dea1f6..c269b59 100755 --- a/stubs/replay.c +++ b/stubs/replay.c @@ -36,3 +36,7 @@ uint64_t replay_get_current_step(void) { return 0; } + +void replay_add_thread_event(void *opaque, void *opaque2, uint64_t id) +{ +} diff --git a/tests/test-thread-pool.c b/tests/test-thread-pool.c index aa156bc..6dfbb55 100644 --- a/tests/test-thread-pool.c +++ b/tests/test-thread-pool.c @@ -55,7 +55,7 @@ static void test_submit_aio(void) { WorkerTestData data = { .n = 0, .ret = -EINPROGRESS }; data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data, - done_cb, &data); + done_cb, &data, false, 0); /* The callbacks are not called until after the first wait. */ active = 1; @@ -119,7 +119,8 @@ static void test_submit_many(void) for (i = 0; i < 100; i++) { data[i].n = 0; data[i].ret = -EINPROGRESS; - thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]); + thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i], + false, 0); } active = 100; @@ -148,7 +149,7 @@ static void test_cancel(void) data[i].n = 0; data[i].ret = -EINPROGRESS; data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i], - done_cb, &data[i]); + done_cb, &data[i], false, 0); } /* Starting the threads may be left to a bottom half. Let it diff --git a/thread-pool.c b/thread-pool.c index dfb699d..25cff25 100644 --- a/thread-pool.c +++ b/thread-pool.c @@ -24,6 +24,7 @@ #include "qemu/event_notifier.h" #include "block/thread-pool.h" #include "qemu/main-loop.h" +#include "replay/replay.h" static void do_spawn_thread(ThreadPool *pool); @@ -79,6 +80,30 @@ struct ThreadPool { bool stopping; }; +void thread_pool_work(ThreadPool *pool, void *r) +{ + ThreadPoolElement *req = (ThreadPoolElement *)r; + int ret; + if (replay_mode == REPLAY_NONE) { + qemu_mutex_unlock(&pool->lock); + } + + ret = req->func(req->arg); + req->ret = ret; + /* Write ret before state. */ + smp_wmb(); + req->state = THREAD_DONE; + + if (replay_mode == REPLAY_NONE) { + qemu_mutex_lock(&pool->lock); + } + if (pool->pending_cancellations) { + qemu_cond_broadcast(&pool->check_cancel); + } + + event_notifier_set(&pool->notifier); +} + static void *worker_thread(void *opaque) { ThreadPool *pool = opaque; @@ -105,21 +130,12 @@ static void *worker_thread(void *opaque) req = QTAILQ_FIRST(&pool->request_list); QTAILQ_REMOVE(&pool->request_list, req, reqs); req->state = THREAD_ACTIVE; - qemu_mutex_unlock(&pool->lock); - - ret = req->func(req->arg); - req->ret = ret; - /* Write ret before state. */ - smp_wmb(); - req->state = THREAD_DONE; - - qemu_mutex_lock(&pool->lock); - if (pool->pending_cancellations) { - qemu_cond_broadcast(&pool->check_cancel); + if (replay_mode != REPLAY_NONE && req->common.replay) { + replay_add_thread_event(pool, req, req->common.replay_step); + } else { + thread_pool_work(pool, req); } - - event_notifier_set(&pool->notifier); } pool->cur_threads--; @@ -234,7 +250,8 @@ static const AIOCBInfo thread_pool_aiocb_info = { BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool, ThreadPoolFunc *func, void *arg, - BlockDriverCompletionFunc *cb, void *opaque) + BlockDriverCompletionFunc *cb, void *opaque, + bool replay, uint64_t replay_step) { ThreadPoolElement *req; @@ -243,6 +260,8 @@ BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool, req->arg = arg; req->state = THREAD_QUEUED; req->pool = pool; + req->common.replay = replay; + req->common.replay_step = replay_step; QLIST_INSERT_HEAD(&pool->head, req, all); @@ -253,8 +272,8 @@ BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool, spawn_thread(pool); } QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); - qemu_mutex_unlock(&pool->lock); qemu_sem_post(&pool->sem); + qemu_mutex_unlock(&pool->lock); return &req->common; } @@ -276,14 +295,14 @@ int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, { ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS }; assert(qemu_in_coroutine()); - thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc); + thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc, false, 0); qemu_coroutine_yield(); return tpc.ret; } void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg) { - thread_pool_submit_aio(pool, func, arg, NULL, NULL); + thread_pool_submit_aio(pool, func, arg, NULL, NULL, false, 0); } static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)