diff mbox

[RFC,0/7] Introduce hard dependency on glib

Message ID 4D3E182D.7080308@codemonkey.ws
State New
Headers show

Commit Message

Anthony Liguori Jan. 25, 2011, 12:24 a.m. UTC
On 01/24/2011 03:00 PM, Anthony Liguori wrote:
> Both the recent I/O loop and threadlet series have me concerned that we're
> digging ourselves deeper into the NIH hole.  I think it's time we look at
> something radical to let us borrow more code from existing projects instead of
> reinventing everything through trial and error.
>
> This series introduces a hard dependency on glib.  The initial use is portable
> threads but I see this as just the beginning.  Glib/Gobject offer many nice
> things including:
>
>   - portable threads
>   - rich data structure support
>   - INI parser
>   - JSON parser
>   - generic type system
>   - object oriented infrastructure
>   - IO library
>   - module system
>   - introspection to enable support for dynamic language bindings
>
> I see this series as the first step, followed by converting the I/O loop to
> a GMainLoop instance.  Once we're there, we can start making deeper use of
> GObjects including converting QDev to a GObject hierarchy.
>
> I've spent the past few months working on C++ integration for QEMU.  I'm more
> convinced than ever that we desperately in need of structured object oriented
> mechanisms to be successful but am pretty strongly convinced that incremental
> additional of C++ is not going to be successful.
>
> On the other hand, while GObjects are uglier and require a lot of template code,
> there's more than enough structure that I think it can guide us into a much
> better object model implementation.
>
> There is some ugliness.   GLib does not abstract signals because they're very
> non-portable but QEMU makes extensive use of signaling.  I don't think it's
> a major issue but some of the ugliness in this series is due to that fact.
>
> This series is only lightly tested but also mostly mechanical.  I'm pretty
> confused by the way tcg_halt_cond and friends works so I'm fairly sure I broke
> that (for non-threaded TCG).
>    

Just to share where this is going, attached patch removes the posix-aio 
thread pool and replaces it with a GThreadPool.

Need to do a lot of functional and performance testing before making a 
change like this so I'll keep this in a separate series, but thought it 
might be interesting.

Regards,

Anthony Liguori

Comments

Edgar E. Iglesias Jan. 25, 2011, 6:51 a.m. UTC | #1
On Mon, Jan 24, 2011 at 06:24:13PM -0600, Anthony Liguori wrote:
> On 01/24/2011 03:00 PM, Anthony Liguori wrote:
> > Both the recent I/O loop and threadlet series have me concerned that we're
> > digging ourselves deeper into the NIH hole.  I think it's time we look at
> > something radical to let us borrow more code from existing projects instead of
> > reinventing everything through trial and error.
> >
> > This series introduces a hard dependency on glib.  The initial use is portable
> > threads but I see this as just the beginning.  Glib/Gobject offer many nice
> > things including:
> >
> >   - portable threads
> >   - rich data structure support
> >   - INI parser
> >   - JSON parser
> >   - generic type system
> >   - object oriented infrastructure
> >   - IO library
> >   - module system
> >   - introspection to enable support for dynamic language bindings
> >
> > I see this series as the first step, followed by converting the I/O loop to
> > a GMainLoop instance.  Once we're there, we can start making deeper use of
> > GObjects including converting QDev to a GObject hierarchy.
> >
> > I've spent the past few months working on C++ integration for QEMU.  I'm more
> > convinced than ever that we desperately in need of structured object oriented
> > mechanisms to be successful but am pretty strongly convinced that incremental
> > additional of C++ is not going to be successful.
> >
> > On the other hand, while GObjects are uglier and require a lot of template code,
> > there's more than enough structure that I think it can guide us into a much
> > better object model implementation.
> >
> > There is some ugliness.   GLib does not abstract signals because they're very
> > non-portable but QEMU makes extensive use of signaling.  I don't think it's
> > a major issue but some of the ugliness in this series is due to that fact.
> >
> > This series is only lightly tested but also mostly mechanical.  I'm pretty
> > confused by the way tcg_halt_cond and friends works so I'm fairly sure I broke
> > that (for non-threaded TCG).
> >    
> 
> Just to share where this is going, attached patch removes the posix-aio 
> thread pool and replaces it with a GThreadPool.
> 
> Need to do a lot of functional and performance testing before making a 
> change like this so I'll keep this in a separate series, but thought it 
> might be interesting.
> 
> Regards,
> 
> Anthony Liguori
> 

