From patchwork Thu Nov 18 18:07:15 2010 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Arun Bharadwaj X-Patchwork-Id: 72131 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from lists.gnu.org (lists.gnu.org [199.232.76.165]) (using TLSv1 with cipher DHE-RSA-AES256-SHA (256/256 bits)) (Client did not present a certificate) by ozlabs.org (Postfix) with ESMTPS id 8C66DB71A8 for ; Fri, 19 Nov 2010 05:23:26 +1100 (EST) Received: from localhost ([127.0.0.1]:41446 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.43) id 1PJ98S-0006qu-R5 for incoming@patchwork.ozlabs.org; Thu, 18 Nov 2010 13:23:04 -0500 Received: from [140.186.70.92] (port=37151 helo=eggs.gnu.org) by lists.gnu.org with esmtp (Exim 4.43) id 1PJ8tJ-00055v-5z for qemu-devel@nongnu.org; Thu, 18 Nov 2010 13:07:29 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1PJ8tG-0003nz-Ia for qemu-devel@nongnu.org; Thu, 18 Nov 2010 13:07:24 -0500 Received: from e28smtp03.in.ibm.com ([122.248.162.3]:57916) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1PJ8tF-0003nm-6K for qemu-devel@nongnu.org; Thu, 18 Nov 2010 13:07:22 -0500 Received: from d28relay01.in.ibm.com (d28relay01.in.ibm.com [9.184.220.58]) by e28smtp03.in.ibm.com (8.14.4/8.13.1) with ESMTP id oAII7IKG001505 for ; Thu, 18 Nov 2010 23:37:18 +0530 Received: from d28av02.in.ibm.com (d28av02.in.ibm.com [9.184.220.64]) by d28relay01.in.ibm.com (8.13.8/8.13.8/NCO v10.0) with ESMTP id oAII7I5e3903674 for ; Thu, 18 Nov 2010 23:37:18 +0530 Received: from d28av02.in.ibm.com (loopback [127.0.0.1]) by d28av02.in.ibm.com (8.14.4/8.13.1/NCO v10.0 AVout) with ESMTP id oAII7H3H009986 for ; Fri, 19 Nov 2010 05:07:17 +1100 Received: from localhost6.localdomain6 ([9.77.206.31]) by d28av02.in.ibm.com (8.14.4/8.13.1/NCO v10.0 AVin) with ESMTP id oAII7Gli009966; Fri, 19 Nov 2010 05:07:17 +1100 To: qemu-devel@nongnu.org From: Arun R Bharadwaj Date: Thu, 18 Nov 2010 23:37:15 +0530 Message-ID: <20101118180715.4434.10790.stgit@localhost6.localdomain6> In-Reply-To: <20101118180547.4434.95904.stgit@localhost6.localdomain6> References: <20101118180547.4434.95904.stgit@localhost6.localdomain6> User-Agent: StGit/0.15 MIME-Version: 1.0 X-detected-operating-system: by eggs.gnu.org: GNU/Linux 2.6, seldom 2.4 (older, 4) Cc: kwolf@redhat.com Subject: [Qemu-devel] [PATCH 5/6] Move threadlets infrastructure to qemu-threadlets.c X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.5 Precedence: list List-Id: qemu-devel.nongnu.org List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Sender: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org The reason for creating this generic infrastructure is so that other subsystems, such as virtio-9p could make use of it for offloading tasks that could block. Signed-off-by: Arun R Bharadwaj Signed-off-by: Aneesh Kumar K.V Signed-off-by: Gautham R Shenoy Signed-off-by: Sripathi Kodi Acked-by: Stefan Hajnoczi --- Makefile.objs | 1 docs/async-support.txt | 141 ++++++++++++++++++++++++++++++++++++++ posix-aio-compat.c | 180 ------------------------------------------------ qemu-threadlets.c | 168 +++++++++++++++++++++++++++++++++++++++++++++ qemu-threadlets.h | 46 ++++++++++++ 5 files changed, 356 insertions(+), 180 deletions(-) create mode 100644 docs/async-support.txt create mode 100644 qemu-threadlets.c create mode 100644 qemu-threadlets.h diff --git a/Makefile.objs b/Makefile.objs index 3b7ec27..2cf8aba 100644 --- a/Makefile.objs +++ b/Makefile.objs @@ -10,6 +10,7 @@ qobject-obj-y += qerror.o block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o block-obj-$(CONFIG_POSIX) += qemu-thread.o +block-obj-$(CONFIG_POSIX) += qemu-threadlets.o block-obj-$(CONFIG_POSIX) += posix-aio-compat.o block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o diff --git a/docs/async-support.txt b/docs/async-support.txt new file mode 100644 index 0000000..9f22b9a --- /dev/null +++ b/docs/async-support.txt @@ -0,0 +1,141 @@ +== How to use the threadlets infrastructure supported in Qemu == + +== Threadlets == + +Q.1: What are threadlets ? +A.1: Threadlets is an infrastructure within QEMU that allows other subsystems + to offload possibly blocking work to a queue to be processed by a pool + of threads asynchronously. + +Q.2: When would one want to use threadlets ? +A.2: Threadlets are useful when there are operations that can be performed + outside the context of the VCPU/IO threads inorder to free these latter + to service any other guest requests. + +Q.3: I have some work that can be executed in an asynchronous context. How + should I go about it ? +A.3: One could follow the steps listed below: + + - Define a function which would do the asynchronous work. + static void my_threadlet_func(ThreadletWork *work) + { + } + + - Declare an object of type ThreadletWork; + ThreadletWork work; + + + - Assign a value to the "func" member of ThreadletWork object. + work.func = my_threadlet_func; + + - Submit the threadlet to the global queue. + submit_threadletwork(&work); + + - Continue servicing some other guest operations. + +Q.4: I want to my_threadlet_func to access some non-global data. How do I do + that ? +A.4: Suppose you want my_threadlet_func to access some non-global data-object + of type myPrivateData. In that case one could follow the following steps. + + - Define a member of the type ThreadletWork within myPrivateData. + typedef struct MyPrivateData { + ...; + ...; + ...; + ThreadletWork work; + } MyPrivateData; + + MyPrivateData my_data; + + - Initialize myData.work as described in A.3 + myData.work.func = my_threadlet_func; + submit_threadletwork(&myData.work); + + - Access the myData object inside my_threadlet_func() using container_of + primitive + static void my_threadlet_func(ThreadletWork *work) + { + myPrivateData *mydata_ptr; + mydata_ptr = container_of(work, myPrivateData, work); + + /* mydata_ptr now points to myData object */ + } + +Q.5: Are there any precautions one must take while sharing data with the + Asynchronous thread-pool ? +A.5: Yes, make sure that the helper function of the type my_threadlet_func() + does not access/modify data when it can be accessed or modified in the + context of VCPU thread or IO thread. This is because the asynchronous + threads in the pool can run in parallel with the VCPU/IOThreads as shown + in the figure. + + A typical workflow is as follows: + + VCPU/IOThread + | + | (1) + | + V + Offload work (2) + |-------> to threadlets -----------------------------> Helper thread + | | | + | | | + | | (3) | (4) + | | | + | Handle other Guest requests | + | | | + | | V + | | (3) Signal the I/O Thread + |(6) | | + | | / + | | / + | V / + | Do the post <---------------------------------/ + | processing (5) + | | + | | (6) + | V + |-Yes------ More async work? + | + | (7) + No + | + | + . + . + + Hence one needs to make sure that in the steps (3) and (4) which run in + parallel, any global data is accessed within only one context. + +Q.6: I have queued a threadlet which I want to cancel. How do I do that ? +A.6: Threadlets framework provides the API cancel_threadlet: + - int cancel_threadletwork(ThreadletWork *work) + + The API scans the ThreadletQueue to see if (work) is present. If it finds + work, it'll dequeue work and return 0. + + On the other hand, if it does not find the (work) in the ThreadletQueue, + then it'll return 1. This can imply two things. Either the work is being + processed by one of the helper threads or it has been processed. The + threadlet infrastructure currently _does_not_ distinguish between these + two and the onus is on the caller to do that. + +Q.7: Apart from the global pool of threads, can I have my own private Queue ? +A.7: Yes, the threadlets framework allows subsystems to create their own private + queues with associated pools of threads. + + - Define a PrivateQueue + ThreadletQueue myQueue; + + - Initialize it: + threadlet_queue_init(&myQueue, my_max_threads, my_min_threads); + where my_max_threads is the maximum number of threads that can be in the + thread pool and my_min_threads is the minimum number of active threads + that can be in the thread-pool. + + - Submit work: + submit_threadletwork_to_queue(&myQueue, &my_work); + + - Cancel work: + cancel_threadletwork_on_queue(&myQueue, &my_work); diff --git a/posix-aio-compat.c b/posix-aio-compat.c index eb82fa4..20f53c7 100644 --- a/posix-aio-compat.c +++ b/posix-aio-compat.c @@ -27,36 +27,9 @@ #include "qemu-common.h" #include "trace.h" #include "block_int.h" -#include "qemu-thread.h" #include "block/raw-posix-aio.h" -#define MAX_GLOBAL_THREADS 64 -#define MIN_GLOBAL_THREADS 8 - -static QemuMutex aiocb_mutex; -static QemuCond aiocb_completion; - -typedef struct ThreadletQueue -{ - QemuMutex lock; - QemuCond cond; - int max_threads; - int min_threads; - int cur_threads; - int idle_threads; - QTAILQ_HEAD(, ThreadletWork) request_list; -} ThreadletQueue; - -typedef struct ThreadletWork -{ - QTAILQ_ENTRY(ThreadletWork) node; - void (*func)(struct ThreadletWork *work); -} ThreadletWork; - -static ThreadletQueue globalqueue; -static int globalqueue_init; - struct qemu_paiocb { BlockDriverAIOCB common; int aio_fildes; @@ -83,159 +56,6 @@ typedef struct PosixAioState { struct qemu_paiocb *first_aio; } PosixAioState; -static void *threadlet_worker(void *data) -{ - ThreadletQueue *queue = data; - - qemu_mutex_lock(&queue->lock); - while (1) { - ThreadletWork *work; - int ret = 0; - - while (QTAILQ_EMPTY(&queue->request_list) && - (ret != ETIMEDOUT)) { - /* wait for cond to be signalled or broadcast for 1000s */ - ret = qemu_cond_timedwait((&queue->cond), - &(queue->lock), 10*100000); - } - - assert(queue->idle_threads != 0); - if (QTAILQ_EMPTY(&queue->request_list)) { - if (queue->cur_threads > queue->min_threads) { - /* We retain the minimum number of threads */ - break; - } - } else { - work = QTAILQ_FIRST(&queue->request_list); - QTAILQ_REMOVE(&queue->request_list, work, node); - - queue->idle_threads--; - qemu_mutex_unlock(&queue->lock); - - /* execute the work function */ - work->func(work); - - qemu_mutex_lock(&queue->lock); - queue->idle_threads++; - } - } - - queue->idle_threads--; - queue->cur_threads--; - qemu_mutex_unlock(&queue->lock); - - return NULL; -} - -static void spawn_threadlet(ThreadletQueue *queue) -{ - QemuThread thread; - - queue->cur_threads++; - queue->idle_threads++; - - qemu_thread_create(&thread, threadlet_worker, queue); -} - -/** - * submit_work_to_queue: Submit a new task to a private queue to be - * executed asynchronously. - * @queue: Per-subsystem private queue to which the new task needs - * to be submitted. - * @work: Contains information about the task that needs to be submitted. - */ - -static void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work) -{ - qemu_mutex_lock(&queue->lock); - if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) { - spawn_threadlet(queue); - } else { - qemu_cond_signal(&queue->cond); - } - QTAILQ_INSERT_TAIL(&queue->request_list, work, node); - qemu_mutex_unlock(&queue->lock); -} - -static void threadlet_queue_init(ThreadletQueue *queue, - int max_threads, int min_threads); - -/** - * submit_work: Submit to the global queue a new task to be executed - * asynchronously. - * @work: Contains information about the task that needs to be submitted. - */ - -static void submit_work(ThreadletWork *work) -{ - if (!globalqueue_init) { - threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS, - MIN_GLOBAL_THREADS); - globalqueue_init = 1; - } - - submit_work_to_queue(&globalqueue, work); -} - -/** - * dequeue_work_on_queue: Cancel a task queued on a Queue. - * @queue: The queue containing the task to be cancelled. - * @work: Contains the information of the task that needs to be cancelled. - * - * Returns: 0 if the task is successfully cancelled. - * 1 otherwise. - */ - -static int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work) -{ - ThreadletWork *ret_work; - int ret = 1; - - qemu_mutex_lock(&queue->lock); - QTAILQ_FOREACH(ret_work, &(queue->request_list), node) { - if (ret_work == work) { - QTAILQ_REMOVE(&queue->request_list, ret_work, node); - ret = 0; - break; - } - } - qemu_mutex_unlock(&queue->lock); - - return ret; -} - -/** - * dequeue_work: Cancel a task queued on the global queue. - * @work: Contains the information of the task that needs to be cancelled. - * - * Returns: 0 if the task is successfully cancelled. - * 1 otherwise. - */ - -static int dequeue_work(ThreadletWork *work) -{ - return dequeue_work_on_queue(&globalqueue, work); -} - -/** - * threadlet_queue_init: Initialize a threadlet queue. - * @queue: The threadlet queue to be initialized. - * @max_threads: Maximum number of threads processing the queue. - * @min_threads: Minimum number of threads processing the queue. - */ - -static void threadlet_queue_init(ThreadletQueue *queue, - int max_threads, int min_threads) -{ - queue->cur_threads = 0; - queue->idle_threads = 0; - queue->max_threads = max_threads; - queue->min_threads = min_threads; - QTAILQ_INIT(&queue->request_list); - qemu_mutex_init(&queue->lock); - qemu_cond_init(&queue->cond); -} - #ifdef CONFIG_PREADV static int preadv_present = 1; #else diff --git a/qemu-threadlets.c b/qemu-threadlets.c new file mode 100644 index 0000000..23b4ecf --- /dev/null +++ b/qemu-threadlets.c @@ -0,0 +1,168 @@ +/* + * Threadlet support for offloading tasks to be executed asynchronously + * + * Copyright IBM, Corp. 2008 + * Copyright IBM, Corp. 2010 + * + * Authors: + * Anthony Liguori + * Aneesh Kumar K.V + * Gautham R Shenoy + * Arun R Bharadwaj + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + */ + +#include "qemu-threadlets.h" +#include "osdep.h" + +#define MAX_GLOBAL_THREADS 64 +#define MIN_GLOBAL_THREADS 8 +static ThreadletQueue globalqueue; +static int globalqueue_init; + +static void *threadlet_worker(void *data) +{ + ThreadletQueue *queue = data; + + qemu_mutex_lock(&queue->lock); + while (1) { + ThreadletWork *work; + int ret = 0; + + while (QTAILQ_EMPTY(&queue->request_list) && + (ret != ETIMEDOUT)) { + /* wait for cond to be signalled or broadcast for 1000s */ + ret = qemu_cond_timedwait((&queue->cond), + &(queue->lock), 10*100000); + } + + assert(queue->idle_threads != 0); + if (QTAILQ_EMPTY(&queue->request_list)) { + if (queue->cur_threads > queue->min_threads) { + /* We retain the minimum number of threads */ + break; + } + } else { + work = QTAILQ_FIRST(&queue->request_list); + QTAILQ_REMOVE(&queue->request_list, work, node); + + queue->idle_threads--; + qemu_mutex_unlock(&queue->lock); + + /* execute the work function */ + work->func(work); + + qemu_mutex_lock(&queue->lock); + queue->idle_threads++; + } + } + + queue->idle_threads--; + queue->cur_threads--; + qemu_mutex_unlock(&queue->lock); + + return NULL; +} + +static void spawn_threadlet(ThreadletQueue *queue) +{ + QemuThread thread; + + queue->cur_threads++; + queue->idle_threads++; + + qemu_thread_create(&thread, threadlet_worker, queue); +} + +/** + * submit_work_to_queue: Submit a new task to a private queue to be + * executed asynchronously. + * @queue: Per-subsystem private queue to which the new task needs + * to be submitted. + * @work: Contains information about the task that needs to be submitted. + */ +void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work) +{ + qemu_mutex_lock(&queue->lock); + if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) { + spawn_threadlet(queue); + } else { + qemu_cond_signal(&queue->cond); + } + QTAILQ_INSERT_TAIL(&queue->request_list, work, node); + qemu_mutex_unlock(&queue->lock); +} + +/** + * submit_work: Submit to the global queue a new task to be executed + * asynchronously. + * @work: Contains information about the task that needs to be submitted. + */ +void submit_work(ThreadletWork *work) +{ + if (!globalqueue_init) { + threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS, + MIN_GLOBAL_THREADS); + globalqueue_init = 1; + } + + submit_work_to_queue(&globalqueue, work); +} + +/** + * dequeue_work_on_queue: Cancel a task queued on a Queue. + * @queue: The queue containing the task to be cancelled. + * @work: Contains the information of the task that needs to be cancelled. + * + * Returns: 0 if the task is successfully cancelled. + * 1 otherwise. + */ +int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work) +{ + ThreadletWork *ret_work; + int ret = 1; + + qemu_mutex_lock(&queue->lock); + QTAILQ_FOREACH(ret_work, &(queue->request_list), node) { + if (ret_work == work) { + QTAILQ_REMOVE(&queue->request_list, ret_work, node); + ret = 0; + break; + } + } + qemu_mutex_unlock(&queue->lock); + + return ret; +} + +/** + * dequeue_work: Cancel a task queued on the global queue. + * @work: Contains the information of the task that needs to be cancelled. + * + * Returns: 0 if the task is successfully cancelled. + * 1 otherwise. + */ +int dequeue_work(ThreadletWork *work) +{ + return dequeue_work_on_queue(&globalqueue, work); +} + +/** + * threadlet_queue_init: Initialize a threadlet queue. + * @queue: The threadlet queue to be initialized. + * @max_threads: Maximum number of threads processing the queue. + * @min_threads: Minimum number of threads processing the queue. + */ +void threadlet_queue_init(ThreadletQueue *queue, + int max_threads, int min_threads) +{ + queue->cur_threads = 0; + queue->idle_threads = 0; + queue->max_threads = max_threads; + queue->min_threads = min_threads; + QTAILQ_INIT(&queue->request_list); + qemu_mutex_init(&queue->lock); + qemu_cond_init(&queue->cond); +} diff --git a/qemu-threadlets.h b/qemu-threadlets.h new file mode 100644 index 0000000..39461a5 --- /dev/null +++ b/qemu-threadlets.h @@ -0,0 +1,46 @@ +/* + * Threadlet support for offloading tasks to be executed asynchronously + * + * Copyright IBM, Corp. 2008 + * Copyright IBM, Corp. 2010 + * + * Authors: + * Anthony Liguori + * Aneesh Kumar K.V + * Gautham R Shenoy + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + */ + +#ifndef QEMU_ASYNC_WORK_H +#define QEMU_ASYNC_WORK_H + +#include "qemu-queue.h" +#include "qemu-common.h" +#include "qemu-thread.h" + +typedef struct ThreadletQueue +{ + QemuMutex lock; + QemuCond cond; + int max_threads; + int min_threads; + int cur_threads; + int idle_threads; + QTAILQ_HEAD(, ThreadletWork) request_list; +} ThreadletQueue; + +typedef struct ThreadletWork +{ + QTAILQ_ENTRY(ThreadletWork) node; + void (*func)(struct ThreadletWork *work); +} ThreadletWork; + +void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work); +void submit_work(ThreadletWork *work); +int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work); +int dequeue_work(ThreadletWork *work); +void threadlet_queue_init(ThreadletQueue *queue, int max_threads, + int min_threads); +#endif