diff mbox

[1/3] Make paio subsystem use threadlets infrastructure

Message ID 20101110131946.29714.98218.stgit@localhost6.localdomain6
State New
Headers show

Commit Message

Arun Bharadwaj Nov. 10, 2010, 1:19 p.m. UTC
From: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>

This patch creates a generic asynchronous-task-offloading infrastructure named
threadlets.

The patch creates a global queue on-to which subsystems can queue their tasks to
be executed asynchronously.

The patch also provides API's that allow a subsystem to create a private queue
with an associated pool of threads.

The patch has been tested with fstress.

Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Signed-off-by: Gautham R Shenoy <gautham.shenoy@gmail.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
---
 Makefile.objs      |    2 
 configure          |    2 
 posix-aio-compat.c |  353 +++++++++++++++++++++++++++++++---------------------
 3 files changed, 210 insertions(+), 147 deletions(-)

Comments

Stefan Hajnoczi Nov. 10, 2010, 1:45 p.m. UTC | #1
On Wed, Nov 10, 2010 at 1:19 PM, Arun R Bharadwaj
<arun@linux.vnet.ibm.com> wrote:
> @@ -301,106 +431,58 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
>     return nbytes;
>  }
>
> -static void *aio_thread(void *unused)
> +static void aio_thread(ThreadletWork *work)
>  {
>     pid_t pid;
> +    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
> +    ssize_t ret = 0;
>
>     pid = getpid();
>
> -    while (1) {
> -        struct qemu_paiocb *aiocb;
> -        ssize_t ret = 0;
> -        qemu_timeval tv;
> -        struct timespec ts;
> -
> -        qemu_gettimeofday(&tv);
> -        ts.tv_sec = tv.tv_sec + 10;
> -        ts.tv_nsec = 0;
> -
> -        mutex_lock(&lock);
> -
> -        while (QTAILQ_EMPTY(&request_list) &&
> -               !(ret == ETIMEDOUT)) {
> -            ret = cond_timedwait(&cond, &lock, &ts);
> -        }
> -
> -        if (QTAILQ_EMPTY(&request_list))
> -            break;
> -
> -        aiocb = QTAILQ_FIRST(&request_list);
> -        QTAILQ_REMOVE(&request_list, aiocb, node);
> -        aiocb->active = 1;
> -        idle_threads--;
> -        mutex_unlock(&lock);
> -
> -        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> -        case QEMU_AIO_READ:
> -        case QEMU_AIO_WRITE:
> -            ret = handle_aiocb_rw(aiocb);
> -            break;
> -        case QEMU_AIO_FLUSH:
> -            ret = handle_aiocb_flush(aiocb);
> -            break;
> -        case QEMU_AIO_IOCTL:
> -            ret = handle_aiocb_ioctl(aiocb);
> -            break;
> -        default:
> -            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> -            ret = -EINVAL;
> -            break;
> -        }
> -
> -        mutex_lock(&lock);
> -        aiocb->ret = ret;
> -        idle_threads++;
> -        mutex_unlock(&lock);
> -
> -        if (kill(pid, aiocb->ev_signo)) die("kill failed");
> +    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> +    case QEMU_AIO_READ:
> +    case QEMU_AIO_WRITE:
> +        ret = handle_aiocb_rw(aiocb);
> +        break;
> +    case QEMU_AIO_FLUSH:
> +        ret = handle_aiocb_flush(aiocb);
> +        break;
> +    case QEMU_AIO_IOCTL:
> +        ret = handle_aiocb_ioctl(aiocb);
> +        break;
> +    default:
> +        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> +        ret = -EINVAL;
> +        break;
>     }
>
> -    idle_threads--;
> -    cur_threads--;
> -    mutex_unlock(&lock);
> -
> -    return NULL;
> -}
> -
> -static void spawn_thread(void)
> -{
> -    sigset_t set, oldset;
> -
> -    cur_threads++;
> -    idle_threads++;
> +    qemu_mutex_lock(&aiocb_mutex);
> +    aiocb->ret = ret;

This is where qemu_cond_broadcast() is needed.

> +    qemu_mutex_unlock(&aiocb_mutex);
>
> -    /* block all signals */
> -    if (sigfillset(&set)) die("sigfillset");
> -    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
> -
> -    thread_create(&thread_id, &attr, aio_thread, NULL);
> -
> -    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
> +    if (kill(pid, aiocb->ev_signo)) {
> +        die("kill failed");
> +    }
>  }
>
>  static void qemu_paio_submit(struct qemu_paiocb *aiocb)
>  {
> +    qemu_mutex_lock(&aiocb_mutex);
>     aiocb->ret = -EINPROGRESS;
> -    aiocb->active = 0;
> -    mutex_lock(&lock);
> -    if (idle_threads == 0 && cur_threads < max_threads)
> -        spawn_thread();
> -    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
> -    mutex_unlock(&lock);
> -    cond_signal(&cond);
> +    qemu_mutex_unlock(&aiocb_mutex);
> +
> +    aiocb->work.func = aio_thread;
> +    submit_work(&aiocb->work);
>  }
>
>  static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
>  {
>     ssize_t ret;
>
> -    mutex_lock(&lock);
> +    qemu_mutex_lock(&aiocb_mutex);
>     ret = aiocb->ret;
> -    mutex_unlock(&lock);
> -
> +    qemu_mutex_unlock(&aiocb_mutex);
> +    qemu_cond_broadcast(&aiocb_completion);

qemu_paio_return() gets the return value from qemu_paiocb.  It
shouldn't have side effects and is the wrong place to broadcast
completion.

>     return ret;
>  }
>
> @@ -534,22 +616,14 @@ static void paio_remove(struct qemu_paiocb *acb)
>  static void paio_cancel(BlockDriverAIOCB *blockacb)
>  {
>     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
> -    int active = 0;
> -
> -    mutex_lock(&lock);
> -    if (!acb->active) {
> -        QTAILQ_REMOVE(&request_list, acb, node);
> -        acb->ret = -ECANCELED;
> -    } else if (acb->ret == -EINPROGRESS) {
> -        active = 1;
> -    }
> -    mutex_unlock(&lock);
>
> -    if (active) {
> -        /* fail safe: if the aio could not be canceled, we wait for
> -           it */
> -        while (qemu_paio_error(acb) == EINPROGRESS)
> -            ;
> +    if (dequeue_work(&acb->work) != 0) {
> +        /* Wait for running work item to complete */
> +        qemu_mutex_lock(&aiocb_mutex);
> +        while (acb->ret == -EINPROGRESS) {
> +            qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
> +        }
> +        qemu_mutex_unlock(&aiocb_mutex);

I wonder if the condition variable has a measurable performance
overhead.  We unconditionally broadcast on paiocb completion.  One
idea would be to keep a counter of waiters (should only ever be 0 or
1) protected by aiocb_mutex and broadcast only when there is a waiter.
 I just want to share this idea, I don't know if it's necessary to
implement it or if it could even work without a race condition.

>     }
>
>     paio_remove(acb);
> @@ -618,11 +692,12 @@ int paio_init(void)
>     struct sigaction act;
>     PosixAioState *s;
>     int fds[2];
> -    int ret;
>
>     if (posix_aio_state)
>         return 0;
>
> +    qemu_mutex_init(&aiocb_mutex);

Also needs qemu_cond_init(&aiocb_completion).

Stefan
Arun Bharadwaj Nov. 10, 2010, 5:54 p.m. UTC | #2
* Stefan Hajnoczi <stefanha@gmail.com> [2010-11-10 13:45:29]:

> On Wed, Nov 10, 2010 at 1:19 PM, Arun R Bharadwaj
> <arun@linux.vnet.ibm.com> wrote:
> > @@ -301,106 +431,58 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
> >     return nbytes;
> >  }
> >
> > -static void *aio_thread(void *unused)
> > +static void aio_thread(ThreadletWork *work)
> >  {
> >     pid_t pid;
> > +    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
> > +    ssize_t ret = 0;
> >
> >     pid = getpid();
> >
> > -    while (1) {
> > -        struct qemu_paiocb *aiocb;
> > -        ssize_t ret = 0;
> > -        qemu_timeval tv;
> > -        struct timespec ts;
> > -
> > -        qemu_gettimeofday(&tv);
> > -        ts.tv_sec = tv.tv_sec + 10;
> > -        ts.tv_nsec = 0;
> > -
> > -        mutex_lock(&lock);
> > -
> > -        while (QTAILQ_EMPTY(&request_list) &&
> > -               !(ret == ETIMEDOUT)) {
> > -            ret = cond_timedwait(&cond, &lock, &ts);
> > -        }
> > -
> > -        if (QTAILQ_EMPTY(&request_list))
> > -            break;
> > -
> > -        aiocb = QTAILQ_FIRST(&request_list);
> > -        QTAILQ_REMOVE(&request_list, aiocb, node);
> > -        aiocb->active = 1;
> > -        idle_threads--;
> > -        mutex_unlock(&lock);
> > -
> > -        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> > -        case QEMU_AIO_READ:
> > -        case QEMU_AIO_WRITE:
> > -            ret = handle_aiocb_rw(aiocb);
> > -            break;
> > -        case QEMU_AIO_FLUSH:
> > -            ret = handle_aiocb_flush(aiocb);
> > -            break;
> > -        case QEMU_AIO_IOCTL:
> > -            ret = handle_aiocb_ioctl(aiocb);
> > -            break;
> > -        default:
> > -            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> > -            ret = -EINVAL;
> > -            break;
> > -        }
> > -
> > -        mutex_lock(&lock);
> > -        aiocb->ret = ret;
> > -        idle_threads++;
> > -        mutex_unlock(&lock);
> > -
> > -        if (kill(pid, aiocb->ev_signo)) die("kill failed");
> > +    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> > +    case QEMU_AIO_READ:
> > +    case QEMU_AIO_WRITE:
> > +        ret = handle_aiocb_rw(aiocb);
> > +        break;
> > +    case QEMU_AIO_FLUSH:
> > +        ret = handle_aiocb_flush(aiocb);
> > +        break;
> > +    case QEMU_AIO_IOCTL:
> > +        ret = handle_aiocb_ioctl(aiocb);
> > +        break;
> > +    default:
> > +        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> > +        ret = -EINVAL;
> > +        break;
> >     }
> >
> > -    idle_threads--;
> > -    cur_threads--;
> > -    mutex_unlock(&lock);
> > -
> > -    return NULL;
> > -}
> > -
> > -static void spawn_thread(void)
> > -{
> > -    sigset_t set, oldset;
> > -
> > -    cur_threads++;
> > -    idle_threads++;
> > +    qemu_mutex_lock(&aiocb_mutex);
> > +    aiocb->ret = ret;
> 
> This is where qemu_cond_broadcast() is needed.
> 
> > +    qemu_mutex_unlock(&aiocb_mutex);
> >
> > -    /* block all signals */
> > -    if (sigfillset(&set)) die("sigfillset");
> > -    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
> > -
> > -    thread_create(&thread_id, &attr, aio_thread, NULL);
> > -
> > -    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
> > +    if (kill(pid, aiocb->ev_signo)) {
> > +        die("kill failed");
> > +    }
> >  }
> >
> 
> I wonder if the condition variable has a measurable performance
> overhead.  We unconditionally broadcast on paiocb completion.  One
> idea would be to keep a counter of waiters (should only ever be 0 or
> 1) protected by aiocb_mutex and broadcast only when there is a waiter.
>  I just want to share this idea, I don't know if it's necessary to
> implement it or if it could even work without a race condition.
> 