> From 5fdc51b2aac307c0219e1489b80bc18e9a3db0d1 Mon Sep 17 00:00:00 2001
> From: Anthony Liguori <aliguori@us.ibm.com>
> Date: Mon, 24 Jan 2011 18:19:08 -0600
> Subject: [PATCH 8/7] posix-aio: convert to glib based thread pool
> 
> This removes the custom pthread based thread pool in favor of a GThreadPool.
> I believe this patch implements all of the necessary functionality but it needs
> quite a lot more testing and performance analysis.
> 
> One thing I'm sure will break--we used to deliver a signal on every I/O
> completion.  This just slows down the I/O path.  The reason we did this was
> because at the time, I believe Cris depended on that signal to break out of
> QEMU because it did I/O without a periodic timer installed.

Hi Anthony,

I have no memory of any such issues. Anyway, if you've got a tree a can
clone, I'll be happy to give it a go and let you know if CRIS works ok.
There's also a bootable CRIS linux guest image on the wiki's download
page if you wan't to try yourself.

Cheers
diff mbox

Patch

From 5fdc51b2aac307c0219e1489b80bc18e9a3db0d1 Mon Sep 17 00:00:00 2001
From: Anthony Liguori <aliguori@us.ibm.com>
Date: Mon, 24 Jan 2011 18:19:08 -0600
Subject: [PATCH 8/7] posix-aio: convert to glib based thread pool

This removes the custom pthread based thread pool in favor of a GThreadPool.
I believe this patch implements all of the necessary functionality but it needs
quite a lot more testing and performance analysis.

One thing I'm sure will break--we used to deliver a signal on every I/O
completion.  This just slows down the I/O path.  The reason we did this was
because at the time, I believe Cris depended on that signal to break out of
QEMU because it did I/O without a periodic timer installed.

At this point in time, I think any architecture that requires signals needs to
do so with a periodic timer or some other mechanism.

Signed-off-by: Anthony Liguori <aliguori@us.ibm.com>

diff --git a/configure b/configure
index 820fde9..1af6b44 100755
--- a/configure
+++ b/configure
@@ -1663,6 +1663,7 @@  if $pkg_config --modversion gthread-2.0 > /dev/null 2>&1 ; then
     glib_cflags=`$pkg_config --cflags gthread-2.0 2>/dev/null`
     glib_libs=`$pkg_config --libs gthread-2.0 2>/dev/null`
     libs_softmmu="$glib_libs $libs_softmmu"
+    libs_tools="$glib_libs $libs_tools"
 else
     echo "glib-2.0 required to compile QEMU"
     exit 1
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index fa5494d..4d65396 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -12,27 +12,24 @@ 
  */
 
 #include <sys/ioctl.h>
-#include <sys/types.h>
-#include <pthread.h>
-#include <unistd.h>
-#include <errno.h>
-#include <time.h>
-#include <signal.h>
-#include <string.h>
-#include <stdlib.h>
-#include <stdio.h>
-
-#include "qemu-queue.h"
+
 #include "osdep.h"
 #include "sysemu.h"
 #include "qemu-common.h"
 #include "trace.h"
 #include "block_int.h"
+#include "qemu-thread.h"
 
 #include "block/raw-posix-aio.h"
 
+typedef enum AioState {
+    INACTIVE,
+    CANCELLED,
+    ACTIVE,
+    COMPLETED
+} AioState;
 
