diff mbox

[11/12] Threadlets: Add functionality to create private queues.

Message ID 20110120123416.17667.43009.stgit@localhost6.localdomain6
State New
Headers show

Commit Message

Arun Bharadwaj Jan. 20, 2011, 12:34 p.m. UTC
This patch allows subsystems to create their own private
queues with associated pools of threads.

Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Reviewed-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
 qemu-threadlet.c |   85 +++++++++++++++++++++++++++++++++++++-----------------
 qemu-threadlet.h |    4 +++
 2 files changed, 63 insertions(+), 26 deletions(-)
diff mbox

Patch

diff --git a/qemu-threadlet.c b/qemu-threadlet.c
index 6523f08..4bad8c6 100644
--- a/qemu-threadlet.c
+++ b/qemu-threadlet.c
@@ -99,6 +99,25 @@  static void spawn_threadlet(ThreadletQueue *queue)
 }
 
 /**
+ * submit_work_to_queue: Submit a new task to a private queue to be
+ *                            executed asynchronously.
+ * @queue: Per-subsystem private queue to which the new task needs
+ *         to be submitted.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work)
+{
+    qemu_mutex_lock(&queue->lock);
+    if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
+        spawn_threadlet(queue);
+    } else {
+        qemu_cond_signal(&queue->cond);
+    }
+    QTAILQ_INSERT_TAIL(&queue->request_list, work, node);
+    qemu_mutex_unlock(&queue->lock);
+}
+
+/**
  * submit_work: Submit to the global queue a new task to be executed
  *                   asynchronously.
  * @work: Contains information about the task that needs to be submitted.
@@ -106,49 +125,63 @@  static void spawn_threadlet(ThreadletQueue *queue)
 void submit_work(ThreadletWork *work)
 {
     if (!globalqueue_init) {
-        globalqueue.cur_threads  = 0;
-        globalqueue.idle_threads = 0;
-        globalqueue.max_threads = MAX_GLOBAL_THREADS;
-        globalqueue.min_threads = MIN_GLOBAL_THREADS;
-        QTAILQ_INIT(&globalqueue.request_list);
-        qemu_mutex_init(&globalqueue.lock);
-        qemu_cond_init(&globalqueue.cond);
-
+        threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
+                                MIN_GLOBAL_THREADS);
         globalqueue_init = 1;
     }
 
-    qemu_mutex_lock(&globalqueue.lock);
-    if (globalqueue.idle_threads == 0 &&
-        globalqueue.cur_threads < globalqueue.max_threads) {
-        spawn_threadlet(&globalqueue);
-    } else {
-        qemu_cond_signal(&globalqueue.cond);
-    }
-    QTAILQ_INSERT_TAIL(&globalqueue.request_list, work, node);
-    qemu_mutex_unlock(&globalqueue.lock);
+    submit_work_to_queue(&globalqueue, work);
 }
 
 /**
- * dequeue_work: Cancel a task queued on the global queue.
+ * dequeue_work_on_queue: Cancel a task queued on a Queue.
+ * @queue: The queue containing the task to be cancelled.
  * @work: Contains the information of the task that needs to be cancelled.
- *
- * Returns:     0 if successfully dequeued work.
- *              1 otherwise.
  */
-int dequeue_work(ThreadletWork *work)
+int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work)
 {
     int ret = 1;
     ThreadletWork *ret_work;
 
-    qemu_mutex_lock(&globalqueue.lock);
-    QTAILQ_FOREACH(ret_work, &(globalqueue.request_list), node) {
+    qemu_mutex_lock(&queue->lock);
+    QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
         if (ret_work == work) {
-            QTAILQ_REMOVE(&globalqueue.request_list, ret_work, node);
+            QTAILQ_REMOVE(&queue->request_list, work, node);
             ret = 0;
             break;
         }
     }
-    qemu_mutex_unlock(&globalqueue.lock);
+    qemu_mutex_unlock(&queue->lock);
 
     return ret;
 }
+
+/**
+ * dequeue_work: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns:     0 if successfully dequeued work.
+ *              1 otherwise.
+ */
+int dequeue_work(ThreadletWork *work)
+{
+    return dequeue_work_on_queue(&globalqueue, work);
+}
+
+/**
+ * threadlet_queue_init: Initialize a threadlet queue.
+ * @queue: The threadlet queue to be initialized.
+ * @max_threads: Maximum number of threads processing the queue.
+ * @min_threads: Minimum number of threads processing the queue.
+ */
+void threadlet_queue_init(ThreadletQueue *queue,
+                                    int max_threads, int min_threads)
+{
+    queue->cur_threads  = 0;
+    queue->idle_threads = 0;
+    queue->max_threads  = max_threads;
+    queue->min_threads  = min_threads;
+    QTAILQ_INIT(&queue->request_list);
+    qemu_mutex_init(&queue->lock);
+    qemu_cond_init(&queue->cond);
+}
diff --git a/qemu-threadlet.h b/qemu-threadlet.h
index 6b11c86..25e77f2 100644
--- a/qemu-threadlet.h
+++ b/qemu-threadlet.h
@@ -38,7 +38,11 @@  typedef struct ThreadletWork
     void (*func)(struct ThreadletWork *work);
 } ThreadletWork;
 
+void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work);
 void submit_work(ThreadletWork *work);
+int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work);
 int dequeue_work(ThreadletWork *work);
 void threadlet_init(void);
+void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
+                          int min_threads);
 #endif