diff mbox

[V3,2/3] qemu: Generic asynchronous threading framework to offload tasks

Message ID 20100603085623.25546.14585.stgit@localhost.localdomain
State New
Headers show

Commit Message

Gautham R Shenoy June 3, 2010, 8:56 a.m. UTC
From: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>

This patch creates a generic asynchronous-task-offloading infrastructure. It's
extracted out of the threading framework that is being used by paio.

The reason for extracting out this generic infrastructure of the
posix-aio-compat.c is so that other subsystems, such as virtio-9p could make use
of it for offloading tasks that could block.

[ego@in.ibm.com: work_item_pool, async_work_init, async_work_release,
async_cancel_work]

Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
---
 Makefile.objs |    3 +
 async-work.c  |  136 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 async-work.h  |   85 ++++++++++++++++++++++++++++++++++++
 3 files changed, 223 insertions(+), 1 deletions(-)
 create mode 100644 async-work.c
 create mode 100644 async-work.h

Comments

Corentin Chary June 3, 2010, 11:41 a.m. UTC | #1
On Thu, Jun 3, 2010 at 10:56 AM, Gautham R Shenoy <ego@in.ibm.com> wrote:
> From: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
>
> This patch creates a generic asynchronous-task-offloading infrastructure. It's
> extracted out of the threading framework that is being used by paio.
>
> The reason for extracting out this generic infrastructure of the
> posix-aio-compat.c is so that other subsystems, such as virtio-9p could make use
> of it for offloading tasks that could block.
>
> [ego@in.ibm.com: work_item_pool, async_work_init, async_work_release,
> async_cancel_work]
>
> Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
> Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
> ---
>  Makefile.objs |    3 +
>  async-work.c  |  136 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>  async-work.h  |   85 ++++++++++++++++++++++++++++++++++++
>  3 files changed, 223 insertions(+), 1 deletions(-)
>  create mode 100644 async-work.c
>  create mode 100644 async-work.h
>
> diff --git a/Makefile.objs b/Makefile.objs
> index ecdd53e..fd5ea4d 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -9,6 +9,8 @@ qobject-obj-y += qerror.o
>
>  block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
>  block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
> +block-obj-y += qemu-thread.o
> +block-obj-y += async-work.o
>  block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
>  block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
>
> @@ -108,7 +110,6 @@ common-obj-y += iov.o
>  common-obj-$(CONFIG_VNC_TLS) += vnc-tls.o vnc-auth-vencrypt.o
>  common-obj-$(CONFIG_VNC_SASL) += vnc-auth-sasl.o
>  common-obj-$(CONFIG_COCOA) += cocoa.o
> -common-obj-$(CONFIG_IOTHREAD) += qemu-thread.o
>  common-obj-y += notify.o event_notifier.o
>  common-obj-y += qemu-timer.o
>
> diff --git a/async-work.c b/async-work.c
> new file mode 100644
> index 0000000..0675732
> --- /dev/null
> +++ b/async-work.c
> @@ -0,0 +1,136 @@
> +/*
> + * Async work support
> + *
> + * Copyright IBM, Corp. 2010
> + *
> + * Authors:
> + *  Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2.  See
> + * the COPYING file in the top-level directory.
> + *
> + */
> +#include <stdio.h>
> +#include <errno.h>
> +#include <string.h>
> +#include <stdlib.h>
> +#include <signal.h>
> +#include "async-work.h"
> +#include "osdep.h"
> +
> +static void async_abort(int err, const char *what)
> +{
> +    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
> +    abort();
> +}
> +
> +static void *async_worker_thread(void *data)
> +{
> +    struct async_queue *queue = data;
> +
> +    while (1) {
> +        struct work_item *work;
> +        int ret = 0;
> +        qemu_mutex_lock(&(queue->lock));
> +
> +        while (QTAILQ_EMPTY(&(queue->request_list)) &&
> +               (ret != ETIMEDOUT)) {
> +            ret = qemu_cond_timedwait(&(queue->cond),
> +                                        &(queue->lock), 10*100000);
> +        }
> +
> +        if (QTAILQ_EMPTY(&(queue->request_list)))
> +            goto check_exit;
> +
> +        work = QTAILQ_FIRST(&(queue->request_list));
> +        QTAILQ_REMOVE(&(queue->request_list), work, node);
> +        queue->idle_threads--;
> +        qemu_mutex_unlock(&(queue->lock));
> +
> +        /* execute the work function */
> +        work->func(work);

Here the queue is empty, but there is a job running. In the VNC server
I need to be able to "join" jobs (before resize, deconnection, etc..).
What do you thing about adding something like qemu_async_join(queue,
work) (if work is null, join all job) ?

> +        async_work_release(queue, work);
> +
> +        qemu_mutex_lock(&(queue->lock));
> +        queue->idle_threads++;
> +
> +check_exit:
> +        if ((queue->idle_threads > 0) &&
> +            (queue->cur_threads > queue->min_threads)) {
> +            /* we retain minimum number of threads */
> +            break;
> +        }
> +        qemu_mutex_unlock(&(queue->lock));
> +    }
> +
> +    queue->idle_threads--;
> +    queue->cur_threads--;
> +    qemu_mutex_unlock(&(queue->lock));
> +
> +    return NULL;
> +}
> +
> +static void spawn_async_thread(struct async_queue *queue)
> +{
> +    QemuThreadAttr attr;
> +    QemuThread thread;
> +    sigset_t set, oldset;
> +
> +    queue->cur_threads++;
> +    queue->idle_threads++;
> +
> +    qemu_thread_attr_init(&attr);
> +
> +    /* create a detached thread so that we don't need to wait on it */
> +    qemu_thread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
> +
> +    /* block all signals */
> +    if (sigfillset(&set)) {
> +        async_abort(errno, "sigfillset");
> +    }
> +
> +    if (sigprocmask(SIG_SETMASK, &set, &oldset)) {
> +        async_abort(errno, "sigprocmask");
> +    }
> +
> +    qemu_thread_create_attr(&thread, &attr, async_worker_thread, queue);
> +
> +    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) {
> +        async_abort(errno, "sigprocmask restore");
> +    }

