From patchwork Wed Jun 16 11:56:56 2010 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Gautham R Shenoy X-Patchwork-Id: 55871 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from lists.gnu.org (lists.gnu.org [199.232.76.165]) (using TLSv1 with cipher DHE-RSA-AES256-SHA (256/256 bits)) (Client did not present a certificate) by ozlabs.org (Postfix) with ESMTPS id 9234CB7D16 for ; Wed, 16 Jun 2010 22:03:22 +1000 (EST) Received: from localhost ([127.0.0.1]:56818 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.43) id 1OOrKx-0007QB-DY for incoming@patchwork.ozlabs.org; Wed, 16 Jun 2010 08:03:19 -0400 Received: from [140.186.70.92] (port=42373 helo=eggs.gnu.org) by lists.gnu.org with esmtp (Exim 4.43) id 1OOrEs-0003EL-Mf for qemu-devel@nongnu.org; Wed, 16 Jun 2010 07:57:04 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.69) (envelope-from ) id 1OOrEq-0007Zt-W0 for qemu-devel@nongnu.org; Wed, 16 Jun 2010 07:57:02 -0400 Received: from e23smtp07.au.ibm.com ([202.81.31.140]:47435) by eggs.gnu.org with esmtp (Exim 4.69) (envelope-from ) id 1OOrEp-0007ZZ-UM for qemu-devel@nongnu.org; Wed, 16 Jun 2010 07:57:00 -0400 Received: from d23relay05.au.ibm.com (d23relay05.au.ibm.com [202.81.31.247]) by e23smtp07.au.ibm.com (8.14.4/8.13.1) with ESMTP id o5GBv2Nd024926 for ; Wed, 16 Jun 2010 21:57:02 +1000 Received: from d23av02.au.ibm.com (d23av02.au.ibm.com [9.190.235.138]) by d23relay05.au.ibm.com (8.13.8/8.13.8/NCO v10.0) with ESMTP id o5GBuwHG1200370 for ; Wed, 16 Jun 2010 21:56:58 +1000 Received: from d23av02.au.ibm.com (loopback [127.0.0.1]) by d23av02.au.ibm.com (8.14.4/8.13.1/NCO v10.0 AVout) with ESMTP id o5GBuvNl024958 for ; Wed, 16 Jun 2010 21:56:58 +1000 Received: from sofia.in.ibm.com (sofia.in.ibm.com [9.124.35.76]) by d23av02.au.ibm.com (8.14.4/8.13.1/NCO v10.0 AVin) with ESMTP id o5GBuuiY024942; Wed, 16 Jun 2010 21:56:57 +1000 Received: from localhost.localdomain (localhost [IPv6:::1]) by sofia.in.ibm.com (Postfix) with ESMTP id AAA04E4AF8; Wed, 16 Jun 2010 17:26:56 +0530 (IST) To: Qemu-development List From: Gautham R Shenoy Date: Wed, 16 Jun 2010 17:26:56 +0530 Message-ID: <20100616115656.10988.96529.stgit@localhost.localdomain> In-Reply-To: <20100616115404.10988.62371.stgit@localhost.localdomain> References: <20100616115404.10988.62371.stgit@localhost.localdomain> User-Agent: StGit/0.15-51-gc750 MIME-Version: 1.0 X-detected-operating-system: by eggs.gnu.org: GNU/Linux 2.6, seldom 2.4 (older, 4) Cc: Anthony Liguori , Paolo Bonzini , "Aneesh Kumar K.V" , Corentin Chary , Avi Kivity Subject: [Qemu-devel] [PATCH V4 2/3] qemu: Generic task offloading framework: threadlets X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.5 Precedence: list List-Id: qemu-devel.nongnu.org List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Sender: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org From: Aneesh Kumar K.V This patch creates a generic asynchronous-task-offloading infrastructure named threadlets. The core idea has been borrowed from the threading framework that is being used by paio. The reason for creating this generic infrastructure is so that other subsystems, such as virtio-9p could make use of it for offloading tasks that could block. The patch creates a global queue on-to which subsystems can queue their tasks to be executed asynchronously. The patch also provides API's that allow a subsystem to create a private queue. API's that allow a subsystem to wait till all the earlier queued tasks have been executed, is also provided. [ego@in.ibm.com: Facelift of the code, cancel_threadlet, flush_threadlet_queue and other minor helpers.] Signed-off-by: Aneesh Kumar K.V Signed-off-by: Gautham R Shenoy --- Makefile.objs | 3 + async-work.c | 186 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ async-work.h | 69 +++++++++++++++++++++ 3 files changed, 257 insertions(+), 1 deletions(-) create mode 100644 async-work.c create mode 100644 async-work.h diff --git a/Makefile.objs b/Makefile.objs index 1a942e5..019646f 100644 --- a/Makefile.objs +++ b/Makefile.objs @@ -9,6 +9,8 @@ qobject-obj-y += qerror.o block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o +block-obj-y += qemu-thread.o +block-obj-y += async-work.o block-obj-$(CONFIG_POSIX) += posix-aio-compat.o block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o @@ -109,7 +111,6 @@ common-obj-y += iov.o common-obj-$(CONFIG_VNC_TLS) += vnc-tls.o vnc-auth-vencrypt.o common-obj-$(CONFIG_VNC_SASL) += vnc-auth-sasl.o common-obj-$(CONFIG_COCOA) += cocoa.o -common-obj-$(CONFIG_IOTHREAD) += qemu-thread.o common-obj-y += notify.o event_notifier.o common-obj-y += qemu-timer.o diff --git a/async-work.c b/async-work.c new file mode 100644 index 0000000..50e39ce --- /dev/null +++ b/async-work.c @@ -0,0 +1,186 @@ +/* + * Threadlet support for offloading tasks to be executed asynchronously + * Generalization based on posix-aio emulation code. + * + * Copyright IBM, Corp. 2008 + * Copyright IBM, Corp. 2010 + * + * Authors: + * Anthony Liguori + * Aneesh Kumar K.V + * Gautham R Shenoy + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + */ + +#include +#include +#include +#include +#include +#include "async-work.h" +#include "osdep.h" + +#define MAX_GLOBAL_THREADS 64 +#define MIN_GLOBAL_THREADS 64 +ThreadletQueue globalqueue; +static int globalqueue_init; + +static void *threadlet_worker(void *data) +{ + ThreadletQueue *queue = data; + + while (1) { + ThreadletWork *work; + int ret = 0; + qemu_mutex_lock(&(queue->lock)); + + while (QTAILQ_EMPTY(&(queue->request_list)) && + (ret != ETIMEDOUT)) { + ret = qemu_cond_timedwait(&(queue->cond), + &(queue->lock), 10*100000); + } + + if (QTAILQ_EMPTY(&(queue->request_list))) + goto check_exit; + + work = QTAILQ_FIRST(&(queue->request_list)); + QTAILQ_REMOVE(&(queue->request_list), work, node); + queue->idle_threads--; + qemu_mutex_unlock(&(queue->lock)); + + /* execute the work function */ + work->func(work); + + qemu_mutex_lock(&(queue->lock)); + queue->idle_threads++; + +check_exit: + if (queue->exit || ((queue->idle_threads > 0) && + (queue->cur_threads > queue->min_threads))) { + /* We exit the queue or we retain minimum number of threads */ + break; + } + qemu_mutex_unlock(&(queue->lock)); + } + + queue->idle_threads--; + queue->cur_threads--; + if (queue->exit) { + qemu_mutex_unlock(&(queue->lock)); + qemu_barrier_wait(&queue->barr); + } else + qemu_mutex_unlock(&queue->lock); + + return NULL; +} + +static void spawn_threadlet(ThreadletQueue *queue) +{ + QemuThread thread; + + queue->cur_threads++; + queue->idle_threads++; + + qemu_thread_create(&thread, threadlet_worker, queue); +} + +/** + * threadlet_submit: Submit a new task to be executed asynchronously. + * @queue: Queue to which the new task needs to be submitted. + * @work: Contains information about the task that needs to be submitted. + */ +void threadlet_submit(ThreadletQueue *queue, ThreadletWork *work) +{ + qemu_mutex_lock(&(queue->lock)); + if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) { + spawn_threadlet(queue); + } + QTAILQ_INSERT_TAIL(&(queue->request_list), work, node); + qemu_mutex_unlock(&(queue->lock)); + qemu_cond_signal(&(queue->cond)); +} + +/** + * threadlet_submit_common: Submit to the global queue a new task to be + * executed asynchronously. + * @work: Contains information about the task that needs to be submitted. + */ +void threadlet_submit_common(ThreadletWork *work) +{ + if (!globalqueue_init) { + threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS, + MIN_GLOBAL_THREADS); + globalqueue_init = 1; + } + + threadlet_submit(&globalqueue, work); +} + +/** + * flush_threadlet_queue: Wait till completion of all the submitted tasks + * @queue: Queue containing the tasks we're waiting on. + */ +void flush_threadlet_queue(ThreadletQueue *queue) +{ + qemu_mutex_lock(&queue->lock); + queue->exit = 1; + + qemu_barrier_init(&queue->barr, queue->cur_threads + 1); + qemu_mutex_unlock(&queue->lock); + + qemu_barrier_wait(&queue->barr); +} + +/** + * flush_common_threadlet_queue: Wait till completion of all the + * submitted tasks + * @queue: Queue containing the tasks we're waiting on. + */ +void flush_common_threadlet_queue(void) +{ + flush_threadlet_queue(&globalqueue); +} + +/** + * cancel_threadlet: Cancel a queued task. + * @queue: The queue containing the task to be cancelled. + * @work: Contains the information of the task that needs to be cancelled. + * + * Returns: 0 if the task is successfully cancelled. + * 1 otherwise. + */ +int cancel_threadlet(ThreadletQueue *queue, ThreadletWork *work) +{ + ThreadletWork *ret_work; + int found = 0; + + qemu_mutex_lock(&(queue->lock)); + QTAILQ_FOREACH(ret_work, &(queue->request_list), node) { + if (ret_work == work) { + QTAILQ_REMOVE(&(queue->request_list), ret_work, node); + found = 1; + break; + } + } + qemu_mutex_unlock(&(queue->lock)); + + if (found) { + return 0; + } + + return 1; +} + +/** + * cancel_threadlet_common: Cancel a task queued on the global queue. + * @work: Contains the information of the task that needs to be cancelled. + * + * Returns: 0 if the task is successfully cancelled. + * 1 otherwise. + */ +int cancel_threadlet_common(ThreadletWork *work) +{ + return cancel_threadlet(&globalqueue, work); +} diff --git a/async-work.h b/async-work.h new file mode 100644 index 0000000..36d19fa --- /dev/null +++ b/async-work.h @@ -0,0 +1,69 @@ +/* + * Threadlet support for offloading tasks to be executed asynchronously + * Generalization based on posix-aio emulation code. + * + * Copyright IBM, Corp. 2008 + * Copyright IBM, Corp. 2010 + * + * Authors: + * Anthony Liguori + * Aneesh Kumar K.V + * Gautham R Shenoy + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + */ + +#ifndef QEMU_ASYNC_WORK_H +#define QEMU_ASYNC_WORK_H + +#include "qemu-queue.h" +#include "qemu-common.h" +#include "qemu-thread.h" + +typedef struct ThreadletQueue +{ + QemuMutex lock; + QemuCond cond; + QemuBarrier barr; + int max_threads; + int min_threads; + int cur_threads; + int idle_threads; + int exit; + QTAILQ_HEAD(, threadlet_work) request_list; + QTAILQ_HEAD(, threadlet_work) threadlet_work_pool; +} ThreadletQueue; + +typedef struct threadlet_work +{ + QTAILQ_ENTRY(threadlet_work) node; + void (*func)(struct threadlet_work *work); +} ThreadletWork; + +static inline void threadlet_queue_init(ThreadletQueue *queue, + int max_threads, int min_threads) +{ + queue->cur_threads = 0; + queue->idle_threads = 0; + queue->exit = 0; + queue->max_threads = max_threads; + queue->min_threads = min_threads; + QTAILQ_INIT(&(queue->request_list)); + QTAILQ_INIT(&(queue->threadlet_work_pool)); + qemu_mutex_init(&(queue->lock)); + qemu_cond_init(&(queue->cond)); +} + +extern void threadlet_submit(ThreadletQueue *queue, + ThreadletWork *work); + +extern void threadlet_submit_common(ThreadletWork *work); + +extern int cancel_threadlet(ThreadletQueue *queue, ThreadletWork *work); +extern int cancel_threadlet_common(ThreadletWork *work); + + +extern void flush_threadlet_queue(ThreadletQueue *queue); +extern void flush_common_threadlet_queue(void); +#endif