I did not understand exactly why we are going to see a performane hit.
We will be doing a broadcast only after the aio_thread has finished
the work right? So how is this going to affect performance even if we
do a useless broadcast?

-arun
> >     }
> >
> >     paio_remove(acb);
> > @@ -618,11 +692,12 @@ int paio_init(void)
> >     struct sigaction act;
> >     PosixAioState *s;
> >     int fds[2];
> > -    int ret;
> >
> >     if (posix_aio_state)
> >         return 0;
> >
> > +    qemu_mutex_init(&aiocb_mutex);
> 
> Also needs qemu_cond_init(&aiocb_completion).
> 
> Stefan
>
Stefan Hajnoczi Nov. 10, 2010, 8:13 p.m. UTC | #3
On Wed, Nov 10, 2010 at 5:54 PM, Arun R Bharadwaj
<arun@linux.vnet.ibm.com> wrote:
> * Stefan Hajnoczi <stefanha@gmail.com> [2010-11-10 13:45:29]:
>> On Wed, Nov 10, 2010 at 1:19 PM, Arun R Bharadwaj
>> <arun@linux.vnet.ibm.com> wrote:
>> I wonder if the condition variable has a measurable performance
>> overhead.  We unconditionally broadcast on paiocb completion.  One
>> idea would be to keep a counter of waiters (should only ever be 0 or
>> 1) protected by aiocb_mutex and broadcast only when there is a waiter.
>>  I just want to share this idea, I don't know if it's necessary to
>> implement it or if it could even work without a race condition.
>>
>
> I did not understand exactly why we are going to see a performane hit.
> We will be doing a broadcast only after the aio_thread has finished
> the work right? So how is this going to affect performance even if we
> do a useless broadcast?