Using PTHREAD_CREATE_DETACHED and signal stuff here doesn't looks
really portable. Can't we abstract that into qemu-thread (then, we
just need port qemu-thread to windows) ?

> +}
> +
> +void qemu_async_submit(struct async_queue *queue, struct work_item *work)
> +{
> +    qemu_mutex_lock(&(queue->lock));
> +    if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
> +        spawn_async_thread(queue);
> +    }
> +    QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
> +    qemu_mutex_unlock(&(queue->lock));
> +    qemu_cond_signal(&(queue->cond));
> +}
> +
> +int qemu_async_cancel_work(struct async_queue *queue, struct work_item *work)
> +{
> +    struct work_item *ret_work;
> +    int found = 0;
> +
> +    qemu_mutex_lock(&(queue->lock));
> +    QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
> +        if (ret_work == work) {
> +            QTAILQ_REMOVE(&(queue->request_list), ret_work, node);
> +            found = 1;
> +            break;
> +        }
> +    }
> +    qemu_mutex_unlock(&(queue->lock));
> +
> +    if (found) {
> +        async_work_release(queue, work);
> +        return 0;
> +    }
> +
> +    return 1;
> +}
> +
> diff --git a/async-work.h b/async-work.h
> new file mode 100644
> index 0000000..8389f56
> --- /dev/null
> +++ b/async-work.h
> @@ -0,0 +1,85 @@
> +/*
> + * Async work support
> + *
> + * Copyright IBM, Corp. 2010
> + *
> + * Authors:
> + *  Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2.  See
> + * the COPYING file in the top-level directory.
> + *
> + */
> +#ifndef QEMU_ASYNC_WORK_H
> +#define QEMU_ASYNC_WORK_H
> +
> +#include "qemu-queue.h"
> +#include "qemu-common.h"
> +#include "qemu-thread.h"
> +
> +struct async_queue
> +{
> +    QemuMutex lock;
> +    QemuCond cond;
> +    int max_threads;
> +    int min_threads;
> +    int cur_threads;
> +    int idle_threads;
> +    QTAILQ_HEAD(, work_item) request_list;
> +    QTAILQ_HEAD(, work_item) work_item_pool;
> +};
> +
> +struct work_item
> +{
> +    QTAILQ_ENTRY(work_item) node;
> +    void (*func)(struct work_item *work);
> +    void *private;
> +};
> +
> +static inline void async_queue_init(struct async_queue *queue,
> +                                   int max_threads, int min_threads)
> +{
> +    queue->cur_threads  = 0;
> +    queue->idle_threads = 0;
> +    queue->max_threads  = max_threads;
> +    queue->min_threads  = min_threads;
> +    QTAILQ_INIT(&(queue->request_list));
> +    QTAILQ_INIT(&(queue->work_item_pool));
> +    qemu_mutex_init(&(queue->lock));
> +    qemu_cond_init(&(queue->cond));
> +}
> +
> +static inline struct work_item *async_work_init(struct async_queue *queue,
> +                                  void (*func)(struct work_item *),
> +                                  void *data)
> +{
> +    struct work_item *work;
> +    qemu_mutex_lock(&(queue->lock));
> +    if (QTAILQ_EMPTY(&(queue->work_item_pool))) {
> +        work = qemu_mallocz(sizeof(*work));
> +    } else {
> +        work = QTAILQ_FIRST(&(queue->work_item_pool));
> +        QTAILQ_REMOVE(&(queue->work_item_pool), work, node);
> +    }
> +
> +    work->func  = func;
> +    work->private  = data;
> +    qemu_mutex_unlock(&(queue->lock));
> +
> +    return work;
> +}
> +
> +static inline void async_work_release(struct async_queue *queue,
> +                                        struct work_item *work)
> +{
> +    qemu_mutex_lock(&(queue->lock));
> +    QTAILQ_INSERT_TAIL(&(queue->work_item_pool), work, node);
> +    qemu_mutex_unlock(&(queue->lock));
> +}
> +
> +extern void qemu_async_submit(struct async_queue *queue,
> +                                 struct work_item *work);
> +
> +extern int qemu_async_cancel_work(struct async_queue *queue,
> +                    struct work_item *work);
> +#endif
>
>
Paolo Bonzini June 3, 2010, 12:37 p.m. UTC | #2
On 06/03/2010 01:41 PM, Corentin Chary wrote:
>>  +    if (sigprocmask(SIG_SETMASK,&set,&oldset)) {
>>  +        async_abort(errno, "sigprocmask");
>>  +    }
>>  +
>>  +    qemu_thread_create_attr(&thread,&attr, async_worker_thread, queue);
>>  +
>>  +    if (sigprocmask(SIG_SETMASK,&oldset, NULL)) {
>>  +        async_abort(errno, "sigprocmask restore");
>>  +    }

