Message ID | 20101118180715.4434.10790.stgit@localhost6.localdomain6 |
---|---|
State | New |
Headers | show |
On 11/18/2010 10:07 AM, Arun R Bharadwaj wrote: > The reason for creating this generic infrastructure is so that other subsystems, > such as virtio-9p could make use of it for offloading tasks that could block. If I read it right..this patch is nothing but reorganizing the code. i.e moving the code out of posix-aio-compat.c into new files. Please make that explicit in the comment so that review becomes easy. :) Thanks, JV > > Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com> > Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com> > Signed-off-by: Gautham R Shenoy <gautham.shenoy@gmail.com> > Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com> > > Acked-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com> > --- > Makefile.objs | 1 > docs/async-support.txt | 141 ++++++++++++++++++++++++++++++++++++++ > posix-aio-compat.c | 180 ------------------------------------------------ > qemu-threadlets.c | 168 +++++++++++++++++++++++++++++++++++++++++++++ > qemu-threadlets.h | 46 ++++++++++++ > 5 files changed, 356 insertions(+), 180 deletions(-) > create mode 100644 docs/async-support.txt > create mode 100644 qemu-threadlets.c > create mode 100644 qemu-threadlets.h > > diff --git a/Makefile.objs b/Makefile.objs > index 3b7ec27..2cf8aba 100644 > --- a/Makefile.objs > +++ b/Makefile.objs > @@ -10,6 +10,7 @@ qobject-obj-y += qerror.o > block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o > block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o > block-obj-$(CONFIG_POSIX) += qemu-thread.o > +block-obj-$(CONFIG_POSIX) += qemu-threadlets.o > block-obj-$(CONFIG_POSIX) += posix-aio-compat.o > block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o > > diff --git a/docs/async-support.txt b/docs/async-support.txt > new file mode 100644 > index 0000000..9f22b9a > --- /dev/null > +++ b/docs/async-support.txt > @@ -0,0 +1,141 @@ > +== How to use the threadlets infrastructure supported in Qemu == > + > +== Threadlets == > + > +Q.1: What are threadlets ? > +A.1: Threadlets is an infrastructure within QEMU that allows other subsystems > + to offload possibly blocking work to a queue to be processed by a pool > + of threads asynchronously. > + > +Q.2: When would one want to use threadlets ? > +A.2: Threadlets are useful when there are operations that can be performed > + outside the context of the VCPU/IO threads inorder to free these latter > + to service any other guest requests. > + > +Q.3: I have some work that can be executed in an asynchronous context. How > + should I go about it ? > +A.3: One could follow the steps listed below: > + > + - Define a function which would do the asynchronous work. > + static void my_threadlet_func(ThreadletWork *work) > + { > + } > + > + - Declare an object of type ThreadletWork; > + ThreadletWork work; > + > + > + - Assign a value to the "func" member of ThreadletWork object. > + work.func = my_threadlet_func; > + > + - Submit the threadlet to the global queue. > + submit_threadletwork(&work); > + > + - Continue servicing some other guest operations. > + > +Q.4: I want to my_threadlet_func to access some non-global data. How do I do > + that ? > +A.4: Suppose you want my_threadlet_func to access some non-global data-object > + of type myPrivateData. In that case one could follow the following steps. > + > + - Define a member of the type ThreadletWork within myPrivateData. > + typedef struct MyPrivateData { > + ...; > + ...; > + ...; > + ThreadletWork work; > + } MyPrivateData; > + > + MyPrivateData my_data; > + > + - Initialize myData.work as described in A.3 > + myData.work.func = my_threadlet_func; > + submit_threadletwork(&myData.work); > + > + - Access the myData object inside my_threadlet_func() using container_of > + primitive > + static void my_threadlet_func(ThreadletWork *work) > + { > + myPrivateData *mydata_ptr; > + mydata_ptr = container_of(work, myPrivateData, work); > + > + /* mydata_ptr now points to myData object */ > + } > + > +Q.5: Are there any precautions one must take while sharing data with the > + Asynchronous thread-pool ? > +A.5: Yes, make sure that the helper function of the type my_threadlet_func() > + does not access/modify data when it can be accessed or modified in the > + context of VCPU thread or IO thread. This is because the asynchronous > + threads in the pool can run in parallel with the VCPU/IOThreads as shown > + in the figure. > + > + A typical workflow is as follows: > + > + VCPU/IOThread > + | > + | (1) > + | > + V > + Offload work (2) > + |-------> to threadlets -----------------------------> Helper thread > + | | | > + | | | > + | | (3) | (4) > + | | | > + | Handle other Guest requests | > + | | | > + | | V > + | | (3) Signal the I/O Thread > + |(6) | | > + | | / > + | | / > + | V / > + | Do the post <---------------------------------/ > + | processing (5) > + | | > + | | (6) > + | V > + |-Yes------ More async work? > + | > + | (7) > + No > + | > + | > + . > + . > + > + Hence one needs to make sure that in the steps (3) and (4) which run in > + parallel, any global data is accessed within only one context. > + > +Q.6: I have queued a threadlet which I want to cancel. How do I do that ? > +A.6: Threadlets framework provides the API cancel_threadlet: > + - int cancel_threadletwork(ThreadletWork *work) > + > + The API scans the ThreadletQueue to see if (work) is present. If it finds > + work, it'll dequeue work and return 0. > + > + On the other hand, if it does not find the (work) in the ThreadletQueue, > + then it'll return 1. This can imply two things. Either the work is being > + processed by one of the helper threads or it has been processed. The > + threadlet infrastructure currently _does_not_ distinguish between these > + two and the onus is on the caller to do that. > + > +Q.7: Apart from the global pool of threads, can I have my own private Queue ? > +A.7: Yes, the threadlets framework allows subsystems to create their own private > + queues with associated pools of threads. > + > + - Define a PrivateQueue > + ThreadletQueue myQueue; > + > + - Initialize it: > + threadlet_queue_init(&myQueue, my_max_threads, my_min_threads); > + where my_max_threads is the maximum number of threads that can be in the > + thread pool and my_min_threads is the minimum number of active threads > + that can be in the thread-pool. > + > + - Submit work: > + submit_threadletwork_to_queue(&myQueue, &my_work); > + > + - Cancel work: > + cancel_threadletwork_on_queue(&myQueue, &my_work); > diff --git a/posix-aio-compat.c b/posix-aio-compat.c > index eb82fa4..20f53c7 100644 > --- a/posix-aio-compat.c > +++ b/posix-aio-compat.c > @@ -27,36 +27,9 @@ > #include "qemu-common.h" > #include "trace.h" > #include "block_int.h" > -#include "qemu-thread.h" > > #include "block/raw-posix-aio.h" > > -#define MAX_GLOBAL_THREADS 64 > -#define MIN_GLOBAL_THREADS 8 > - > -static QemuMutex aiocb_mutex; > -static QemuCond aiocb_completion; > - > -typedef struct ThreadletQueue > -{ > - QemuMutex lock; > - QemuCond cond; > - int max_threads; > - int min_threads; > - int cur_threads; > - int idle_threads; > - QTAILQ_HEAD(, ThreadletWork) request_list; > -} ThreadletQueue; > - > -typedef struct ThreadletWork > -{ > - QTAILQ_ENTRY(ThreadletWork) node; > - void (*func)(struct ThreadletWork *work); > -} ThreadletWork; > - > -static ThreadletQueue globalqueue; > -static int globalqueue_init; > - > struct qemu_paiocb { > BlockDriverAIOCB common; > int aio_fildes; > @@ -83,159 +56,6 @@ typedef struct PosixAioState { > struct qemu_paiocb *first_aio; > } PosixAioState; > > -static void *threadlet_worker(void *data) > -{ > - ThreadletQueue *queue = data; > - > - qemu_mutex_lock(&queue->lock); > - while (1) { > - ThreadletWork *work; > - int ret = 0; > - > - while (QTAILQ_EMPTY(&queue->request_list) && > - (ret != ETIMEDOUT)) { > - /* wait for cond to be signalled or broadcast for 1000s */ > - ret = qemu_cond_timedwait((&queue->cond), > - &(queue->lock), 10*100000); > - } > - > - assert(queue->idle_threads != 0); > - if (QTAILQ_EMPTY(&queue->request_list)) { > - if (queue->cur_threads > queue->min_threads) { > - /* We retain the minimum number of threads */ > - break; > - } > - } else { > - work = QTAILQ_FIRST(&queue->request_list); > - QTAILQ_REMOVE(&queue->request_list, work, node); > - > - queue->idle_threads--; > - qemu_mutex_unlock(&queue->lock); > - > - /* execute the work function */ > - work->func(work); > - > - qemu_mutex_lock(&queue->lock); > - queue->idle_threads++; > - } > - } > - > - queue->idle_threads--; > - queue->cur_threads--; > - qemu_mutex_unlock(&queue->lock); > - > - return NULL; > -} > - > -static void spawn_threadlet(ThreadletQueue *queue) > -{ > - QemuThread thread; > - > - queue->cur_threads++; > - queue->idle_threads++; > - > - qemu_thread_create(&thread, threadlet_worker, queue); > -} > - > -/** > - * submit_work_to_queue: Submit a new task to a private queue to be > - * executed asynchronously. > - * @queue: Per-subsystem private queue to which the new task needs > - * to be submitted. > - * @work: Contains information about the task that needs to be submitted. > - */ > - > -static void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work) > -{ > - qemu_mutex_lock(&queue->lock); > - if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) { > - spawn_threadlet(queue); > - } else { > - qemu_cond_signal(&queue->cond); > - } > - QTAILQ_INSERT_TAIL(&queue->request_list, work, node); > - qemu_mutex_unlock(&queue->lock); > -} > - > -static void threadlet_queue_init(ThreadletQueue *queue, > - int max_threads, int min_threads); > - > -/** > - * submit_work: Submit to the global queue a new task to be executed > - * asynchronously. > - * @work: Contains information about the task that needs to be submitted. > - */ > - > -static void submit_work(ThreadletWork *work) > -{ > - if (!globalqueue_init) { > - threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS, > - MIN_GLOBAL_THREADS); > - globalqueue_init = 1; > - } > - > - submit_work_to_queue(&globalqueue, work); > -} > - > -/** > - * dequeue_work_on_queue: Cancel a task queued on a Queue. > - * @queue: The queue containing the task to be cancelled. > - * @work: Contains the information of the task that needs to be cancelled. > - * > - * Returns: 0 if the task is successfully cancelled. > - * 1 otherwise. > - */ > - > -static int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work) > -{ > - ThreadletWork *ret_work; > - int ret = 1; > - > - qemu_mutex_lock(&queue->lock); > - QTAILQ_FOREACH(ret_work, &(queue->request_list), node) { > - if (ret_work == work) { > - QTAILQ_REMOVE(&queue->request_list, ret_work, node); > - ret = 0; > - break; > - } > - } > - qemu_mutex_unlock(&queue->lock); > - > - return ret; > -} > - > -/** > - * dequeue_work: Cancel a task queued on the global queue. > - * @work: Contains the information of the task that needs to be cancelled. > - * > - * Returns: 0 if the task is successfully cancelled. > - * 1 otherwise. > - */ > - > -static int dequeue_work(ThreadletWork *work) > -{ > - return dequeue_work_on_queue(&globalqueue, work); > -} > - > -/** > - * threadlet_queue_init: Initialize a threadlet queue. > - * @queue: The threadlet queue to be initialized. > - * @max_threads: Maximum number of threads processing the queue. > - * @min_threads: Minimum number of threads processing the queue. > - */ > - > -static void threadlet_queue_init(ThreadletQueue *queue, > - int max_threads, int min_threads) > -{ > - queue->cur_threads = 0; > - queue->idle_threads = 0; > - queue->max_threads = max_threads; > - queue->min_threads = min_threads; > - QTAILQ_INIT(&queue->request_list); > - qemu_mutex_init(&queue->lock); > - qemu_cond_init(&queue->cond); > -} > - > #ifdef CONFIG_PREADV > static int preadv_present = 1; > #else > diff --git a/qemu-threadlets.c b/qemu-threadlets.c > new file mode 100644 > index 0000000..23b4ecf > --- /dev/null > +++ b/qemu-threadlets.c > @@ -0,0 +1,168 @@ > +/* > + * Threadlet support for offloading tasks to be executed asynchronously > + * > + * Copyright IBM, Corp. 2008 > + * Copyright IBM, Corp. 2010 > + * > + * Authors: > + * Anthony Liguori <aliguori@us.ibm.com> > + * Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com> > + * Gautham R Shenoy <gautham.shenoy@gmail.com> > + * Arun R Bharadwaj <arun@linux.vnet.ibm.com> > + * > + * This work is licensed under the terms of the GNU GPL, version 2. See > + * the COPYING file in the top-level directory. > + */ > + > +#include "qemu-threadlets.h" > +#include "osdep.h" > + > +#define MAX_GLOBAL_THREADS 64 > +#define MIN_GLOBAL_THREADS 8 > +static ThreadletQueue globalqueue; > +static int globalqueue_init; > + > +static void *threadlet_worker(void *data) > +{ > + ThreadletQueue *queue = data; > + > + qemu_mutex_lock(&queue->lock); > + while (1) { > + ThreadletWork *work; > + int ret = 0; > + > + while (QTAILQ_EMPTY(&queue->request_list) && > + (ret != ETIMEDOUT)) { > + /* wait for cond to be signalled or broadcast for 1000s */ > + ret = qemu_cond_timedwait((&queue->cond), > + &(queue->lock), 10*100000); > + } > + > + assert(queue->idle_threads != 0); > + if (QTAILQ_EMPTY(&queue->request_list)) { > + if (queue->cur_threads > queue->min_threads) { > + /* We retain the minimum number of threads */ > + break; > + } > + } else { > + work = QTAILQ_FIRST(&queue->request_list); > + QTAILQ_REMOVE(&queue->request_list, work, node); > + > + queue->idle_threads--; > + qemu_mutex_unlock(&queue->lock); > + > + /* execute the work function */ > + work->func(work); > + > + qemu_mutex_lock(&queue->lock); > + queue->idle_threads++; > + } > + } > + > + queue->idle_threads--; > + queue->cur_threads--; > + qemu_mutex_unlock(&queue->lock); > + > + return NULL; > +} > + > +static void spawn_threadlet(ThreadletQueue *queue) > +{ > + QemuThread thread; > + > + queue->cur_threads++; > + queue->idle_threads++; > + > + qemu_thread_create(&thread, threadlet_worker, queue); > +} > + > +/** > + * submit_work_to_queue: Submit a new task to a private queue to be > + * executed asynchronously. > + * @queue: Per-subsystem private queue to which the new task needs > + * to be submitted. > + * @work: Contains information about the task that needs to be submitted. > + */ > +void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work) > +{ > + qemu_mutex_lock(&queue->lock); > + if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) { > + spawn_threadlet(queue); > + } else { > + qemu_cond_signal(&queue->cond); > + } > + QTAILQ_INSERT_TAIL(&queue->request_list, work, node); > + qemu_mutex_unlock(&queue->lock); > +} > + > +/** > + * submit_work: Submit to the global queue a new task to be executed > + * asynchronously. > + * @work: Contains information about the task that needs to be submitted. > + */ > +void submit_work(ThreadletWork *work) > +{ > + if (!globalqueue_init) { > + threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS, > + MIN_GLOBAL_THREADS); > + globalqueue_init = 1; > + } > + > + submit_work_to_queue(&globalqueue, work); > +} > + > +/** > + * dequeue_work_on_queue: Cancel a task queued on a Queue. > + * @queue: The queue containing the task to be cancelled. > + * @work: Contains the information of the task that needs to be cancelled. > + * > + * Returns: 0 if the task is successfully cancelled. > + * 1 otherwise. > + */ > +int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work) > +{ > + ThreadletWork *ret_work; > + int ret = 1; > + > + qemu_mutex_lock(&queue->lock); > + QTAILQ_FOREACH(ret_work, &(queue->request_list), node) { > + if (ret_work == work) { > + QTAILQ_REMOVE(&queue->request_list, ret_work, node); > + ret = 0; > + break; > + } > + } > + qemu_mutex_unlock(&queue->lock); > + > + return ret; > +} > + > +/** > + * dequeue_work: Cancel a task queued on the global queue. > + * @work: Contains the information of the task that needs to be cancelled. > + * > + * Returns: 0 if the task is successfully cancelled. > + * 1 otherwise. > + */ > +int dequeue_work(ThreadletWork *work) > +{ > + return dequeue_work_on_queue(&globalqueue, work); > +} > + > +/** > + * threadlet_queue_init: Initialize a threadlet queue. > + * @queue: The threadlet queue to be initialized. > + * @max_threads: Maximum number of threads processing the queue. > + * @min_threads: Minimum number of threads processing the queue. > + */ > +void threadlet_queue_init(ThreadletQueue *queue, > + int max_threads, int min_threads) > +{ > + queue->cur_threads = 0; > + queue->idle_threads = 0; > + queue->max_threads = max_threads; > + queue->min_threads = min_threads; > + QTAILQ_INIT(&queue->request_list); > + qemu_mutex_init(&queue->lock); > + qemu_cond_init(&queue->cond); > +} > diff --git a/qemu-threadlets.h b/qemu-threadlets.h > new file mode 100644 > index 0000000..39461a5 > --- /dev/null > +++ b/qemu-threadlets.h > @@ -0,0 +1,46 @@ > +/* > + * Threadlet support for offloading tasks to be executed asynchronously > + * > + * Copyright IBM, Corp. 2008 > + * Copyright IBM, Corp. 2010 > + * > + * Authors: > + * Anthony Liguori <aliguori@us.ibm.com> > + * Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com> > + * Gautham R Shenoy <gautham.shenoy@gmail.com> > + * > + * This work is licensed under the terms of the GNU GPL, version 2. See > + * the COPYING file in the top-level directory. > + */ > + > +#ifndef QEMU_ASYNC_WORK_H > +#define QEMU_ASYNC_WORK_H > + > +#include "qemu-queue.h" > +#include "qemu-common.h" > +#include "qemu-thread.h" > + > +typedef struct ThreadletQueue > +{ > + QemuMutex lock; > + QemuCond cond; > + int max_threads; > + int min_threads; > + int cur_threads; > + int idle_threads; > + QTAILQ_HEAD(, ThreadletWork) request_list; > +} ThreadletQueue; > + > +typedef struct ThreadletWork > +{ > + QTAILQ_ENTRY(ThreadletWork) node; > + void (*func)(struct ThreadletWork *work); > +} ThreadletWork; > + > +void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work); > +void submit_work(ThreadletWork *work); > +int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work); > +int dequeue_work(ThreadletWork *work); > +void threadlet_queue_init(ThreadletQueue *queue, int max_threads, > + int min_threads); > +#endif > >
diff --git a/Makefile.objs b/Makefile.objs index 3b7ec27..2cf8aba 100644 --- a/Makefile.objs +++ b/Makefile.objs @@ -10,6 +10,7 @@ qobject-obj-y += qerror.o block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o block-obj-$(CONFIG_POSIX) += qemu-thread.o +block-obj-$(CONFIG_POSIX) += qemu-threadlets.o block-obj-$(CONFIG_POSIX) += posix-aio-compat.o block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o diff --git a/docs/async-support.txt b/docs/async-support.txt new file mode 100644 index 0000000..9f22b9a --- /dev/null +++ b/docs/async-support.txt @@ -0,0 +1,141 @@ +== How to use the threadlets infrastructure supported in Qemu == + +== Threadlets == + +Q.1: What are threadlets ? +A.1: Threadlets is an infrastructure within QEMU that allows other subsystems + to offload possibly blocking work to a queue to be processed by a pool + of threads asynchronously. + +Q.2: When would one want to use threadlets ? +A.2: Threadlets are useful when there are operations that can be performed + outside the context of the VCPU/IO threads inorder to free these latter + to service any other guest requests. + +Q.3: I have some work that can be executed in an asynchronous context. How + should I go about it ? +A.3: One could follow the steps listed below: + + - Define a function which would do the asynchronous work. + static void my_threadlet_func(ThreadletWork *work) + { + } + + - Declare an object of type ThreadletWork; + ThreadletWork work; + + + - Assign a value to the "func" member of ThreadletWork object. + work.func = my_threadlet_func; + + - Submit the threadlet to the global queue. + submit_threadletwork(&work); + + - Continue servicing some other guest operations. + +Q.4: I want to my_threadlet_func to access some non-global data. How do I do + that ? +A.4: Suppose you want my_threadlet_func to access some non-global data-object + of type myPrivateData. In that case one could follow the following steps. + + - Define a member of the type ThreadletWork within myPrivateData. + typedef struct MyPrivateData { + ...; + ...; + ...; + ThreadletWork work; + } MyPrivateData; + + MyPrivateData my_data; + + - Initialize myData.work as described in A.3 + myData.work.func = my_threadlet_func; + submit_threadletwork(&myData.work); + + - Access the myData object inside my_threadlet_func() using container_of + primitive + static void my_threadlet_func(ThreadletWork *work) + { + myPrivateData *mydata_ptr; + mydata_ptr = container_of(work, myPrivateData, work); + + /* mydata_ptr now points to myData object */ + } + +Q.5: Are there any precautions one must take while sharing data with the + Asynchronous thread-pool ? +A.5: Yes, make sure that the helper function of the type my_threadlet_func() + does not access/modify data when it can be accessed or modified in the + context of VCPU thread or IO thread. This is because the asynchronous + threads in the pool can run in parallel with the VCPU/IOThreads as shown + in the figure. + + A typical workflow is as follows: + + VCPU/IOThread + | + | (1) + | + V + Offload work (2) + |-------> to threadlets -----------------------------> Helper thread + | | | + | | | + | | (3) | (4) + | | | + | Handle other Guest requests | + | | | + | | V + | | (3) Signal the I/O Thread + |(6) | | + | | / + | | / + | V / + | Do the post <---------------------------------/ + | processing (5) + | | + | | (6) + | V + |-Yes------ More async work? + | + | (7) + No + | + | + . + . + + Hence one needs to make sure that in the steps (3) and (4) which run in + parallel, any global data is accessed within only one context. + +Q.6: I have queued a threadlet which I want to cancel. How do I do that ? +A.6: Threadlets framework provides the API cancel_threadlet: + - int cancel_threadletwork(ThreadletWork *work) + + The API scans the ThreadletQueue to see if (work) is present. If it finds + work, it'll dequeue work and return 0. + + On the other hand, if it does not find the (work) in the ThreadletQueue, + then it'll return 1. This can imply two things. Either the work is being + processed by one of the helper threads or it has been processed. The + threadlet infrastructure currently _does_not_ distinguish between these + two and the onus is on the caller to do that. + +Q.7: Apart from the global pool of threads, can I have my own private Queue ? +A.7: Yes, the threadlets framework allows subsystems to create their own private + queues with associated pools of threads. + + - Define a PrivateQueue + ThreadletQueue myQueue; + + - Initialize it: + threadlet_queue_init(&myQueue, my_max_threads, my_min_threads); + where my_max_threads is the maximum number of threads that can be in the + thread pool and my_min_threads is the minimum number of active threads + that can be in the thread-pool. + + - Submit work: + submit_threadletwork_to_queue(&myQueue, &my_work); + + - Cancel work: + cancel_threadletwork_on_queue(&myQueue, &my_work); diff --git a/posix-aio-compat.c b/posix-aio-compat.c index eb82fa4..20f53c7 100644 --- a/posix-aio-compat.c +++ b/posix-aio-compat.c @@ -27,36 +27,9 @@ #include "qemu-common.h" #include "trace.h" #include "block_int.h" -#include "qemu-thread.h" #include "block/raw-posix-aio.h" -#define MAX_GLOBAL_THREADS 64 -#define MIN_GLOBAL_THREADS 8 - -static QemuMutex aiocb_mutex; -static QemuCond aiocb_completion; - -typedef struct ThreadletQueue -{ - QemuMutex lock; - QemuCond cond; - int max_threads; - int min_threads; - int cur_threads; - int idle_threads; - QTAILQ_HEAD(, ThreadletWork) request_list; -} ThreadletQueue; - -typedef struct ThreadletWork -{ - QTAILQ_ENTRY(ThreadletWork) node; - void (*func)(struct ThreadletWork *work); -} ThreadletWork; - -static ThreadletQueue globalqueue; -static int globalqueue_init; - struct qemu_paiocb { BlockDriverAIOCB common; int aio_fildes; @@ -83,159 +56,6 @@ typedef struct PosixAioState { struct qemu_paiocb *first_aio; } PosixAioState; -static void *threadlet_worker(void *data) -{ - ThreadletQueue *queue = data; - - qemu_mutex_lock(&queue->lock); - while (1) { - ThreadletWork *work; - int ret = 0; - - while (QTAILQ_EMPTY(&queue->request_list) && - (ret != ETIMEDOUT)) { - /* wait for cond to be signalled or broadcast for 1000s */ - ret = qemu_cond_timedwait((&queue->cond), - &(queue->lock), 10*100000); - } - - assert(queue->idle_threads != 0); - if (QTAILQ_EMPTY(&queue->request_list)) { - if (queue->cur_threads > queue->min_threads) { - /* We retain the minimum number of threads */ - break; - } - } else { - work = QTAILQ_FIRST(&queue->request_list); - QTAILQ_REMOVE(&queue->request_list, work, node); - - queue->idle_threads--; - qemu_mutex_unlock(&queue->lock); - - /* execute the work function */ - work->func(work); - - qemu_mutex_lock(&queue->lock); - queue->idle_threads++; - } - } - - queue->idle_threads--; - queue->cur_threads--; - qemu_mutex_unlock(&queue->lock); - - return NULL; -} - -static void spawn_threadlet(ThreadletQueue *queue) -{ - QemuThread thread; - - queue->cur_threads++; - queue->idle_threads++; - - qemu_thread_create(&thread, threadlet_worker, queue); -} - -/** - * submit_work_to_queue: Submit a new task to a private queue to be - * executed asynchronously. - * @queue: Per-subsystem private queue to which the new task needs - * to be submitted. - * @work: Contains information about the task that needs to be submitted. - */ - -static void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work) -{ - qemu_mutex_lock(&queue->lock); - if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) { - spawn_threadlet(queue); - } else { - qemu_cond_signal(&queue->cond); - } - QTAILQ_INSERT_TAIL(&queue->request_list, work, node); - qemu_mutex_unlock(&queue->lock); -} - -static void threadlet_queue_init(ThreadletQueue *queue, - int max_threads, int min_threads); - -/** - * submit_work: Submit to the global queue a new task to be executed - * asynchronously. - * @work: Contains information about the task that needs to be submitted. - */ - -static void submit_work(ThreadletWork *work) -{ - if (!globalqueue_init) { - threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS, - MIN_GLOBAL_THREADS); - globalqueue_init = 1; - } - - submit_work_to_queue(&globalqueue, work); -} - -/** - * dequeue_work_on_queue: Cancel a task queued on a Queue. - * @queue: The queue containing the task to be cancelled. - * @work: Contains the information of the task that needs to be cancelled. - * - * Returns: 0 if the task is successfully cancelled. - * 1 otherwise. - */ - -static int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work) -{ - ThreadletWork *ret_work; - int ret = 1; - - qemu_mutex_lock(&queue->lock); - QTAILQ_FOREACH(ret_work, &(queue->request_list), node) { - if (ret_work == work) { - QTAILQ_REMOVE(&queue->request_list, ret_work, node); - ret = 0; - break; - } - } - qemu_mutex_unlock(&queue->lock); - - return ret; -} - -/** - * dequeue_work: Cancel a task queued on the global queue. - * @work: Contains the information of the task that needs to be cancelled. - * - * Returns: 0 if the task is successfully cancelled. - * 1 otherwise. - */ - -static int dequeue_work(ThreadletWork *work) -{ - return dequeue_work_on_queue(&globalqueue, work); -} - -/** - * threadlet_queue_init: Initialize a threadlet queue. - * @queue: The threadlet queue to be initialized. - * @max_threads: Maximum number of threads processing the queue. - * @min_threads: Minimum number of threads processing the queue. - */ - -static void threadlet_queue_init(ThreadletQueue *queue, - int max_threads, int min_threads) -{ - queue->cur_threads = 0; - queue->idle_threads = 0; - queue->max_threads = max_threads; - queue->min_threads = min_threads; - QTAILQ_INIT(&queue->request_list); - qemu_mutex_init(&queue->lock); - qemu_cond_init(&queue->cond); -} - #ifdef CONFIG_PREADV static int preadv_present = 1; #else diff --git a/qemu-threadlets.c b/qemu-threadlets.c new file mode 100644 index 0000000..23b4ecf --- /dev/null +++ b/qemu-threadlets.c @@ -0,0 +1,168 @@ +/* + * Threadlet support for offloading tasks to be executed asynchronously + * + * Copyright IBM, Corp. 2008 + * Copyright IBM, Corp. 2010 + * + * Authors: + * Anthony Liguori <aliguori@us.ibm.com> + * Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com> + * Gautham R Shenoy <gautham.shenoy@gmail.com> + * Arun R Bharadwaj <arun@linux.vnet.ibm.com> + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + */ + +#include "qemu-threadlets.h" +#include "osdep.h" + +#define MAX_GLOBAL_THREADS 64 +#define MIN_GLOBAL_THREADS 8 +static ThreadletQueue globalqueue; +static int globalqueue_init; + +static void *threadlet_worker(void *data) +{ + ThreadletQueue *queue = data; + + qemu_mutex_lock(&queue->lock); + while (1) { + ThreadletWork *work; + int ret = 0; + + while (QTAILQ_EMPTY(&queue->request_list) && + (ret != ETIMEDOUT)) { + /* wait for cond to be signalled or broadcast for 1000s */ + ret = qemu_cond_timedwait((&queue->cond), + &(queue->lock), 10*100000); + } + + assert(queue->idle_threads != 0); + if (QTAILQ_EMPTY(&queue->request_list)) { + if (queue->cur_threads > queue->min_threads) { + /* We retain the minimum number of threads */ + break; + } + } else { + work = QTAILQ_FIRST(&queue->request_list); + QTAILQ_REMOVE(&queue->request_list, work, node); + + queue->idle_threads--; + qemu_mutex_unlock(&queue->lock); + + /* execute the work function */ + work->func(work); + + qemu_mutex_lock(&queue->lock); + queue->idle_threads++; + } + } + + queue->idle_threads--; + queue->cur_threads--; + qemu_mutex_unlock(&queue->lock); + + return NULL; +} + +static void spawn_threadlet(ThreadletQueue *queue) +{ + QemuThread thread; + + queue->cur_threads++; + queue->idle_threads++; + + qemu_thread_create(&thread, threadlet_worker, queue); +} + +/** + * submit_work_to_queue: Submit a new task to a private queue to be + * executed asynchronously. + * @queue: Per-subsystem private queue to which the new task needs + * to be submitted. + * @work: Contains information about the task that needs to be submitted. + */ +void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work) +{ + qemu_mutex_lock(&queue->lock); + if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) { + spawn_threadlet(queue); + } else { + qemu_cond_signal(&queue->cond); + } + QTAILQ_INSERT_TAIL(&queue->request_list, work, node); + qemu_mutex_unlock(&queue->lock); +} + +/** + * submit_work: Submit to the global queue a new task to be executed + * asynchronously. + * @work: Contains information about the task that needs to be submitted. + */ +void submit_work(ThreadletWork *work) +{ + if (!globalqueue_init) { + threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS, + MIN_GLOBAL_THREADS); + globalqueue_init = 1; + } + + submit_work_to_queue(&globalqueue, work); +} + +/** + * dequeue_work_on_queue: Cancel a task queued on a Queue. + * @queue: The queue containing the task to be cancelled. + * @work: Contains the information of the task that needs to be cancelled. + * + * Returns: 0 if the task is successfully cancelled. + * 1 otherwise. + */ +int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work) +{ + ThreadletWork *ret_work; + int ret = 1; + + qemu_mutex_lock(&queue->lock); + QTAILQ_FOREACH(ret_work, &(queue->request_list), node) { + if (ret_work == work) { + QTAILQ_REMOVE(&queue->request_list, ret_work, node); + ret = 0; + break; + } + } + qemu_mutex_unlock(&queue->lock); + + return ret; +} + +/** + * dequeue_work: Cancel a task queued on the global queue. + * @work: Contains the information of the task that needs to be cancelled. + * + * Returns: 0 if the task is successfully cancelled. + * 1 otherwise. + */ +int dequeue_work(ThreadletWork *work) +{ + return dequeue_work_on_queue(&globalqueue, work); +} + +/** + * threadlet_queue_init: Initialize a threadlet queue. + * @queue: The threadlet queue to be initialized. + * @max_threads: Maximum number of threads processing the queue. + * @min_threads: Minimum number of threads processing the queue. + */ +void threadlet_queue_init(ThreadletQueue *queue, + int max_threads, int min_threads) +{ + queue->cur_threads = 0; + queue->idle_threads = 0; + queue->max_threads = max_threads; + queue->min_threads = min_threads; + QTAILQ_INIT(&queue->request_list); + qemu_mutex_init(&queue->lock); + qemu_cond_init(&queue->cond); +} diff --git a/qemu-threadlets.h b/qemu-threadlets.h new file mode 100644 index 0000000..39461a5 --- /dev/null +++ b/qemu-threadlets.h @@ -0,0 +1,46 @@ +/* + * Threadlet support for offloading tasks to be executed asynchronously + * + * Copyright IBM, Corp. 2008 + * Copyright IBM, Corp. 2010 + * + * Authors: + * Anthony Liguori <aliguori@us.ibm.com> + * Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com> + * Gautham R Shenoy <gautham.shenoy@gmail.com> + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + */ + +#ifndef QEMU_ASYNC_WORK_H +#define QEMU_ASYNC_WORK_H + +#include "qemu-queue.h" +#include "qemu-common.h" +#include "qemu-thread.h" + +typedef struct ThreadletQueue +{ + QemuMutex lock; + QemuCond cond; + int max_threads; + int min_threads; + int cur_threads; + int idle_threads; + QTAILQ_HEAD(, ThreadletWork) request_list; +} ThreadletQueue; + +typedef struct ThreadletWork +{ + QTAILQ_ENTRY(ThreadletWork) node; + void (*func)(struct ThreadletWork *work); +} ThreadletWork; + +void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work); +void submit_work(ThreadletWork *work); +int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work); +int dequeue_work(ThreadletWork *work); +void threadlet_queue_init(ThreadletQueue *queue, int max_threads, + int min_threads); +#endif