@@ -99,6 +99,25 @@ static void spawn_threadlet(ThreadletQueue *queue)
}
/**
+ * submit_work_to_queue: Submit a new task to a private queue to be
+ * executed asynchronously.
+ * @queue: Per-subsystem private queue to which the new task needs
+ * to be submitted.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work)
+{
+ qemu_mutex_lock(&queue->lock);
+ if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
+ spawn_threadlet(queue);
+ } else {
+ qemu_cond_signal(&queue->cond);
+ }
+ QTAILQ_INSERT_TAIL(&queue->request_list, work, node);
+ qemu_mutex_unlock(&queue->lock);
+}
+
+/**
* submit_work: Submit to the global queue a new task to be executed
* asynchronously.
* @work: Contains information about the task that needs to be submitted.
@@ -106,43 +125,60 @@ static void spawn_threadlet(ThreadletQueue *queue)
void submit_work(ThreadletWork *work)
{
if (!globalqueue_init) {
- globalqueue.cur_threads = 0;
- globalqueue.idle_threads = 0;
- globalqueue.max_threads = MAX_GLOBAL_THREADS;
- globalqueue.min_threads = MIN_GLOBAL_THREADS;
- QTAILQ_INIT(&globalqueue.request_list);
- qemu_mutex_init(&globalqueue.lock);
- qemu_cond_init(&globalqueue.cond);
-
+ threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
+ MIN_GLOBAL_THREADS);
globalqueue_init = 1;
}
- qemu_mutex_lock(&globalqueue.lock);
- if (globalqueue.idle_threads == 0 &&
- globalqueue.cur_threads < globalqueue.max_threads) {
- spawn_threadlet(&globalqueue);
- } else {
- qemu_cond_signal(&globalqueue.cond);
- }
- QTAILQ_INSERT_TAIL(&globalqueue.request_list, work, node);
- qemu_mutex_unlock(&globalqueue.lock);
+ submit_work_to_queue(&globalqueue, work);
}
/**
- * dequeue_work: Cancel a task queued on the global queue.
+ * dequeue_work_on_queue: Cancel a task queued on a Queue.
+ * @queue: The queue containing the task to be cancelled.
* @work: Contains the information of the task that needs to be cancelled.
*
* Returns: 0 if the task is successfully cancelled.
* 1 otherwise.
*/
-int dequeue_work(ThreadletWork *work)
+int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work)
{
int ret = 1;
- qemu_mutex_lock(&globalqueue.lock);
- QTAILQ_REMOVE(&globalqueue.request_list, work, node);
+ qemu_mutex_lock(&queue->lock);
+ QTAILQ_REMOVE(&queue->request_list, work, node);
ret = 0;
- qemu_mutex_unlock(&globalqueue.lock);
+ qemu_mutex_unlock(&queue->lock);
return ret;
}
+
+/**
+ * dequeue_work: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ * 1 otherwise.
+ */
+int dequeue_work(ThreadletWork *work)
+{
+ return dequeue_work_on_queue(&globalqueue, work);
+}
+
+/**
+ * threadlet_queue_init: Initialize a threadlet queue.
+ * @queue: The threadlet queue to be initialized.
+ * @max_threads: Maximum number of threads processing the queue.
+ * @min_threads: Minimum number of threads processing the queue.
+ */
+void threadlet_queue_init(ThreadletQueue *queue,
+ int max_threads, int min_threads)
+{
+ queue->cur_threads = 0;
+ queue->idle_threads = 0;
+ queue->max_threads = max_threads;
+ queue->min_threads = min_threads;
+ QTAILQ_INIT(&queue->request_list);
+ qemu_mutex_init(&queue->lock);
+ qemu_cond_init(&queue->cond);
+}
@@ -37,7 +37,11 @@ typedef struct ThreadletWork
void (*func)(struct ThreadletWork *work);
} ThreadletWork;
+void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work);
void submit_work(ThreadletWork *work);
+int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work);
int dequeue_work(ThreadletWork *work);
void threadlet_init(void);
+void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
+ int min_threads);
#endif
This patch allows subsystems to create their own private queues with associated pools of threads. Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com> --- qemu-threadlets.c | 80 ++++++++++++++++++++++++++++++++++++++--------------- qemu-threadlets.h | 4 +++ 2 files changed, 62 insertions(+), 22 deletions(-)