If aio_thread() broadcasts before raising the signal then POSIX aio
request completion is delayed by the time it takes to broadcast.  I
don't know if it matters though.

Stefan
Paolo Bonzini Nov. 11, 2010, 8:41 a.m. UTC | #4
On 11/10/2010 02:45 PM, Stefan Hajnoczi wrote:
> I wonder if the condition variable has a measurable performance
> overhead.  We unconditionally broadcast on paiocb completion.  One
> idea would be to keep a counter of waiters (should only ever be 0 or
> 1) protected by aiocb_mutex and broadcast only when there is a waiter.

This should be handled anyway by the pthreads library.  If we are sure 
there is only one waiter at most, you can change broadcast to signal but 
that would already be microoptimization.

Alternatively, you can use futexes to implement a completion signal 
(similar to Win32 events).  But it seems too much effort.

Paolo
Stefan Hajnoczi Nov. 11, 2010, 9:26 a.m. UTC | #5
On Thu, Nov 11, 2010 at 8:41 AM, Paolo Bonzini <pbonzini@redhat.com> wrote:
> On 11/10/2010 02:45 PM, Stefan Hajnoczi wrote:
>>
>> I wonder if the condition variable has a measurable performance
>> overhead.  We unconditionally broadcast on paiocb completion.  One
>> idea would be to keep a counter of waiters (should only ever be 0 or
>> 1) protected by aiocb_mutex and broadcast only when there is a waiter.
>
> This should be handled anyway by the pthreads library.  If we are sure there
> is only one waiter at most, you can change broadcast to signal but that
> would already be microoptimization.
>
> Alternatively, you can use futexes to implement a completion signal (similar
> to Win32 events).  But it seems too much effort.