I wonder if qemu_thread_create shouldn't block all signals by default. 
Then the cpu and iothreads can unblock whatever they want.

I'll send a patch shortly.

In any case, please use pthread_sigmask instead of sigprocmask.

Paolo
Anthony Liguori June 4, 2010, 1:16 p.m. UTC | #3
On 06/03/2010 03:56 AM, Gautham R Shenoy wrote:
> From: Aneesh Kumar K.V<aneesh.kumar@linux.vnet.ibm.com>
>
> This patch creates a generic asynchronous-task-offloading infrastructure. It's
> extracted out of the threading framework that is being used by paio.
>
> The reason for extracting out this generic infrastructure of the
> posix-aio-compat.c is so that other subsystems, such as virtio-9p could make use
> of it for offloading tasks that could block.
>
> [ego@in.ibm.com: work_item_pool, async_work_init, async_work_release,
> async_cancel_work]
>
> Signed-off-by: Aneesh Kumar K.V<aneesh.kumar@linux.vnet.ibm.com>
> Signed-off-by: Gautham R Shenoy<ego@in.ibm.com>
> ---
>   Makefile.objs |    3 +
>   async-work.c  |  136 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>   async-work.h  |   85 ++++++++++++++++++++++++++++++++++++
>   3 files changed, 223 insertions(+), 1 deletions(-)
>   create mode 100644 async-work.c
>   create mode 100644 async-work.h
>
> diff --git a/Makefile.objs b/Makefile.objs
> index ecdd53e..fd5ea4d 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -9,6 +9,8 @@ qobject-obj-y += qerror.o
>
>   block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
>   block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
> +block-obj-y += qemu-thread.o
> +block-obj-y += async-work.o
>   block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
>   block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
>
> @@ -108,7 +110,6 @@ common-obj-y += iov.o
>   common-obj-$(CONFIG_VNC_TLS) += vnc-tls.o vnc-auth-vencrypt.o
>   common-obj-$(CONFIG_VNC_SASL) += vnc-auth-sasl.o
>   common-obj-$(CONFIG_COCOA) += cocoa.o
> -common-obj-$(CONFIG_IOTHREAD) += qemu-thread.o
>   common-obj-y += notify.o event_notifier.o
>   common-obj-y += qemu-timer.o
>
> diff --git a/async-work.c b/async-work.c
> new file mode 100644
> index 0000000..0675732
> --- /dev/null
> +++ b/async-work.c
> @@ -0,0 +1,136 @@
> +/*
> + * Async work support
> + *
> + * Copyright IBM, Corp. 2010
> + *
> + * Authors:
> + *  Aneesh Kumar K.V<aneesh.kumar@linux.vnet.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2.  See
> + * the COPYING file in the top-level directory.
>    

Please preserve the original copyright of the copied code.

> + */
> +#include<stdio.h>
> +#include<errno.h>
> +#include<string.h>
> +#include<stdlib.h>
> +#include<signal.h>
>    

qemu-common.h should have all of these.  Generally, you should avoid 
including system headers because qemu headers take care of portability.

