From patchwork Mon Nov 8 14:33:22 2010 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Arun Bharadwaj X-Patchwork-Id: 70422 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from lists.gnu.org (lists.gnu.org [199.232.76.165]) (using TLSv1 with cipher DHE-RSA-AES256-SHA (256/256 bits)) (Client did not present a certificate) by ozlabs.org (Postfix) with ESMTPS id B9C98B7108 for ; Tue, 9 Nov 2010 01:34:47 +1100 (EST) Received: from localhost ([127.0.0.1]:37712 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.43) id 1PFSnc-0001zx-P8 for incoming@patchwork.ozlabs.org; Mon, 08 Nov 2010 09:34:20 -0500 Received: from [140.186.70.92] (port=56058 helo=eggs.gnu.org) by lists.gnu.org with esmtp (Exim 4.43) id 1PFSn0-0001zY-Eu for qemu-devel@nongnu.org; Mon, 08 Nov 2010 09:33:44 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1PFSmu-0000wH-RJ for qemu-devel@nongnu.org; Mon, 08 Nov 2010 09:33:42 -0500 Received: from e23smtp08.au.ibm.com ([202.81.31.141]:46563) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1PFSmt-0000vp-Pp for qemu-devel@nongnu.org; Mon, 08 Nov 2010 09:33:36 -0500 Received: from d23relay04.au.ibm.com (d23relay04.au.ibm.com [202.81.31.246]) by e23smtp08.au.ibm.com (8.14.4/8.13.1) with ESMTP id oA8EXWUA015689 for ; Tue, 9 Nov 2010 01:33:32 +1100 Received: from d23av04.au.ibm.com (d23av04.au.ibm.com [9.190.235.139]) by d23relay04.au.ibm.com (8.13.8/8.13.8/NCO v10.0) with ESMTP id oA8EXWWJ2302028 for ; Tue, 9 Nov 2010 01:33:32 +1100 Received: from d23av04.au.ibm.com (loopback [127.0.0.1]) by d23av04.au.ibm.com (8.14.4/8.13.1/NCO v10.0 AVout) with ESMTP id oA8EXWvv024598 for ; Tue, 9 Nov 2010 01:33:32 +1100 Received: from linux.vnet.ibm.com ([9.77.201.195]) by d23av04.au.ibm.com (8.14.4/8.13.1/NCO v10.0 AVin) with ESMTP id oA8EXOTk024541 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES128-SHA bits=128 verify=NO) for ; Tue, 9 Nov 2010 01:33:29 +1100 Date: Mon, 8 Nov 2010 20:03:22 +0530 From: Arun R Bharadwaj To: qemu-devel@nongnu.org Message-ID: <20101108143322.GA10435@linux.vnet.ibm.com> References: <20101108104542.6769.22583.stgit@localhost6.localdomain6> MIME-Version: 1.0 Content-Disposition: inline In-Reply-To: <20101108104542.6769.22583.stgit@localhost6.localdomain6> User-Agent: Mutt/1.5.20 (2009-06-14) X-detected-operating-system: by eggs.gnu.org: GNU/Linux 2.6, seldom 2.4 (older, 4) Subject: [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.5 Precedence: list Reply-To: arun@linux.vnet.ibm.com List-Id: qemu-devel.nongnu.org List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Sender: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org * Arun R Bharadwaj [2010-11-08 16:16:50]: Make paio subsystem use threadlets infrastructure From: Aneesh Kumar K.V This patch creates a generic asynchronous-task-offloading infrastructure named threadlets. The patch creates a global queue on-to which subsystems can queue their tasks to be executed asynchronously. The patch also provides API's that allow a subsystem to create a private queue with an associated pool of threads. The patch has been tested with fstress. Signed-off-by: Aneesh Kumar K.V Signed-off-by: Arun R Bharadwaj Signed-off-by: Gautham R Shenoy Signed-off-by: Sripathi Kodi --- Makefile.objs | 2 posix-aio-compat.c | 340 ++++++++++++++++++++++++++++++++-------------------- 2 files changed, 208 insertions(+), 134 deletions(-) diff --git a/Makefile.objs b/Makefile.objs index cd5a24b..3b7ec27 100644 --- a/Makefile.objs +++ b/Makefile.objs @@ -9,6 +9,7 @@ qobject-obj-y += qerror.o block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o +block-obj-$(CONFIG_POSIX) += qemu-thread.o block-obj-$(CONFIG_POSIX) += posix-aio-compat.o block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o @@ -124,7 +125,6 @@ endif common-obj-y += $(addprefix ui/, $(ui-obj-y)) common-obj-y += iov.o acl.o -common-obj-$(CONFIG_THREAD) += qemu-thread.o common-obj-y += notify.o event_notifier.o common-obj-y += qemu-timer.o diff --git a/posix-aio-compat.c b/posix-aio-compat.c index 7b862b5..00b2a4e 100644 --- a/posix-aio-compat.c +++ b/posix-aio-compat.c @@ -29,7 +29,32 @@ #include "block_int.h" #include "block/raw-posix-aio.h" +#include "qemu-thread.h" +#define MAX_GLOBAL_THREADS 64 +#define MIN_GLOBAL_THREADS 8 + +QemuMutex aiocb_mutex; + +typedef struct ThreadletQueue +{ + QemuMutex lock; + QemuCond cond; + int max_threads; + int min_threads; + int cur_threads; + int idle_threads; + QTAILQ_HEAD(, ThreadletWork) request_list; +} ThreadletQueue; + +typedef struct ThreadletWork +{ + QTAILQ_ENTRY(ThreadletWork) node; + void (*func)(struct ThreadletWork *work); +} ThreadletWork; + +static ThreadletQueue globalqueue; +static int globalqueue_init; struct qemu_paiocb { BlockDriverAIOCB common; @@ -44,13 +69,13 @@ struct qemu_paiocb { int ev_signo; off_t aio_offset; - QTAILQ_ENTRY(qemu_paiocb) node; int aio_type; ssize_t ret; int active; struct qemu_paiocb *next; int async_context_id; + ThreadletWork work; }; typedef struct PosixAioState { @@ -58,64 +83,169 @@ typedef struct PosixAioState { struct qemu_paiocb *first_aio; } PosixAioState; +static void *threadlet_worker(void *data) +{ + ThreadletQueue *queue = data; -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; -static pthread_t thread_id; -static pthread_attr_t attr; -static int max_threads = 64; -static int cur_threads = 0; -static int idle_threads = 0; -static QTAILQ_HEAD(, qemu_paiocb) request_list; + qemu_mutex_lock(&queue->lock); + while (1) { + ThreadletWork *work; + int ret = 0; + + while (QTAILQ_EMPTY(&queue->request_list) && + (ret != ETIMEDOUT)) { + /* wait for cond to be signalled or broadcast for 1000s */ + ret = qemu_cond_timedwait((&queue->cond), + &(queue->lock), 10*100000); + } -#ifdef CONFIG_PREADV -static int preadv_present = 1; -#else -static int preadv_present = 0; -#endif + assert(queue->idle_threads != 0); + if (QTAILQ_EMPTY(&queue->request_list)) { + if (queue->cur_threads > queue->min_threads) { + /* We retain the minimum number of threads */ + break; + } + } else { + work = QTAILQ_FIRST(&queue->request_list); + QTAILQ_REMOVE(&queue->request_list, work, node); -static void die2(int err, const char *what) -{ - fprintf(stderr, "%s failed: %s\n", what, strerror(err)); - abort(); + queue->idle_threads--; + qemu_mutex_unlock(&queue->lock); + + /* execute the work function */ + work->func(work); + + qemu_mutex_lock(&queue->lock); + queue->idle_threads++; + } + } + + queue->idle_threads--; + queue->cur_threads--; + qemu_mutex_unlock(&queue->lock); + + return NULL; } -static void die(const char *what) +static void spawn_threadlet(ThreadletQueue *queue) { - die2(errno, what); + QemuThread thread; + + queue->cur_threads++; + queue->idle_threads++; + + qemu_thread_create(&thread, threadlet_worker, queue); } -static void mutex_lock(pthread_mutex_t *mutex) +/** + * submit_work_to_queue: Submit a new task to a private queue to be + * executed asynchronously. + * @queue: Per-subsystem private queue to which the new task needs + * to be submitted. + * @work: Contains information about the task that needs to be submitted. + */ +static void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work) { - int ret = pthread_mutex_lock(mutex); - if (ret) die2(ret, "pthread_mutex_lock"); + qemu_mutex_lock(&queue->lock); + if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) { + spawn_threadlet(queue); + } else { + qemu_cond_signal(&queue->cond); + } + QTAILQ_INSERT_TAIL(&queue->request_list, work, node); + qemu_mutex_unlock(&queue->lock); } -static void mutex_unlock(pthread_mutex_t *mutex) +static void threadlet_queue_init(ThreadletQueue *queue, + int max_threads, int min_threads); + +/** + * submit_work: Submit to the global queue a new task to be executed + * asynchronously. + * @work: Contains information about the task that needs to be submitted. + */ +static void submit_work(ThreadletWork *work) { - int ret = pthread_mutex_unlock(mutex); - if (ret) die2(ret, "pthread_mutex_unlock"); + if (!globalqueue_init) { + threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS, + MIN_GLOBAL_THREADS); + globalqueue_init = 1; + } + + submit_work_to_queue(&globalqueue, work); } -static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, - struct timespec *ts) +/** + * dequeue_work_on_queue: Cancel a task queued on a Queue. + * @queue: The queue containing the task to be cancelled. + * @work: Contains the information of the task that needs to be cancelled. + * + * Returns: 0 if the task is successfully cancelled. + * 1 otherwise. + */ +static int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work) { - int ret = pthread_cond_timedwait(cond, mutex, ts); - if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait"); + ThreadletWork *ret_work; + int ret = 1; + + qemu_mutex_lock(&queue->lock); + QTAILQ_FOREACH(ret_work, &(queue->request_list), node) { + if (ret_work == work) { + QTAILQ_REMOVE(&queue->request_list, ret_work, node); + ret = 0; + break; + } + } + qemu_mutex_unlock(&queue->lock); + return ret; } -static void cond_signal(pthread_cond_t *cond) +/** + * dequeue_work: Cancel a task queued on the global queue. + * @work: Contains the information of the task that needs to be cancelled. + * + * Returns: 0 if the task is successfully cancelled. + * 1 otherwise. + */ +static int dequeue_work(ThreadletWork *work) +{ + return dequeue_work_on_queue(&globalqueue, work); +} + +/** + * threadlet_queue_init: Initialize a threadlet queue. + * @queue: The threadlet queue to be initialized. + * @max_threads: Maximum number of threads processing the queue. + * @min_threads: Minimum number of threads processing the queue. + */ +static void threadlet_queue_init(ThreadletQueue *queue, + int max_threads, int min_threads) +{ + queue->cur_threads = 0; + queue->idle_threads = 0; + queue->max_threads = max_threads; + queue->min_threads = min_threads; + QTAILQ_INIT(&queue->request_list); + qemu_mutex_init(&queue->lock); + qemu_cond_init(&queue->cond); +} + +#ifdef CONFIG_PREADV +static int preadv_present = 1; +#else +static int preadv_present = 0; +#endif + +static void die2(int err, const char *what) { - int ret = pthread_cond_signal(cond); - if (ret) die2(ret, "pthread_cond_signal"); + fprintf(stderr, "%s failed: %s\n", what, strerror(err)); + abort(); } -static void thread_create(pthread_t *thread, pthread_attr_t *attr, - void *(*start_routine)(void*), void *arg) +static void die(const char *what) { - int ret = pthread_create(thread, attr, start_routine, arg); - if (ret) die2(ret, "pthread_create"); + die2(errno, what); } static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb) @@ -301,106 +431,59 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb) return nbytes; } -static void *aio_thread(void *unused) +static void aio_thread(ThreadletWork *work) { pid_t pid; + struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work); + ssize_t ret = 0; pid = getpid(); + aiocb->active = 1; - while (1) { - struct qemu_paiocb *aiocb; - ssize_t ret = 0; - qemu_timeval tv; - struct timespec ts; - - qemu_gettimeofday(&tv); - ts.tv_sec = tv.tv_sec + 10; - ts.tv_nsec = 0; - - mutex_lock(&lock); - - while (QTAILQ_EMPTY(&request_list) && - !(ret == ETIMEDOUT)) { - ret = cond_timedwait(&cond, &lock, &ts); - } - - if (QTAILQ_EMPTY(&request_list)) - break; - - aiocb = QTAILQ_FIRST(&request_list); - QTAILQ_REMOVE(&request_list, aiocb, node); - aiocb->active = 1; - idle_threads--; - mutex_unlock(&lock); - - switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) { - case QEMU_AIO_READ: - case QEMU_AIO_WRITE: - ret = handle_aiocb_rw(aiocb); - break; - case QEMU_AIO_FLUSH: - ret = handle_aiocb_flush(aiocb); - break; - case QEMU_AIO_IOCTL: - ret = handle_aiocb_ioctl(aiocb); - break; - default: - fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type); - ret = -EINVAL; - break; - } - - mutex_lock(&lock); - aiocb->ret = ret; - idle_threads++; - mutex_unlock(&lock); - - if (kill(pid, aiocb->ev_signo)) die("kill failed"); + switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) { + case QEMU_AIO_READ: + case QEMU_AIO_WRITE: + ret = handle_aiocb_rw(aiocb); + break; + case QEMU_AIO_FLUSH: + ret = handle_aiocb_flush(aiocb); + break; + case QEMU_AIO_IOCTL: + ret = handle_aiocb_ioctl(aiocb); + break; + default: + fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type); + ret = -EINVAL; + break; } - idle_threads--; - cur_threads--; - mutex_unlock(&lock); - - return NULL; -} - -static void spawn_thread(void) -{ - sigset_t set, oldset; - - cur_threads++; - idle_threads++; - - /* block all signals */ - if (sigfillset(&set)) die("sigfillset"); - if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask"); + qemu_mutex_lock(&aiocb_mutex); + aiocb->ret = ret; + qemu_mutex_unlock(&aiocb_mutex); - thread_create(&thread_id, &attr, aio_thread, NULL); - - if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore"); + if (kill(pid, aiocb->ev_signo)) { + die("kill failed"); + } } static void qemu_paio_submit(struct qemu_paiocb *aiocb) { + qemu_mutex_lock(&aiocb_mutex); aiocb->ret = -EINPROGRESS; + qemu_mutex_unlock(&aiocb_mutex); aiocb->active = 0; - mutex_lock(&lock); - if (idle_threads == 0 && cur_threads < max_threads) - spawn_thread(); - QTAILQ_INSERT_TAIL(&request_list, aiocb, node); - mutex_unlock(&lock); - cond_signal(&cond); + + aiocb->work.func = aio_thread; + submit_work(&aiocb->work); } static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb) { ssize_t ret; - mutex_lock(&lock); + qemu_mutex_lock(&aiocb_mutex); ret = aiocb->ret; - mutex_unlock(&lock); - + qemu_mutex_unlock(&aiocb_mutex); return ret; } @@ -536,20 +619,20 @@ static void paio_cancel(BlockDriverAIOCB *blockacb) struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb; int active = 0; - mutex_lock(&lock); if (!acb->active) { - QTAILQ_REMOVE(&request_list, acb, node); - acb->ret = -ECANCELED; + if (!dequeue_work(&acb->work)) { + acb->ret = -ECANCELED; + } else { + active = 1; + } } else if (acb->ret == -EINPROGRESS) { active = 1; } - mutex_unlock(&lock); if (active) { /* fail safe: if the aio could not be canceled, we wait for it */ - while (qemu_paio_error(acb) == EINPROGRESS) - ; + active = qemu_paio_error(acb); } paio_remove(acb); @@ -618,11 +701,12 @@ int paio_init(void) struct sigaction act; PosixAioState *s; int fds[2]; - int ret; if (posix_aio_state) return 0; + qemu_mutex_init(&aiocb_mutex); + s = qemu_malloc(sizeof(PosixAioState)); sigfillset(&act.sa_mask); @@ -645,16 +729,6 @@ int paio_init(void) qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, posix_aio_process_queue, s); - ret = pthread_attr_init(&attr); - if (ret) - die2(ret, "pthread_attr_init"); - - ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - if (ret) - die2(ret, "pthread_attr_setdetachstate"); - - QTAILQ_INIT(&request_list); - posix_aio_state = s; return 0; }