diff mbox

[3/3] Add helper functions to enable virtio-9p make use of the threadlets

Message ID 20101108104707.6769.69626.stgit@localhost6.localdomain6
State New
Headers show

Commit Message

Arun Bharadwaj Nov. 8, 2010, 10:47 a.m. UTC
From: Gautham R Shenoy <ego@in.ibm.com>

infrastructure for offloading blocking tasks such as making posix calls on
to the helper threads and handle the post_posix_operations() from the
context of the iothread. This frees the vcpu thread to process any other guest
operations while the processing of v9fs_io is in progress.

Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
---
 hw/virtio-9p.c     |  165 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 posix-aio-compat.c |   33 +++-------
 qemu-threadlets.c  |   21 +++++++
 qemu-threadlets.h  |    1 
 vl.c               |    3 +
 5 files changed, 200 insertions(+), 23 deletions(-)

Comments

jvrao Nov. 9, 2010, 6:44 a.m. UTC | #1
On 11/8/2010 2:47 AM, Arun R Bharadwaj wrote:
> From: Gautham R Shenoy <ego@in.ibm.com>
> 
> infrastructure for offloading blocking tasks such as making posix calls on
> to the helper threads and handle the post_posix_operations() from the
> context of the iothread. This frees the vcpu thread to process any other guest
> operations while the processing of v9fs_io is in progress.
> 
> Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
> Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
> Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
> ---
>  hw/virtio-9p.c     |  165 ++++++++++++++++++++++++++++++++++++++++++++++++++++
>  posix-aio-compat.c |   33 +++-------
>  qemu-threadlets.c  |   21 +++++++
>  qemu-threadlets.h  |    1 
>  vl.c               |    3 +
>  5 files changed, 200 insertions(+), 23 deletions(-)
> 
> diff --git a/hw/virtio-9p.c b/hw/virtio-9p.c
> index 7c59988..61b65fd 100644
> --- a/hw/virtio-9p.c
> +++ b/hw/virtio-9p.c
> @@ -18,6 +18,7 @@
>  #include "fsdev/qemu-fsdev.h"
>  #include "virtio-9p-debug.h"
>  #include "virtio-9p-xattr.h"
> +#include "qemu-threadlets.h"
> 
>  int debug_9p_pdu;
> 
> @@ -33,6 +34,146 @@ enum {
>      Oappend = 0x80,
>  };
> 
> +struct v9fs_post_op {
> +    QTAILQ_ENTRY(v9fs_post_op) node;
> +    void (*func)(void *arg);
> +    void *arg;
> +};
> +
> +static struct {
> +    int rfd;
> +    int wfd;
> +    QemuMutex lock;
> +    QTAILQ_HEAD(, v9fs_post_op) post_op_list;
> +} v9fs_async_struct;
> +
> +static void die2(int err, const char *what)
> +{
> +    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
> +    abort();
> +}
> +
> +static void die(const char *what)
> +{
> +    die2(errno, what);
> +}
> +
> +#define ASYNC_MAX_PROCESS   5
> +
> +/**
> + * v9fs_process_post_ops: Process any pending v9fs_post_posix_operation
> + * @arg: Not used.
> + *
> + * This function serves as a callback to the iothread to be called into whenever
> + * the v9fs_async_struct.wfd is written into. This thread goes through the list
> + * of v9fs_post_posix_operations() and executes them. In the process, it might
> + * queue more job on the asynchronous thread pool.
> + */
> +static void v9fs_process_post_ops(void *arg)
> +{
> +    int count = 0;
> +    struct v9fs_post_op *post_op;
> +    int ret;
> +    char byte;
> +
> +    qemu_mutex_lock(&v9fs_async_struct.lock);
> +    do {
> +        ret = read(v9fs_async_struct.rfd, &byte, sizeof(byte));
> +    } while (ret >= 0 && errno != EAGAIN);
> +
> +    for (count = 0; count < ASYNC_MAX_PROCESS; count++) {
> +        if (QTAILQ_EMPTY(&(v9fs_async_struct.post_op_list))) {
> +           break;
> +        }
> +        post_op = QTAILQ_FIRST(&(v9fs_async_struct.post_op_list));
> +        QTAILQ_REMOVE(&(v9fs_async_struct.post_op_list), post_op, node);
> +
> +        qemu_mutex_unlock(&v9fs_async_struct.lock);
> +        post_op->func(post_op->arg);
> +        qemu_free(post_op);
> +        qemu_mutex_lock(&v9fs_async_struct.lock);
> +    }
> +    qemu_mutex_unlock(&v9fs_async_struct.lock);
> +}
> +
> +/**
> + * v9fs_async_signal: Inform the io-thread of completion of async job.
> + *
> + * This function is used to inform the iothread that a particular
> + * async-operation pertaining to v9fs has been completed and that the io thread
> + * can handle the v9fs_post_posix_operation.
> + *
> + * This is based on the aio_signal_handler
> + */
> +static inline void v9fs_async_signal(void)
> +{
> +    char byte = 0;
> +    ssize_t ret;
> +    int tries = 0;
> +
> +    qemu_mutex_lock(&v9fs_async_struct.lock);
> +    do {
> +        assert(tries != 100);
> +       ret = write(v9fs_async_struct.wfd, &byte, sizeof(byte));
> +        tries++;
> +    } while (ret < 0 && errno == EAGAIN);
> +    qemu_mutex_unlock(&v9fs_async_struct.lock);
> +
> +    if (ret < 0 && errno != EAGAIN)
> +        die("write() in v9fs");
> +
> +    if (kill(getpid(), SIGUSR2)) die("kill failed");
> +}
> +
> +/**
> + * v9fs_async_helper_done: Marks the completion of the v9fs_async job
> + * @func: v9fs_post_posix_func() for post-processing invoked in the context of
> + *        the io-thread
> + * @arg: Argument to func.
> + *
> + * This function is called from the context of one of the asynchronous threads
> + * in the thread pool. This is called when the asynchronous thread has finished
> + * executing a v9fs_posix_operation. It's purpose is to initiate the process of
> + * informing the io-thread that the v9fs_posix_operation has completed.
> + */
> +static void v9fs_async_helper_done(void (*func)(void *arg), void *arg)
> +{
> +    struct v9fs_post_op *post_op;
> +
> +    post_op = qemu_mallocz(sizeof(*post_op));
> +    post_op->func = func;
> +    post_op->arg = arg;
> +
> +    qemu_mutex_lock(&v9fs_async_struct.lock);
> +    QTAILQ_INSERT_TAIL(&(v9fs_async_struct.post_op_list), post_op, node);
> +    qemu_mutex_unlock(&v9fs_async_struct.lock);
> +
> +    v9fs_async_signal();
> +}
> +
> +/**
> + * v9fs_do_async_posix: Offload v9fs_posix_operation onto async thread.
> + * @vs: V9fsOPState variable for the OP operation.
> + * @posix_fn: The posix function which has to be offloaded onto async thread.
> + * @post_fn_ptr: Address of the location to hold the post_fn corresponding to
> + * the posix_fn
> + * @post_fn: The post processing function corresponding to the posix_fn.
> + *
> + * This function is a helper to offload posix_operation on to the asynchronous
> + * thread pool. It sets up the associations with the post_function that needs to
> + * be invoked by from the context of the iothread once the posix_fn has been
> + * executed.
> + */
> +static void v9fs_do_async_posix(ThreadletWork *work ,
> +                                void (*posix_fn)(ThreadletWork *work),
> +                                void (**post_fn_ptr)(void *arg),
> +                                void (*post_fn)(void *arg))
> +{
> +    *post_fn_ptr = post_fn;
> +    work->func = posix_fn;
> +    submit_work(work);
> +}
> +
>  static int omode_to_uflags(int8_t mode)
>  {
>      int ret = 0;
> @@ -3657,7 +3798,7 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
>      int i, len;
>      struct stat stat;
>      FsTypeEntry *fse;
> -
> +    int fds[2];
> 
>      s = (V9fsState *)virtio_common_init("virtio-9p",
>                                      VIRTIO_ID_9P,
> @@ -3740,5 +3881,27 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
>                          s->tag_len;
>      s->vdev.get_config = virtio_9p_get_config;
> 
> +    if (qemu_pipe(fds) == -1) {
> +        fprintf(stderr, "failed to create fd's for virtio-9p\n");
> +        exit(1);
> +    }
> +
> +    v9fs_async_struct.rfd = fds[0];
> +    v9fs_async_struct.wfd = fds[1];
> +
> +    printf("v9fs: rfd: %d\n", v9fs_async_struct.rfd);
> +    printf("v9fs: wfd: %d\n", v9fs_async_struct.wfd);

Looks like debug escaped again. :)

- JV

> +
> +    fcntl(fds[0], F_SETFL, O_NONBLOCK);
> +    fcntl(fds[1], F_SETFL, O_NONBLOCK);
> +
> +    qemu_set_fd_handler(fds[0], v9fs_process_post_ops, NULL, NULL);
> +    QTAILQ_INIT(&v9fs_async_struct.post_op_list);
> +    qemu_mutex_init(&(v9fs_async_struct.lock));
> +    /* Create async queue. */
> +
> +    (void)v9fs_do_async_posix;
> +    (void)v9fs_async_helper_done;
> +
>      return &s->vdev;
>  }
> diff --git a/posix-aio-compat.c b/posix-aio-compat.c
> index 9e02d18..8019be7 100644
> --- a/posix-aio-compat.c
> +++ b/posix-aio-compat.c
> @@ -260,6 +260,8 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
>      return nbytes;
>  }
> 
> +static PosixAioState *posix_aio_state;
> +
>  static void aio_thread(ThreadletWork *work)
>  {
>      pid_t pid;
> @@ -290,6 +292,15 @@ static void aio_thread(ThreadletWork *work)
>      aiocb->ret = ret;
>      qemu_mutex_unlock(&aiocb_mutex);
> 
> +    if (posix_aio_state) {
> +        char byte = 0;
> +        ssize_t ret;
> +
> +        ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
> +        if (ret < 0 && errno != EAGAIN)
> +            die("write()");
> +    }
> +
>      if (kill(pid, aiocb->ev_signo)) {
>           die("kill failed");
>      }
> @@ -408,22 +419,6 @@ static int posix_aio_flush(void *opaque)
>      return !!s->first_aio;
>  }
> 
> -static PosixAioState *posix_aio_state;
> -
> -static void aio_signal_handler(int signum)
> -{
> -    if (posix_aio_state) {
> -        char byte = 0;
> -        ssize_t ret;
> -
> -        ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
> -        if (ret < 0 && errno != EAGAIN)
> -            die("write()");
> -    }
> -
> -    qemu_service_io();
> -}
> -
>  static void paio_remove(struct qemu_paiocb *acb)
>  {
>      struct qemu_paiocb **pacb;
> @@ -527,7 +522,6 @@ BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
> 
>  int paio_init(void)
>  {
> -    struct sigaction act;
>      PosixAioState *s;
>      int fds[2];
> 
> @@ -538,11 +532,6 @@ int paio_init(void)
> 
>      s = qemu_malloc(sizeof(PosixAioState));
> 
> -    sigfillset(&act.sa_mask);
> -    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
> -    act.sa_handler = aio_signal_handler;
> -    sigaction(SIGUSR2, &act, NULL);
> -
>      s->first_aio = NULL;
>      if (qemu_pipe(fds) == -1) {
>          fprintf(stderr, "failed to create pipe\n");
> diff --git a/qemu-threadlets.c b/qemu-threadlets.c
> index a79062c..38b758a 100644
> --- a/qemu-threadlets.c
> +++ b/qemu-threadlets.c
> @@ -15,12 +15,28 @@
> 
>  #include "qemu-threadlets.h"
>  #include "osdep.h"
> +#include <signal.h>
> 
>  #define MAX_GLOBAL_THREADS  64
>  #define MIN_GLOBAL_THREADS   8
>  static ThreadletQueue globalqueue;
>  static int globalqueue_init;
> 
> +static void threadlet_io_completion_signal_handler(int signum)
> +{
> +    qemu_service_io();
> +}
> +
> +static void threadlet_register_signal_handler(void)
> +{
> +    struct sigaction act;
> +
> +    sigfillset(&act.sa_mask);
> +    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
> +    act.sa_handler = threadlet_io_completion_signal_handler;
> +    sigaction(SIGUSR2, &act, NULL);
> +}
> +
>  static void *threadlet_worker(void *data)
>  {
>      ThreadletQueue *queue = data;
> @@ -165,3 +181,8 @@ void threadlet_queue_init(ThreadletQueue *queue,
>      qemu_mutex_init(&queue->lock);
>      qemu_cond_init(&queue->cond);
>  }
> +
> +void threadlet_init(void)
> +{
> +   threadlet_register_signal_handler();
> +}
> diff --git a/qemu-threadlets.h b/qemu-threadlets.h
> index cdb5faa..5b125cd 100644
> --- a/qemu-threadlets.h
> +++ b/qemu-threadlets.h
> @@ -45,4 +45,5 @@ extern int dequeue_work_on_queue(ThreadletQueue *queue,
>  extern int dequeue_work(ThreadletWork *work);
>  extern void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
>                                   int min_threads);
> +extern void threadlet_init(void);
>  #endif
> diff --git a/vl.c b/vl.c
> index df414ef..7b9a425 100644
> --- a/vl.c
> +++ b/vl.c
> @@ -148,6 +148,7 @@ int main(int argc, char **argv)
>  #include "qemu-config.h"
>  #include "qemu-objects.h"
>  #include "qemu-options.h"
> +#include "qemu-threadlets.h"
>  #ifdef CONFIG_VIRTFS
>  #include "fsdev/qemu-fsdev.h"
>  #endif
> @@ -2922,6 +2923,8 @@ int main(int argc, char **argv, char **envp)
>              exit(1);
>      }
> 
> +    threadlet_init();
> +
>      /* init generic devices */
>      if (qemu_opts_foreach(qemu_find_opts("device"), device_init_func, NULL, 1) != 0)
>          exit(1);
> 
>
Stefan Hajnoczi Nov. 9, 2010, 9:06 a.m. UTC | #2
On Mon, Nov 8, 2010 at 10:47 AM, Arun R Bharadwaj
<arun@linux.vnet.ibm.com> wrote:
> diff --git a/hw/virtio-9p.c b/hw/virtio-9p.c
> index 7c59988..61b65fd 100644
> --- a/hw/virtio-9p.c
> +++ b/hw/virtio-9p.c
> @@ -18,6 +18,7 @@
>  #include "fsdev/qemu-fsdev.h"
>  #include "virtio-9p-debug.h"
>  #include "virtio-9p-xattr.h"
> +#include "qemu-threadlets.h"
>
>  int debug_9p_pdu;
>
> @@ -33,6 +34,146 @@ enum {
>     Oappend = 0x80,
>  };
>
> +struct v9fs_post_op {
> +    QTAILQ_ENTRY(v9fs_post_op) node;
> +    void (*func)(void *arg);
> +    void *arg;
> +};

Structs are usually typedefed:
typedef struct V9fsPostOp {
    ...
} V9fsPostOp;

QEMU usually doesn't use struct explicitly unless a library interface
requires it.

> +
> +static struct {
> +    int rfd;
> +    int wfd;
> +    QemuMutex lock;
> +    QTAILQ_HEAD(, v9fs_post_op) post_op_list;
> +} v9fs_async_struct;
> +
> +static void die2(int err, const char *what)
> +{
> +    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
> +    abort();
> +}
> +
> +static void die(const char *what)
> +{
> +    die2(errno, what);
> +}
> +
> +#define ASYNC_MAX_PROCESS   5
> +
> +/**
> + * v9fs_process_post_ops: Process any pending v9fs_post_posix_operation
> + * @arg: Not used.
> + *
> + * This function serves as a callback to the iothread to be called into whenever
> + * the v9fs_async_struct.wfd is written into. This thread goes through the list
> + * of v9fs_post_posix_operations() and executes them. In the process, it might
> + * queue more job on the asynchronous thread pool.
> + */
> +static void v9fs_process_post_ops(void *arg)
> +{
> +    int count = 0;
> +    struct v9fs_post_op *post_op;
> +    int ret;
> +    char byte;
> +
> +    qemu_mutex_lock(&v9fs_async_struct.lock);
> +    do {
> +        ret = read(v9fs_async_struct.rfd, &byte, sizeof(byte));
> +    } while (ret >= 0 && errno != EAGAIN);

} while (ret > 1 || (ret == -1 && errno == EINTR)); ?

> +
> +    for (count = 0; count < ASYNC_MAX_PROCESS; count++) {
> +        if (QTAILQ_EMPTY(&(v9fs_async_struct.post_op_list))) {
> +           break;
> +        }
> +        post_op = QTAILQ_FIRST(&(v9fs_async_struct.post_op_list));
> +        QTAILQ_REMOVE(&(v9fs_async_struct.post_op_list), post_op, node);
> +
> +        qemu_mutex_unlock(&v9fs_async_struct.lock);
> +        post_op->func(post_op->arg);
> +        qemu_free(post_op);
> +        qemu_mutex_lock(&v9fs_async_struct.lock);
> +    }
> +    qemu_mutex_unlock(&v9fs_async_struct.lock);
> +}
> +
> +/**
> + * v9fs_async_signal: Inform the io-thread of completion of async job.
> + *
> + * This function is used to inform the iothread that a particular
> + * async-operation pertaining to v9fs has been completed and that the io thread
> + * can handle the v9fs_post_posix_operation.
> + *
> + * This is based on the aio_signal_handler
> + */
> +static inline void v9fs_async_signal(void)
> +{
> +    char byte = 0;
> +    ssize_t ret;
> +    int tries = 0;
> +
> +    qemu_mutex_lock(&v9fs_async_struct.lock);
> +    do {
> +        assert(tries != 100);
> +       ret = write(v9fs_async_struct.wfd, &byte, sizeof(byte));
> +        tries++;
> +    } while (ret < 0 && errno == EAGAIN);
> +    qemu_mutex_unlock(&v9fs_async_struct.lock);
> +
> +    if (ret < 0 && errno != EAGAIN)
> +        die("write() in v9fs");

{ } coding style

> +
> +    if (kill(getpid(), SIGUSR2)) die("kill failed");

{ } coding style

> +}
> +
> +/**
> + * v9fs_async_helper_done: Marks the completion of the v9fs_async job
> + * @func: v9fs_post_posix_func() for post-processing invoked in the context of
> + *        the io-thread
> + * @arg: Argument to func.
> + *
> + * This function is called from the context of one of the asynchronous threads
> + * in the thread pool. This is called when the asynchronous thread has finished
> + * executing a v9fs_posix_operation. It's purpose is to initiate the process of
> + * informing the io-thread that the v9fs_posix_operation has completed.
> + */
> +static void v9fs_async_helper_done(void (*func)(void *arg), void *arg)
> +{
> +    struct v9fs_post_op *post_op;
> +
> +    post_op = qemu_mallocz(sizeof(*post_op));
> +    post_op->func = func;
> +    post_op->arg = arg;
> +
> +    qemu_mutex_lock(&v9fs_async_struct.lock);
> +    QTAILQ_INSERT_TAIL(&(v9fs_async_struct.post_op_list), post_op, node);
> +    qemu_mutex_unlock(&v9fs_async_struct.lock);
> +
> +    v9fs_async_signal();
> +}
> +
> +/**
> + * v9fs_do_async_posix: Offload v9fs_posix_operation onto async thread.
> + * @vs: V9fsOPState variable for the OP operation.
> + * @posix_fn: The posix function which has to be offloaded onto async thread.
> + * @post_fn_ptr: Address of the location to hold the post_fn corresponding to
> + * the posix_fn
> + * @post_fn: The post processing function corresponding to the posix_fn.
> + *
> + * This function is a helper to offload posix_operation on to the asynchronous
> + * thread pool. It sets up the associations with the post_function that needs to
> + * be invoked by from the context of the iothread once the posix_fn has been
> + * executed.
> + */
> +static void v9fs_do_async_posix(ThreadletWork *work ,
> +                                void (*posix_fn)(ThreadletWork *work),
> +                                void (**post_fn_ptr)(void *arg),
> +                                void (*post_fn)(void *arg))
> +{
> +    *post_fn_ptr = post_fn;
> +    work->func = posix_fn;
> +    submit_work(work);
> +}
> +
>  static int omode_to_uflags(int8_t mode)
>  {
>     int ret = 0;
> @@ -3657,7 +3798,7 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
>     int i, len;
>     struct stat stat;
>     FsTypeEntry *fse;
> -
> +    int fds[2];
>
>     s = (V9fsState *)virtio_common_init("virtio-9p",
>                                     VIRTIO_ID_9P,
> @@ -3740,5 +3881,27 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
>                         s->tag_len;
>     s->vdev.get_config = virtio_9p_get_config;
>
> +    if (qemu_pipe(fds) == -1) {
> +        fprintf(stderr, "failed to create fd's for virtio-9p\n");
> +        exit(1);
> +    }
> +
> +    v9fs_async_struct.rfd = fds[0];
> +    v9fs_async_struct.wfd = fds[1];
> +
> +    printf("v9fs: rfd: %d\n", v9fs_async_struct.rfd);
> +    printf("v9fs: wfd: %d\n", v9fs_async_struct.wfd);
> +
> +    fcntl(fds[0], F_SETFL, O_NONBLOCK);
> +    fcntl(fds[1], F_SETFL, O_NONBLOCK);
> +
> +    qemu_set_fd_handler(fds[0], v9fs_process_post_ops, NULL, NULL);
> +    QTAILQ_INIT(&v9fs_async_struct.post_op_list);
> +    qemu_mutex_init(&(v9fs_async_struct.lock));
> +    /* Create async queue. */

Please delete this comment, it seems outdated.

> +
> +    (void)v9fs_do_async_posix;
> +    (void)v9fs_async_helper_done;
> +
>     return &s->vdev;
>  }
> diff --git a/posix-aio-compat.c b/posix-aio-compat.c
> index 9e02d18..8019be7 100644
> --- a/posix-aio-compat.c
> +++ b/posix-aio-compat.c
> @@ -260,6 +260,8 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
>     return nbytes;
>  }
>
> +static PosixAioState *posix_aio_state;
> +
>  static void aio_thread(ThreadletWork *work)
>  {
>     pid_t pid;
> @@ -290,6 +292,15 @@ static void aio_thread(ThreadletWork *work)
>     aiocb->ret = ret;
>     qemu_mutex_unlock(&aiocb_mutex);
>
> +    if (posix_aio_state) {

This test is not needed because posix_aio_state must be non-NULL here.

> +        char byte = 0;
> +        ssize_t ret;
> +
> +        ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
> +        if (ret < 0 && errno != EAGAIN)

Please always use if () { ... } curly braces.  QEMU coding style.

Stefan
jvrao Nov. 10, 2010, 1:41 a.m. UTC | #3
On 11/9/2010 1:06 AM, Stefan Hajnoczi wrote:
> On Mon, Nov 8, 2010 at 10:47 AM, Arun R Bharadwaj
> <arun@linux.vnet.ibm.com> wrote:
>> diff --git a/hw/virtio-9p.c b/hw/virtio-9p.c
>> index 7c59988..61b65fd 100644
>> --- a/hw/virtio-9p.c
>> +++ b/hw/virtio-9p.c
>> @@ -18,6 +18,7 @@
>>  #include "fsdev/qemu-fsdev.h"
>>  #include "virtio-9p-debug.h"
>>  #include "virtio-9p-xattr.h"
>> +#include "qemu-threadlets.h"
>>
>>  int debug_9p_pdu;
>>
>> @@ -33,6 +34,146 @@ enum {
>>     Oappend = 0x80,
>>  };
>>
>> +struct v9fs_post_op {
>> +    QTAILQ_ENTRY(v9fs_post_op) node;
>> +    void (*func)(void *arg);
>> +    void *arg;
>> +};
> 
> Structs are usually typedefed:
> typedef struct V9fsPostOp {
>     ...
> } V9fsPostOp;
> 
> QEMU usually doesn't use struct explicitly unless a library interface
> requires it.
> 
>> +
>> +static struct {
>> +    int rfd;
>> +    int wfd;
>> +    QemuMutex lock;
>> +    QTAILQ_HEAD(, v9fs_post_op) post_op_list;
>> +} v9fs_async_struct;
>> +
>> +static void die2(int err, const char *what)
>> +{
>> +    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
>> +    abort();
>> +}
>> +
>> +static void die(const char *what)
>> +{
>> +    die2(errno, what);
>> +}
>> +
>> +#define ASYNC_MAX_PROCESS   5
>> +
>> +/**
>> + * v9fs_process_post_ops: Process any pending v9fs_post_posix_operation
>> + * @arg: Not used.
>> + *
>> + * This function serves as a callback to the iothread to be called into whenever
>> + * the v9fs_async_struct.wfd is written into. This thread goes through the list
>> + * of v9fs_post_posix_operations() and executes them. In the process, it might
>> + * queue more job on the asynchronous thread pool.
>> + */
>> +static void v9fs_process_post_ops(void *arg)
>> +{
>> +    int count = 0;
>> +    struct v9fs_post_op *post_op;
>> +    int ret;
>> +    char byte;
>> +
>> +    qemu_mutex_lock(&v9fs_async_struct.lock);
>> +    do {
>> +        ret = read(v9fs_async_struct.rfd, &byte, sizeof(byte));
>> +    } while (ret >= 0 && errno != EAGAIN);
> 
> } while (ret > 1 || (ret == -1 && errno == EINTR)); ?

Not sure why we need to retry if ret > 1; all we need is

} while (ret == -1 && errno == EINTR);

> 
>> +
>> +    for (count = 0; count < ASYNC_MAX_PROCESS; count++) {
>> +        if (QTAILQ_EMPTY(&(v9fs_async_struct.post_op_list))) {
>> +           break;
>> +        }
>> +        post_op = QTAILQ_FIRST(&(v9fs_async_struct.post_op_list));
>> +        QTAILQ_REMOVE(&(v9fs_async_struct.post_op_list), post_op, node);
>> +
>> +        qemu_mutex_unlock(&v9fs_async_struct.lock);
>> +        post_op->func(post_op->arg);
>> +        qemu_free(post_op);
>> +        qemu_mutex_lock(&v9fs_async_struct.lock);
>> +    }
>> +    qemu_mutex_unlock(&v9fs_async_struct.lock);
>> +}
>> +
>> +/**
>> + * v9fs_async_signal: Inform the io-thread of completion of async job.
>> + *
>> + * This function is used to inform the iothread that a particular
>> + * async-operation pertaining to v9fs has been completed and that the io thread
>> + * can handle the v9fs_post_posix_operation.
>> + *
>> + * This is based on the aio_signal_handler
>> + */
>> +static inline void v9fs_async_signal(void)
>> +{
>> +    char byte = 0;
>> +    ssize_t ret;
>> +    int tries = 0;
>> +
>> +    qemu_mutex_lock(&v9fs_async_struct.lock);
>> +    do {
>> +        assert(tries != 100);
>> +       ret = write(v9fs_async_struct.wfd, &byte, sizeof(byte));
>> +        tries++;
>> +    } while (ret < 0 && errno == EAGAIN);
>> +    qemu_mutex_unlock(&v9fs_async_struct.lock);
>> +
>> +    if (ret < 0 && errno != EAGAIN)
>> +        die("write() in v9fs");
> 
> { } coding style
> 
>> +
>> +    if (kill(getpid(), SIGUSR2)) die("kill failed");
> 
> { } coding style
> 
>> +}
>> +
>> +/**
>> + * v9fs_async_helper_done: Marks the completion of the v9fs_async job
>> + * @func: v9fs_post_posix_func() for post-processing invoked in the context of
>> + *        the io-thread
>> + * @arg: Argument to func.
>> + *
>> + * This function is called from the context of one of the asynchronous threads
>> + * in the thread pool. This is called when the asynchronous thread has finished
>> + * executing a v9fs_posix_operation. It's purpose is to initiate the process of
>> + * informing the io-thread that the v9fs_posix_operation has completed.
>> + */
>> +static void v9fs_async_helper_done(void (*func)(void *arg), void *arg)
>> +{
>> +    struct v9fs_post_op *post_op;
>> +
>> +    post_op = qemu_mallocz(sizeof(*post_op));
>> +    post_op->func = func;
>> +    post_op->arg = arg;
>> +
>> +    qemu_mutex_lock(&v9fs_async_struct.lock);
>> +    QTAILQ_INSERT_TAIL(&(v9fs_async_struct.post_op_list), post_op, node);
>> +    qemu_mutex_unlock(&v9fs_async_struct.lock);
>> +
>> +    v9fs_async_signal();
>> +}
>> +
>> +/**
>> + * v9fs_do_async_posix: Offload v9fs_posix_operation onto async thread.
>> + * @vs: V9fsOPState variable for the OP operation.
>> + * @posix_fn: The posix function which has to be offloaded onto async thread.
>> + * @post_fn_ptr: Address of the location to hold the post_fn corresponding to
>> + * the posix_fn
>> + * @post_fn: The post processing function corresponding to the posix_fn.
>> + *
>> + * This function is a helper to offload posix_operation on to the asynchronous
>> + * thread pool. It sets up the associations with the post_function that needs to
>> + * be invoked by from the context of the iothread once the posix_fn has been
>> + * executed.
>> + */
>> +static void v9fs_do_async_posix(ThreadletWork *work ,
>> +                                void (*posix_fn)(ThreadletWork *work),
>> +                                void (**post_fn_ptr)(void *arg),
>> +                                void (*post_fn)(void *arg))
>> +{
>> +    *post_fn_ptr = post_fn;
>> +    work->func = posix_fn;
>> +    submit_work(work);
>> +}
>> +
>>  static int omode_to_uflags(int8_t mode)
>>  {
>>     int ret = 0;
>> @@ -3657,7 +3798,7 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
>>     int i, len;
>>     struct stat stat;
>>     FsTypeEntry *fse;
>> -
>> +    int fds[2];
>>
>>     s = (V9fsState *)virtio_common_init("virtio-9p",
>>                                     VIRTIO_ID_9P,
>> @@ -3740,5 +3881,27 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
>>                         s->tag_len;
>>     s->vdev.get_config = virtio_9p_get_config;
>>
>> +    if (qemu_pipe(fds) == -1) {
>> +        fprintf(stderr, "failed to create fd's for virtio-9p\n");
>> +        exit(1);
>> +    }
>> +
>> +    v9fs_async_struct.rfd = fds[0];
>> +    v9fs_async_struct.wfd = fds[1];
>> +
>> +    printf("v9fs: rfd: %d\n", v9fs_async_struct.rfd);
>> +    printf("v9fs: wfd: %d\n", v9fs_async_struct.wfd);
>> +
>> +    fcntl(fds[0], F_SETFL, O_NONBLOCK);
>> +    fcntl(fds[1], F_SETFL, O_NONBLOCK);
>> +
>> +    qemu_set_fd_handler(fds[0], v9fs_process_post_ops, NULL, NULL);
>> +    QTAILQ_INIT(&v9fs_async_struct.post_op_list);
>> +    qemu_mutex_init(&(v9fs_async_struct.lock));
>> +    /* Create async queue. */
> 
> Please delete this comment, it seems outdated.
> 
>> +
>> +    (void)v9fs_do_async_posix;
>> +    (void)v9fs_async_helper_done;
>> +
>>     return &s->vdev;
>>  }
>> diff --git a/posix-aio-compat.c b/posix-aio-compat.c
>> index 9e02d18..8019be7 100644
>> --- a/posix-aio-compat.c
>> +++ b/posix-aio-compat.c
>> @@ -260,6 +260,8 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
>>     return nbytes;
>>  }
>>
>> +static PosixAioState */;
>> +
>>  static void aio_thread(ThreadletWork *work)
>>  {
>>     pid_t pid;
>> @@ -290,6 +292,15 @@ static void aio_thread(ThreadletWork *work)
>>     aiocb->ret = ret;
>>     qemu_mutex_unlock(&aiocb_mutex);
>>
>> +    if (posix_aio_state) {
> 
> This test is not needed because posix_aio_state must be non-NULL here.

I see similar check in the old code also..

- JV

> 
>> +        char byte = 0;
>> +        ssize_t ret;
>> +
>> +        ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
>> +        if (ret < 0 && errno != EAGAIN)
> 
> Please always use if () { ... } curly braces.  QEMU coding style.
> 
> Stefan
>
Stefan Hajnoczi Nov. 10, 2010, 9:37 a.m. UTC | #4
On Wed, Nov 10, 2010 at 1:41 AM, Venkateswararao Jujjuri (JV)
<jvrao@linux.vnet.ibm.com> wrote:
> On 11/9/2010 1:06 AM, Stefan Hajnoczi wrote:
>> On Mon, Nov 8, 2010 at 10:47 AM, Arun R Bharadwaj
>> <arun@linux.vnet.ibm.com> wrote:
>>> +
>>> +static struct {
>>> +    int rfd;
>>> +    int wfd;
>>> +    QemuMutex lock;
>>> +    QTAILQ_HEAD(, v9fs_post_op) post_op_list;
>>> +} v9fs_async_struct;
>>> +
>>> +static void die2(int err, const char *what)
>>> +{
>>> +    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
>>> +    abort();
>>> +}
>>> +
>>> +static void die(const char *what)
>>> +{
>>> +    die2(errno, what);
>>> +}
>>> +
>>> +#define ASYNC_MAX_PROCESS   5
>>> +
>>> +/**
>>> + * v9fs_process_post_ops: Process any pending v9fs_post_posix_operation
>>> + * @arg: Not used.
>>> + *
>>> + * This function serves as a callback to the iothread to be called into whenever
>>> + * the v9fs_async_struct.wfd is written into. This thread goes through the list
>>> + * of v9fs_post_posix_operations() and executes them. In the process, it might
>>> + * queue more job on the asynchronous thread pool.
>>> + */
>>> +static void v9fs_process_post_ops(void *arg)
>>> +{
>>> +    int count = 0;
>>> +    struct v9fs_post_op *post_op;
>>> +    int ret;
>>> +    char byte;
>>> +
>>> +    qemu_mutex_lock(&v9fs_async_struct.lock);
>>> +    do {
>>> +        ret = read(v9fs_async_struct.rfd, &byte, sizeof(byte));
>>> +    } while (ret >= 0 && errno != EAGAIN);
>>
>> } while (ret > 1 || (ret == -1 && errno == EINTR)); ?
>
> Not sure why we need to retry if ret > 1; all we need is
>
> } while (ret == -1 && errno == EINTR);

We've been notified.  Now we want to drain the pipe fully in case we
were notified multiple times and bytes have been buffered up before we
got around to processing pending post_ops.

>>> diff --git a/posix-aio-compat.c b/posix-aio-compat.c
>>> index 9e02d18..8019be7 100644
>>> --- a/posix-aio-compat.c
>>> +++ b/posix-aio-compat.c
>>> @@ -260,6 +260,8 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
>>>     return nbytes;
>>>  }
>>>
>>> +static PosixAioState */;
>>> +
>>>  static void aio_thread(ThreadletWork *work)
>>>  {
>>>     pid_t pid;
>>> @@ -290,6 +292,15 @@ static void aio_thread(ThreadletWork *work)
>>>     aiocb->ret = ret;
>>>     qemu_mutex_unlock(&aiocb_mutex);
>>>
>>> +    if (posix_aio_state) {
>>
>> This test is not needed because posix_aio_state must be non-NULL here.
>
> I see similar check in the old code also..

The old code was in a signal handler, which means it could be called
at an inconvenient moment - before posix_aio_state has been
initialized.  But now we're guaranteed that posix_aio_state is
initialized because we'd never get to aio_thread() otherwise.

Stefan
diff mbox

Patch

diff --git a/hw/virtio-9p.c b/hw/virtio-9p.c
index 7c59988..61b65fd 100644
--- a/hw/virtio-9p.c
+++ b/hw/virtio-9p.c
@@ -18,6 +18,7 @@ 
 #include "fsdev/qemu-fsdev.h"
 #include "virtio-9p-debug.h"
 #include "virtio-9p-xattr.h"
+#include "qemu-threadlets.h"
 
 int debug_9p_pdu;
 
@@ -33,6 +34,146 @@  enum {
     Oappend = 0x80,
 };
 
+struct v9fs_post_op {
+    QTAILQ_ENTRY(v9fs_post_op) node;
+    void (*func)(void *arg);
+    void *arg;
+};
+
+static struct {
+    int rfd;
+    int wfd;
+    QemuMutex lock;
+    QTAILQ_HEAD(, v9fs_post_op) post_op_list;
+} v9fs_async_struct;
+
+static void die2(int err, const char *what)
+{
+    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
+    abort();
+}
+
+static void die(const char *what)
+{
+    die2(errno, what);
+}
+
+#define ASYNC_MAX_PROCESS   5
+
+/**
+ * v9fs_process_post_ops: Process any pending v9fs_post_posix_operation
+ * @arg: Not used.
+ *
+ * This function serves as a callback to the iothread to be called into whenever
+ * the v9fs_async_struct.wfd is written into. This thread goes through the list
+ * of v9fs_post_posix_operations() and executes them. In the process, it might
+ * queue more job on the asynchronous thread pool.
+ */
+static void v9fs_process_post_ops(void *arg)
+{
+    int count = 0;
+    struct v9fs_post_op *post_op;
+    int ret;
+    char byte;
+
+    qemu_mutex_lock(&v9fs_async_struct.lock);
+    do {
+        ret = read(v9fs_async_struct.rfd, &byte, sizeof(byte));
+    } while (ret >= 0 && errno != EAGAIN);
+
+    for (count = 0; count < ASYNC_MAX_PROCESS; count++) {
+        if (QTAILQ_EMPTY(&(v9fs_async_struct.post_op_list))) {
+           break;
+        }
+        post_op = QTAILQ_FIRST(&(v9fs_async_struct.post_op_list));
+        QTAILQ_REMOVE(&(v9fs_async_struct.post_op_list), post_op, node);
+
+        qemu_mutex_unlock(&v9fs_async_struct.lock);
+        post_op->func(post_op->arg);
+        qemu_free(post_op);
+        qemu_mutex_lock(&v9fs_async_struct.lock);
+    }
+    qemu_mutex_unlock(&v9fs_async_struct.lock);
+}
+
+/**
+ * v9fs_async_signal: Inform the io-thread of completion of async job.
+ *
+ * This function is used to inform the iothread that a particular
+ * async-operation pertaining to v9fs has been completed and that the io thread
+ * can handle the v9fs_post_posix_operation.
+ *
+ * This is based on the aio_signal_handler
+ */
+static inline void v9fs_async_signal(void)
+{
+    char byte = 0;
+    ssize_t ret;
+    int tries = 0;
+
+    qemu_mutex_lock(&v9fs_async_struct.lock);
+    do {
+        assert(tries != 100);
+       ret = write(v9fs_async_struct.wfd, &byte, sizeof(byte));
+        tries++;
+    } while (ret < 0 && errno == EAGAIN);
+    qemu_mutex_unlock(&v9fs_async_struct.lock);
+
+    if (ret < 0 && errno != EAGAIN)
+        die("write() in v9fs");
+
+    if (kill(getpid(), SIGUSR2)) die("kill failed");
+}
+
+/**
+ * v9fs_async_helper_done: Marks the completion of the v9fs_async job
+ * @func: v9fs_post_posix_func() for post-processing invoked in the context of
+ *        the io-thread
+ * @arg: Argument to func.
+ *
+ * This function is called from the context of one of the asynchronous threads
+ * in the thread pool. This is called when the asynchronous thread has finished
+ * executing a v9fs_posix_operation. It's purpose is to initiate the process of
+ * informing the io-thread that the v9fs_posix_operation has completed.
+ */
+static void v9fs_async_helper_done(void (*func)(void *arg), void *arg)
+{
+    struct v9fs_post_op *post_op;
+
+    post_op = qemu_mallocz(sizeof(*post_op));
+    post_op->func = func;
+    post_op->arg = arg;
+
+    qemu_mutex_lock(&v9fs_async_struct.lock);
+    QTAILQ_INSERT_TAIL(&(v9fs_async_struct.post_op_list), post_op, node);
+    qemu_mutex_unlock(&v9fs_async_struct.lock);
+
+    v9fs_async_signal();
+}
+
+/**
+ * v9fs_do_async_posix: Offload v9fs_posix_operation onto async thread.
+ * @vs: V9fsOPState variable for the OP operation.
+ * @posix_fn: The posix function which has to be offloaded onto async thread.
+ * @post_fn_ptr: Address of the location to hold the post_fn corresponding to
+ * the posix_fn
+ * @post_fn: The post processing function corresponding to the posix_fn.
+ *
+ * This function is a helper to offload posix_operation on to the asynchronous
+ * thread pool. It sets up the associations with the post_function that needs to
+ * be invoked by from the context of the iothread once the posix_fn has been
+ * executed.
+ */
+static void v9fs_do_async_posix(ThreadletWork *work ,
+                                void (*posix_fn)(ThreadletWork *work),
+                                void (**post_fn_ptr)(void *arg),
+                                void (*post_fn)(void *arg))
+{
+    *post_fn_ptr = post_fn;
+    work->func = posix_fn;
+    submit_work(work);
+}
+
 static int omode_to_uflags(int8_t mode)
 {
     int ret = 0;
@@ -3657,7 +3798,7 @@  VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
     int i, len;
     struct stat stat;
     FsTypeEntry *fse;
-
+    int fds[2];
 
     s = (V9fsState *)virtio_common_init("virtio-9p",
                                     VIRTIO_ID_9P,
@@ -3740,5 +3881,27 @@  VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
                         s->tag_len;
     s->vdev.get_config = virtio_9p_get_config;
 
+    if (qemu_pipe(fds) == -1) {
+        fprintf(stderr, "failed to create fd's for virtio-9p\n");
+        exit(1);
+    }
+
+    v9fs_async_struct.rfd = fds[0];
+    v9fs_async_struct.wfd = fds[1];
+
+    printf("v9fs: rfd: %d\n", v9fs_async_struct.rfd);
+    printf("v9fs: wfd: %d\n", v9fs_async_struct.wfd);
+
+    fcntl(fds[0], F_SETFL, O_NONBLOCK);
+    fcntl(fds[1], F_SETFL, O_NONBLOCK);
+
+    qemu_set_fd_handler(fds[0], v9fs_process_post_ops, NULL, NULL);
+    QTAILQ_INIT(&v9fs_async_struct.post_op_list);
+    qemu_mutex_init(&(v9fs_async_struct.lock));
+    /* Create async queue. */
+
+    (void)v9fs_do_async_posix;
+    (void)v9fs_async_helper_done;
+
     return &s->vdev;
 }
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 9e02d18..8019be7 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -260,6 +260,8 @@  static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
     return nbytes;
 }
 
+static PosixAioState *posix_aio_state;
+
 static void aio_thread(ThreadletWork *work)
 {
     pid_t pid;
@@ -290,6 +292,15 @@  static void aio_thread(ThreadletWork *work)
     aiocb->ret = ret;
     qemu_mutex_unlock(&aiocb_mutex);
 
+    if (posix_aio_state) {
+        char byte = 0;
+        ssize_t ret;
+
+        ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
+        if (ret < 0 && errno != EAGAIN)
+            die("write()");
+    }
+
     if (kill(pid, aiocb->ev_signo)) {
          die("kill failed");
     }
@@ -408,22 +419,6 @@  static int posix_aio_flush(void *opaque)
     return !!s->first_aio;
 }
 
-static PosixAioState *posix_aio_state;
-
-static void aio_signal_handler(int signum)
-{
-    if (posix_aio_state) {
-        char byte = 0;
-        ssize_t ret;
-
-        ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
-        if (ret < 0 && errno != EAGAIN)
-            die("write()");
-    }
-
-    qemu_service_io();
-}
-
 static void paio_remove(struct qemu_paiocb *acb)
 {
     struct qemu_paiocb **pacb;
@@ -527,7 +522,6 @@  BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
 
 int paio_init(void)
 {
-    struct sigaction act;
     PosixAioState *s;
     int fds[2];
 
@@ -538,11 +532,6 @@  int paio_init(void)
 
     s = qemu_malloc(sizeof(PosixAioState));
 
-    sigfillset(&act.sa_mask);
-    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
-    act.sa_handler = aio_signal_handler;
-    sigaction(SIGUSR2, &act, NULL);
-
     s->first_aio = NULL;
     if (qemu_pipe(fds) == -1) {
         fprintf(stderr, "failed to create pipe\n");
diff --git a/qemu-threadlets.c b/qemu-threadlets.c
index a79062c..38b758a 100644
--- a/qemu-threadlets.c
+++ b/qemu-threadlets.c
@@ -15,12 +15,28 @@ 
 
 #include "qemu-threadlets.h"
 #include "osdep.h"
+#include <signal.h>
 
 #define MAX_GLOBAL_THREADS  64
 #define MIN_GLOBAL_THREADS   8
 static ThreadletQueue globalqueue;
 static int globalqueue_init;
 
+static void threadlet_io_completion_signal_handler(int signum)
+{
+    qemu_service_io();
+}
+
+static void threadlet_register_signal_handler(void)
+{
+    struct sigaction act;
+
+    sigfillset(&act.sa_mask);
+    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
+    act.sa_handler = threadlet_io_completion_signal_handler;
+    sigaction(SIGUSR2, &act, NULL);
+}
+
 static void *threadlet_worker(void *data)
 {
     ThreadletQueue *queue = data;
@@ -165,3 +181,8 @@  void threadlet_queue_init(ThreadletQueue *queue,
     qemu_mutex_init(&queue->lock);
     qemu_cond_init(&queue->cond);
 }
+
+void threadlet_init(void)
+{
+   threadlet_register_signal_handler();
+}
diff --git a/qemu-threadlets.h b/qemu-threadlets.h
index cdb5faa..5b125cd 100644
--- a/qemu-threadlets.h
+++ b/qemu-threadlets.h
@@ -45,4 +45,5 @@  extern int dequeue_work_on_queue(ThreadletQueue *queue,
 extern int dequeue_work(ThreadletWork *work);
 extern void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
                                  int min_threads);
+extern void threadlet_init(void);
 #endif
diff --git a/vl.c b/vl.c
index df414ef..7b9a425 100644
--- a/vl.c
+++ b/vl.c
@@ -148,6 +148,7 @@  int main(int argc, char **argv)
 #include "qemu-config.h"
 #include "qemu-objects.h"
 #include "qemu-options.h"
+#include "qemu-threadlets.h"
 #ifdef CONFIG_VIRTFS
 #include "fsdev/qemu-fsdev.h"
 #endif
@@ -2922,6 +2923,8 @@  int main(int argc, char **argv, char **envp)
             exit(1);
     }
 
+    threadlet_init();
+
     /* init generic devices */
     if (qemu_opts_foreach(qemu_find_opts("device"), device_init_func, NULL, 1) != 0)
         exit(1);