From patchwork Mon Nov 15 17:53:55 2010 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Arun Bharadwaj X-Patchwork-Id: 71261 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from lists.gnu.org (lists.gnu.org [199.232.76.165]) (using TLSv1 with cipher DHE-RSA-AES256-SHA (256/256 bits)) (Client did not present a certificate) by ozlabs.org (Postfix) with ESMTPS id E976CB7123 for ; Tue, 16 Nov 2010 05:00:25 +1100 (EST) Received: from localhost ([127.0.0.1]:48157 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.43) id 1PI3Kj-0000jz-VW for incoming@patchwork.ozlabs.org; Mon, 15 Nov 2010 12:59:14 -0500 Received: from [140.186.70.92] (port=37935 helo=eggs.gnu.org) by lists.gnu.org with esmtp (Exim 4.43) id 1PI3Fo-0006iX-4a for qemu-devel@nongnu.org; Mon, 15 Nov 2010 12:54:10 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1PI3Fl-0003LT-MQ for qemu-devel@nongnu.org; Mon, 15 Nov 2010 12:54:07 -0500 Received: from e23smtp08.au.ibm.com ([202.81.31.141]:53165) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1PI3Fk-0003Km-EO for qemu-devel@nongnu.org; Mon, 15 Nov 2010 12:54:05 -0500 Received: from d23relay05.au.ibm.com (d23relay05.au.ibm.com [202.81.31.247]) by e23smtp08.au.ibm.com (8.14.4/8.13.1) with ESMTP id oAFHs277003420 for ; Tue, 16 Nov 2010 04:54:02 +1100 Received: from d23av03.au.ibm.com (d23av03.au.ibm.com [9.190.234.97]) by d23relay05.au.ibm.com (8.13.8/8.13.8/NCO v10.0) with ESMTP id oAFHs1Ek2265326 for ; Tue, 16 Nov 2010 04:54:01 +1100 Received: from d23av03.au.ibm.com (loopback [127.0.0.1]) by d23av03.au.ibm.com (8.14.4/8.13.1/NCO v10.0 AVout) with ESMTP id oAFHs120013118 for ; Tue, 16 Nov 2010 04:54:01 +1100 Received: from localhost6.localdomain6 ([9.77.125.173]) by d23av03.au.ibm.com (8.14.4/8.13.1/NCO v10.0 AVin) with ESMTP id oAFHrv6E013071; Tue, 16 Nov 2010 04:53:59 +1100 To: qemu-devel@nongnu.org From: Arun R Bharadwaj Date: Mon, 15 Nov 2010 23:23:55 +0530 Message-ID: <20101115175355.12469.78095.stgit@localhost6.localdomain6> In-Reply-To: <20101115175229.12469.48656.stgit@localhost6.localdomain6> References: <20101115175229.12469.48656.stgit@localhost6.localdomain6> User-Agent: StGit/0.15 MIME-Version: 1.0 X-detected-operating-system: by eggs.gnu.org: GNU/Linux 2.6, seldom 2.4 (older, 4) Cc: kwolf@redhat.com Subject: [Qemu-devel] [PATCH 2/3] Move threadlets infrastructure to qemu-threadlets.c X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.5 Precedence: list List-Id: qemu-devel.nongnu.org List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Sender: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org The reason for creating this generic infrastructure is so that other subsystems, such as virtio-9p could make use of it for offloading tasks that could block. Signed-off-by: Arun R Bharadwaj Signed-off-by: Aneesh Kumar K.V Signed-off-by: Gautham R Shenoy Signed-off-by: Sripathi Kodi Acked-by: Stefan Hajnoczi --- Makefile.objs | 1 docs/async-support.txt | 141 +++++++++++++++++++++++++++++++++++++++ posix-aio-compat.c | 173 ------------------------------------------------ qemu-threadlets.c | 168 +++++++++++++++++++++++++++++++++++++++++++++++ qemu-threadlets.h | 46 +++++++++++++ 5 files changed, 357 insertions(+), 172 deletions(-) create mode 100644 docs/async-support.txt create mode 100644 qemu-threadlets.c create mode 100644 qemu-threadlets.h diff --git a/Makefile.objs b/Makefile.objs index 3b7ec27..2cf8aba 100644 --- a/Makefile.objs +++ b/Makefile.objs @@ -10,6 +10,7 @@ qobject-obj-y += qerror.o block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o block-obj-$(CONFIG_POSIX) += qemu-thread.o +block-obj-$(CONFIG_POSIX) += qemu-threadlets.o block-obj-$(CONFIG_POSIX) += posix-aio-compat.o block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o diff --git a/docs/async-support.txt b/docs/async-support.txt new file mode 100644 index 0000000..9f22b9a --- /dev/null +++ b/docs/async-support.txt @@ -0,0 +1,141 @@ +== How to use the threadlets infrastructure supported in Qemu == + +== Threadlets == + +Q.1: What are threadlets ? +A.1: Threadlets is an infrastructure within QEMU that allows other subsystems + to offload possibly blocking work to a queue to be processed by a pool + of threads asynchronously. + +Q.2: When would one want to use threadlets ? +A.2: Threadlets are useful when there are operations that can be performed + outside the context of the VCPU/IO threads inorder to free these latter + to service any other guest requests. + +Q.3: I have some work that can be executed in an asynchronous context. How + should I go about it ? +A.3: One could follow the steps listed below: + + - Define a function which would do the asynchronous work. + static void my_threadlet_func(ThreadletWork *work) + { + } + + - Declare an object of type ThreadletWork; + ThreadletWork work; + + + - Assign a value to the "func" member of ThreadletWork object. + work.func = my_threadlet_func; + + - Submit the threadlet to the global queue. + submit_threadletwork(&work); + + - Continue servicing some other guest operations. + +Q.4: I want to my_threadlet_func to access some non-global data. How do I do + that ? +A.4: Suppose you want my_threadlet_func to access some non-global data-object + of type myPrivateData. In that case one could follow the following steps. + + - Define a member of the type ThreadletWork within myPrivateData. + typedef struct MyPrivateData { + ...; + ...; + ...; + ThreadletWork work; + } MyPrivateData; + + MyPrivateData my_data; + + - Initialize myData.work as described in A.3 + myData.work.func = my_threadlet_func; + submit_threadletwork(&myData.work); + + - Access the myData object inside my_threadlet_func() using container_of + primitive + static void my_threadlet_func(ThreadletWork *work) + { + myPrivateData *mydata_ptr; + mydata_ptr = container_of(work, myPrivateData, work); + + /* mydata_ptr now points to myData object */ + } + +Q.5: Are there any precautions one must take while sharing data with the + Asynchronous thread-pool ? +A.5: Yes, make sure that the helper function of the type my_threadlet_func() + does not access/modify data when it can be accessed or modified in the + context of VCPU thread or IO thread. This is because the asynchronous + threads in the pool can run in parallel with the VCPU/IOThreads as shown + in the figure. + + A typical workflow is as follows: + + VCPU/IOThread + | + | (1) + | + V + Offload work (2) + |-------> to threadlets -----------------------------> Helper thread + | | | + | | | + | | (3) | (4) + | | | + | Handle other Guest requests | + | | | + | | V + | | (3) Signal the I/O Thread + |(6) | | + | | / + | | / + | V / + | Do the post <---------------------------------/ + | processing (5) + | | + | | (6) + | V + |-Yes------ More async work? + | + | (7) + No + | + | + . + . + + Hence one needs to make sure that in the steps (3) and (4) which run in + parallel, any global data is accessed within only one context. + +Q.6: I have queued a threadlet which I want to cancel. How do I do that ? +A.6: Threadlets framework provides the API cancel_threadlet: + - int cancel_threadletwork(ThreadletWork *work) + + The API scans the ThreadletQueue to see if (work) is present. If it finds + work, it'll dequeue work and return 0. + + On the other hand, if it does not find the (work) in the ThreadletQueue, + then it'll return 1. This can imply two things. Either the work is being + processed by one of the helper threads or it has been processed. The + threadlet infrastructure currently _does_not_ distinguish between these + two and the onus is on the caller to do that. + +Q.7: Apart from the global pool of threads, can I have my own private Queue ? +A.7: Yes, the threadlets framework allows subsystems to create their own private + queues with associated pools of threads. + + - Define a PrivateQueue + ThreadletQueue myQueue; + + - Initialize it: + threadlet_queue_init(&myQueue, my_max_threads, my_min_threads); + where my_max_threads is the maximum number of threads that can be in the + thread pool and my_min_threads is the minimum number of active threads + that can be in the thread-pool. + + - Submit work: + submit_threadletwork_to_queue(&myQueue, &my_work); + + - Cancel work: + cancel_threadletwork_on_queue(&myQueue, &my_work); diff --git a/posix-aio-compat.c b/posix-aio-compat.c index e1812fc..d8cae6d 100644 --- a/posix-aio-compat.c +++ b/posix-aio-compat.c @@ -29,34 +29,11 @@ #include "block_int.h" #include "block/raw-posix-aio.h" -#include "qemu-thread.h" - -#define MAX_GLOBAL_THREADS 64 -#define MIN_GLOBAL_THREADS 8 +#include "qemu-threadlets.h" static QemuMutex aiocb_mutex; static QemuCond aiocb_completion; -typedef struct ThreadletQueue -{ - QemuMutex lock; - QemuCond cond; - int max_threads; - int min_threads; - int cur_threads; - int idle_threads; - QTAILQ_HEAD(, ThreadletWork) request_list; -} ThreadletQueue; - -typedef struct ThreadletWork -{ - QTAILQ_ENTRY(ThreadletWork) node; - void (*func)(struct ThreadletWork *work); -} ThreadletWork; - -static ThreadletQueue globalqueue; -static int globalqueue_init; - struct qemu_paiocb { BlockDriverAIOCB common; int aio_fildes; @@ -83,154 +60,6 @@ typedef struct PosixAioState { struct qemu_paiocb *first_aio; } PosixAioState; -static void *threadlet_worker(void *data) -{ - ThreadletQueue *queue = data; - - qemu_mutex_lock(&queue->lock); - while (1) { - ThreadletWork *work; - int ret = 0; - - while (QTAILQ_EMPTY(&queue->request_list) && - (ret != ETIMEDOUT)) { - /* wait for cond to be signalled or broadcast for 1000s */ - ret = qemu_cond_timedwait((&queue->cond), - &(queue->lock), 10*100000); - } - - assert(queue->idle_threads != 0); - if (QTAILQ_EMPTY(&queue->request_list)) { - if (queue->cur_threads > queue->min_threads) { - /* We retain the minimum number of threads */ - break; - } - } else { - work = QTAILQ_FIRST(&queue->request_list); - QTAILQ_REMOVE(&queue->request_list, work, node); - - queue->idle_threads--; - qemu_mutex_unlock(&queue->lock); - - /* execute the work function */ - work->func(work); - - qemu_mutex_lock(&queue->lock); - queue->idle_threads++; - } - } - - queue->idle_threads--; - queue->cur_threads--; - qemu_mutex_unlock(&queue->lock); - - return NULL; -} - -static void spawn_threadlet(ThreadletQueue *queue) -{ - QemuThread thread; - - queue->cur_threads++; - queue->idle_threads++; - - qemu_thread_create(&thread, threadlet_worker, queue); -} - -/** - * submit_work_to_queue: Submit a new task to a private queue to be - * executed asynchronously. - * @queue: Per-subsystem private queue to which the new task needs - * to be submitted. - * @work: Contains information about the task that needs to be submitted. - */ -static void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work) -{ - qemu_mutex_lock(&queue->lock); - if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) { - spawn_threadlet(queue); - } else { - qemu_cond_signal(&queue->cond); - } - QTAILQ_INSERT_TAIL(&queue->request_list, work, node); - qemu_mutex_unlock(&queue->lock); -} - -static void threadlet_queue_init(ThreadletQueue *queue, - int max_threads, int min_threads); - -/** - * submit_work: Submit to the global queue a new task to be executed - * asynchronously. - * @work: Contains information about the task that needs to be submitted. - */ -static void submit_work(ThreadletWork *work) -{ - if (!globalqueue_init) { - threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS, - MIN_GLOBAL_THREADS); - globalqueue_init = 1; - } - - submit_work_to_queue(&globalqueue, work); -} - -/** - * dequeue_work_on_queue: Cancel a task queued on a Queue. - * @queue: The queue containing the task to be cancelled. - * @work: Contains the information of the task that needs to be cancelled. - * - * Returns: 0 if the task is successfully cancelled. - * 1 otherwise. - */ -static int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work) -{ - ThreadletWork *ret_work; - int ret = 1; - - qemu_mutex_lock(&queue->lock); - QTAILQ_FOREACH(ret_work, &(queue->request_list), node) { - if (ret_work == work) { - QTAILQ_REMOVE(&queue->request_list, ret_work, node); - ret = 0; - break; - } - } - qemu_mutex_unlock(&queue->lock); - - return ret; -} - -/** - * dequeue_work: Cancel a task queued on the global queue. - * @work: Contains the information of the task that needs to be cancelled. - * - * Returns: 0 if the task is successfully cancelled. - * 1 otherwise. - */ -static int dequeue_work(ThreadletWork *work) -{ - return dequeue_work_on_queue(&globalqueue, work); -} - -/** - * threadlet_queue_init: Initialize a threadlet queue. - * @queue: The threadlet queue to be initialized. - * @max_threads: Maximum number of threads processing the queue. - * @min_threads: Minimum number of threads processing the queue. - */ -static void threadlet_queue_init(ThreadletQueue *queue, - int max_threads, int min_threads) -{ - queue->cur_threads = 0; - queue->idle_threads = 0; - queue->max_threads = max_threads; - queue->min_threads = min_threads; - QTAILQ_INIT(&queue->request_list); - qemu_mutex_init(&queue->lock); - qemu_cond_init(&queue->cond); -} - #ifdef CONFIG_PREADV static int preadv_present = 1; #else diff --git a/qemu-threadlets.c b/qemu-threadlets.c new file mode 100644 index 0000000..23b4ecf --- /dev/null +++ b/qemu-threadlets.c @@ -0,0 +1,168 @@ +/* + * Threadlet support for offloading tasks to be executed asynchronously + * + * Copyright IBM, Corp. 2008 + * Copyright IBM, Corp. 2010 + * + * Authors: + * Anthony Liguori + * Aneesh Kumar K.V + * Gautham R Shenoy + * Arun R Bharadwaj + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + */ + +#include "qemu-threadlets.h" +#include "osdep.h" + +#define MAX_GLOBAL_THREADS 64 +#define MIN_GLOBAL_THREADS 8 +static ThreadletQueue globalqueue; +static int globalqueue_init; + +static void *threadlet_worker(void *data) +{ + ThreadletQueue *queue = data; + + qemu_mutex_lock(&queue->lock); + while (1) { + ThreadletWork *work; + int ret = 0; + + while (QTAILQ_EMPTY(&queue->request_list) && + (ret != ETIMEDOUT)) { + /* wait for cond to be signalled or broadcast for 1000s */ + ret = qemu_cond_timedwait((&queue->cond), + &(queue->lock), 10*100000); + } + + assert(queue->idle_threads != 0); + if (QTAILQ_EMPTY(&queue->request_list)) { + if (queue->cur_threads > queue->min_threads) { + /* We retain the minimum number of threads */ + break; + } + } else { + work = QTAILQ_FIRST(&queue->request_list); + QTAILQ_REMOVE(&queue->request_list, work, node); + + queue->idle_threads--; + qemu_mutex_unlock(&queue->lock); + + /* execute the work function */ + work->func(work); + + qemu_mutex_lock(&queue->lock); + queue->idle_threads++; + } + } + + queue->idle_threads--; + queue->cur_threads--; + qemu_mutex_unlock(&queue->lock); + + return NULL; +} + +static void spawn_threadlet(ThreadletQueue *queue) +{ + QemuThread thread; + + queue->cur_threads++; + queue->idle_threads++; + + qemu_thread_create(&thread, threadlet_worker, queue); +} + +/** + * submit_work_to_queue: Submit a new task to a private queue to be + * executed asynchronously. + * @queue: Per-subsystem private queue to which the new task needs + * to be submitted. + * @work: Contains information about the task that needs to be submitted. + */ +void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work) +{ + qemu_mutex_lock(&queue->lock); + if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) { + spawn_threadlet(queue); + } else { + qemu_cond_signal(&queue->cond); + } + QTAILQ_INSERT_TAIL(&queue->request_list, work, node); + qemu_mutex_unlock(&queue->lock); +} + +/** + * submit_work: Submit to the global queue a new task to be executed + * asynchronously. + * @work: Contains information about the task that needs to be submitted. + */ +void submit_work(ThreadletWork *work) +{ + if (!globalqueue_init) { + threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS, + MIN_GLOBAL_THREADS); + globalqueue_init = 1; + } + + submit_work_to_queue(&globalqueue, work); +} + +/** + * dequeue_work_on_queue: Cancel a task queued on a Queue. + * @queue: The queue containing the task to be cancelled. + * @work: Contains the information of the task that needs to be cancelled. + * + * Returns: 0 if the task is successfully cancelled. + * 1 otherwise. + */ +int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work) +{ + ThreadletWork *ret_work; + int ret = 1; + + qemu_mutex_lock(&queue->lock); + QTAILQ_FOREACH(ret_work, &(queue->request_list), node) { + if (ret_work == work) { + QTAILQ_REMOVE(&queue->request_list, ret_work, node); + ret = 0; + break; + } + } + qemu_mutex_unlock(&queue->lock); + + return ret; +} + +/** + * dequeue_work: Cancel a task queued on the global queue. + * @work: Contains the information of the task that needs to be cancelled. + * + * Returns: 0 if the task is successfully cancelled. + * 1 otherwise. + */ +int dequeue_work(ThreadletWork *work) +{ + return dequeue_work_on_queue(&globalqueue, work); +} + +/** + * threadlet_queue_init: Initialize a threadlet queue. + * @queue: The threadlet queue to be initialized. + * @max_threads: Maximum number of threads processing the queue. + * @min_threads: Minimum number of threads processing the queue. + */ +void threadlet_queue_init(ThreadletQueue *queue, + int max_threads, int min_threads) +{ + queue->cur_threads = 0; + queue->idle_threads = 0; + queue->max_threads = max_threads; + queue->min_threads = min_threads; + QTAILQ_INIT(&queue->request_list); + qemu_mutex_init(&queue->lock); + qemu_cond_init(&queue->cond); +} diff --git a/qemu-threadlets.h b/qemu-threadlets.h new file mode 100644 index 0000000..39461a5 --- /dev/null +++ b/qemu-threadlets.h @@ -0,0 +1,46 @@ +/* + * Threadlet support for offloading tasks to be executed asynchronously + * + * Copyright IBM, Corp. 2008 + * Copyright IBM, Corp. 2010 + * + * Authors: + * Anthony Liguori + * Aneesh Kumar K.V + * Gautham R Shenoy + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + */ + +#ifndef QEMU_ASYNC_WORK_H +#define QEMU_ASYNC_WORK_H + +#include "qemu-queue.h" +#include "qemu-common.h" +#include "qemu-thread.h" + +typedef struct ThreadletQueue +{ + QemuMutex lock; + QemuCond cond; + int max_threads; + int min_threads; + int cur_threads; + int idle_threads; + QTAILQ_HEAD(, ThreadletWork) request_list; +} ThreadletQueue; + +typedef struct ThreadletWork +{ + QTAILQ_ENTRY(ThreadletWork) node; + void (*func)(struct ThreadletWork *work); +} ThreadletWork; + +void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work); +void submit_work(ThreadletWork *work); +int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work); +int dequeue_work(ThreadletWork *work); +void threadlet_queue_init(ThreadletQueue *queue, int max_threads, + int min_threads); +#endif