From patchwork Thu Mar 7 12:41:46 2013 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Stefan Hajnoczi X-Patchwork-Id: 225839 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from lists.gnu.org (lists.gnu.org [208.118.235.17]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (Client did not present a certificate) by ozlabs.org (Postfix) with ESMTPS id D300E2C0387 for ; Thu, 7 Mar 2013 23:44:43 +1100 (EST) Received: from localhost ([::1]:60142 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1UDaBe-0002fO-3h for incoming@patchwork.ozlabs.org; Thu, 07 Mar 2013 07:44:42 -0500 Received: from eggs.gnu.org ([208.118.235.92]:51560) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1UDa9k-00087l-0c for qemu-devel@nongnu.org; Thu, 07 Mar 2013 07:42:49 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1UDa9c-0007JF-Tc for qemu-devel@nongnu.org; Thu, 07 Mar 2013 07:42:43 -0500 Received: from mx1.redhat.com ([209.132.183.28]:44686) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1UDa9c-0007J7-M6 for qemu-devel@nongnu.org; Thu, 07 Mar 2013 07:42:36 -0500 Received: from int-mx09.intmail.prod.int.phx2.redhat.com (int-mx09.intmail.prod.int.phx2.redhat.com [10.5.11.22]) by mx1.redhat.com (8.14.4/8.14.4) with ESMTP id r27CgaZG014122 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES256-SHA bits=256 verify=OK) for ; Thu, 7 Mar 2013 07:42:36 -0500 Received: from localhost (ovpn-112-34.ams2.redhat.com [10.36.112.34]) by int-mx09.intmail.prod.int.phx2.redhat.com (8.14.4/8.14.4) with ESMTP id r27CgYIS006959; Thu, 7 Mar 2013 07:42:35 -0500 From: Stefan Hajnoczi To: Date: Thu, 7 Mar 2013 13:41:46 +0100 Message-Id: <1362660110-26970-4-git-send-email-stefanha@redhat.com> In-Reply-To: <1362660110-26970-1-git-send-email-stefanha@redhat.com> References: <1362660110-26970-1-git-send-email-stefanha@redhat.com> X-Scanned-By: MIMEDefang 2.68 on 10.5.11.22 X-detected-operating-system: by eggs.gnu.org: GNU/Linux 3.x X-Received-From: 209.132.183.28 Cc: Kevin Wolf , Paolo Bonzini , Stefan Hajnoczi Subject: [Qemu-devel] [PATCH v2 3/7] threadpool: add thread_pool_new() and thread_pool_free() X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.14 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Sender: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org ThreadPool is tied to an AioContext through its event notifier, which dictates in which AioContext the work item's callback function will be invoked. In order to support multiple AioContexts we need to support multiple ThreadPool instances. This patch adds the new/free functions. The free function deserves special attention because it quiesces remaining worker threads. This requires a new condition variable and a "stopping" flag to let workers know they should terminate once idle. We never needed to do this before since the global threadpool was not explicitly destroyed until process termination. Also stash the AioContext pointer in ThreadPool so that we can call aio_set_event_notifier() in thread_pool_free(). We didn't need to hold onto AioContext previously since there was no free function. Signed-off-by: Stefan Hajnoczi --- include/block/thread-pool.h | 5 +++++ thread-pool.c | 52 +++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 53 insertions(+), 4 deletions(-) diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h index 200703e..e1453c6 100644 --- a/include/block/thread-pool.h +++ b/include/block/thread-pool.h @@ -26,6 +26,11 @@ typedef int ThreadPoolFunc(void *opaque); +typedef struct ThreadPool ThreadPool; + +ThreadPool *thread_pool_new(struct AioContext *ctx); +void thread_pool_free(ThreadPool *pool); + BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, BlockDriverCompletionFunc *cb, void *opaque); int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg); diff --git a/thread-pool.c b/thread-pool.c index a0aecd0..d1e4570 100644 --- a/thread-pool.c +++ b/thread-pool.c @@ -24,8 +24,6 @@ #include "qemu/event_notifier.h" #include "block/thread-pool.h" -typedef struct ThreadPool ThreadPool; - static void do_spawn_thread(ThreadPool *pool); typedef struct ThreadPoolElement ThreadPoolElement; @@ -59,8 +57,10 @@ struct ThreadPoolElement { struct ThreadPool { EventNotifier notifier; + AioContext *ctx; QemuMutex lock; QemuCond check_cancel; + QemuCond worker_stopped; QemuSemaphore sem; int max_threads; QEMUBH *new_thread_bh; @@ -75,6 +75,7 @@ struct ThreadPool { int new_threads; /* backlog of threads we need to create */ int pending_threads; /* threads created but not running yet */ int pending_cancellations; /* whether we need a cond_broadcast */ + bool stopping; }; /* Currently there is only one thread pool instance. */ @@ -88,7 +89,7 @@ static void *worker_thread(void *opaque) pool->pending_threads--; do_spawn_thread(pool); - while (1) { + while (!pool->stopping) { ThreadPoolElement *req; int ret; @@ -99,7 +100,7 @@ static void *worker_thread(void *opaque) qemu_mutex_lock(&pool->lock); pool->idle_threads--; } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list)); - if (ret == -1) { + if (ret == -1 || pool->stopping) { break; } @@ -124,6 +125,7 @@ static void *worker_thread(void *opaque) } pool->cur_threads--; + qemu_cond_signal(&pool->worker_stopped); qemu_mutex_unlock(&pool->lock); return NULL; } @@ -298,8 +300,10 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) memset(pool, 0, sizeof(*pool)); event_notifier_init(&pool->notifier, false); + pool->ctx = ctx; qemu_mutex_init(&pool->lock); qemu_cond_init(&pool->check_cancel); + qemu_cond_init(&pool->worker_stopped); qemu_sem_init(&pool->sem, 0); pool->max_threads = 64; pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); @@ -311,6 +315,46 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) thread_pool_active); } +ThreadPool *thread_pool_new(AioContext *ctx) +{ + ThreadPool *pool = g_new(ThreadPool, 1); + thread_pool_init_one(pool, ctx); + return pool; +} + +void thread_pool_free(ThreadPool *pool) +{ + if (!pool) { + return; + } + + assert(QLIST_EMPTY(&pool->head)); + + qemu_mutex_lock(&pool->lock); + + /* Stop new threads from spawning */ + qemu_bh_delete(pool->new_thread_bh); + pool->cur_threads -= pool->new_threads; + pool->new_threads = 0; + + /* Wait for worker threads to terminate */ + pool->stopping = true; + while (pool->cur_threads > 0) { + qemu_sem_post(&pool->sem); + qemu_cond_wait(&pool->worker_stopped, &pool->lock); + } + + qemu_mutex_unlock(&pool->lock); + + aio_set_event_notifier(pool->ctx, &pool->notifier, NULL, NULL); + qemu_sem_destroy(&pool->sem); + qemu_cond_destroy(&pool->check_cancel); + qemu_cond_destroy(&pool->worker_stopped); + qemu_mutex_destroy(&pool->lock); + event_notifier_cleanup(&pool->notifier); + g_free(pool); +} + static void thread_pool_init(void) { thread_pool_init_one(&global_pool, NULL);