> +#include "async-work.h"
> +#include "osdep.h"
> +
> +static void async_abort(int err, const char *what)
> +{
> +    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
> +    abort();
> +}
> +
> +static void *async_worker_thread(void *data)
> +{
> +    struct async_queue *queue = data;
> +
> +    while (1) {
> +        struct work_item *work;
> +        int ret = 0;
> +        qemu_mutex_lock(&(queue->lock));
> +
> +        while (QTAILQ_EMPTY(&(queue->request_list))&&
> +               (ret != ETIMEDOUT)) {
> +            ret = qemu_cond_timedwait(&(queue->cond),
> +					&(queue->lock), 10*100000);
> +        }
> +
> +        if (QTAILQ_EMPTY(&(queue->request_list)))
> +            goto check_exit;
> +
> +        work = QTAILQ_FIRST(&(queue->request_list));
> +        QTAILQ_REMOVE(&(queue->request_list), work, node);
> +        queue->idle_threads--;
> +        qemu_mutex_unlock(&(queue->lock));
> +
> +        /* execute the work function */
> +        work->func(work);
> +        async_work_release(queue, work);
> +
> +        qemu_mutex_lock(&(queue->lock));
> +        queue->idle_threads++;
> +
> +check_exit:
> +        if ((queue->idle_threads>  0)&&
> +            (queue->cur_threads>  queue->min_threads)) {
> +            /* we retain minimum number of threads */
> +            break;
> +        }
> +        qemu_mutex_unlock(&(queue->lock));
> +    }
> +
> +    queue->idle_threads--;
> +    queue->cur_threads--;
> +    qemu_mutex_unlock(&(queue->lock));
> +
> +    return NULL;
> +}
> +
> +static void spawn_async_thread(struct async_queue *queue)
> +{
> +    QemuThreadAttr attr;
> +    QemuThread thread;
> +    sigset_t set, oldset;
> +
> +    queue->cur_threads++;
> +    queue->idle_threads++;
> +
> +    qemu_thread_attr_init(&attr);
> +
> +    /* create a detached thread so that we don't need to wait on it */
> +    qemu_thread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
> +
> +    /* block all signals */
> +    if (sigfillset(&set)) {
> +        async_abort(errno, "sigfillset");
> +    }
> +
> +    if (sigprocmask(SIG_SETMASK,&set,&oldset)) {
> +        async_abort(errno, "sigprocmask");
> +    }
> +
> +    qemu_thread_create_attr(&thread,&attr, async_worker_thread, queue);
> +
> +    if (sigprocmask(SIG_SETMASK,&oldset, NULL)) {
> +        async_abort(errno, "sigprocmask restore");
> +    }
> +}
> +
> +void qemu_async_submit(struct async_queue *queue, struct work_item *work)
> +{
> +    qemu_mutex_lock(&(queue->lock));
> +    if (queue->idle_threads == 0&&  queue->cur_threads<  queue->max_threads) {
> +        spawn_async_thread(queue);
> +    }
> +    QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
> +    qemu_mutex_unlock(&(queue->lock));
> +    qemu_cond_signal(&(queue->cond));
> +}
> +
> +int qemu_async_cancel_work(struct async_queue *queue, struct work_item *work)
> +{
> +    struct work_item *ret_work;
> +    int found = 0;
> +
> +    qemu_mutex_lock(&(queue->lock));
> +    QTAILQ_FOREACH(ret_work,&(queue->request_list), node) {
> +        if (ret_work == work) {
> +            QTAILQ_REMOVE(&(queue->request_list), ret_work, node);
> +            found = 1;
> +            break;
> +        }
> +    }
> +    qemu_mutex_unlock(&(queue->lock));
> +
> +    if (found) {
> +        async_work_release(queue, work);
> +        return 0;
> +    }
> +
> +    return 1;
> +}
> +
> diff --git a/async-work.h b/async-work.h
> new file mode 100644
> index 0000000..8389f56
> --- /dev/null
> +++ b/async-work.h
> @@ -0,0 +1,85 @@
> +/*
> + * Async work support
> + *
> + * Copyright IBM, Corp. 2010
> + *
> + * Authors:
> + *  Aneesh Kumar K.V<aneesh.kumar@linux.vnet.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2.  See
> + * the COPYING file in the top-level directory.
> + *
> + */
> +#ifndef QEMU_ASYNC_WORK_H
> +#define QEMU_ASYNC_WORK_H
> +
> +#include "qemu-queue.h"
> +#include "qemu-common.h"
> +#include "qemu-thread.h"
> +
> +struct async_queue
> +{
> +    QemuMutex lock;
> +    QemuCond cond;
> +    int max_threads;
> +    int min_threads;
> +    int cur_threads;
> +    int idle_threads;
> +    QTAILQ_HEAD(, work_item) request_list;
> +    QTAILQ_HEAD(, work_item) work_item_pool;
> +};
> +
> +struct work_item
> +{
> +    QTAILQ_ENTRY(work_item) node;
> +    void (*func)(struct work_item *work);
> +    void *private;
> +};
>    

Structs are not named in accordance to CODING_STYLE.

> +static inline void async_queue_init(struct async_queue *queue,
> +				    int max_threads, int min_threads)
> +{
> +    queue->cur_threads  = 0;
> +    queue->idle_threads = 0;
> +    queue->max_threads  = max_threads;
> +    queue->min_threads  = min_threads;
> +    QTAILQ_INIT(&(queue->request_list));
> +    QTAILQ_INIT(&(queue->work_item_pool));
> +    qemu_mutex_init(&(queue->lock));
> +    qemu_cond_init(&(queue->cond));
> +}
>    

I'd prefer there be a single queue that everything used verses multiple 
queues.  Otherwise, we'll end up having per device queues and my concern 
is that we'll end up with thousands and thousands of threads with no 
central place to tune the maximum thread number.

> +static inline struct work_item *async_work_init(struct async_queue *queue,
> +				   void (*func)(struct work_item *),
> +				   void *data)
>    

I'd suggest actually using a Notifier as the worker or at least 
something that looks exactly like it.  There's no need to pass a void * 
because more often than not, a caller just wants to pass a state 
structure anyway and they can embed the Notifier within the structure.  IOW:

async_work_submit(queue, &s->worker);

Then in the callback:

DeviceState *s = container_of(worker, DeviceState, worker);

I don't think the name makes the most sense either.  I think something like:

threadlet_submit()

Would work best.  It would be good for there to be a big comment warning 
that the routine does not run with the qemu_mutex and therefore cannot 
make use of any qemu functions without very special consideration.


There shouldn't need to be an explicit init vs. submit function either.

Regards,

