Patchwork [12/13] Threadlets: Add functionality to create private queues.

login
register
mail settings
Submitter Arun Bharadwaj
Date Jan. 4, 2011, 5:28 a.m.
Message ID <20110104052823.15887.3046.stgit@localhost6.localdomain6>
Download mbox | patch
Permalink /patch/77385/
State New
Headers show

Comments

Arun Bharadwaj - Jan. 4, 2011, 5:28 a.m.
This patch allows subsystems to create their own private
queues with associated pools of threads.

Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
---
 qemu-threadlets.c |   80 ++++++++++++++++++++++++++++++++++++++---------------
 qemu-threadlets.h |    4 +++
 2 files changed, 62 insertions(+), 22 deletions(-)

Patch

diff --git a/qemu-threadlets.c b/qemu-threadlets.c
index 4702c02..7075331 100644
--- a/qemu-threadlets.c
+++ b/qemu-threadlets.c
@@ -99,6 +99,25 @@  static void spawn_threadlet(ThreadletQueue *queue)
 }
 
 /**
+ * submit_work_to_queue: Submit a new task to a private queue to be
+ *                            executed asynchronously.
+ * @queue: Per-subsystem private queue to which the new task needs
+ *         to be submitted.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work)
+{
+    qemu_mutex_lock(&queue->lock);
+    if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
+        spawn_threadlet(queue);
+    } else {
+        qemu_cond_signal(&queue->cond);
+    }
+    QTAILQ_INSERT_TAIL(&queue->request_list, work, node);
+    qemu_mutex_unlock(&queue->lock);
+}
+
+/**
  * submit_work: Submit to the global queue a new task to be executed
  *                   asynchronously.
  * @work: Contains information about the task that needs to be submitted.
@@ -106,43 +125,60 @@  static void spawn_threadlet(ThreadletQueue *queue)
 void submit_work(ThreadletWork *work)
 {
     if (!globalqueue_init) {
-        globalqueue.cur_threads  = 0;
-        globalqueue.idle_threads = 0;
-        globalqueue.max_threads = MAX_GLOBAL_THREADS;
-        globalqueue.min_threads = MIN_GLOBAL_THREADS;
-        QTAILQ_INIT(&globalqueue.request_list);
-        qemu_mutex_init(&globalqueue.lock);
-        qemu_cond_init(&globalqueue.cond);
-
+        threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
+                                MIN_GLOBAL_THREADS);
         globalqueue_init = 1;
     }
 
-    qemu_mutex_lock(&globalqueue.lock);
-    if (globalqueue.idle_threads == 0 &&
-        globalqueue.cur_threads < globalqueue.max_threads) {
-        spawn_threadlet(&globalqueue);
-    } else {
-        qemu_cond_signal(&globalqueue.cond);
-    }
-    QTAILQ_INSERT_TAIL(&globalqueue.request_list, work, node);
-    qemu_mutex_unlock(&globalqueue.lock);
+    submit_work_to_queue(&globalqueue, work);
 }
 
 /**
- * dequeue_work: Cancel a task queued on the global queue.
+ * dequeue_work_on_queue: Cancel a task queued on a Queue.
+ * @queue: The queue containing the task to be cancelled.
  * @work: Contains the information of the task that needs to be cancelled.
  *
  * Returns: 0 if the task is successfully cancelled.
  *          1 otherwise.
  */
-int dequeue_work(ThreadletWork *work)
+int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work)
 {
     int ret = 1;
 
-    qemu_mutex_lock(&globalqueue.lock);
-    QTAILQ_REMOVE(&globalqueue.request_list, work, node);
+    qemu_mutex_lock(&queue->lock);
+    QTAILQ_REMOVE(&queue->request_list, work, node);
     ret = 0;
-    qemu_mutex_unlock(&globalqueue.lock);
+    qemu_mutex_unlock(&queue->lock);
 
     return ret;
 }
+
+/**
+ * dequeue_work: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ *          1 otherwise.
+ */
+int dequeue_work(ThreadletWork *work)
+{
+    return dequeue_work_on_queue(&globalqueue, work);
+}
+
+/**
+ * threadlet_queue_init: Initialize a threadlet queue.
+ * @queue: The threadlet queue to be initialized.
+ * @max_threads: Maximum number of threads processing the queue.
+ * @min_threads: Minimum number of threads processing the queue.
+ */
+void threadlet_queue_init(ThreadletQueue *queue,
+                                    int max_threads, int min_threads)
+{
+    queue->cur_threads  = 0;
+    queue->idle_threads = 0;
+    queue->max_threads  = max_threads;
+    queue->min_threads  = min_threads;
+    QTAILQ_INIT(&queue->request_list);
+    qemu_mutex_init(&queue->lock);
+    qemu_cond_init(&queue->cond);
+}
diff --git a/qemu-threadlets.h b/qemu-threadlets.h
index 03bb86b..993d7ab 100644
--- a/qemu-threadlets.h
+++ b/qemu-threadlets.h
@@ -37,7 +37,11 @@  typedef struct ThreadletWork
     void (*func)(struct ThreadletWork *work);
 } ThreadletWork;
 
+void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work);
 void submit_work(ThreadletWork *work);
+int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work);
 int dequeue_work(ThreadletWork *work);
 void threadlet_init(void);
+void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
+                          int min_threads);
 #endif