Yes, let's use standard cond vars for now.

Stefan
diff mbox

Patch

diff --git a/Makefile.objs b/Makefile.objs
index cd5a24b..3b7ec27 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -9,6 +9,7 @@  qobject-obj-y += qerror.o
 
 block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
 block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
+block-obj-$(CONFIG_POSIX) += qemu-thread.o
 block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
 block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
 
@@ -124,7 +125,6 @@  endif
 common-obj-y += $(addprefix ui/, $(ui-obj-y))
 
 common-obj-y += iov.o acl.o
-common-obj-$(CONFIG_THREAD) += qemu-thread.o
 common-obj-y += notify.o event_notifier.o
 common-obj-y += qemu-timer.o
 
diff --git a/configure b/configure
index a079a49..addf733 100755
--- a/configure
+++ b/configure
@@ -2456,7 +2456,6 @@  if test "$vnc_png" != "no" ; then
 fi
 if test "$vnc_thread" != "no" ; then
   echo "CONFIG_VNC_THREAD=y" >> $config_host_mak
-  echo "CONFIG_THREAD=y" >> $config_host_mak
 fi
 if test "$fnmatch" = "yes" ; then
   echo "CONFIG_FNMATCH=y" >> $config_host_mak
@@ -2534,7 +2533,6 @@  if test "$xen" = "yes" ; then
 fi
 if test "$io_thread" = "yes" ; then
   echo "CONFIG_IOTHREAD=y" >> $config_host_mak
-  echo "CONFIG_THREAD=y" >> $config_host_mak
 fi
 if test "$linux_aio" = "yes" ; then
   echo "CONFIG_LINUX_AIO=y" >> $config_host_mak
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 7b862b5..dc5f8ec 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -29,7 +29,33 @@ 
 #include "block_int.h"
 
 #include "block/raw-posix-aio.h"
+#include "qemu-thread.h"
 