-struct qemu_paiocb {
+typedef struct AioAiocb {
     BlockDriverAIOCB common;
     int aio_fildes;
     union {
@@ -40,34 +37,29 @@  struct qemu_paiocb {
         void *aio_ioctl_buf;
     };
     int aio_niov;
-    size_t aio_nbytes;
-#define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
-    int ev_signo;
+    union {
+        size_t aio_nbytes;
+        long aio_ioctl_cmd;
+    };
     off_t aio_offset;
-
-    QTAILQ_ENTRY(qemu_paiocb) node;
     int aio_type;
     ssize_t ret;
-    int active;
-    struct qemu_paiocb *next;
-
     int async_context_id;
-};
 
-typedef struct PosixAioState {
+    /* This state can only be set/get when the aio pool lock is held */
+    AioState state;
+} AioAiocb;
+
+typedef struct AioPool {
+    GThreadPool *pool;
     int rfd, wfd;
-    struct qemu_paiocb *first_aio;
-} PosixAioState;
+    GList *requests;
 
+    /* If this turns out to be contended, push to a per-request lock */
+    GMutex *lock;
+} AioPool;
 
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
+static AioPool aio_pool;
 
 #ifdef CONFIG_PREADV
 static int preadv_present = 1;
@@ -75,51 +67,7 @@  static int preadv_present = 1;
 static int preadv_present = 0;
 #endif
 
-static void die2(int err, const char *what)
-{
-    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
-    abort();
-}
-
-static void die(const char *what)
-{
-    die2(errno, what);
-}
-
-static void mutex_lock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_lock(mutex);
-    if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_unlock(mutex);
-    if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
-                           struct timespec *ts)
-{
-    int ret = pthread_cond_timedwait(cond, mutex, ts);
-    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
-    return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
-    int ret = pthread_cond_signal(cond);
-    if (ret) die2(ret, "pthread_cond_signal");
-}
-
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
-                          void *(*start_routine)(void*), void *arg)
-{
-    int ret = pthread_create(thread, attr, start_routine, arg);
-    if (ret) die2(ret, "pthread_create");
-}
-
-static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
+static ssize_t handle_aiocb_ioctl(AioAiocb *aiocb)
 {
     int ret;
 
@@ -138,7 +86,7 @@  static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
     return aiocb->aio_nbytes;
 }
 
