From patchwork Tue Jan 25 00:24:13 2011 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anthony Liguori X-Patchwork-Id: 80296 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from lists.gnu.org (lists.gnu.org [199.232.76.165]) (using TLSv1 with cipher DHE-RSA-AES256-SHA (256/256 bits)) (Client did not present a certificate) by ozlabs.org (Postfix) with ESMTPS id 51D94B713D for ; Tue, 25 Jan 2011 11:28:06 +1100 (EST) Received: from localhost ([127.0.0.1]:55704 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.43) id 1PhWjN-0004kp-PK for incoming@patchwork.ozlabs.org; Mon, 24 Jan 2011 19:25:57 -0500 Received: from [140.186.70.92] (port=48033 helo=eggs.gnu.org) by lists.gnu.org with esmtp (Exim 4.43) id 1PhWhz-0003vS-JZ for qemu-devel@nongnu.org; Mon, 24 Jan 2011 19:24:37 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1PhWhx-0001GS-G7 for qemu-devel@nongnu.org; Mon, 24 Jan 2011 19:24:31 -0500 Received: from mail-qw0-f45.google.com ([209.85.216.45]:49267) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1PhWhx-0001GM-9c for qemu-devel@nongnu.org; Mon, 24 Jan 2011 19:24:29 -0500 Received: by qwk4 with SMTP id 4so4697819qwk.4 for ; Mon, 24 Jan 2011 16:24:28 -0800 (PST) Received: by 10.224.6.83 with SMTP id 19mr4705294qay.36.1295915068368; Mon, 24 Jan 2011 16:24:28 -0800 (PST) Received: from [192.168.0.101] (cpe-70-123-132-139.austin.res.rr.com [70.123.132.139]) by mx.google.com with ESMTPS id t7sm9802196qcs.40.2011.01.24.16.24.27 (version=TLSv1/SSLv3 cipher=RC4-MD5); Mon, 24 Jan 2011 16:24:27 -0800 (PST) Message-ID: <4D3E182D.7080308@codemonkey.ws> Date: Mon, 24 Jan 2011 18:24:13 -0600 From: Anthony Liguori User-Agent: Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.1.15) Gecko/20101027 Lightning/1.0b1 Thunderbird/3.0.10 MIME-Version: 1.0 Subject: Re: [Qemu-devel] [RFC 0/7] Introduce hard dependency on glib References: <1295902845-29807-1-git-send-email-aliguori@us.ibm.com> In-Reply-To: <1295902845-29807-1-git-send-email-aliguori@us.ibm.com> X-detected-operating-system: by eggs.gnu.org: GNU/Linux 2.6 (newer, 2) Cc: Stefan Hajnoczi , Marcelo Tosatti , qemu-devel@nongnu.org, Paul Brook , Paulo Bonzini , Arun Bharadwaj X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.5 Precedence: list List-Id: qemu-devel.nongnu.org List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Sender: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org On 01/24/2011 03:00 PM, Anthony Liguori wrote: > Both the recent I/O loop and threadlet series have me concerned that we're > digging ourselves deeper into the NIH hole. I think it's time we look at > something radical to let us borrow more code from existing projects instead of > reinventing everything through trial and error. > > This series introduces a hard dependency on glib. The initial use is portable > threads but I see this as just the beginning. Glib/Gobject offer many nice > things including: > > - portable threads > - rich data structure support > - INI parser > - JSON parser > - generic type system > - object oriented infrastructure > - IO library > - module system > - introspection to enable support for dynamic language bindings > > I see this series as the first step, followed by converting the I/O loop to > a GMainLoop instance. Once we're there, we can start making deeper use of > GObjects including converting QDev to a GObject hierarchy. > > I've spent the past few months working on C++ integration for QEMU. I'm more > convinced than ever that we desperately in need of structured object oriented > mechanisms to be successful but am pretty strongly convinced that incremental > additional of C++ is not going to be successful. > > On the other hand, while GObjects are uglier and require a lot of template code, > there's more than enough structure that I think it can guide us into a much > better object model implementation. > > There is some ugliness. GLib does not abstract signals because they're very > non-portable but QEMU makes extensive use of signaling. I don't think it's > a major issue but some of the ugliness in this series is due to that fact. > > This series is only lightly tested but also mostly mechanical. I'm pretty > confused by the way tcg_halt_cond and friends works so I'm fairly sure I broke > that (for non-threaded TCG). > Just to share where this is going, attached patch removes the posix-aio thread pool and replaces it with a GThreadPool. Need to do a lot of functional and performance testing before making a change like this so I'll keep this in a separate series, but thought it might be interesting. Regards, Anthony Liguori From 5fdc51b2aac307c0219e1489b80bc18e9a3db0d1 Mon Sep 17 00:00:00 2001 From: Anthony Liguori Date: Mon, 24 Jan 2011 18:19:08 -0600 Subject: [PATCH 8/7] posix-aio: convert to glib based thread pool This removes the custom pthread based thread pool in favor of a GThreadPool. I believe this patch implements all of the necessary functionality but it needs quite a lot more testing and performance analysis. One thing I'm sure will break--we used to deliver a signal on every I/O completion. This just slows down the I/O path. The reason we did this was because at the time, I believe Cris depended on that signal to break out of QEMU because it did I/O without a periodic timer installed. At this point in time, I think any architecture that requires signals needs to do so with a periodic timer or some other mechanism. Signed-off-by: Anthony Liguori diff --git a/configure b/configure index 820fde9..1af6b44 100755 --- a/configure +++ b/configure @@ -1663,6 +1663,7 @@ if $pkg_config --modversion gthread-2.0 > /dev/null 2>&1 ; then glib_cflags=`$pkg_config --cflags gthread-2.0 2>/dev/null` glib_libs=`$pkg_config --libs gthread-2.0 2>/dev/null` libs_softmmu="$glib_libs $libs_softmmu" + libs_tools="$glib_libs $libs_tools" else echo "glib-2.0 required to compile QEMU" exit 1 diff --git a/posix-aio-compat.c b/posix-aio-compat.c index fa5494d..4d65396 100644 --- a/posix-aio-compat.c +++ b/posix-aio-compat.c @@ -12,27 +12,24 @@ */ #include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "qemu-queue.h" + #include "osdep.h" #include "sysemu.h" #include "qemu-common.h" #include "trace.h" #include "block_int.h" +#include "qemu-thread.h" #include "block/raw-posix-aio.h" +typedef enum AioState { + INACTIVE, + CANCELLED, + ACTIVE, + COMPLETED +} AioState; -struct qemu_paiocb { +typedef struct AioAiocb { BlockDriverAIOCB common; int aio_fildes; union { @@ -40,34 +37,29 @@ struct qemu_paiocb { void *aio_ioctl_buf; }; int aio_niov; - size_t aio_nbytes; -#define aio_ioctl_cmd aio_nbytes /* for QEMU_AIO_IOCTL */ - int ev_signo; + union { + size_t aio_nbytes; + long aio_ioctl_cmd; + }; off_t aio_offset; - - QTAILQ_ENTRY(qemu_paiocb) node; int aio_type; ssize_t ret; - int active; - struct qemu_paiocb *next; - int async_context_id; -}; -typedef struct PosixAioState { + /* This state can only be set/get when the aio pool lock is held */ + AioState state; +} AioAiocb; + +typedef struct AioPool { + GThreadPool *pool; int rfd, wfd; - struct qemu_paiocb *first_aio; -} PosixAioState; + GList *requests; + /* If this turns out to be contended, push to a per-request lock */ + GMutex *lock; +} AioPool; -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; -static pthread_t thread_id; -static pthread_attr_t attr; -static int max_threads = 64; -static int cur_threads = 0; -static int idle_threads = 0; -static QTAILQ_HEAD(, qemu_paiocb) request_list; +static AioPool aio_pool; #ifdef CONFIG_PREADV static int preadv_present = 1; @@ -75,51 +67,7 @@ static int preadv_present = 1; static int preadv_present = 0; #endif -static void die2(int err, const char *what) -{ - fprintf(stderr, "%s failed: %s\n", what, strerror(err)); - abort(); -} - -static void die(const char *what) -{ - die2(errno, what); -} - -static void mutex_lock(pthread_mutex_t *mutex) -{ - int ret = pthread_mutex_lock(mutex); - if (ret) die2(ret, "pthread_mutex_lock"); -} - -static void mutex_unlock(pthread_mutex_t *mutex) -{ - int ret = pthread_mutex_unlock(mutex); - if (ret) die2(ret, "pthread_mutex_unlock"); -} - -static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, - struct timespec *ts) -{ - int ret = pthread_cond_timedwait(cond, mutex, ts); - if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait"); - return ret; -} - -static void cond_signal(pthread_cond_t *cond) -{ - int ret = pthread_cond_signal(cond); - if (ret) die2(ret, "pthread_cond_signal"); -} - -static void thread_create(pthread_t *thread, pthread_attr_t *attr, - void *(*start_routine)(void*), void *arg) -{ - int ret = pthread_create(thread, attr, start_routine, arg); - if (ret) die2(ret, "pthread_create"); -} - -static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb) +static ssize_t handle_aiocb_ioctl(AioAiocb *aiocb) { int ret; @@ -138,7 +86,7 @@ static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb) return aiocb->aio_nbytes; } -static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb) +static ssize_t handle_aiocb_flush(AioAiocb *aiocb) { int ret; @@ -178,7 +126,7 @@ qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset) #endif -static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb) +static ssize_t handle_aiocb_rw_vector(AioAiocb *aiocb) { size_t offset = 0; ssize_t len; @@ -201,7 +149,7 @@ static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb) return len; } -static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf) +static ssize_t handle_aiocb_rw_linear(AioAiocb *aiocb, char *buf) { ssize_t offset = 0; ssize_t len; @@ -232,7 +180,7 @@ static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf) return offset; } -static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb) +static ssize_t handle_aiocb_rw(AioAiocb *aiocb) { ssize_t nbytes; char *buf; @@ -302,278 +250,94 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb) return nbytes; } -static void *aio_thread(void *unused) +static void aio_routine(gpointer data, gpointer user_data) { - pid_t pid; - - pid = getpid(); - - while (1) { - struct qemu_paiocb *aiocb; - ssize_t ret = 0; - qemu_timeval tv; - struct timespec ts; - - qemu_gettimeofday(&tv); - ts.tv_sec = tv.tv_sec + 10; - ts.tv_nsec = 0; - - mutex_lock(&lock); - - while (QTAILQ_EMPTY(&request_list) && - !(ret == ETIMEDOUT)) { - ret = cond_timedwait(&cond, &lock, &ts); - } - - if (QTAILQ_EMPTY(&request_list)) - break; - - aiocb = QTAILQ_FIRST(&request_list); - QTAILQ_REMOVE(&request_list, aiocb, node); - aiocb->active = 1; - idle_threads--; - mutex_unlock(&lock); - - switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) { - case QEMU_AIO_READ: - case QEMU_AIO_WRITE: - ret = handle_aiocb_rw(aiocb); - break; - case QEMU_AIO_FLUSH: - ret = handle_aiocb_flush(aiocb); - break; - case QEMU_AIO_IOCTL: - ret = handle_aiocb_ioctl(aiocb); - break; - default: - fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type); - ret = -EINVAL; - break; - } - - mutex_lock(&lock); - aiocb->ret = ret; - idle_threads++; - mutex_unlock(&lock); + AioPool *s = user_data; + AioAiocb *aiocb = data; + ssize_t ret = 0; + char ch = 0; + AioState state; + ssize_t len; - if (kill(pid, aiocb->ev_signo)) die("kill failed"); + g_mutex_lock(s->lock); + if (aiocb->state != CANCELLED) { + aiocb->state = ACTIVE; } - - idle_threads--; - cur_threads--; - mutex_unlock(&lock); - - return NULL; -} - -static void spawn_thread(void) -{ - sigset_t set, oldset; - - cur_threads++; - idle_threads++; - - /* block all signals */ - if (sigfillset(&set)) die("sigfillset"); - if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask"); - - thread_create(&thread_id, &attr, aio_thread, NULL); - - if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore"); -} - -static void qemu_paio_submit(struct qemu_paiocb *aiocb) -{ - aiocb->ret = -EINPROGRESS; - aiocb->active = 0; - mutex_lock(&lock); - if (idle_threads == 0 && cur_threads < max_threads) - spawn_thread(); - QTAILQ_INSERT_TAIL(&request_list, aiocb, node); - mutex_unlock(&lock); - cond_signal(&cond); -} - -static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb) -{ - ssize_t ret; - - mutex_lock(&lock); - ret = aiocb->ret; - mutex_unlock(&lock); - - return ret; -} - -static int qemu_paio_error(struct qemu_paiocb *aiocb) -{ - ssize_t ret = qemu_paio_return(aiocb); - - if (ret < 0) - ret = -ret; - else - ret = 0; - - return ret; -} - -static int posix_aio_process_queue(void *opaque) -{ - PosixAioState *s = opaque; - struct qemu_paiocb *acb, **pacb; - int ret; - int result = 0; - int async_context_id = get_async_context_id(); - - for(;;) { - pacb = &s->first_aio; - for(;;) { - acb = *pacb; - if (!acb) - return result; - - /* we're only interested in requests in the right context */ - if (acb->async_context_id != async_context_id) { - pacb = &acb->next; - continue; - } - - ret = qemu_paio_error(acb); - if (ret == ECANCELED) { - /* remove the request */ - *pacb = acb->next; - qemu_aio_release(acb); - result = 1; - } else if (ret != EINPROGRESS) { - /* end of aio */ - if (ret == 0) { - ret = qemu_paio_return(acb); - if (ret == acb->aio_nbytes) - ret = 0; - else - ret = -EINVAL; - } else { - ret = -ret; - } - /* remove the request */ - *pacb = acb->next; - /* call the callback */ - acb->common.cb(acb->common.opaque, ret); - qemu_aio_release(acb); - result = 1; - break; - } else { - pacb = &acb->next; - } - } + state = aiocb->state; + g_mutex_unlock(s->lock); + + if (state == CANCELLED) { + return; } - return result; -} - -static void posix_aio_read(void *opaque) -{ - PosixAioState *s = opaque; - ssize_t len; - - /* read all bytes from signal pipe */ - for (;;) { - char bytes[16]; - - len = read(s->rfd, bytes, sizeof(bytes)); - if (len == -1 && errno == EINTR) - continue; /* try again */ - if (len == sizeof(bytes)) - continue; /* more to read */ + switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) { + case QEMU_AIO_READ: + case QEMU_AIO_WRITE: + ret = handle_aiocb_rw(aiocb); + break; + case QEMU_AIO_FLUSH: + ret = handle_aiocb_flush(aiocb); + break; + case QEMU_AIO_IOCTL: + ret = handle_aiocb_ioctl(aiocb); + break; + default: + fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type); + ret = -EINVAL; break; } - posix_aio_process_queue(s); -} - -static int posix_aio_flush(void *opaque) -{ - PosixAioState *s = opaque; - return !!s->first_aio; -} - -static PosixAioState *posix_aio_state; + aiocb->ret = ret; + g_mutex_lock(s->lock); + aiocb->state = COMPLETED; + g_mutex_unlock(s->lock); -static void aio_signal_handler(int signum) -{ - if (posix_aio_state) { - char byte = 0; - ssize_t ret; - - ret = write(posix_aio_state->wfd, &byte, sizeof(byte)); - if (ret < 0 && errno != EAGAIN) - die("write()"); - } + do { + len = write(s->wfd, &ch, sizeof(ch)); + } while (len == -1 && errno == EINTR); - qemu_service_io(); + return; } -static void paio_remove(struct qemu_paiocb *acb) + +static void qemu_paio_submit(AioAiocb *aiocb) { - struct qemu_paiocb **pacb; - - /* remove the callback from the queue */ - pacb = &posix_aio_state->first_aio; - for(;;) { - if (*pacb == NULL) { - fprintf(stderr, "paio_remove: aio request not found!\n"); - break; - } else if (*pacb == acb) { - *pacb = acb->next; - qemu_aio_release(acb); - break; - } - pacb = &(*pacb)->next; - } + AioPool *s = &aio_pool; + aiocb->state = INACTIVE; + aiocb->async_context_id = get_async_context_id(); + s->requests = g_list_append(s->requests, aiocb); + g_thread_pool_push(s->pool, aiocb, NULL); } -static void paio_cancel(BlockDriverAIOCB *blockacb) +static void qemu_paio_cancel(BlockDriverAIOCB *acb) { - struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb; - int active = 0; - - mutex_lock(&lock); - if (!acb->active) { - QTAILQ_REMOVE(&request_list, acb, node); - acb->ret = -ECANCELED; - } else if (acb->ret == -EINPROGRESS) { - active = 1; - } - mutex_unlock(&lock); + AioAiocb *aiocb = container_of(acb, AioAiocb, common); + AioPool *s = &aio_pool; - if (active) { - /* fail safe: if the aio could not be canceled, we wait for - it */ - while (qemu_paio_error(acb) == EINPROGRESS) - ; + g_mutex_lock(s->lock); + if (aiocb->state == INACTIVE) { + aiocb->state = CANCELLED; } - - paio_remove(acb); + g_mutex_unlock(s->lock); + } static AIOPool raw_aio_pool = { - .aiocb_size = sizeof(struct qemu_paiocb), - .cancel = paio_cancel, + .aiocb_size = sizeof(AioAiocb), + .cancel = qemu_paio_cancel, }; BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd, int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, BlockDriverCompletionFunc *cb, void *opaque, int type) { - struct qemu_paiocb *acb; + AioAiocb *acb; acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque); if (!acb) return NULL; acb->aio_type = type; acb->aio_fildes = fd; - acb->ev_signo = SIGUSR2; - acb->async_context_id = get_async_context_id(); if (qiov) { acb->aio_iov = qiov->iov; @@ -582,9 +346,6 @@ BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd, acb->aio_nbytes = nb_sectors * 512; acb->aio_offset = sector_num * 512; - acb->next = posix_aio_state->first_aio; - posix_aio_state->first_aio = acb; - trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); qemu_paio_submit(acb); return &acb->common; @@ -594,68 +355,114 @@ BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd, unsigned long int req, void *buf, BlockDriverCompletionFunc *cb, void *opaque) { - struct qemu_paiocb *acb; + AioAiocb *acb; acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque); if (!acb) return NULL; acb->aio_type = QEMU_AIO_IOCTL; acb->aio_fildes = fd; - acb->ev_signo = SIGUSR2; - acb->async_context_id = get_async_context_id(); acb->aio_offset = 0; acb->aio_ioctl_buf = buf; acb->aio_ioctl_cmd = req; - acb->next = posix_aio_state->first_aio; - posix_aio_state->first_aio = acb; - qemu_paio_submit(acb); return &acb->common; } -int paio_init(void) +static int paio_process_queue(void *opaque) { - struct sigaction act; - PosixAioState *s; - int fds[2]; - int ret; + AioPool *s = opaque; + GList *i, *next_i; + GList *completed_requests = NULL; + int async_context_id = get_async_context_id(); + int did_work = 0; - if (posix_aio_state) - return 0; + /* Search the list to build a list of completed requests, we do + * this as it's own pass so that we minimize the time we're holding + * the shared lock. + */ + g_mutex_lock(s->lock); + for (i = s->requests; i != NULL; i = next_i) { + AioAiocb *aiocb = i->data; + next_i = g_list_next(i); + + /* don't complete a request that isn't part of this async context */ + if (aiocb->async_context_id != async_context_id) { + continue; + } - s = qemu_malloc(sizeof(PosixAioState)); + if (aiocb->state == CANCELLED || aiocb->state == COMPLETED) { + s->requests = g_list_remove_link(s->requests, i); + completed_requests = g_list_concat(completed_requests, i); + } + } + g_mutex_unlock(s->lock); + + /* Dispatch any completed requests */ + for (i = completed_requests; i != NULL; i = g_list_next(i)) { + AioAiocb *aiocb = i->data; + if (aiocb->state == COMPLETED) { + if (aiocb->ret == aiocb->aio_nbytes) { + aiocb->ret = 0; + } + aiocb->common.cb(aiocb->common.opaque, aiocb->ret); + did_work = 1; + } + qemu_aio_release(aiocb); + } - sigfillset(&act.sa_mask); - act.sa_flags = 0; /* do not restart syscalls to interrupt select() */ - act.sa_handler = aio_signal_handler; - sigaction(SIGUSR2, &act, NULL); + g_list_free(completed_requests); + + return did_work; +} - s->first_aio = NULL; - if (qemu_pipe(fds) == -1) { - fprintf(stderr, "failed to create pipe\n"); - return -1; +static int paio_io_flush(void *opaque) +{ + AioPool *s = opaque; + if (s->requests == NULL) { + return 0; } + return 1; +} - s->rfd = fds[0]; - s->wfd = fds[1]; +static void paio_complete(void *opaque) +{ + AioPool *s = opaque; + char buffer[1024]; + ssize_t len; - fcntl(s->rfd, F_SETFL, O_NONBLOCK); - fcntl(s->wfd, F_SETFL, O_NONBLOCK); + /* Drain event queue */ + do { + len = read(s->rfd, buffer, sizeof(buffer)); + } while (len == -1 && errno == EINTR); + + if (len == -1 && errno == EAGAIN) { + return; + } + + paio_process_queue(s); +} - qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, - posix_aio_process_queue, s); +int paio_init(void) +{ + AioPool *s = &aio_pool; + int fds[2]; - ret = pthread_attr_init(&attr); - if (ret) - die2(ret, "pthread_attr_init"); + if (pipe(fds) == -1) { + return -errno; + } - ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - if (ret) - die2(ret, "pthread_attr_setdetachstate"); + s->pool = g_thread_pool_new(aio_routine, s, 64, FALSE, NULL); + s->rfd = fds[0]; + s->wfd = fds[1]; + s->requests = NULL; + s->lock = g_mutex_new(); - QTAILQ_INIT(&request_list); + fcntl(s->wfd, F_SETFL, O_NONBLOCK); + fcntl(s->rfd, F_SETFL, O_NONBLOCK); + qemu_aio_set_fd_handler(s->rfd, paio_complete, NULL, + paio_io_flush, paio_process_queue, s); - posix_aio_state = s; return 0; } diff --git a/qemu-tool.c b/qemu-tool.c index 392e1c9..ca9c711 100644 --- a/qemu-tool.c +++ b/qemu-tool.c @@ -111,3 +111,11 @@ int qemu_set_fd_handler2(int fd, { return 0; } + +int qemu_set_fd_handler(int fd, + IOHandler *fd_read, + IOHandler *fd_write, + void *opaque) +{ + return 0; +} -- 1.7.0.4