+#define MAX_GLOBAL_THREADS  64
+#define MIN_GLOBAL_THREADS   8
+
+static QemuMutex aiocb_mutex;
+static QemuCond aiocb_completion;
+
+typedef struct ThreadletQueue
+{
+    QemuMutex lock;
+    QemuCond cond;
+    int max_threads;
+    int min_threads;
+    int cur_threads;
+    int idle_threads;
+    QTAILQ_HEAD(, ThreadletWork) request_list;
+} ThreadletQueue;
+
+typedef struct ThreadletWork
+{
+    QTAILQ_ENTRY(ThreadletWork) node;
+    void (*func)(struct ThreadletWork *work);
+} ThreadletWork;
+
+static ThreadletQueue globalqueue;
+static int globalqueue_init;
 
 struct qemu_paiocb {
     BlockDriverAIOCB common;
@@ -44,13 +70,12 @@  struct qemu_paiocb {
     int ev_signo;
     off_t aio_offset;
 
-    QTAILQ_ENTRY(qemu_paiocb) node;
     int aio_type;
     ssize_t ret;
-    int active;
     struct qemu_paiocb *next;
 
     int async_context_id;
+    ThreadletWork work;
 };
 
 typedef struct PosixAioState {
@@ -58,64 +83,169 @@  typedef struct PosixAioState {
     struct qemu_paiocb *first_aio;
 } PosixAioState;
 
+static void *threadlet_worker(void *data)
+{
+    ThreadletQueue *queue = data;
 
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
+    qemu_mutex_lock(&queue->lock);
+    while (1) {
+        ThreadletWork *work;
+        int ret = 0;
+
+        while (QTAILQ_EMPTY(&queue->request_list) &&
+               (ret != ETIMEDOUT)) {
+            /* wait for cond to be signalled or broadcast for 1000s */
+            ret = qemu_cond_timedwait((&queue->cond),
+                                         &(queue->lock), 10*100000);
+        }
 
-#ifdef CONFIG_PREADV
-static int preadv_present = 1;
-#else
-static int preadv_present = 0;
-#endif
+        assert(queue->idle_threads != 0);
+        if (QTAILQ_EMPTY(&queue->request_list)) {
+            if (queue->cur_threads > queue->min_threads) {
+                /* We retain the minimum number of threads */
+                break;
+            }
+        } else {
+            work = QTAILQ_FIRST(&queue->request_list);
+            QTAILQ_REMOVE(&queue->request_list, work, node);
 
-static void die2(int err, const char *what)
-{
-    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
-    abort();
+            queue->idle_threads--;
+            qemu_mutex_unlock(&queue->lock);
+
+            /* execute the work function */
+            work->func(work);
+
+            qemu_mutex_lock(&queue->lock);
+            queue->idle_threads++;
+        }
+    }
+
+    queue->idle_threads--;
+    queue->cur_threads--;
+    qemu_mutex_unlock(&queue->lock);
+
+    return NULL;
 }
 
-static void die(const char *what)
+static void spawn_threadlet(ThreadletQueue *queue)
 {
-    die2(errno, what);
+    QemuThread thread;
+
+    queue->cur_threads++;
+    queue->idle_threads++;
+
+    qemu_thread_create(&thread, threadlet_worker, queue);
 }
 
-static void mutex_lock(pthread_mutex_t *mutex)
+/**
+ * submit_work_to_queue: Submit a new task to a private queue to be
+ *                            executed asynchronously.
+ * @queue: Per-subsystem private queue to which the new task needs
+ *         to be submitted.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+static void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work)
 {
-    int ret = pthread_mutex_lock(mutex);
-    if (ret) die2(ret, "pthread_mutex_lock");
+    qemu_mutex_lock(&queue->lock);
+    if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
+        spawn_threadlet(queue);
+    } else {
+        qemu_cond_signal(&queue->cond);
+    }
+    QTAILQ_INSERT_TAIL(&queue->request_list, work, node);
+    qemu_mutex_unlock(&queue->lock);
 }
 
-static void mutex_unlock(pthread_mutex_t *mutex)
+static void threadlet_queue_init(ThreadletQueue *queue,
+                          int max_threads, int min_threads);
+
+/**
+ * submit_work: Submit to the global queue a new task to be executed
+ *                   asynchronously.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+static void submit_work(ThreadletWork *work)
 {
-    int ret = pthread_mutex_unlock(mutex);
-    if (ret) die2(ret, "pthread_mutex_unlock");
+    if (!globalqueue_init) {
+        threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
+                                MIN_GLOBAL_THREADS);
+        globalqueue_init = 1;
+    }
+
+    submit_work_to_queue(&globalqueue, work);
 }
 
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
-                           struct timespec *ts)
+/**
+ * dequeue_work_on_queue: Cancel a task queued on a Queue.
+ * @queue: The queue containing the task to be cancelled.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ *          1 otherwise.
+ */
+static int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work)
 {
-    int ret = pthread_cond_timedwait(cond, mutex, ts);
-    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
+    ThreadletWork *ret_work;
+    int ret = 1;
+
+    qemu_mutex_lock(&queue->lock);
+    QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
+        if (ret_work == work) {
+            QTAILQ_REMOVE(&queue->request_list, ret_work, node);
+            ret = 0;
+            break;
+        }
+    }
+    qemu_mutex_unlock(&queue->lock);
+
     return ret;
 }
 
-static void cond_signal(pthread_cond_t *cond)
+/**
+ * dequeue_work: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ *          1 otherwise.
+ */
+static int dequeue_work(ThreadletWork *work)
+{
+    return dequeue_work_on_queue(&globalqueue, work);
+}
+
+/**
+ * threadlet_queue_init: Initialize a threadlet queue.
+ * @queue: The threadlet queue to be initialized.
+ * @max_threads: Maximum number of threads processing the queue.
+ * @min_threads: Minimum number of threads processing the queue.
+ */
+static void threadlet_queue_init(ThreadletQueue *queue,
+                                    int max_threads, int min_threads)
+{
+    queue->cur_threads  = 0;
+    queue->idle_threads = 0;
+    queue->max_threads  = max_threads;
+    queue->min_threads  = min_threads;
+    QTAILQ_INIT(&queue->request_list);
+    qemu_mutex_init(&queue->lock);
+    qemu_cond_init(&queue->cond);
+}
+
+#ifdef CONFIG_PREADV
+static int preadv_present = 1;
+#else
+static int preadv_present;
+#endif
+
+static void die2(int err, const char *what)
 {
-    int ret = pthread_cond_signal(cond);
-    if (ret) die2(ret, "pthread_cond_signal");
+    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
+    abort();
 }
 
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
-                          void *(*start_routine)(void*), void *arg)
+static void die(const char *what)
 {
-    int ret = pthread_create(thread, attr, start_routine, arg);
-    if (ret) die2(ret, "pthread_create");
+    die2(errno, what);
 }
 
 static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