-static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
+static ssize_t handle_aiocb_flush(AioAiocb *aiocb)
 {
     int ret;
 
@@ -178,7 +126,7 @@  qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
 
 #endif
 
-static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
+static ssize_t handle_aiocb_rw_vector(AioAiocb *aiocb)
 {
     size_t offset = 0;
     ssize_t len;
@@ -201,7 +149,7 @@  static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
     return len;
 }
 
-static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
+static ssize_t handle_aiocb_rw_linear(AioAiocb *aiocb, char *buf)
 {
     ssize_t offset = 0;
     ssize_t len;
@@ -232,7 +180,7 @@  static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
     return offset;
 }
 
-static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
+static ssize_t handle_aiocb_rw(AioAiocb *aiocb)
 {
     ssize_t nbytes;
     char *buf;
@@ -302,278 +250,94 @@  static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
     return nbytes;
 }
 
-static void *aio_thread(void *unused)
+static void aio_routine(gpointer data, gpointer user_data)
 {
-    pid_t pid;
-
-    pid = getpid();
-
-    while (1) {
-        struct qemu_paiocb *aiocb;
-        ssize_t ret = 0;
-        qemu_timeval tv;
-        struct timespec ts;
-
-        qemu_gettimeofday(&tv);
-        ts.tv_sec = tv.tv_sec + 10;
-        ts.tv_nsec = 0;
-
-        mutex_lock(&lock);
-
-        while (QTAILQ_EMPTY(&request_list) &&
-               !(ret == ETIMEDOUT)) {
-            ret = cond_timedwait(&cond, &lock, &ts);
-        }
-
-        if (QTAILQ_EMPTY(&request_list))
-            break;
-
-        aiocb = QTAILQ_FIRST(&request_list);
-        QTAILQ_REMOVE(&request_list, aiocb, node);
-        aiocb->active = 1;
-        idle_threads--;
-        mutex_unlock(&lock);
-
-        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
-        case QEMU_AIO_READ:
-        case QEMU_AIO_WRITE:
-            ret = handle_aiocb_rw(aiocb);
-            break;
-        case QEMU_AIO_FLUSH:
-            ret = handle_aiocb_flush(aiocb);
-            break;
-        case QEMU_AIO_IOCTL:
-            ret = handle_aiocb_ioctl(aiocb);
-            break;
-        default:
-            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
-            ret = -EINVAL;
-            break;
-        }
-
-        mutex_lock(&lock);
-        aiocb->ret = ret;
-        idle_threads++;
-        mutex_unlock(&lock);
+    AioPool *s = user_data;
+    AioAiocb *aiocb = data;
+    ssize_t ret = 0;
+    char ch = 0;
+    AioState state;
+    ssize_t len;
 
-        if (kill(pid, aiocb->ev_signo)) die("kill failed");
+    g_mutex_lock(s->lock);
+    if (aiocb->state != CANCELLED) {
+        aiocb->state = ACTIVE;
     }
-
-    idle_threads--;
-    cur_threads--;
-    mutex_unlock(&lock);
-
-    return NULL;
-}
-
-static void spawn_thread(void)
-{
-    sigset_t set, oldset;
-
-    cur_threads++;
-    idle_threads++;
-
-    /* block all signals */
-    if (sigfillset(&set)) die("sigfillset");
-    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
-    thread_create(&thread_id, &attr, aio_thread, NULL);
-
-    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
-}
-
-static void qemu_paio_submit(struct qemu_paiocb *aiocb)
-{
-    aiocb->ret = -EINPROGRESS;
-    aiocb->active = 0;
-    mutex_lock(&lock);
-    if (idle_threads == 0 && cur_threads < max_threads)
-        spawn_thread();
-    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
-    mutex_unlock(&lock);
-    cond_signal(&cond);
-}
-
-static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
-{
-    ssize_t ret;
-
-    mutex_lock(&lock);
-    ret = aiocb->ret;
-    mutex_unlock(&lock);
-
-    return ret;
-}
-
-static int qemu_paio_error(struct qemu_paiocb *aiocb)
-{
-    ssize_t ret = qemu_paio_return(aiocb);
-
-    if (ret < 0)
-        ret = -ret;
-    else
-        ret = 0;
-
-    return ret;
-}
-
-static int posix_aio_process_queue(void *opaque)
-{
-    PosixAioState *s = opaque;
-    struct qemu_paiocb *acb, **pacb;
-    int ret;
-    int result = 0;
-    int async_context_id = get_async_context_id();
-
-    for(;;) {
-        pacb = &s->first_aio;
-        for(;;) {
-            acb = *pacb;
-            if (!acb)
-                return result;
-
-            /* we're only interested in requests in the right context */
-            if (acb->async_context_id != async_context_id) {
-                pacb = &acb->next;
-                continue;
-            }
-
-            ret = qemu_paio_error(acb);
-            if (ret == ECANCELED) {
-                /* remove the request */
-                *pacb = acb->next;
-                qemu_aio_release(acb);
-                result = 1;
-            } else if (ret != EINPROGRESS) {
-                /* end of aio */
-                if (ret == 0) {
-                    ret = qemu_paio_return(acb);
-                    if (ret == acb->aio_nbytes)
-                        ret = 0;
-                    else
-                        ret = -EINVAL;
-                } else {
-                    ret = -ret;
-                }
-                /* remove the request */
-                *pacb = acb->next;
-                /* call the callback */
-                acb->common.cb(acb->common.opaque, ret);
-                qemu_aio_release(acb);
-                result = 1;
-                break;
-            } else {
-                pacb = &acb->next;
-            }
-        }
+    state = aiocb->state;
+    g_mutex_unlock(s->lock);
+        
+    if (state == CANCELLED) {
+        return;
     }
 
-    return result;
-}
-
-static void posix_aio_read(void *opaque)
-{
-    PosixAioState *s = opaque;
-    ssize_t len;
-
-    /* read all bytes from signal pipe */
-    for (;;) {
-        char bytes[16];
-
-        len = read(s->rfd, bytes, sizeof(bytes));
-        if (len == -1 && errno == EINTR)
-            continue; /* try again */
-        if (len == sizeof(bytes))
-            continue; /* more to read */
+    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+    case QEMU_AIO_READ:
+    case QEMU_AIO_WRITE:
+        ret = handle_aiocb_rw(aiocb);
+        break;
+    case QEMU_AIO_FLUSH:
+        ret = handle_aiocb_flush(aiocb);
+        break;
+    case QEMU_AIO_IOCTL:
+        ret = handle_aiocb_ioctl(aiocb);
+        break;
+    default:
+        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+        ret = -EINVAL;
         break;
     }
 
-    posix_aio_process_queue(s);
-}
-
-static int posix_aio_flush(void *opaque)
-{
-    PosixAioState *s = opaque;
-    return !!s->first_aio;
-}
-
-static PosixAioState *posix_aio_state;
+    aiocb->ret = ret;
+    g_mutex_lock(s->lock);
+    aiocb->state = COMPLETED;
+    g_mutex_unlock(s->lock);
 
-static void aio_signal_handler(int signum)
-{
-    if (posix_aio_state) {
-        char byte = 0;
-        ssize_t ret;
-
-        ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
-        if (ret < 0 && errno != EAGAIN)
-            die("write()");
-    }
+    do {
+        len = write(s->wfd, &ch, sizeof(ch));
+    } while (len == -1 && errno == EINTR);
 
-    qemu_service_io();
+    return;
 }
 
-static void paio_remove(struct qemu_paiocb *acb)
+
+static void qemu_paio_submit(AioAiocb *aiocb)
 {
-    struct qemu_paiocb **pacb;
-
-    /* remove the callback from the queue */
-    pacb = &posix_aio_state->first_aio;
-    for(;;) {
-        if (*pacb == NULL) {
-            fprintf(stderr, "paio_remove: aio request not found!\n");
-            break;
-        } else if (*pacb == acb) {
-            *pacb = acb->next;
-            qemu_aio_release(acb);
-            break;
-        }
-        pacb = &(*pacb)->next;
-    }
+    AioPool *s = &aio_pool;
+    aiocb->state = INACTIVE;
+    aiocb->async_context_id = get_async_context_id();
+    s->requests = g_list_append(s->requests, aiocb);
+    g_thread_pool_push(s->pool, aiocb, NULL);
 }
 
-static void paio_cancel(BlockDriverAIOCB *blockacb)
+static void qemu_paio_cancel(BlockDriverAIOCB *acb)
 {
-    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
-    int active = 0;
-
-    mutex_lock(&lock);
-    if (!acb->active) {
-        QTAILQ_REMOVE(&request_list, acb, node);
-        acb->ret = -ECANCELED;
-    } else if (acb->ret == -EINPROGRESS) {
-        active = 1;
-    }
-    mutex_unlock(&lock);
+    AioAiocb *aiocb = container_of(acb, AioAiocb, common);
+    AioPool *s = &aio_pool;
 
-    if (active) {
-        /* fail safe: if the aio could not be canceled, we wait for
-           it */
-        while (qemu_paio_error(acb) == EINPROGRESS)
-            ;
+    g_mutex_lock(s->lock);
+    if (aiocb->state == INACTIVE) {
+        aiocb->state = CANCELLED;
     }
-
-    paio_remove(acb);
+    g_mutex_unlock(s->lock);
+    
 }
 
 static AIOPool raw_aio_pool = {
-    .aiocb_size         = sizeof(struct qemu_paiocb),
-    .cancel             = paio_cancel,
+    .aiocb_size         = sizeof(AioAiocb),
+    .cancel             = qemu_paio_cancel,
 };
 
 BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
         BlockDriverCompletionFunc *cb, void *opaque, int type)
 {
-    struct qemu_paiocb *acb;
+    AioAiocb *acb;
 
     acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
     if (!acb)
         return NULL;
     acb->aio_type = type;
     acb->aio_fildes = fd;
-    acb->ev_signo = SIGUSR2;
-    acb->async_context_id = get_async_context_id();
 
     if (qiov) {
         acb->aio_iov = qiov->iov;
@@ -582,9 +346,6 @@  BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
     acb->aio_nbytes = nb_sectors * 512;
     acb->aio_offset = sector_num * 512;
 
-    acb->next = posix_aio_state->first_aio;
-    posix_aio_state->first_aio = acb;
-
     trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
     qemu_paio_submit(acb);
     return &acb->common;
@@ -594,68 +355,114 @@  BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
         unsigned long int req, void *buf,
         BlockDriverCompletionFunc *cb, void *opaque)
 {
-    struct qemu_paiocb *acb;
+    AioAiocb *acb;
 
     acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
     if (!acb)
         return NULL;
     acb->aio_type = QEMU_AIO_IOCTL;
     acb->aio_fildes = fd;
-    acb->ev_signo = SIGUSR2;
-    acb->async_context_id = get_async_context_id();
     acb->aio_offset = 0;
     acb->aio_ioctl_buf = buf;
     acb->aio_ioctl_cmd = req;
 
-    acb->next = posix_aio_state->first_aio;
-    posix_aio_state->first_aio = acb;
-
     qemu_paio_submit(acb);
     return &acb->common;
 }
 
-int paio_init(void)
+static int paio_process_queue(void *opaque)
 {
-    struct sigaction act;
-    PosixAioState *s;
-    int fds[2];
-    int ret;
+    AioPool *s = opaque;
+    GList *i, *next_i;
+    GList *completed_requests = NULL;
+    int async_context_id = get_async_context_id();
+    int did_work = 0;
 
-    if (posix_aio_state)
-        return 0;
+    /* Search the list to build a list of completed requests, we do
+     * this as it's own pass so that we minimize the time we're holding
+     * the shared lock.
+     */
+    g_mutex_lock(s->lock);
+    for (i = s->requests; i != NULL; i = next_i) {
+        AioAiocb *aiocb = i->data;
+        next_i = g_list_next(i);
+
+        /* don't complete a request that isn't part of this async context */
+        if (aiocb->async_context_id != async_context_id) {
+            continue;
+        }
 
-    s = qemu_malloc(sizeof(PosixAioState));
+        if (aiocb->state == CANCELLED || aiocb->state == COMPLETED) {
+            s->requests = g_list_remove_link(s->requests, i);
+            completed_requests = g_list_concat(completed_requests, i);
+        }
+    }
+    g_mutex_unlock(s->lock);
+
+    /* Dispatch any completed requests */
+    for (i = completed_requests; i != NULL; i = g_list_next(i)) {
+        AioAiocb *aiocb = i->data;
+        if (aiocb->state == COMPLETED) {
+            if (aiocb->ret == aiocb->aio_nbytes) {
+                aiocb->ret = 0;
+            }
+            aiocb->common.cb(aiocb->common.opaque, aiocb->ret);
+            did_work = 1;
+        }
+        qemu_aio_release(aiocb);
+    }
 
-    sigfillset(&act.sa_mask);
-    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
-    act.sa_handler = aio_signal_handler;
-    sigaction(SIGUSR2, &act, NULL);
+    g_list_free(completed_requests);
+
+    return did_work;
+}
 
-    s->first_aio = NULL;
-    if (qemu_pipe(fds) == -1) {
-        fprintf(stderr, "failed to create pipe\n");
-        return -1;
+static int paio_io_flush(void *opaque)
+{
+    AioPool *s = opaque;
+    if (s->requests == NULL) {
+        return 0;
     }
+    return 1;
+}
 
-    s->rfd = fds[0];
-    s->wfd = fds[1];
+static void paio_complete(void *opaque)
+{
+    AioPool *s = opaque;
+    char buffer[1024];
+    ssize_t len;
 
-    fcntl(s->rfd, F_SETFL, O_NONBLOCK);
-    fcntl(s->wfd, F_SETFL, O_NONBLOCK);
+    /* Drain event queue */
+    do {
+        len = read(s->rfd, buffer, sizeof(buffer));
+    } while (len == -1 && errno == EINTR);
+
+    if (len == -1 && errno == EAGAIN) {
+        return;
+    }
+
+    paio_process_queue(s);
+}
 
-    qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
-        posix_aio_process_queue, s);
+int paio_init(void)
+{
+    AioPool *s = &aio_pool;
+    int fds[2];
 
-    ret = pthread_attr_init(&attr);
-    if (ret)
-        die2(ret, "pthread_attr_init");
+    if (pipe(fds) == -1) {
+        return -errno;
+    }
 
-    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-    if (ret)
-        die2(ret, "pthread_attr_setdetachstate");
+    s->pool = g_thread_pool_new(aio_routine, s, 64, FALSE, NULL);
+    s->rfd = fds[0];
+    s->wfd = fds[1];
+    s->requests = NULL;
+    s->lock = g_mutex_new();
 
-    QTAILQ_INIT(&request_list);
+    fcntl(s->wfd, F_SETFL, O_NONBLOCK);
+    fcntl(s->rfd, F_SETFL, O_NONBLOCK);
+    qemu_aio_set_fd_handler(s->rfd, paio_complete, NULL,
+                            paio_io_flush, paio_process_queue, s);
 
-    posix_aio_state = s;
     return 0;
 }
diff --git a/qemu-tool.c b/qemu-tool.c
index 392e1c9..ca9c711 100644
--- a/qemu-tool.c
+++ b/qemu-tool.c
@@ -111,3 +111,11 @@  int qemu_set_fd_handler2(int fd,
 {
     return 0;
 }
+
+int qemu_set_fd_handler(int fd,
+                        IOHandler *fd_read,
+                        IOHandler *fd_write,
+                        void *opaque)
+{
+    return 0;
+}
-- 
1.7.0.4