From patchwork Wed Oct 31 15:30:49 2012 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Paolo Bonzini X-Patchwork-Id: 195918 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from lists.gnu.org (lists.gnu.org [208.118.235.17]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (Client did not present a certificate) by ozlabs.org (Postfix) with ESMTPS id 0E92E2C00AC for ; Thu, 1 Nov 2012 03:11:54 +1100 (EST) Received: from localhost ([::1]:34466 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1TTaIe-0001pW-Fs for incoming@patchwork.ozlabs.org; Wed, 31 Oct 2012 11:33:48 -0400 Received: from eggs.gnu.org ([208.118.235.92]:46866) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1TTaI7-00018W-Ha for qemu-devel@nongnu.org; Wed, 31 Oct 2012 11:33:25 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1TTaI5-0005lL-ES for qemu-devel@nongnu.org; Wed, 31 Oct 2012 11:33:15 -0400 Received: from mail-pb0-f45.google.com ([209.85.160.45]:41123) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1TTaI5-0004ui-6Z for qemu-devel@nongnu.org; Wed, 31 Oct 2012 11:33:13 -0400 Received: by mail-pb0-f45.google.com with SMTP id rp2so1021969pbb.4 for ; Wed, 31 Oct 2012 08:33:12 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=sender:from:to:cc:subject:date:message-id:x-mailer:in-reply-to :references; bh=VR2griCirhRClZ5iq3F9VkSPE2RiMQTPD0A6ex8H19c=; b=gWAM4u7R2zhtzK+w5OPlJfvQyrjV9m8JXkf1+ElYQAk8kLCDK/KeYBqlRkaBwjLEH9 M9kXrTL3UnJYSLTvYUQsoQO0p0C26WtU45mfkbJ7/uLToVPnVldah3D/bDN+PJVZ5XWH Ml3s8PJuCR5idNQPbv5Bf4WrpvgkSo6cQ1JYhSa9kkF6wLX+3g5maC79mnudBCl1QImP gG3/R7Jk6AE9k62TkgrEyhCB5AF24ibZnm67UeRSlwGMgDWnzuPK3FY9uxjWKkYVHzmO EHnrgM3S72MWTCZ5Rszx5lfYdr3GtP2YThd0QfZzh/3W7jaD6uVF1sjE7Kn5ekTI9F5M OcTA== Received: by 10.68.200.227 with SMTP id jv3mr113273378pbc.162.1351697592808; Wed, 31 Oct 2012 08:33:12 -0700 (PDT) Received: from yakj.usersys.redhat.com (93-34-169-1.ip50.fastwebnet.it. [93.34.169.1]) by mx.google.com with ESMTPS id sz6sm2445230pbc.52.2012.10.31.08.33.09 (version=TLSv1/SSLv3 cipher=OTHER); Wed, 31 Oct 2012 08:33:11 -0700 (PDT) From: Paolo Bonzini To: qemu-devel@nongnu.org Date: Wed, 31 Oct 2012 16:30:49 +0100 Message-Id: <1351697456-16107-33-git-send-email-pbonzini@redhat.com> X-Mailer: git-send-email 1.7.12.1 In-Reply-To: <1351697456-16107-1-git-send-email-pbonzini@redhat.com> References: <1351697456-16107-1-git-send-email-pbonzini@redhat.com> X-detected-operating-system: by eggs.gnu.org: Genre and OS details not recognized. X-Received-From: 209.85.160.45 Cc: aliguori@us.ibm.com, stefanha@redhat.com Subject: [Qemu-devel] [PATCH v2 32/39] aio: add generic thread-pool facility X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.14 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Sender: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Add a generic thread-pool. The code is roughly based on posix-aio-compat.c, with some changes, especially the following: - use QemuSemaphore instead of QemuCond; - separate the state of the thread from the return code of the worker function. The return code is totally opaque for the thread pool; - do not busy wait when doing cancellation. A more generic threadpool (but still specific to I/O so that in the future it can use special scheduling classes or PI mutexes) can have many uses: it allows more flexibility in raw-posix.c and can more easily be extended to Win32, and it will also be used to do an msync of the persistent bitmap. Signed-off-by: Paolo Bonzini --- Difference from previous submission: add locking documentation, fix locking in event_notifier_ready. Makefile.objs | 2 +- thread-pool.c | 282 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ thread-pool.h | 34 +++++++ trace-events | 5 ++ 4 file modificati, 322 inserzioni(+). 1 rimozione(-) create mode 100644 thread-pool.c create mode 100644 thread-pool.h diff --git a/Makefile.objs b/Makefile.objs index a8ade04..f8ae031 100644 --- a/Makefile.objs +++ b/Makefile.objs @@ -43,7 +43,7 @@ coroutine-obj-$(CONFIG_WIN32) += coroutine-win32.o block-obj-y = iov.o cache-utils.o qemu-option.o module.o async.o block-obj-y += nbd.o block.o blockjob.o aes.o qemu-config.o -block-obj-y += qemu-progress.o qemu-sockets.o uri.o notify.o +block-obj-y += thread-pool.o qemu-progress.o qemu-sockets.o uri.o notify.o block-obj-y += $(coroutine-obj-y) $(qobject-obj-y) $(version-obj-y) block-obj-$(CONFIG_POSIX) += posix-aio-compat.o block-obj-$(CONFIG_POSIX) += event_notifier-posix.o aio-posix.o diff --git a/thread-pool.c b/thread-pool.c new file mode 100644 index 0000000..80749b7 --- /dev/null +++ b/thread-pool.c @@ -0,0 +1,282 @@ +/* + * QEMU block layer thread pool + * + * Copyright IBM, Corp. 2008 + * Copyright Red Hat, Inc. 2012 + * + * Authors: + * Anthony Liguori + * Paolo Bonzini + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + * + * Contributions after 2012-01-13 are licensed under the terms of the + * GNU GPL, version 2 or (at your option) any later version. + */ +#include "qemu-common.h" +#include "qemu-queue.h" +#include "qemu-thread.h" +#include "osdep.h" +#include "qemu-coroutine.h" +#include "trace.h" +#include "block_int.h" +#include "event_notifier.h" +#include "thread-pool.h" + +static void do_spawn_thread(void); + +typedef struct ThreadPoolElement ThreadPoolElement; + +enum ThreadState { + THREAD_QUEUED, + THREAD_ACTIVE, + THREAD_DONE, + THREAD_CANCELED, +}; + +struct ThreadPoolElement { + BlockDriverAIOCB common; + ThreadPoolFunc *func; + void *arg; + enum ThreadState state; + int ret; + + /* Access to this list is protected by lock. */ + QTAILQ_ENTRY(ThreadPoolElement) reqs; + + /* Access to this list is protected by the global mutex. */ + QLIST_ENTRY(ThreadPoolElement) all; +}; + +static EventNotifier notifier; +static QemuMutex lock; +static QemuCond check_cancel; +static QemuSemaphore sem; +static int max_threads = 64; +static QEMUBH *new_thread_bh; + +/* The following variables are protected by the global mutex. */ +static QLIST_HEAD(, ThreadPoolElement) head; + +/* The following variables are protected by lock. */ +static QTAILQ_HEAD(, ThreadPoolElement) request_list; +static int cur_threads; +static int idle_threads; +static int new_threads; /* backlog of threads we need to create */ +static int pending_threads; /* threads created but not running yet */ +static int pending_cancellations; /* whether we need a cond_broadcast */ + +static void *worker_thread(void *unused) +{ + qemu_mutex_lock(&lock); + pending_threads--; + do_spawn_thread(); + + while (1) { + ThreadPoolElement *req; + int ret; + + do { + idle_threads++; + qemu_mutex_unlock(&lock); + ret = qemu_sem_timedwait(&sem, 10000); + qemu_mutex_lock(&lock); + idle_threads--; + } while (ret == -1 && !QTAILQ_EMPTY(&request_list)); + if (ret == -1) { + break; + } + + req = QTAILQ_FIRST(&request_list); + QTAILQ_REMOVE(&request_list, req, reqs); + req->state = THREAD_ACTIVE; + qemu_mutex_unlock(&lock); + + ret = req->func(req->arg); + + qemu_mutex_lock(&lock); + req->state = THREAD_DONE; + req->ret = ret; + if (pending_cancellations) { + qemu_cond_broadcast(&check_cancel); + } + + event_notifier_set(¬ifier); + } + + cur_threads--; + qemu_mutex_unlock(&lock); + return NULL; +} + +static void do_spawn_thread(void) +{ + QemuThread t; + + /* Runs with lock taken. */ + if (!new_threads) { + return; + } + + new_threads--; + pending_threads++; + + qemu_thread_create(&t, worker_thread, NULL, QEMU_THREAD_DETACHED); +} + +static void spawn_thread_bh_fn(void *opaque) +{ + qemu_mutex_lock(&lock); + do_spawn_thread(); + qemu_mutex_unlock(&lock); +} + +static void spawn_thread(void) +{ + cur_threads++; + new_threads++; + /* If there are threads being created, they will spawn new workers, so + * we don't spend time creating many threads in a loop holding a mutex or + * starving the current vcpu. + * + * If there are no idle threads, ask the main thread to create one, so we + * inherit the correct affinity instead of the vcpu affinity. + */ + if (!pending_threads) { + qemu_bh_schedule(new_thread_bh); + } +} + +static void event_notifier_ready(EventNotifier *notifier) +{ + ThreadPoolElement *elem, *next; + + event_notifier_test_and_clear(notifier); +restart: + QLIST_FOREACH_SAFE(elem, &head, all, next) { + if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) { + continue; + } + if (elem->state == THREAD_DONE) { + trace_thread_pool_complete(elem, elem->common.opaque, elem->ret); + } + if (elem->state == THREAD_DONE && elem->common.cb) { + qemu_mutex_lock(&lock); + int ret = elem->ret; + qemu_mutex_unlock(&lock); + QLIST_REMOVE(elem, all); + elem->common.cb(elem->common.opaque, ret); + qemu_aio_release(elem); + goto restart; + } else { + /* remove the request */ + QLIST_REMOVE(elem, all); + qemu_aio_release(elem); + } + } +} + +static int thread_pool_active(EventNotifier *notifier) +{ + return !QLIST_EMPTY(&head); +} + +static void thread_pool_cancel(BlockDriverAIOCB *acb) +{ + ThreadPoolElement *elem = (ThreadPoolElement *)acb; + + trace_thread_pool_cancel(elem, elem->common.opaque); + + qemu_mutex_lock(&lock); + if (elem->state == THREAD_QUEUED && + /* No thread has yet started working on elem. we can try to "steal" + * the item from the worker if we can get a signal from the + * semaphore. Because this is non-blocking, we can do it with + * the lock taken and ensure that elem will remain THREAD_QUEUED. + */ + qemu_sem_timedwait(&sem, 0) == 0) { + QTAILQ_REMOVE(&request_list, elem, reqs); + elem->state = THREAD_CANCELED; + event_notifier_set(¬ifier); + } else { + pending_cancellations++; + while (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) { + qemu_cond_wait(&check_cancel, &lock); + } + pending_cancellations--; + } + qemu_mutex_unlock(&lock); +} + +static AIOPool thread_pool_cb_pool = { + .aiocb_size = sizeof(ThreadPoolElement), + .cancel = thread_pool_cancel, +}; + +BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, + BlockDriverCompletionFunc *cb, void *opaque) +{ + ThreadPoolElement *req; + + req = qemu_aio_get(&thread_pool_cb_pool, NULL, cb, opaque); + req->func = func; + req->arg = arg; + req->state = THREAD_QUEUED; + + QLIST_INSERT_HEAD(&head, req, all); + + trace_thread_pool_submit(req, arg); + + qemu_mutex_lock(&lock); + if (idle_threads == 0 && cur_threads < max_threads) { + spawn_thread(); + } + QTAILQ_INSERT_TAIL(&request_list, req, reqs); + qemu_mutex_unlock(&lock); + qemu_sem_post(&sem); + return &req->common; +} + +typedef struct ThreadPoolCo { + Coroutine *co; + int ret; +} ThreadPoolCo; + +static void thread_pool_co_cb(void *opaque, int ret) +{ + ThreadPoolCo *co = opaque; + + co->ret = ret; + qemu_coroutine_enter(co->co, NULL); +} + +int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg) +{ + ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS }; + assert(qemu_in_coroutine()); + thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc); + qemu_coroutine_yield(); + return tpc.ret; +} + +void thread_pool_submit(ThreadPoolFunc *func, void *arg) +{ + thread_pool_submit_aio(func, arg, NULL, NULL); +} + +static void thread_pool_init(void) +{ + QLIST_INIT(&head); + event_notifier_init(¬ifier, false); + qemu_mutex_init(&lock); + qemu_cond_init(&check_cancel); + qemu_sem_init(&sem, 0); + qemu_aio_set_event_notifier(¬ifier, event_notifier_ready, + thread_pool_active); + + QTAILQ_INIT(&request_list); + new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL); +} + +block_init(thread_pool_init) diff --git a/thread-pool.h b/thread-pool.h new file mode 100644 index 0000000..378a4ac --- /dev/null +++ b/thread-pool.h @@ -0,0 +1,34 @@ +/* + * QEMU block layer thread pool + * + * Copyright IBM, Corp. 2008 + * Copyright Red Hat, Inc. 2012 + * + * Authors: + * Anthony Liguori + * Paolo Bonzini + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + * + * Contributions after 2012-01-13 are licensed under the terms of the + * GNU GPL, version 2 or (at your option) any later version. + */ + +#ifndef QEMU_THREAD_POOL_H +#define QEMU_THREAD_POOL_H 1 + +#include "qemu-common.h" +#include "qemu-queue.h" +#include "qemu-thread.h" +#include "qemu-coroutine.h" +#include "block_int.h" + +typedef int ThreadPoolFunc(void *opaque); + +BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, + BlockDriverCompletionFunc *cb, void *opaque); +int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg); +void thread_pool_submit(ThreadPoolFunc *func, void *arg); + +#endif diff --git a/trace-events b/trace-events index e2d4580..58c18eb 100644 --- a/trace-events +++ b/trace-events @@ -90,6 +90,11 @@ virtio_blk_rw_complete(void *req, int ret) "req %p ret %d" virtio_blk_handle_write(void *req, uint64_t sector, size_t nsectors) "req %p sector %"PRIu64" nsectors %zu" virtio_blk_handle_read(void *req, uint64_t sector, size_t nsectors) "req %p sector %"PRIu64" nsectors %zu" +# thread-pool.c +thread_pool_submit(void *req, void *opaque) "req %p opaque %p" +thread_pool_complete(void *req, void *opaque, int ret) "req %p opaque %p ret %d" +thread_pool_cancel(void *req, void *opaque) "req %p opaque %p" + # posix-aio-compat.c paio_submit(void *acb, void *opaque, int64_t sector_num, int nb_sectors, int type) "acb %p opaque %p sector_num %"PRId64" nb_sectors %d type %d" paio_complete(void *acb, void *opaque, int ret) "acb %p opaque %p ret %d"