@@ -301,106 +431,58 @@  static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
     return nbytes;
 }
 
-static void *aio_thread(void *unused)
+static void aio_thread(ThreadletWork *work)
 {
     pid_t pid;
+    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
+    ssize_t ret = 0;
 
     pid = getpid();
 
-    while (1) {
-        struct qemu_paiocb *aiocb;
-        ssize_t ret = 0;
-        qemu_timeval tv;
-        struct timespec ts;
-
-        qemu_gettimeofday(&tv);
-        ts.tv_sec = tv.tv_sec + 10;
-        ts.tv_nsec = 0;
-
-        mutex_lock(&lock);
-
-        while (QTAILQ_EMPTY(&request_list) &&
-               !(ret == ETIMEDOUT)) {
-            ret = cond_timedwait(&cond, &lock, &ts);
-        }
-
-        if (QTAILQ_EMPTY(&request_list))
-            break;
-
-        aiocb = QTAILQ_FIRST(&request_list);
-        QTAILQ_REMOVE(&request_list, aiocb, node);
-        aiocb->active = 1;
-        idle_threads--;
-        mutex_unlock(&lock);
-
-        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
-        case QEMU_AIO_READ:
-        case QEMU_AIO_WRITE:
-            ret = handle_aiocb_rw(aiocb);
-            break;
-        case QEMU_AIO_FLUSH:
-            ret = handle_aiocb_flush(aiocb);
-            break;
-        case QEMU_AIO_IOCTL:
-            ret = handle_aiocb_ioctl(aiocb);
-            break;
-        default:
-            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
-            ret = -EINVAL;
-            break;
-        }
-
-        mutex_lock(&lock);
-        aiocb->ret = ret;
-        idle_threads++;
-        mutex_unlock(&lock);
-
-        if (kill(pid, aiocb->ev_signo)) die("kill failed");
+    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+    case QEMU_AIO_READ:
+    case QEMU_AIO_WRITE:
+        ret = handle_aiocb_rw(aiocb);
+        break;
+    case QEMU_AIO_FLUSH:
+        ret = handle_aiocb_flush(aiocb);
+        break;
+    case QEMU_AIO_IOCTL:
+        ret = handle_aiocb_ioctl(aiocb);
+        break;
+    default:
+        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+        ret = -EINVAL;
+        break;
     }
 