Anthony Liguori
Corentin Chary June 5, 2010, 7:03 a.m. UTC | #4
On Fri, Jun 4, 2010 at 3:16 PM, Anthony Liguori <anthony@codemonkey.ws> wrote:
> On 06/03/2010 03:56 AM, Gautham R Shenoy wrote:
>>
>> From: Aneesh Kumar K.V<aneesh.kumar@linux.vnet.ibm.com>
>>
>> This patch creates a generic asynchronous-task-offloading infrastructure.
>> It's
>> extracted out of the threading framework that is being used by paio.
>>
>> The reason for extracting out this generic infrastructure of the
>> posix-aio-compat.c is so that other subsystems, such as virtio-9p could
>> make use
>> of it for offloading tasks that could block.
>>
>> [ego@in.ibm.com: work_item_pool, async_work_init, async_work_release,
>> async_cancel_work]
>>
>> Signed-off-by: Aneesh Kumar K.V<aneesh.kumar@linux.vnet.ibm.com>
>> Signed-off-by: Gautham R Shenoy<ego@in.ibm.com>
>> ---
>>  Makefile.objs |    3 +
>>  async-work.c  |  136
>> +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>>  async-work.h  |   85 ++++++++++++++++++++++++++++++++++++
>>  3 files changed, 223 insertions(+), 1 deletions(-)
>>  create mode 100644 async-work.c
>>  create mode 100644 async-work.h
>>
>> diff --git a/Makefile.objs b/Makefile.objs
>> index ecdd53e..fd5ea4d 100644
>> --- a/Makefile.objs
>> +++ b/Makefile.objs
>> @@ -9,6 +9,8 @@ qobject-obj-y += qerror.o
>>
>>  block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
>>  block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
>> +block-obj-y += qemu-thread.o
>> +block-obj-y += async-work.o
>>  block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
>>  block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
>>
>> @@ -108,7 +110,6 @@ common-obj-y += iov.o
>>  common-obj-$(CONFIG_VNC_TLS) += vnc-tls.o vnc-auth-vencrypt.o
>>  common-obj-$(CONFIG_VNC_SASL) += vnc-auth-sasl.o
>>  common-obj-$(CONFIG_COCOA) += cocoa.o
>> -common-obj-$(CONFIG_IOTHREAD) += qemu-thread.o
>>  common-obj-y += notify.o event_notifier.o
>>  common-obj-y += qemu-timer.o
>>
>> diff --git a/async-work.c b/async-work.c
>> new file mode 100644
>> index 0000000..0675732
>> --- /dev/null
>> +++ b/async-work.c
>> @@ -0,0 +1,136 @@
>> +/*
>> + * Async work support
>> + *
>> + * Copyright IBM, Corp. 2010
>> + *
>> + * Authors:
>> + *  Aneesh Kumar K.V<aneesh.kumar@linux.vnet.ibm.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2.  See
>> + * the COPYING file in the top-level directory.
>>
>
> Please preserve the original copyright of the copied code.
>
>> + */
>> +#include<stdio.h>
>> +#include<errno.h>
>> +#include<string.h>
>> +#include<stdlib.h>
>> +#include<signal.h>
>>
>
> qemu-common.h should have all of these.  Generally, you should avoid
> including system headers because qemu headers take care of portability.
>
>> +#include "async-work.h"
>> +#include "osdep.h"
>> +
>> +static void async_abort(int err, const char *what)
>> +{
>> +    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
>> +    abort();
>> +}
>> +
>> +static void *async_worker_thread(void *data)
>> +{
>> +    struct async_queue *queue = data;
>> +
>> +    while (1) {
>> +        struct work_item *work;
>> +        int ret = 0;
>> +        qemu_mutex_lock(&(queue->lock));
>> +
>> +        while (QTAILQ_EMPTY(&(queue->request_list))&&
>> +               (ret != ETIMEDOUT)) {
>> +            ret = qemu_cond_timedwait(&(queue->cond),
>> +                                       &(queue->lock), 10*100000);
>> +        }
>> +
>> +        if (QTAILQ_EMPTY(&(queue->request_list)))
>> +            goto check_exit;
>> +
>> +        work = QTAILQ_FIRST(&(queue->request_list));
>> +        QTAILQ_REMOVE(&(queue->request_list), work, node);
>> +        queue->idle_threads--;
>> +        qemu_mutex_unlock(&(queue->lock));
>> +
>> +        /* execute the work function */
>> +        work->func(work);
>> +        async_work_release(queue, work);
>> +
>> +        qemu_mutex_lock(&(queue->lock));
>> +        queue->idle_threads++;
>> +
>> +check_exit:
>> +        if ((queue->idle_threads>  0)&&
>> +            (queue->cur_threads>  queue->min_threads)) {
>> +            /* we retain minimum number of threads */
>> +            break;
>> +        }
>> +        qemu_mutex_unlock(&(queue->lock));
>> +    }
>> +
>> +    queue->idle_threads--;
>> +    queue->cur_threads--;
>> +    qemu_mutex_unlock(&(queue->lock));
>> +
>> +    return NULL;
>> +}
>> +
>> +static void spawn_async_thread(struct async_queue *queue)
>> +{
>> +    QemuThreadAttr attr;
>> +    QemuThread thread;
>> +    sigset_t set, oldset;
>> +
>> +    queue->cur_threads++;
>> +    queue->idle_threads++;
>> +
>> +    qemu_thread_attr_init(&attr);
>> +
>> +    /* create a detached thread so that we don't need to wait on it */
>> +    qemu_thread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
>> +
>> +    /* block all signals */
>> +    if (sigfillset(&set)) {
>> +        async_abort(errno, "sigfillset");
>> +    }
>> +
>> +    if (sigprocmask(SIG_SETMASK,&set,&oldset)) {
>> +        async_abort(errno, "sigprocmask");
>> +    }
>> +
>> +    qemu_thread_create_attr(&thread,&attr, async_worker_thread, queue);
>> +
>> +    if (sigprocmask(SIG_SETMASK,&oldset, NULL)) {
>> +        async_abort(errno, "sigprocmask restore");
>> +    }
>> +}
>> +
>> +void qemu_async_submit(struct async_queue *queue, struct work_item *work)
>> +{
>> +    qemu_mutex_lock(&(queue->lock));
>> +    if (queue->idle_threads == 0&&  queue->cur_threads<
>>  queue->max_threads) {
>> +        spawn_async_thread(queue);
>> +    }
>> +    QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
>> +    qemu_mutex_unlock(&(queue->lock));
>> +    qemu_cond_signal(&(queue->cond));
>> +}
>> +
>> +int qemu_async_cancel_work(struct async_queue *queue, struct work_item
>> *work)
>> +{
>> +    struct work_item *ret_work;
>> +    int found = 0;
>> +
>> +    qemu_mutex_lock(&(queue->lock));
>> +    QTAILQ_FOREACH(ret_work,&(queue->request_list), node) {
>> +        if (ret_work == work) {
>> +            QTAILQ_REMOVE(&(queue->request_list), ret_work, node);
>> +            found = 1;
>> +            break;
>> +        }
>> +    }
>> +    qemu_mutex_unlock(&(queue->lock));
>> +
>> +    if (found) {
>> +        async_work_release(queue, work);
>> +        return 0;
>> +    }
>> +
>> +    return 1;
>> +}
>> +
>> diff --git a/async-work.h b/async-work.h
>> new file mode 100644
>> index 0000000..8389f56
>> --- /dev/null
>> +++ b/async-work.h
>> @@ -0,0 +1,85 @@
>> +/*
>> + * Async work support
>> + *
>> + * Copyright IBM, Corp. 2010
>> + *
>> + * Authors:
>> + *  Aneesh Kumar K.V<aneesh.kumar@linux.vnet.ibm.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2.  See
>> + * the COPYING file in the top-level directory.
>> + *
>> + */
>> +#ifndef QEMU_ASYNC_WORK_H
>> +#define QEMU_ASYNC_WORK_H
>> +
>> +#include "qemu-queue.h"
>> +#include "qemu-common.h"
>> +#include "qemu-thread.h"
>> +
>> +struct async_queue
>> +{
>> +    QemuMutex lock;
>> +    QemuCond cond;
>> +    int max_threads;
>> +    int min_threads;
>> +    int cur_threads;
>> +    int idle_threads;
>> +    QTAILQ_HEAD(, work_item) request_list;
>> +    QTAILQ_HEAD(, work_item) work_item_pool;
>> +};
>> +
>> +struct work_item
>> +{
>> +    QTAILQ_ENTRY(work_item) node;
>> +    void (*func)(struct work_item *work);
>> +    void *private;
>> +};
>>
>
> Structs are not named in accordance to CODING_STYLE.
>
>> +static inline void async_queue_init(struct async_queue *queue,
>> +                                   int max_threads, int min_threads)
>> +{
>> +    queue->cur_threads  = 0;
>> +    queue->idle_threads = 0;
>> +    queue->max_threads  = max_threads;
>> +    queue->min_threads  = min_threads;
>> +    QTAILQ_INIT(&(queue->request_list));
>> +    QTAILQ_INIT(&(queue->work_item_pool));
>> +    qemu_mutex_init(&(queue->lock));
>> +    qemu_cond_init(&(queue->cond));
>> +}
>>
>
> I'd prefer there be a single queue that everything used verses multiple
> queues.  Otherwise, we'll end up having per device queues and my concern is
> that we'll end up with thousands and thousands of threads with no central
> place to tune the maximum thread number.

