Patchwork [3/3] Add helper functions to enable virtio-9p make use of the threadlets

login
register
mail settings
Submitter Arun Bharadwaj
Date Oct. 21, 2010, 12:10 p.m.
Message ID <20101021121055.25166.80527.stgit@localhost6.localdomain6>
Download mbox | patch
Permalink /patch/68600/
State New
Headers show

Comments

Arun Bharadwaj - Oct. 21, 2010, 12:10 p.m.
From: Gautham R Shenoy <ego@in.ibm.com>

infrastructure for offloading blocking tasks such as making posix calls on
to the helper threads and handle the post_posix_operations() from the
context of the iothread. This frees the vcpu thread to process any other guest
operations while the processing of v9fs_io is in progress.

Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
---
 hw/virtio-9p.c     |  168 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 posix-aio-compat.c |   48 ++++++---------
 qemu-threadlets.c  |   21 +++++++
 qemu-threadlets.h  |    1 
 vl.c               |    3 +
 5 files changed, 211 insertions(+), 30 deletions(-)
Arun Bharadwaj - Oct. 21, 2010, 1:26 p.m.
* Arun R Bharadwaj <arun@linux.vnet.ibm.com> [2010-10-21 17:40:55]:

> From: Gautham R Shenoy <ego@in.ibm.com>
> 
> infrastructure for offloading blocking tasks such as making posix calls on
> to the helper threads and handle the post_posix_operations() from the
> context of the iothread. This frees the vcpu thread to process any other guest
> operations while the processing of v9fs_io is in progress.
> 
> Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
  (Please note this correction)
  Signed-off-by: Gautham R Shenoy <gautham.shenoy@gmail.com> 

> Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
> Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
> ---
>  hw/virtio-9p.c     |  168 ++++++++++++++++++++++++++++++++++++++++++++++++++++
>  posix-aio-compat.c |   48 ++++++---------
>  qemu-threadlets.c  |   21 +++++++
>  qemu-threadlets.h  |    1 
>  vl.c               |    3 +
>  5 files changed, 211 insertions(+), 30 deletions(-)
> 
> diff --git a/hw/virtio-9p.c b/hw/virtio-9p.c
> index a871685..f9a7b7d 100644
> --- a/hw/virtio-9p.c
> +++ b/hw/virtio-9p.c
> @@ -18,6 +18,7 @@
>  #include "fsdev/qemu-fsdev.h"
>  #include "virtio-9p-debug.h"
>  #include "virtio-9p-xattr.h"
> +#include "qemu-threadlets.h"
> 
>  int debug_9p_pdu;
> 
> @@ -33,6 +34,149 @@ enum {
>      Oappend = 0x80,
>  };
> 
> +struct v9fs_post_op {
> +    QTAILQ_ENTRY(v9fs_post_op) node;
> +    void (*func)(void *arg);
> +    void *arg;
> +};
> +
> +static struct {
> +    int rfd;
> +    int wfd;
> +    QemuMutex lock;
> +    QTAILQ_HEAD(, v9fs_post_op) post_op_list;
> +} v9fs_async_struct;
> +
> +static void die2(int err, const char *what)
> +{
> +    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
> +    abort();
> +}
> +
> +static void die(const char *what)
> +{
> +    die2(errno, what);
> +}
> +
> +#define ASYNC_MAX_PROCESS   5
> +
> +/**
> + * v9fs_process_post_ops: Process any pending v9fs_post_posix_operation
> + * @arg: Not used.
> + *
> + * This function serves as a callback to the iothread to be called into whenever
> + * the v9fs_async_struct.wfd is written into. This thread goes through the list
> + * of v9fs_post_posix_operations() and executes them. In the process, it might
> + * queue more job on the asynchronous thread pool.
> + */
> +static void v9fs_process_post_ops(void *arg)
> +{
> +    int count = 0;
> +    struct v9fs_post_op *post_op;
> +    int ret;
> +    char byte;
> +
> +    qemu_mutex_lock(&v9fs_async_struct.lock);
> +    do {
> +        ret = read(v9fs_async_struct.rfd, &byte, sizeof(byte));
> +    } while (ret >= 0 && errno != EAGAIN);
> +
> +    for (count = 0; count < ASYNC_MAX_PROCESS; count++) {
> +        if (QTAILQ_EMPTY(&(v9fs_async_struct.post_op_list))) {
> +            break;
> +        }
> +        post_op = QTAILQ_FIRST(&(v9fs_async_struct.post_op_list));
> +        QTAILQ_REMOVE(&(v9fs_async_struct.post_op_list), post_op, node);
> +
> +        qemu_mutex_unlock(&v9fs_async_struct.lock);
> +        post_op->func(post_op->arg);
> +        qemu_free(post_op);
> +        qemu_mutex_lock(&v9fs_async_struct.lock);
> +    }
> +    qemu_mutex_unlock(&v9fs_async_struct.lock);
> +}
> +
> +/**
> + * v9fs_async_signal: Inform the io-thread of completion of async job.
> + *
> + * This function is used to inform the iothread that a particular
> + * async-operation pertaining to v9fs has been completed and that the io thread
> + * can handle the v9fs_post_posix_operation.
> + *
> + * This is based on the aio_signal_handler
> + */
> +static inline void v9fs_async_signal(void)
> +{
> +    char byte = 0;
> +    ssize_t ret;
> +    int tries = 0;
> +
> +    qemu_mutex_lock(&v9fs_async_struct.lock);
> +    do {
> +        assert(tries != 100);
> +       ret = write(v9fs_async_struct.wfd, &byte, sizeof(byte));
> +        tries++;
> +    } while (ret < 0 && errno == EAGAIN);
> +    qemu_mutex_unlock(&v9fs_async_struct.lock);
> +
> +    if (ret < 0 && errno != EAGAIN) {
> +        die("write() in v9fs");
> +    }
> +
> +    if (kill(getpid(), SIGUSR2)) {
> +        die("kill failed");
> +    }
> +}
> +
> +/**
> + * v9fs_async_helper_done: Marks the completion of the v9fs_async job
> + * @func: v9fs_post_posix_func() for post-processing invoked in the context of
> + *        the io-thread
> + * @arg: Argument to func.
> + *
> + * This function is called from the context of one of the asynchronous threads
> + * in the thread pool. This is called when the asynchronous thread has finished
> + * executing a v9fs_posix_operation. It's purpose is to initiate the process of
> + * informing the io-thread that the v9fs_posix_operation has completed.
> + */
> +static void v9fs_async_helper_done(void (*func)(void *arg), void *arg)
> +{
> +    struct v9fs_post_op *post_op;
> +
> +    post_op = qemu_mallocz(sizeof(*post_op));
> +    post_op->func = func;
> +    post_op->arg = arg;
> +
> +    qemu_mutex_lock(&v9fs_async_struct.lock);
> +    QTAILQ_INSERT_TAIL(&(v9fs_async_struct.post_op_list), post_op, node);
> +    qemu_mutex_unlock(&v9fs_async_struct.lock);
> +
> +    v9fs_async_signal();
> +}
> +
> +/**
> + * v9fs_do_async_posix: Offload v9fs_posix_operation onto async thread.
> + * @vs: V9fsOPState variable for the OP operation.
> + * @posix_fn: The posix function which has to be offloaded onto async thread.
> + * @post_fn_ptr: Address of the location to hold the post_fn corresponding to
> + * the posix_fn
> + * @post_fn: The post processing function corresponding to the posix_fn.
> + *
> + * This function is a helper to offload posix_operation on to the asynchronous
> + * thread pool. It sets up the associations with the post_function that needs to
> + * be invoked by from the context of the iothread once the posix_fn has been
> + * executed.
> + */
> +static void v9fs_do_async_posix(ThreadletWork *work ,
> +                                void (*posix_fn)(ThreadletWork *work),
> +                                void (**post_fn_ptr)(void *arg),
> +                                void (*post_fn)(void *arg))
> +{
> +    *post_fn_ptr = post_fn;
> +    work->func = posix_fn;
> +    submit_threadletwork(work);
> +}
> +
>  static int omode_to_uflags(int8_t mode)
>  {
>      int ret = 0;
> @@ -3639,7 +3783,7 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
>      int i, len;
>      struct stat stat;
>      FsTypeEntry *fse;
> -
> +    int fds[2];
> 
>      s = (V9fsState *)virtio_common_init("virtio-9p",
>                                      VIRTIO_ID_9P,
> @@ -3722,5 +3866,27 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
>                          s->tag_len;
>      s->vdev.get_config = virtio_9p_get_config;
> 
> +    if (qemu_pipe(fds) == -1) {
> +        fprintf(stderr, "failed to create fd's for virtio-9p\n");
> +        exit(1);
> +    }
> +
> +    v9fs_async_struct.rfd = fds[0];
> +    v9fs_async_struct.wfd = fds[1];
> +
> +    printf("v9fs: rfd: %d\n", v9fs_async_struct.rfd);
> +    printf("v9fs: wfd: %d\n", v9fs_async_struct.wfd);
> +
> +    fcntl(fds[0], F_SETFL, O_NONBLOCK);
> +    fcntl(fds[1], F_SETFL, O_NONBLOCK);
> +
> +    qemu_set_fd_handler(fds[0], v9fs_process_post_ops, NULL, NULL);
> +    QTAILQ_INIT(&v9fs_async_struct.post_op_list);
> +    qemu_mutex_init(&(v9fs_async_struct.lock));
> +    /* Create async queue. */
> +
> +    (void)v9fs_do_async_posix;
> +    (void)v9fs_async_helper_done;
> +
>      return &s->vdev;
>  }
> diff --git a/posix-aio-compat.c b/posix-aio-compat.c
> index 2e47736..cb4308a 100644
> --- a/posix-aio-compat.c
> +++ b/posix-aio-compat.c
> @@ -260,6 +260,8 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
>      return nbytes;
>  }
> 
> +static PosixAioState *posix_aio_state;
> +
>  static void aio_thread(ThreadletWork *work)
>  {
>      pid_t pid;
> @@ -288,6 +290,16 @@ static void aio_thread(ThreadletWork *work)
> 
>      aiocb->ret = ret;
> 
> +    if (posix_aio_state) {
> +        char byte = 0;
> +        ssize_t ret;
> +
> +        ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
> +        if (ret < 0 && errno != EAGAIN) {
> +            die("write()");
> +        }
> +    }
> +
>      if (kill(pid, aiocb->ev_signo)) {
>          die("kill failed");
>      }
> @@ -402,22 +414,6 @@ static int posix_aio_flush(void *opaque)
>      return !!s->first_aio;
>  }
> 
> -static PosixAioState *posix_aio_state;
> -
> -static void aio_signal_handler(int signum)
> -{
> -    if (posix_aio_state) {
> -        char byte = 0;
> -        ssize_t ret;
> -
> -        ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
> -        if (ret < 0 && errno != EAGAIN)
> -            die("write()");
> -    }
> -
> -    qemu_service_io();
> -}
> -
>  static void paio_remove(struct qemu_paiocb *acb)
>  {
>      struct qemu_paiocb **pacb;
> @@ -442,13 +438,13 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
>      struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
>      int active = 0;
> 
> -    if (!acb->active) {
> -        if (!deque_threadletwork(&acb->work)) {
> -            acb->ret = -ECANCELED;
> -         } else {
> -            active = 1;
> -         }
> -    } else if (acb->ret == -EINPROGRESS) {
> +    if (!deque_threadletwork(&acb->work)) {
> +        acb->ret = -ECANCELED;
> +    } else {
> +        active = 1;
> +    }
> +
> +    if (acb->ret == -EINPROGRESS) {
>          active = 1;
>      }
> 
> @@ -522,7 +518,6 @@ BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
> 
>  int paio_init(void)
>  {
> -    struct sigaction act;
>      PosixAioState *s;
>      int fds[2];
> 
> @@ -531,11 +526,6 @@ int paio_init(void)
> 
>      s = qemu_malloc(sizeof(PosixAioState));
> 
> -    sigfillset(&act.sa_mask);
> -    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
> -    act.sa_handler = aio_signal_handler;
> -    sigaction(SIGUSR2, &act, NULL);
> -
>      s->first_aio = NULL;
>      if (qemu_pipe(fds) == -1) {
>          fprintf(stderr, "failed to create pipe\n");
> diff --git a/qemu-threadlets.c b/qemu-threadlets.c
> index ac3b97b..2da6f1b 100644
> --- a/qemu-threadlets.c
> +++ b/qemu-threadlets.c
> @@ -15,12 +15,28 @@
> 
>  #include "qemu-threadlets.h"
>  #include "osdep.h"
> +#include <signal.h>
> 
>  #define MAX_GLOBAL_THREADS  64
>  #define MIN_GLOBAL_THREADS  64
>  static ThreadletQueue globalqueue;
>  static int globalqueue_init;
> 
> +static void threadlet_io_completion_signal_handler(int signum)
> +{
> +    qemu_service_io();
> +}
> +
> +static void threadlet_register_signal_handler(void)
> +{
> +    struct sigaction act;
> +
> +    sigfillset(&act.sa_mask);
> +    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
> +    act.sa_handler = threadlet_io_completion_signal_handler;
> +    sigaction(SIGUSR2, &act, NULL);
> +}
> +
>  static void *threadlet_worker(void *data)
>  {
>      ThreadletQueue *queue = data;
> @@ -165,3 +181,8 @@ void threadlet_queue_init(ThreadletQueue *queue,
>      qemu_mutex_init(&(queue->lock));
>      qemu_cond_init(&(queue->cond));
>  }
> +
> +void threadlet_init(void)
> +{
> +   threadlet_register_signal_handler();
> +}
> diff --git a/qemu-threadlets.h b/qemu-threadlets.h
> index 6d9585b..5fd218a 100644
> --- a/qemu-threadlets.h
> +++ b/qemu-threadlets.h
> @@ -45,4 +45,5 @@ extern int deque_threadletwork_on_queue(ThreadletQueue *queue,
>  extern int deque_threadletwork(ThreadletWork *work);
>  extern void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
>                                   int min_threads);
> +extern void threadlet_init(void);
>  #endif
> diff --git a/vl.c b/vl.c
> index df414ef..7b9a425 100644
> --- a/vl.c
> +++ b/vl.c
> @@ -148,6 +148,7 @@ int main(int argc, char **argv)
>  #include "qemu-config.h"
>  #include "qemu-objects.h"
>  #include "qemu-options.h"
> +#include "qemu-threadlets.h"
>  #ifdef CONFIG_VIRTFS
>  #include "fsdev/qemu-fsdev.h"
>  #endif
> @@ -2922,6 +2923,8 @@ int main(int argc, char **argv, char **envp)
>              exit(1);
>      }
> 
> +    threadlet_init();
> +
>      /* init generic devices */
>      if (qemu_opts_foreach(qemu_find_opts("device"), device_init_func, NULL, 1) != 0)
>          exit(1);
> 
>
jvrao - Oct. 22, 2010, 5:50 p.m.
On 10/21/2010 5:10 AM, Arun R Bharadwaj wrote:
> From: Gautham R Shenoy <ego@in.ibm.com>
> 
> infrastructure for offloading blocking tasks such as making posix calls on
> to the helper threads and handle the post_posix_operations() from the
> context of the iothread. This frees the vcpu thread to process any other guest
> operations while the processing of v9fs_io is in progress.
> 
> Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
> Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
> Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
> ---
>  hw/virtio-9p.c     |  168 ++++++++++++++++++++++++++++++++++++++++++++++++++++
>  posix-aio-compat.c |   48 ++++++---------
>  qemu-threadlets.c  |   21 +++++++
>  qemu-threadlets.h  |    1 
>  vl.c               |    3 +
>  5 files changed, 211 insertions(+), 30 deletions(-)
> 
> diff --git a/hw/virtio-9p.c b/hw/virtio-9p.c
> index a871685..f9a7b7d 100644
> --- a/hw/virtio-9p.c
> +++ b/hw/virtio-9p.c
> @@ -18,6 +18,7 @@
>  #include "fsdev/qemu-fsdev.h"
>  #include "virtio-9p-debug.h"
>  #include "virtio-9p-xattr.h"
> +#include "qemu-threadlets.h"
> 
>  int debug_9p_pdu;
> 
> @@ -33,6 +34,149 @@ enum {
>      Oappend = 0x80,
>  };
> 
> +struct v9fs_post_op {
> +    QTAILQ_ENTRY(v9fs_post_op) node;
> +    void (*func)(void *arg);
> +    void *arg;
> +};
> +
> +static struct {
> +    int rfd;
> +    int wfd;
> +    QemuMutex lock;
> +    QTAILQ_HEAD(, v9fs_post_op) post_op_list;
> +} v9fs_async_struct;
> +
> +static void die2(int err, const char *what)
> +{
> +    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
> +    abort();
> +}
> +
> +static void die(const char *what)
> +{
> +    die2(errno, what);
> +}
> +
> +#define ASYNC_MAX_PROCESS   5
> +
> +/**
> + * v9fs_process_post_ops: Process any pending v9fs_post_posix_operation
> + * @arg: Not used.
> + *
> + * This function serves as a callback to the iothread to be called into whenever
> + * the v9fs_async_struct.wfd is written into. This thread goes through the list
> + * of v9fs_post_posix_operations() and executes them. In the process, it might
> + * queue more job on the asynchronous thread pool.
> + */
> +static void v9fs_process_post_ops(void *arg)
> +{
> +    int count = 0;
> +    struct v9fs_post_op *post_op;
> +    int ret;
> +    char byte;
> +
> +    qemu_mutex_lock(&v9fs_async_struct.lock);
> +    do {
> +        ret = read(v9fs_async_struct.rfd, &byte, sizeof(byte));
> +    } while (ret >= 0 && errno != EAGAIN);
> +
> +    for (count = 0; count < ASYNC_MAX_PROCESS; count++) {
> +        if (QTAILQ_EMPTY(&(v9fs_async_struct.post_op_list))) {
> +            break;
> +        }
> +        post_op = QTAILQ_FIRST(&(v9fs_async_struct.post_op_list));
> +        QTAILQ_REMOVE(&(v9fs_async_struct.post_op_list), post_op, node);
> +
> +        qemu_mutex_unlock(&v9fs_async_struct.lock);
> +        post_op->func(post_op->arg);
> +        qemu_free(post_op);
> +        qemu_mutex_lock(&v9fs_async_struct.lock);
> +    }
> +    qemu_mutex_unlock(&v9fs_async_struct.lock);
> +}
> +
> +/**
> + * v9fs_async_signal: Inform the io-thread of completion of async job.
> + *
> + * This function is used to inform the iothread that a particular
> + * async-operation pertaining to v9fs has been completed and that the io thread
> + * can handle the v9fs_post_posix_operation.
> + *
> + * This is based on the aio_signal_handler
> + */
> +static inline void v9fs_async_signal(void)
> +{
> +    char byte = 0;
> +    ssize_t ret;
> +    int tries = 0;
> +
> +    qemu_mutex_lock(&v9fs_async_struct.lock);
> +    do {
> +        assert(tries != 100);
> +       ret = write(v9fs_async_struct.wfd, &byte, sizeof(byte));
> +        tries++;
> +    } while (ret < 0 && errno == EAGAIN);
> +    qemu_mutex_unlock(&v9fs_async_struct.lock);
> +
> +    if (ret < 0 && errno != EAGAIN) {
> +        die("write() in v9fs");
> +    }
> +
> +    if (kill(getpid(), SIGUSR2)) {
> +        die("kill failed");
> +    }
> +}
> +
> +/**
> + * v9fs_async_helper_done: Marks the completion of the v9fs_async job
> + * @func: v9fs_post_posix_func() for post-processing invoked in the context of
> + *        the io-thread
> + * @arg: Argument to func.
> + *
> + * This function is called from the context of one of the asynchronous threads
> + * in the thread pool. This is called when the asynchronous thread has finished
> + * executing a v9fs_posix_operation. It's purpose is to initiate the process of
> + * informing the io-thread that the v9fs_posix_operation has completed.
> + */
> +static void v9fs_async_helper_done(void (*func)(void *arg), void *arg)
> +{
> +    struct v9fs_post_op *post_op;
> +
> +    post_op = qemu_mallocz(sizeof(*post_op));
> +    post_op->func = func;
> +    post_op->arg = arg;
> +
> +    qemu_mutex_lock(&v9fs_async_struct.lock);
> +    QTAILQ_INSERT_TAIL(&(v9fs_async_struct.post_op_list), post_op, node);
> +    qemu_mutex_unlock(&v9fs_async_struct.lock);
> +
> +    v9fs_async_signal();
> +}
> +
> +/**
> + * v9fs_do_async_posix: Offload v9fs_posix_operation onto async thread.
> + * @vs: V9fsOPState variable for the OP operation.
> + * @posix_fn: The posix function which has to be offloaded onto async thread.
> + * @post_fn_ptr: Address of the location to hold the post_fn corresponding to
> + * the posix_fn
> + * @post_fn: The post processing function corresponding to the posix_fn.
> + *
> + * This function is a helper to offload posix_operation on to the asynchronous
> + * thread pool. It sets up the associations with the post_function that needs to
> + * be invoked by from the context of the iothread once the posix_fn has been
> + * executed.
> + */
> +static void v9fs_do_async_posix(ThreadletWork *work ,
> +                                void (*posix_fn)(ThreadletWork *work),
> +                                void (**post_fn_ptr)(void *arg),
> +                                void (*post_fn)(void *arg))
> +{
> +    *post_fn_ptr = post_fn;
> +    work->func = posix_fn;
> +    submit_threadletwork(work);
> +}
> +
>  static int omode_to_uflags(int8_t mode)
>  {
>      int ret = 0;
> @@ -3639,7 +3783,7 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
>      int i, len;
>      struct stat stat;
>      FsTypeEntry *fse;
> -
> +    int fds[2];
> 
>      s = (V9fsState *)virtio_common_init("virtio-9p",
>                                      VIRTIO_ID_9P,
> @@ -3722,5 +3866,27 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
>                          s->tag_len;
>      s->vdev.get_config = virtio_9p_get_config;
> 
> +    if (qemu_pipe(fds) == -1) {
> +        fprintf(stderr, "failed to create fd's for virtio-9p\n");
> +        exit(1);
> +    }
> +
> +    v9fs_async_struct.rfd = fds[0];
> +    v9fs_async_struct.wfd = fds[1];
> +
> +    printf("v9fs: rfd: %d\n", v9fs_async_struct.rfd);
> +    printf("v9fs: wfd: %d\n", v9fs_async_struct.wfd);

I still see these debug messages. Please take care of them.

- JV

> +
> +    fcntl(fds[0], F_SETFL, O_NONBLOCK);
> +    fcntl(fds[1], F_SETFL, O_NONBLOCK);
> +
> +    qemu_set_fd_handler(fds[0], v9fs_process_post_ops, NULL, NULL);
> +    QTAILQ_INIT(&v9fs_async_struct.post_op_list);
> +    qemu_mutex_init(&(v9fs_async_struct.lock));
> +    /* Create async queue. */
> +
> +    (void)v9fs_do_async_posix;
> +    (void)v9fs_async_helper_done;
> +
>      return &s->vdev;
>  }
> diff --git a/posix-aio-compat.c b/posix-aio-compat.c
> index 2e47736..cb4308a 100644
> --- a/posix-aio-compat.c
> +++ b/posix-aio-compat.c
> @@ -260,6 +260,8 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
>      return nbytes;
>  }
> 
> +static PosixAioState *posix_aio_state;
> +
>  static void aio_thread(ThreadletWork *work)
>  {
>      pid_t pid;
> @@ -288,6 +290,16 @@ static void aio_thread(ThreadletWork *work)
> 
>      aiocb->ret = ret;
> 
> +    if (posix_aio_state) {
> +        char byte = 0;
> +        ssize_t ret;
> +
> +        ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
> +        if (ret < 0 && errno != EAGAIN) {
> +            die("write()");
> +        }
> +    }
> +
>      if (kill(pid, aiocb->ev_signo)) {
>          die("kill failed");
>      }
> @@ -402,22 +414,6 @@ static int posix_aio_flush(void *opaque)
>      return !!s->first_aio;
>  }
> 
> -static PosixAioState *posix_aio_state;
> -
> -static void aio_signal_handler(int signum)
> -{
> -    if (posix_aio_state) {
> -        char byte = 0;
> -        ssize_t ret;
> -
> -        ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
> -        if (ret < 0 && errno != EAGAIN)
> -            die("write()");
> -    }
> -
> -    qemu_service_io();
> -}
> -
>  static void paio_remove(struct qemu_paiocb *acb)
>  {
>      struct qemu_paiocb **pacb;
> @@ -442,13 +438,13 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
>      struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
>      int active = 0;
> 
> -    if (!acb->active) {
> -        if (!deque_threadletwork(&acb->work)) {
> -            acb->ret = -ECANCELED;
> -         } else {
> -            active = 1;
> -         }
> -    } else if (acb->ret == -EINPROGRESS) {
> +    if (!deque_threadletwork(&acb->work)) {
> +        acb->ret = -ECANCELED;
> +    } else {
> +        active = 1;
> +    }
> +
> +    if (acb->ret == -EINPROGRESS) {
>          active = 1;
>      }
> 
> @@ -522,7 +518,6 @@ BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
> 
>  int paio_init(void)
>  {
> -    struct sigaction act;
>      PosixAioState *s;
>      int fds[2];
> 
> @@ -531,11 +526,6 @@ int paio_init(void)
> 
>      s = qemu_malloc(sizeof(PosixAioState));
> 
> -    sigfillset(&act.sa_mask);
> -    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
> -    act.sa_handler = aio_signal_handler;
> -    sigaction(SIGUSR2, &act, NULL);
> -
>      s->first_aio = NULL;
>      if (qemu_pipe(fds) == -1) {
>          fprintf(stderr, "failed to create pipe\n");
> diff --git a/qemu-threadlets.c b/qemu-threadlets.c
> index ac3b97b..2da6f1b 100644
> --- a/qemu-threadlets.c
> +++ b/qemu-threadlets.c
> @@ -15,12 +15,28 @@
> 
>  #include "qemu-threadlets.h"
>  #include "osdep.h"
> +#include <signal.h>
> 
>  #define MAX_GLOBAL_THREADS  64
>  #define MIN_GLOBAL_THREADS  64
>  static ThreadletQueue globalqueue;
>  static int globalqueue_init;
> 
> +static void threadlet_io_completion_signal_handler(int signum)
> +{
> +    qemu_service_io();
> +}
> +
> +static void threadlet_register_signal_handler(void)
> +{
> +    struct sigaction act;
> +
> +    sigfillset(&act.sa_mask);
> +    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
> +    act.sa_handler = threadlet_io_completion_signal_handler;
> +    sigaction(SIGUSR2, &act, NULL);
> +}
> +
>  static void *threadlet_worker(void *data)
>  {
>      ThreadletQueue *queue = data;
> @@ -165,3 +181,8 @@ void threadlet_queue_init(ThreadletQueue *queue,
>      qemu_mutex_init(&(queue->lock));
>      qemu_cond_init(&(queue->cond));
>  }
> +
> +void threadlet_init(void)
> +{
> +   threadlet_register_signal_handler();
> +}
> diff --git a/qemu-threadlets.h b/qemu-threadlets.h
> index 6d9585b..5fd218a 100644
> --- a/qemu-threadlets.h
> +++ b/qemu-threadlets.h
> @@ -45,4 +45,5 @@ extern int deque_threadletwork_on_queue(ThreadletQueue *queue,
>  extern int deque_threadletwork(ThreadletWork *work);
>  extern void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
>                                   int min_threads);
> +extern void threadlet_init(void);
>  #endif
> diff --git a/vl.c b/vl.c
> index df414ef..7b9a425 100644
> --- a/vl.c
> +++ b/vl.c
> @@ -148,6 +148,7 @@ int main(int argc, char **argv)
>  #include "qemu-config.h"
>  #include "qemu-objects.h"
>  #include "qemu-options.h"
> +#include "qemu-threadlets.h"
>  #ifdef CONFIG_VIRTFS
>  #include "fsdev/qemu-fsdev.h"
>  #endif
> @@ -2922,6 +2923,8 @@ int main(int argc, char **argv, char **envp)
>              exit(1);
>      }
> 
> +    threadlet_init();
> +
>      /* init generic devices */
>      if (qemu_opts_foreach(qemu_find_opts("device"), device_init_func, NULL, 1) != 0)
>          exit(1);
> 
>
Stefan Hajnoczi - Oct. 23, 2010, 11:59 a.m.
My review comments from the last version still stand:

http://permalink.gmane.org/gmane.comp.emulators.qemu/82796

Stefan

Patch

diff --git a/hw/virtio-9p.c b/hw/virtio-9p.c
index a871685..f9a7b7d 100644
--- a/hw/virtio-9p.c
+++ b/hw/virtio-9p.c
@@ -18,6 +18,7 @@ 
 #include "fsdev/qemu-fsdev.h"
 #include "virtio-9p-debug.h"
 #include "virtio-9p-xattr.h"
+#include "qemu-threadlets.h"
 
 int debug_9p_pdu;
 
@@ -33,6 +34,149 @@  enum {
     Oappend = 0x80,
 };
 
+struct v9fs_post_op {
+    QTAILQ_ENTRY(v9fs_post_op) node;
+    void (*func)(void *arg);
+    void *arg;
+};
+
+static struct {
+    int rfd;
+    int wfd;
+    QemuMutex lock;
+    QTAILQ_HEAD(, v9fs_post_op) post_op_list;
+} v9fs_async_struct;
+
+static void die2(int err, const char *what)
+{
+    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
+    abort();
+}
+
+static void die(const char *what)
+{
+    die2(errno, what);
+}
+
+#define ASYNC_MAX_PROCESS   5
+
+/**
+ * v9fs_process_post_ops: Process any pending v9fs_post_posix_operation
+ * @arg: Not used.
+ *
+ * This function serves as a callback to the iothread to be called into whenever
+ * the v9fs_async_struct.wfd is written into. This thread goes through the list
+ * of v9fs_post_posix_operations() and executes them. In the process, it might
+ * queue more job on the asynchronous thread pool.
+ */
+static void v9fs_process_post_ops(void *arg)
+{
+    int count = 0;
+    struct v9fs_post_op *post_op;
+    int ret;
+    char byte;
+
+    qemu_mutex_lock(&v9fs_async_struct.lock);
+    do {
+        ret = read(v9fs_async_struct.rfd, &byte, sizeof(byte));
+    } while (ret >= 0 && errno != EAGAIN);
+
+    for (count = 0; count < ASYNC_MAX_PROCESS; count++) {
+        if (QTAILQ_EMPTY(&(v9fs_async_struct.post_op_list))) {
+            break;
+        }
+        post_op = QTAILQ_FIRST(&(v9fs_async_struct.post_op_list));
+        QTAILQ_REMOVE(&(v9fs_async_struct.post_op_list), post_op, node);
+
+        qemu_mutex_unlock(&v9fs_async_struct.lock);
+        post_op->func(post_op->arg);
+        qemu_free(post_op);
+        qemu_mutex_lock(&v9fs_async_struct.lock);
+    }
+    qemu_mutex_unlock(&v9fs_async_struct.lock);
+}
+
+/**
+ * v9fs_async_signal: Inform the io-thread of completion of async job.
+ *
+ * This function is used to inform the iothread that a particular
+ * async-operation pertaining to v9fs has been completed and that the io thread
+ * can handle the v9fs_post_posix_operation.
+ *
+ * This is based on the aio_signal_handler
+ */
+static inline void v9fs_async_signal(void)
+{
+    char byte = 0;
+    ssize_t ret;
+    int tries = 0;
+
+    qemu_mutex_lock(&v9fs_async_struct.lock);
+    do {
+        assert(tries != 100);
+       ret = write(v9fs_async_struct.wfd, &byte, sizeof(byte));
+        tries++;
+    } while (ret < 0 && errno == EAGAIN);
+    qemu_mutex_unlock(&v9fs_async_struct.lock);
+
+    if (ret < 0 && errno != EAGAIN) {
+        die("write() in v9fs");
+    }
+
+    if (kill(getpid(), SIGUSR2)) {
+        die("kill failed");
+    }
+}
+
+/**
+ * v9fs_async_helper_done: Marks the completion of the v9fs_async job
+ * @func: v9fs_post_posix_func() for post-processing invoked in the context of
+ *        the io-thread
+ * @arg: Argument to func.
+ *
+ * This function is called from the context of one of the asynchronous threads
+ * in the thread pool. This is called when the asynchronous thread has finished
+ * executing a v9fs_posix_operation. It's purpose is to initiate the process of
+ * informing the io-thread that the v9fs_posix_operation has completed.
+ */
+static void v9fs_async_helper_done(void (*func)(void *arg), void *arg)
+{
+    struct v9fs_post_op *post_op;
+
+    post_op = qemu_mallocz(sizeof(*post_op));
+    post_op->func = func;
+    post_op->arg = arg;
+
+    qemu_mutex_lock(&v9fs_async_struct.lock);
+    QTAILQ_INSERT_TAIL(&(v9fs_async_struct.post_op_list), post_op, node);
+    qemu_mutex_unlock(&v9fs_async_struct.lock);
+
+    v9fs_async_signal();
+}
+
+/**
+ * v9fs_do_async_posix: Offload v9fs_posix_operation onto async thread.
+ * @vs: V9fsOPState variable for the OP operation.
+ * @posix_fn: The posix function which has to be offloaded onto async thread.
+ * @post_fn_ptr: Address of the location to hold the post_fn corresponding to
+ * the posix_fn
+ * @post_fn: The post processing function corresponding to the posix_fn.
+ *
+ * This function is a helper to offload posix_operation on to the asynchronous
+ * thread pool. It sets up the associations with the post_function that needs to
+ * be invoked by from the context of the iothread once the posix_fn has been
+ * executed.
+ */
+static void v9fs_do_async_posix(ThreadletWork *work ,
+                                void (*posix_fn)(ThreadletWork *work),
+                                void (**post_fn_ptr)(void *arg),
+                                void (*post_fn)(void *arg))
+{
+    *post_fn_ptr = post_fn;
+    work->func = posix_fn;
+    submit_threadletwork(work);
+}
+
 static int omode_to_uflags(int8_t mode)
 {
     int ret = 0;
@@ -3639,7 +3783,7 @@  VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
     int i, len;
     struct stat stat;
     FsTypeEntry *fse;
-
+    int fds[2];
 
     s = (V9fsState *)virtio_common_init("virtio-9p",
                                     VIRTIO_ID_9P,
@@ -3722,5 +3866,27 @@  VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
                         s->tag_len;
     s->vdev.get_config = virtio_9p_get_config;
 
+    if (qemu_pipe(fds) == -1) {
+        fprintf(stderr, "failed to create fd's for virtio-9p\n");
+        exit(1);
+    }
+
+    v9fs_async_struct.rfd = fds[0];
+    v9fs_async_struct.wfd = fds[1];
+
+    printf("v9fs: rfd: %d\n", v9fs_async_struct.rfd);
+    printf("v9fs: wfd: %d\n", v9fs_async_struct.wfd);
+
+    fcntl(fds[0], F_SETFL, O_NONBLOCK);
+    fcntl(fds[1], F_SETFL, O_NONBLOCK);
+
+    qemu_set_fd_handler(fds[0], v9fs_process_post_ops, NULL, NULL);
+    QTAILQ_INIT(&v9fs_async_struct.post_op_list);
+    qemu_mutex_init(&(v9fs_async_struct.lock));
+    /* Create async queue. */
+
+    (void)v9fs_do_async_posix;
+    (void)v9fs_async_helper_done;
+
     return &s->vdev;
 }
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 2e47736..cb4308a 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -260,6 +260,8 @@  static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
     return nbytes;
 }
 
+static PosixAioState *posix_aio_state;
+
 static void aio_thread(ThreadletWork *work)
 {
     pid_t pid;
@@ -288,6 +290,16 @@  static void aio_thread(ThreadletWork *work)
 
     aiocb->ret = ret;
 
+    if (posix_aio_state) {
+        char byte = 0;
+        ssize_t ret;
+
+        ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
+        if (ret < 0 && errno != EAGAIN) {
+            die("write()");
+        }
+    }
+
     if (kill(pid, aiocb->ev_signo)) {
         die("kill failed");
     }
@@ -402,22 +414,6 @@  static int posix_aio_flush(void *opaque)
     return !!s->first_aio;
 }
 
-static PosixAioState *posix_aio_state;
-
-static void aio_signal_handler(int signum)
-{
-    if (posix_aio_state) {
-        char byte = 0;
-        ssize_t ret;
-
-        ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
-        if (ret < 0 && errno != EAGAIN)
-            die("write()");
-    }
-
-    qemu_service_io();
-}
-
 static void paio_remove(struct qemu_paiocb *acb)
 {
     struct qemu_paiocb **pacb;
@@ -442,13 +438,13 @@  static void paio_cancel(BlockDriverAIOCB *blockacb)
     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
     int active = 0;
 
-    if (!acb->active) {
-        if (!deque_threadletwork(&acb->work)) {
-            acb->ret = -ECANCELED;
-         } else {
-            active = 1;
-         }
-    } else if (acb->ret == -EINPROGRESS) {
+    if (!deque_threadletwork(&acb->work)) {
+        acb->ret = -ECANCELED;
+    } else {
+        active = 1;
+    }
+
+    if (acb->ret == -EINPROGRESS) {
         active = 1;
     }
 
@@ -522,7 +518,6 @@  BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
 
 int paio_init(void)
 {
-    struct sigaction act;
     PosixAioState *s;
     int fds[2];
 
@@ -531,11 +526,6 @@  int paio_init(void)
 
     s = qemu_malloc(sizeof(PosixAioState));
 
-    sigfillset(&act.sa_mask);
-    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
-    act.sa_handler = aio_signal_handler;
-    sigaction(SIGUSR2, &act, NULL);
-
     s->first_aio = NULL;
     if (qemu_pipe(fds) == -1) {
         fprintf(stderr, "failed to create pipe\n");
diff --git a/qemu-threadlets.c b/qemu-threadlets.c
index ac3b97b..2da6f1b 100644
--- a/qemu-threadlets.c
+++ b/qemu-threadlets.c
@@ -15,12 +15,28 @@ 
 
 #include "qemu-threadlets.h"
 #include "osdep.h"
+#include <signal.h>
 
 #define MAX_GLOBAL_THREADS  64
 #define MIN_GLOBAL_THREADS  64
 static ThreadletQueue globalqueue;
 static int globalqueue_init;
 
+static void threadlet_io_completion_signal_handler(int signum)
+{
+    qemu_service_io();
+}
+
+static void threadlet_register_signal_handler(void)
+{
+    struct sigaction act;
+
+    sigfillset(&act.sa_mask);
+    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
+    act.sa_handler = threadlet_io_completion_signal_handler;
+    sigaction(SIGUSR2, &act, NULL);
+}
+
 static void *threadlet_worker(void *data)
 {
     ThreadletQueue *queue = data;
@@ -165,3 +181,8 @@  void threadlet_queue_init(ThreadletQueue *queue,
     qemu_mutex_init(&(queue->lock));
     qemu_cond_init(&(queue->cond));
 }
+
+void threadlet_init(void)
+{
+   threadlet_register_signal_handler();
+}
diff --git a/qemu-threadlets.h b/qemu-threadlets.h
index 6d9585b..5fd218a 100644
--- a/qemu-threadlets.h
+++ b/qemu-threadlets.h
@@ -45,4 +45,5 @@  extern int deque_threadletwork_on_queue(ThreadletQueue *queue,
 extern int deque_threadletwork(ThreadletWork *work);
 extern void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
                                  int min_threads);
+extern void threadlet_init(void);
 #endif
diff --git a/vl.c b/vl.c
index df414ef..7b9a425 100644
--- a/vl.c
+++ b/vl.c
@@ -148,6 +148,7 @@  int main(int argc, char **argv)
 #include "qemu-config.h"
 #include "qemu-objects.h"
 #include "qemu-options.h"
+#include "qemu-threadlets.h"
 #ifdef CONFIG_VIRTFS
 #include "fsdev/qemu-fsdev.h"
 #endif
@@ -2922,6 +2923,8 @@  int main(int argc, char **argv, char **envp)
             exit(1);
     }
 
+    threadlet_init();
+
     /* init generic devices */
     if (qemu_opts_foreach(qemu_find_opts("device"), device_init_func, NULL, 1) != 0)
         exit(1);