-    idle_threads--;
-    cur_threads--;
-    mutex_unlock(&lock);
-
-    return NULL;
-}
-
-static void spawn_thread(void)
-{
-    sigset_t set, oldset;
-
-    cur_threads++;
-    idle_threads++;
+    qemu_mutex_lock(&aiocb_mutex);
+    aiocb->ret = ret;
+    qemu_mutex_unlock(&aiocb_mutex);
 
-    /* block all signals */
-    if (sigfillset(&set)) die("sigfillset");
-    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
-    thread_create(&thread_id, &attr, aio_thread, NULL);
-
-    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
+    if (kill(pid, aiocb->ev_signo)) {
+        die("kill failed");
+    }
 }
 
 static void qemu_paio_submit(struct qemu_paiocb *aiocb)
 {
+    qemu_mutex_lock(&aiocb_mutex);
     aiocb->ret = -EINPROGRESS;
-    aiocb->active = 0;
-    mutex_lock(&lock);
-    if (idle_threads == 0 && cur_threads < max_threads)
-        spawn_thread();
-    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
-    mutex_unlock(&lock);
-    cond_signal(&cond);
+    qemu_mutex_unlock(&aiocb_mutex);
+
+    aiocb->work.func = aio_thread;
+    submit_work(&aiocb->work);
 }
 
 static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
 {
     ssize_t ret;
 
-    mutex_lock(&lock);
+    qemu_mutex_lock(&aiocb_mutex);
     ret = aiocb->ret;
-    mutex_unlock(&lock);
-
+    qemu_mutex_unlock(&aiocb_mutex);
+    qemu_cond_broadcast(&aiocb_completion);
     return ret;
 }
 
@@ -534,22 +616,14 @@  static void paio_remove(struct qemu_paiocb *acb)
 static void paio_cancel(BlockDriverAIOCB *blockacb)
 {
     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
-    int active = 0;
-
-    mutex_lock(&lock);
-    if (!acb->active) {
-        QTAILQ_REMOVE(&request_list, acb, node);
-        acb->ret = -ECANCELED;
-    } else if (acb->ret == -EINPROGRESS) {
-        active = 1;
-    }
-    mutex_unlock(&lock);
 
-    if (active) {
-        /* fail safe: if the aio could not be canceled, we wait for
-           it */
-        while (qemu_paio_error(acb) == EINPROGRESS)
-            ;
+    if (dequeue_work(&acb->work) != 0) {
+        /* Wait for running work item to complete */
+        qemu_mutex_lock(&aiocb_mutex);
+        while (acb->ret == -EINPROGRESS) {
+            qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
+        }
+        qemu_mutex_unlock(&aiocb_mutex);
     }
 
     paio_remove(acb);
@@ -618,11 +692,12 @@  int paio_init(void)
     struct sigaction act;
     PosixAioState *s;
     int fds[2];
-    int ret;
 
     if (posix_aio_state)
         return 0;
 
+    qemu_mutex_init(&aiocb_mutex);
+
     s = qemu_malloc(sizeof(PosixAioState));
 
     sigfillset(&act.sa_mask);
@@ -645,16 +720,6 @@  int paio_init(void)
     qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
         posix_aio_process_queue, s);
 
-    ret = pthread_attr_init(&attr);
-    if (ret)
-        die2(ret, "pthread_attr_init");
-
-    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-    if (ret)
-        die2(ret, "pthread_attr_setdetachstate");
-
-    QTAILQ_INIT(&request_list);
-
     posix_aio_state = s;
     return 0;
 }