If there a single queue, we'll need something to control how job are
processed. For example,
in the VNC server, they must be processed in order (in fact, in order
per VNC client, but I don't see how we could do that).

>> +static inline struct work_item *async_work_init(struct async_queue
>> *queue,
>> +                                  void (*func)(struct work_item *),
>> +                                  void *data)
>>
>
> I'd suggest actually using a Notifier as the worker or at least something
> that looks exactly like it.  There's no need to pass a void * because more
> often than not, a caller just wants to pass a state structure anyway and
> they can embed the Notifier within the structure.  IOW:
>
> async_work_submit(queue, &s->worker);
>
> Then in the callback:
>
> DeviceState *s = container_of(worker, DeviceState, worker);
>
> I don't think the name makes the most sense either.  I think something like:
>
> threadlet_submit()
>
> Would work best.  It would be good for there to be a big comment warning
> that the routine does not run with the qemu_mutex and therefore cannot make
> use of any qemu functions without very special consideration.
>
>
> There shouldn't need to be an explicit init vs. submit function either.
>
> Regards,
>
> Anthony Liguori
>
Gautham R Shenoy June 10, 2010, 11:37 a.m. UTC | #5
On Fri, Jun 04, 2010 at 08:16:19AM -0500, Anthony Liguori wrote:
>> --- /dev/null
>> +++ b/async-work.c
>> @@ -0,0 +1,136 @@
>> +/*
>> + * Async work support
>> + *
>> + * Copyright IBM, Corp. 2010
>> + *
>> + * Authors:
>> + *  Aneesh Kumar K.V<aneesh.kumar@linux.vnet.ibm.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2.  See
>> + * the COPYING file in the top-level directory.
>>    
>
> Please preserve the original copyright of the copied code.

Will update the comment containing the Copyright.
>
>> +
>> +struct work_item
>> +{
>> +    QTAILQ_ENTRY(work_item) node;
>> +    void (*func)(struct work_item *work);
>> +    void *private;
>> +};
>>    
>
> Structs are not named in accordance to CODING_STYLE.

Will fix this.

>
>> +static inline void async_queue_init(struct async_queue *queue,
>> +				    int max_threads, int min_threads)
>> +{
>> +    queue->cur_threads  = 0;
>> +    queue->idle_threads = 0;
>> +    queue->max_threads  = max_threads;
>> +    queue->min_threads  = min_threads;
>> +    QTAILQ_INIT(&(queue->request_list));
>> +    QTAILQ_INIT(&(queue->work_item_pool));
>> +    qemu_mutex_init(&(queue->lock));
>> +    qemu_cond_init(&(queue->cond));
>> +}
>>    
>
> I'd prefer there be a single queue that everything used verses multiple 
> queues.  Otherwise, we'll end up having per device queues and my concern is 
> that we'll end up with thousands and thousands of threads with no central 
> place to tune the maximum thread number.

Aah! So, the original idea was to have a single queue, but since we were
making it generic, we thought that the subsystems might like the
flexibility of having their own queue.

I suppose we are not looking to differentiate between the worker threads
belonging to different subsystems in terms of their relative
importance/priorities, right ?

>
>> +static inline struct work_item *async_work_init(struct async_queue *queue,
>> +				   void (*func)(struct work_item *),
>> +				   void *data)
>>    
>
> I'd suggest actually using a Notifier as the worker or at least something 
> that looks exactly like it.  There's no need to pass a void * because more 
> often than not, a caller just wants to pass a state structure anyway and 
> they can embed the Notifier within the structure.  IOW:
>
> async_work_submit(queue, &s->worker);
>
> Then in the callback:
>
> DeviceState *s = container_of(worker, DeviceState, worker);
>
> I don't think the name makes the most sense either.  I think something like:
>
> threadlet_submit()

Makes sense. Will implement this.
>
> Would work best.  It would be good for there to be a big comment warning 
> that the routine does not run with the qemu_mutex and therefore cannot make 
> use of any qemu functions without very special consideration.
>
>
> There shouldn't need to be an explicit init vs. submit function either.

Ok, will address these comments.
>
> Regards,
>
> Anthony Liguori
diff mbox

Patch

diff --git a/Makefile.objs b/Makefile.objs
index ecdd53e..fd5ea4d 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -9,6 +9,8 @@  qobject-obj-y += qerror.o
 
 block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
 block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
+block-obj-y += qemu-thread.o
+block-obj-y += async-work.o
 block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
 block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
 
@@ -108,7 +110,6 @@  common-obj-y += iov.o
 common-obj-$(CONFIG_VNC_TLS) += vnc-tls.o vnc-auth-vencrypt.o
 common-obj-$(CONFIG_VNC_SASL) += vnc-auth-sasl.o
 common-obj-$(CONFIG_COCOA) += cocoa.o
-common-obj-$(CONFIG_IOTHREAD) += qemu-thread.o
 common-obj-y += notify.o event_notifier.o
 common-obj-y += qemu-timer.o
 
diff --git a/async-work.c b/async-work.c
new file mode 100644
index 0000000..0675732
--- /dev/null
+++ b/async-work.c
@@ -0,0 +1,136 @@ 
+/*
+ * Async work support
+ *
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ *  Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ */
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+#include <signal.h>
+#include "async-work.h"
+#include "osdep.h"
+
+static void async_abort(int err, const char *what)
+{
+    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
+    abort();
+}
+
+static void *async_worker_thread(void *data)
+{
+    struct async_queue *queue = data;
+
+    while (1) {
+        struct work_item *work;
+        int ret = 0;
+        qemu_mutex_lock(&(queue->lock));
+
+        while (QTAILQ_EMPTY(&(queue->request_list)) &&
+               (ret != ETIMEDOUT)) {
+            ret = qemu_cond_timedwait(&(queue->cond),
+					 &(queue->lock), 10*100000);
+        }
+
+        if (QTAILQ_EMPTY(&(queue->request_list)))
+            goto check_exit;
+
+        work = QTAILQ_FIRST(&(queue->request_list));
+        QTAILQ_REMOVE(&(queue->request_list), work, node);
+        queue->idle_threads--;
+        qemu_mutex_unlock(&(queue->lock));
+
+        /* execute the work function */
+        work->func(work);
+        async_work_release(queue, work);
+
+        qemu_mutex_lock(&(queue->lock));
+        queue->idle_threads++;
+
+check_exit:
+        if ((queue->idle_threads > 0) &&
+            (queue->cur_threads > queue->min_threads)) {
+            /* we retain minimum number of threads */
+            break;
+        }
+        qemu_mutex_unlock(&(queue->lock));
+    }
+
+    queue->idle_threads--;
+    queue->cur_threads--;
+    qemu_mutex_unlock(&(queue->lock));
+
+    return NULL;
+}
+
+static void spawn_async_thread(struct async_queue *queue)
+{
+    QemuThreadAttr attr;
+    QemuThread thread;
+    sigset_t set, oldset;
+
+    queue->cur_threads++;
+    queue->idle_threads++;
+
+    qemu_thread_attr_init(&attr);
+
+    /* create a detached thread so that we don't need to wait on it */
+    qemu_thread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+
+    /* block all signals */
+    if (sigfillset(&set)) {
+        async_abort(errno, "sigfillset");
+    }
+
+    if (sigprocmask(SIG_SETMASK, &set, &oldset)) {
+        async_abort(errno, "sigprocmask");
+    }
+
+    qemu_thread_create_attr(&thread, &attr, async_worker_thread, queue);
+
+    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) {
+        async_abort(errno, "sigprocmask restore");
+    }
+}
+
+void qemu_async_submit(struct async_queue *queue, struct work_item *work)
+{
+    qemu_mutex_lock(&(queue->lock));
+    if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
+        spawn_async_thread(queue);
+    }
+    QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
+    qemu_mutex_unlock(&(queue->lock));
+    qemu_cond_signal(&(queue->cond));
+}
+
+int qemu_async_cancel_work(struct async_queue *queue, struct work_item *work)
+{
+    struct work_item *ret_work;
+    int found = 0;
+
+    qemu_mutex_lock(&(queue->lock));
+    QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
+        if (ret_work == work) {
+            QTAILQ_REMOVE(&(queue->request_list), ret_work, node);
+            found = 1;
+            break;
+        }
+    }
+    qemu_mutex_unlock(&(queue->lock));
+
+    if (found) {
+        async_work_release(queue, work);
+        return 0;
+    }
+
+    return 1;
+}
+
diff --git a/async-work.h b/async-work.h
new file mode 100644
index 0000000..8389f56
--- /dev/null
+++ b/async-work.h
@@ -0,0 +1,85 @@ 
+/*
+ * Async work support
+ *
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ *  Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ */
+#ifndef QEMU_ASYNC_WORK_H
+#define QEMU_ASYNC_WORK_H
+
+#include "qemu-queue.h"
+#include "qemu-common.h"
+#include "qemu-thread.h"
+
+struct async_queue
+{
+    QemuMutex lock;
+    QemuCond cond;
+    int max_threads;
+    int min_threads;
+    int cur_threads;
+    int idle_threads;
+    QTAILQ_HEAD(, work_item) request_list;
+    QTAILQ_HEAD(, work_item) work_item_pool;
+};
+
+struct work_item
+{
+    QTAILQ_ENTRY(work_item) node;
+    void (*func)(struct work_item *work);
+    void *private;
+};
+
+static inline void async_queue_init(struct async_queue *queue,
+				    int max_threads, int min_threads)
+{
+    queue->cur_threads  = 0;
+    queue->idle_threads = 0;
+    queue->max_threads  = max_threads;
+    queue->min_threads  = min_threads;
+    QTAILQ_INIT(&(queue->request_list));
+    QTAILQ_INIT(&(queue->work_item_pool));
+    qemu_mutex_init(&(queue->lock));
+    qemu_cond_init(&(queue->cond));
+}
+
+static inline struct work_item *async_work_init(struct async_queue *queue,
+				   void (*func)(struct work_item *),
+				   void *data)
+{
+    struct work_item *work;
+    qemu_mutex_lock(&(queue->lock));
+    if (QTAILQ_EMPTY(&(queue->work_item_pool))) {
+        work = qemu_mallocz(sizeof(*work));
+    } else {
+        work = QTAILQ_FIRST(&(queue->work_item_pool));
+        QTAILQ_REMOVE(&(queue->work_item_pool), work, node);
+    }
+
+    work->func  = func;
+    work->private  = data;
+    qemu_mutex_unlock(&(queue->lock));
+
+    return work;
+}
+
+static inline void async_work_release(struct async_queue *queue,
+                                        struct work_item *work)
+{
+    qemu_mutex_lock(&(queue->lock));
+    QTAILQ_INSERT_TAIL(&(queue->work_item_pool), work, node);
+    qemu_mutex_unlock(&(queue->lock));
+}
+
+extern void qemu_async_submit(struct async_queue *queue,
+				  struct work_item *work);
+
+extern int qemu_async_cancel_work(struct async_queue *queue,
+                    struct work_item *work);
+#endif