From patchwork Wed Feb 18 11:57:28 2015 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Pavel Dovgalyuk X-Patchwork-Id: 440957 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from lists.gnu.org (lists.gnu.org [IPv6:2001:4830:134:3::11]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 92E02140079 for ; Wed, 18 Feb 2015 23:09:31 +1100 (AEDT) Received: from localhost ([::1]:50249 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1YO3RZ-0002vU-65 for incoming@patchwork.ozlabs.org; Wed, 18 Feb 2015 07:09:29 -0500 Received: from eggs.gnu.org ([2001:4830:134:3::10]:60072) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1YO3Fx-0008P0-VF for qemu-devel@nongnu.org; Wed, 18 Feb 2015 06:57:31 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1YO3Fv-0003KX-PG for qemu-devel@nongnu.org; Wed, 18 Feb 2015 06:57:29 -0500 Received: from mail.ispras.ru ([83.149.199.45]:38303) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1YO3Fv-0003KK-DZ for qemu-devel@nongnu.org; Wed, 18 Feb 2015 06:57:27 -0500 Received: from [10.10.150.136] (unknown [85.142.117.224]) by mail.ispras.ru (Postfix) with ESMTPSA id 2CF8D540151; Wed, 18 Feb 2015 14:57:26 +0300 (MSK) To: qemu-devel@nongnu.org From: Pavel Dovgalyuk Date: Wed, 18 Feb 2015 14:57:28 +0300 Message-ID: <20150218115728.4176.75061.stgit@PASHA-ISP> In-Reply-To: <20150218115534.4176.12578.stgit@PASHA-ISP> References: <20150218115534.4176.12578.stgit@PASHA-ISP> User-Agent: StGit/0.16 MIME-Version: 1.0 X-detected-operating-system: by eggs.gnu.org: GNU/Linux 3.x X-Received-From: 83.149.199.45 Cc: peter.maydell@linaro.org, peter.crosthwaite@xilinx.com, alex.bennee@linaro.org, mark.burton@greensocs.com, real@ispras.ru, batuzovk@ispras.ru, maria.klimushenkova@ispras.ru, pavel.dovgaluk@ispras.ru, pbonzini@redhat.com, afaerber@suse.de, fred.konrad@greensocs.com Subject: [Qemu-devel] [RFC PATCH v9 18/23] replay: thread pool X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.14 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Sender: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org This patch modifies thread pool to allow replaying asynchronous thread tasks synchronously in replay mode. Signed-off-by: Pavel Dovgalyuk --- block/raw-posix.c | 6 ++++- block/raw-win32.c | 4 +++- include/block/thread-pool.h | 4 +++- replay/replay-events.c | 11 ++++++++++ replay/replay-internal.h | 1 + replay/replay.h | 2 ++ stubs/replay.c | 4 ++++ tests/test-thread-pool.c | 7 ++++-- thread-pool.c | 49 ++++++++++++++++++++++++++++++------------- 9 files changed, 66 insertions(+), 22 deletions(-) diff --git a/block/raw-posix.c b/block/raw-posix.c index e474c17..4636b95 100644 --- a/block/raw-posix.c +++ b/block/raw-posix.c @@ -1136,7 +1136,9 @@ static BlockAIOCB *paio_submit(BlockDriverState *bs, int fd, trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); - return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque, + qiov ? qiov->replay : false, + qiov ? qiov->replay_step : 0); } static BlockAIOCB *raw_aio_submit(BlockDriverState *bs, @@ -2062,7 +2064,7 @@ static BlockAIOCB *hdev_aio_ioctl(BlockDriverState *bs, acb->aio_ioctl_buf = buf; acb->aio_ioctl_cmd = req; pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); - return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque, false, 0); } #elif defined(__FreeBSD__) || defined(__FreeBSD_kernel__) diff --git a/block/raw-win32.c b/block/raw-win32.c index 06243d7..7ac693b 100644 --- a/block/raw-win32.c +++ b/block/raw-win32.c @@ -158,7 +158,9 @@ static BlockAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile, trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); - return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque, + qiov ? qiov->replay : false, + qiov ? qiov->replay_step : 0); } int qemu_ftruncate64(int fd, int64_t length) diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h index 42eb5e8..801ac00 100644 --- a/include/block/thread-pool.h +++ b/include/block/thread-pool.h @@ -29,9 +29,11 @@ void thread_pool_free(ThreadPool *pool); BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool, ThreadPoolFunc *func, void *arg, - BlockCompletionFunc *cb, void *opaque); + BlockCompletionFunc *cb, void *opaque, + bool replay, uint64_t replay_step); int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, void *arg); void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg); +void thread_pool_work(ThreadPool *pool, void *r); #endif diff --git a/replay/replay-events.c b/replay/replay-events.c index 2fb73be..bfc1a08 100755 --- a/replay/replay-events.c +++ b/replay/replay-events.c @@ -13,6 +13,7 @@ #include "qemu/error-report.h" #include "replay.h" #include "replay-internal.h" +#include "block/thread-pool.h" typedef struct Event { ReplayAsyncEventKind event_kind; @@ -38,6 +39,9 @@ static void replay_run_event(Event *event) case REPLAY_ASYNC_EVENT_BH: aio_bh_call(event->opaque); break; + case REPLAY_ASYNC_EVENT_THREAD: + thread_pool_work((ThreadPool *)event->opaque, event->opaque2); + break; default: error_report("Replay: invalid async event ID (%d) in the queue", event->event_kind); @@ -137,6 +141,7 @@ static void replay_save_event(Event *event, int checkpoint) /* save event-specific data */ switch (event->event_kind) { case REPLAY_ASYNC_EVENT_BH: + case REPLAY_ASYNC_EVENT_THREAD: replay_put_qword(event->id); break; } @@ -148,6 +153,11 @@ void replay_add_bh_event(void *bh, uint64_t id) replay_add_event_internal(REPLAY_ASYNC_EVENT_BH, bh, NULL, id); } +void replay_add_thread_event(void *opaque, void *opaque2, uint64_t id) +{ + replay_add_event_internal(REPLAY_ASYNC_EVENT_THREAD, opaque, opaque2, id); +} + /* Called with replay mutex locked */ void replay_save_events(int checkpoint) { @@ -180,6 +190,7 @@ static Event *replay_read_event(int checkpoint) /* Events that has not to be in the queue */ switch (read_event_kind) { case REPLAY_ASYNC_EVENT_BH: + case REPLAY_ASYNC_EVENT_THREAD: if (read_id == -1) { read_id = replay_get_qword(); } diff --git a/replay/replay-internal.h b/replay/replay-internal.h index 91c966c..a92a92c 100755 --- a/replay/replay-internal.h +++ b/replay/replay-internal.h @@ -40,6 +40,7 @@ enum ReplayEvents { enum ReplayAsyncEventKind { REPLAY_ASYNC_EVENT_BH, + REPLAY_ASYNC_EVENT_THREAD, REPLAY_ASYNC_COUNT }; diff --git a/replay/replay.h b/replay/replay.h index 0c41f4e..a93f065 100755 --- a/replay/replay.h +++ b/replay/replay.h @@ -97,5 +97,7 @@ void replay_disable_events(void); bool replay_events_enabled(void); /*! Adds BH event to the queue */ void replay_add_bh_event(void *bh, uint64_t id); +/*! Adds thread event to the queue */ +void replay_add_thread_event(void *pool, void *req, uint64_t id); #endif diff --git a/stubs/replay.c b/stubs/replay.c index 95b43f3..81eddae 100755 --- a/stubs/replay.c +++ b/stubs/replay.c @@ -30,3 +30,7 @@ uint64_t replay_get_current_step(void) { return 0; } + +void replay_add_thread_event(void *opaque, void *opaque2, uint64_t id) +{ +} diff --git a/tests/test-thread-pool.c b/tests/test-thread-pool.c index 6a0b981..f32594c 100644 --- a/tests/test-thread-pool.c +++ b/tests/test-thread-pool.c @@ -56,7 +56,7 @@ static void test_submit_aio(void) { WorkerTestData data = { .n = 0, .ret = -EINPROGRESS }; data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data, - done_cb, &data); + done_cb, &data, false, 0); /* The callbacks are not called until after the first wait. */ active = 1; @@ -120,7 +120,8 @@ static void test_submit_many(void) for (i = 0; i < 100; i++) { data[i].n = 0; data[i].ret = -EINPROGRESS; - thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]); + thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i], + false, 0); } active = 100; @@ -149,7 +150,7 @@ static void do_test_cancel(bool sync) data[i].n = 0; data[i].ret = -EINPROGRESS; data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i], - done_cb, &data[i]); + done_cb, &data[i], false, 0); } /* Starting the threads may be left to a bottom half. Let it diff --git a/thread-pool.c b/thread-pool.c index e2cac8e..f5a4dac 100644 --- a/thread-pool.c +++ b/thread-pool.c @@ -22,6 +22,7 @@ #include "trace.h" #include "block/thread-pool.h" #include "qemu/main-loop.h" +#include "replay/replay.h" static void do_spawn_thread(ThreadPool *pool); @@ -74,6 +75,27 @@ struct ThreadPool { bool stopping; }; +void thread_pool_work(ThreadPool *pool, void *r) +{ + ThreadPoolElement *req = (ThreadPoolElement *)r; + int ret; + if (replay_mode == REPLAY_MODE_NONE) { + qemu_mutex_unlock(&pool->lock); + } + + ret = req->func(req->arg); + req->ret = ret; + /* Write ret before state. */ + smp_wmb(); + req->state = THREAD_DONE; + + if (replay_mode == REPLAY_MODE_NONE) { + qemu_mutex_lock(&pool->lock); + } + + qemu_bh_schedule(pool->completion_bh); +} + static void *worker_thread(void *opaque) { ThreadPool *pool = opaque; @@ -100,18 +122,12 @@ static void *worker_thread(void *opaque) req = QTAILQ_FIRST(&pool->request_list); QTAILQ_REMOVE(&pool->request_list, req, reqs); req->state = THREAD_ACTIVE; - qemu_mutex_unlock(&pool->lock); - - ret = req->func(req->arg); - - req->ret = ret; - /* Write ret before state. */ - smp_wmb(); - req->state = THREAD_DONE; - - qemu_mutex_lock(&pool->lock); - qemu_bh_schedule(pool->completion_bh); + if (replay_mode != REPLAY_MODE_NONE && req->common.replay) { + replay_add_thread_event(pool, req, req->common.replay_step); + } else { + thread_pool_work(pool, req); + } } pool->cur_threads--; @@ -235,7 +251,8 @@ static const AIOCBInfo thread_pool_aiocb_info = { BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool, ThreadPoolFunc *func, void *arg, - BlockCompletionFunc *cb, void *opaque) + BlockCompletionFunc *cb, void *opaque, + bool replay, uint64_t replay_step) { ThreadPoolElement *req; @@ -244,6 +261,8 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool, req->arg = arg; req->state = THREAD_QUEUED; req->pool = pool; + req->common.replay = replay; + req->common.replay_step = replay_step; QLIST_INSERT_HEAD(&pool->head, req, all); @@ -254,8 +273,8 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool, spawn_thread(pool); } QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); - qemu_mutex_unlock(&pool->lock); qemu_sem_post(&pool->sem); + qemu_mutex_unlock(&pool->lock); return &req->common; } @@ -277,14 +296,14 @@ int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, { ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS }; assert(qemu_in_coroutine()); - thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc); + thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc, false, 0); qemu_coroutine_yield(); return tpc.ret; } void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg) { - thread_pool_submit_aio(pool, func, arg, NULL, NULL); + thread_pool_submit_aio(pool, func, arg, NULL, NULL, false, 0); } static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)