Patchwork [V4,2/3] qemu: Generic task offloading framework: threadlets

login
register
mail settings
Submitter Gautham R Shenoy
Date June 16, 2010, 11:56 a.m.
Message ID <20100616115656.10988.96529.stgit@localhost.localdomain>
Download mbox | patch
Permalink /patch/55871/
State New
Headers show

Comments

Gautham R Shenoy - June 16, 2010, 11:56 a.m.
From: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>

This patch creates a generic asynchronous-task-offloading infrastructure named
threadlets. The core idea has been borrowed from the threading framework that
is being used by paio.

The reason for creating this generic infrastructure is so that other subsystems,
such as virtio-9p could make use of it for offloading tasks that could block.

The patch creates a global queue on-to which subsystems can queue their tasks to
be executed asynchronously.

The patch also provides API's that allow a subsystem to create a private queue.
API's that allow a subsystem to wait till all the earlier queued tasks have been
executed, is also provided.

[ego@in.ibm.com: Facelift of the code, cancel_threadlet,
flush_threadlet_queue and other minor helpers.]

Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
---
 Makefile.objs |    3 +
 async-work.c  |  186 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 async-work.h  |   69 +++++++++++++++++++++
 3 files changed, 257 insertions(+), 1 deletions(-)
 create mode 100644 async-work.c
 create mode 100644 async-work.h
Paolo Bonzini - June 16, 2010, 12:34 p.m.
> +block-obj-y += qemu-thread.o
> +block-obj-y += async-work.o

These should be (at least for now) block-obj-$(CONFIG_POSIX).

> +        while (QTAILQ_EMPTY(&(queue->request_list))&&
> +               (ret != ETIMEDOUT)) {
> +            ret = qemu_cond_timedwait(&(queue->cond),
> +					&(queue->lock), 10*100000);
> +        }

Using qemu_cond_timedwait is a hack for not properly broadcasting the 
condvar in flush_threadlet_queue.

> +        if (QTAILQ_EMPTY(&(queue->request_list)))
> +            goto check_exit;

What's the reason for the goto?  {...} works just as well.

> +/**
> + * flush_threadlet_queue: Wait till completion of all the submitted tasks
> + * @queue: Queue containing the tasks we're waiting on.
> + */
> +void flush_threadlet_queue(ThreadletQueue *queue)
> +{
> +    qemu_mutex_lock(&queue->lock);
> +    queue->exit = 1;
> +
> +    qemu_barrier_init(&queue->barr, queue->cur_threads + 1);
> +    qemu_mutex_unlock(&queue->lock);
> +
> +    qemu_barrier_wait(&queue->barr);

Can be implemented just as well with queue->cond and a loop waiting for 
queue->cur_threads == 0.  This would remove the need to implement 
barriers in qemu-threads (especially for Win32).  Anyway whoever will 
contribute Win32 qemu-threads can do it, since it's not hard.

> +int cancel_threadlet_common(ThreadletWork *work)
> +{
> +    return cancel_threadlet(&globalqueue, work);
> +}

I would prefer *_threadlet to be the globalqueue function (and 
flush_threadlets) and queue_*_threadlet to be the special-queue 
function. I should have spoken earlier probably, but please consider 
this if there will be a v5.

> + * Generalization based on posix-aio emulation code.

No need to specify these as long as the original authors are attributed 
properly.

> +static inline void threadlet_queue_init(ThreadletQueue *queue,
> +				    int max_threads, int min_threads)
> +{
> +    queue->cur_threads  = 0;
> +    queue->idle_threads = 0;
> +    queue->exit = 0;
> +    queue->max_threads  = max_threads;
> +    queue->min_threads  = min_threads;
> +    QTAILQ_INIT(&(queue->request_list));
> +    QTAILQ_INIT(&(queue->threadlet_work_pool));
> +    qemu_mutex_init(&(queue->lock));
> +    qemu_cond_init(&(queue->cond));
> +}

No need to make this inline.

> +extern void threadlet_submit(ThreadletQueue *queue,
> +				  ThreadletWork *work);
> +
> +extern void threadlet_submit_common(ThreadletWork *work);
> +
> +extern int cancel_threadlet(ThreadletQueue *queue, ThreadletWork *work);
> +extern int cancel_threadlet_common(ThreadletWork *work);
> +
> +
> +extern void flush_threadlet_queue(ThreadletQueue *queue);
> +extern void flush_common_threadlet_queue(void);

Please make the position of the verb consistent (e.g. "submit_threadlet").

Paolo
Jamie Lokier - June 16, 2010, 2:22 p.m.
Paolo Bonzini wrote:
> These should be (at least for now) block-obj-$(CONFIG_POSIX).
> 
> >+        while (QTAILQ_EMPTY(&(queue->request_list))&&
> >+               (ret != ETIMEDOUT)) {
> >+            ret = qemu_cond_timedwait(&(queue->cond),
> >+					&(queue->lock), 10*100000);
> >+        }
> 
> Using qemu_cond_timedwait is a hack for not properly broadcasting the 
> condvar in flush_threadlet_queue.

Are you sure?  It looks like it also expires idle threads after a
fixed amount of idle time.

-- Jamie
Anthony Liguori - June 16, 2010, 2:27 p.m.
On 06/16/2010 09:22 AM, Jamie Lokier wrote:
> Paolo Bonzini wrote:
>    
>> These should be (at least for now) block-obj-$(CONFIG_POSIX).
>>
>>      
>>> +        while (QTAILQ_EMPTY(&(queue->request_list))&&
>>> +               (ret != ETIMEDOUT)) {
>>> +            ret = qemu_cond_timedwait(&(queue->cond),
>>> +					&(queue->lock), 10*100000);
>>> +        }
>>>        
>> Using qemu_cond_timedwait is a hack for not properly broadcasting the
>> condvar in flush_threadlet_queue.
>>      
> Are you sure?  It looks like it also expires idle threads after a
> fixed amount of idle time.
>    

Yup, that's the intention of the code.

We signal instead of broadcast because broadcasting causes all threads 
to wake up which hurts performance.  AFAICT, there is no correctness 
issue in using signal vs. broadcast.

Regards,

Anthony Liguori

> -- Jamie
>
Paolo Bonzini - June 16, 2010, 2:29 p.m.
On 06/16/2010 04:22 PM, Jamie Lokier wrote:
> Paolo Bonzini wrote:
>> These should be (at least for now) block-obj-$(CONFIG_POSIX).
>>
>>> +        while (QTAILQ_EMPTY(&(queue->request_list))&&
>>> +               (ret != ETIMEDOUT)) {
>>> +            ret = qemu_cond_timedwait(&(queue->cond),
>>> +					&(queue->lock), 10*100000);
>>> +        }
>>
>> Using qemu_cond_timedwait is a hack for not properly broadcasting the
>> condvar in flush_threadlet_queue.
>
> Are you sure?  It looks like it also expires idle threads after a
> fixed amount of idle time.

Unnecessary idle threads are immediately expired as soon as the 
threadlet exits if ncecessary, since here

+        queue->idle_threads++;
+
+check_exit:
+        if (queue->exit || ((queue->idle_threads > 0) &&
+            (queue->cur_threads > queue->min_threads))) {
+            /* We exit the queue or we retain minimum number of threads */
+            break;
+        }

queue->idle_threads > 0 will always be true (so maybe that should be 
changed into an assertion: "this thread is idle, so there must be idle 
threads").

The min/max_threads parameters of the queue are currently immutable, so 
it can never happen that a thread has to be expired while it's waiting. 
  It may well become true in the future, in which case the condvar will 
have to be broadcast when min_threads changes.

I may well be wrong of course. :)

Paolo
Anthony Liguori - June 16, 2010, 2:38 p.m.
On 06/16/2010 09:29 AM, Paolo Bonzini wrote:
> On 06/16/2010 04:22 PM, Jamie Lokier wrote:
>> Paolo Bonzini wrote:
>>> These should be (at least for now) block-obj-$(CONFIG_POSIX).
>>>
>>>> +        while (QTAILQ_EMPTY(&(queue->request_list))&&
>>>> +               (ret != ETIMEDOUT)) {
>>>> +            ret = qemu_cond_timedwait(&(queue->cond),
>>>> + &(queue->lock), 10*100000);
>>>> +        }
>>>
>>> Using qemu_cond_timedwait is a hack for not properly broadcasting the
>>> condvar in flush_threadlet_queue.
>>
>> Are you sure?  It looks like it also expires idle threads after a
>> fixed amount of idle time.
>
> Unnecessary idle threads are immediately expired as soon as the 
> threadlet exits if ncecessary, since here

If a threadlet is waiting to consume more work, unless we do a 
pthread_cancel (I dislike cancellation) it will keep waiting until it 
gets more work (which would mean it's not actually idle)...

> +        queue->idle_threads++;
> +
> +check_exit:
> +        if (queue->exit || ((queue->idle_threads > 0) &&
> +            (queue->cur_threads > queue->min_threads))) {
> +            /* We exit the queue or we retain minimum number of 
> threads */
> +            break;
> +        }
>
> queue->idle_threads > 0 will always be true (so maybe that should be 
> changed into an assertion: "this thread is idle, so there must be idle 
> threads").

queue->exit could be true though so it's necessary to at least check 
that condition.  I agree though that via the normal fall through path, 
queue->idle_threads can never be non-zero.

Regards,

Anthony Liguori

> The min/max_threads parameters of the queue are currently immutable, 
> so it can never happen that a thread has to be expired while it's 
> waiting.  It may well become true in the future, in which case the 
> condvar will have to be broadcast when min_threads changes.
>
> I may well be wrong of course. :)

I think the code has been modified such that you are correct.

> Paolo
Paolo Bonzini - June 16, 2010, 2:52 p.m.
On 06/16/2010 04:38 PM, Anthony Liguori wrote:
> On 06/16/2010 09:29 AM, Paolo Bonzini wrote:
>> On 06/16/2010 04:22 PM, Jamie Lokier wrote:
>>> Paolo Bonzini wrote:
>>>> These should be (at least for now) block-obj-$(CONFIG_POSIX).
>>>>
>>>>> + while (QTAILQ_EMPTY(&(queue->request_list))&&
>>>>> + (ret != ETIMEDOUT)) {
>>>>> + ret = qemu_cond_timedwait(&(queue->cond),
>>>>> + &(queue->lock), 10*100000);
>>>>> + }
>>>>
>>>> Using qemu_cond_timedwait is a hack for not properly broadcasting the
>>>> condvar in flush_threadlet_queue.
>>>
>>> Are you sure? It looks like it also expires idle threads after a
>>> fixed amount of idle time.
>>
>> Unnecessary idle threads are immediately expired as soon as the
>> threadlet exits if ncecessary, since here
>
> If a threadlet is waiting to consume more work, unless we do a
> pthread_cancel (I dislike cancellation) it will keep waiting until it
> gets more work (which would mean it's not actually idle)...

Agreed---no cancellation, please.

BTW it's obviously okay with signaling the condition when a threadlet is 
submitted.  But when something affects all queue's workers 
(flush_threadlet_queue) you want a broadcast and using expiration as a 
substitute is fishy.

>> + queue->idle_threads++;
>> +
>> +check_exit:
>> + if (queue->exit || ((queue->idle_threads > 0) &&
>> + (queue->cur_threads > queue->min_threads))) {
>> + /* We exit the queue or we retain minimum number of threads */
>> + break;
>> + }
>>
>> queue->idle_threads > 0 will always be true (so maybe that should be
>> changed into an assertion: "this thread is idle, so there must be idle
>> threads").
>
> queue->exit could be true though so it's necessary to at least check
> that condition.

Yes, of course.  The correct test should be:

   if (queue->exit || queue->cur_threads > queue->min_threads)

But queue->idle_threads will be > 0 even if coming via the goto (which 
should be eliminated).

Or maybe no.  After flushing you still want min_threads threads to run. 
  The correct thing then would be:

     do {
         ...
         assert (queue->idle_threads > 0);
         if (queue->exit) {
             /* Threads waiting on the barrier cannot do work.  */
             queue->idle_threads--;
             qemu_mutex_unlock(&(queue->lock));
             qemu_barrier_wait(&queue->barr);
             qemu_mutex_lock(&(queue->lock));
             queue->idle_threads++;
         }
     } while (queue->cur_threads <= queue->min_threads);

     queue->idle_threads--;
     queue->cur_threads--;
     qemu_mutex_unlock(&queue->lock);
     return NULL;

So, if min_threads were changed, broadcasting the condition would be 
enough to exit unwanted threads one at a time, as soon as it grabs the lock.

Paolo
Jamie Lokier - June 16, 2010, 2:58 p.m.
Anthony Liguori wrote:
> On 06/16/2010 09:29 AM, Paolo Bonzini wrote:
> >On 06/16/2010 04:22 PM, Jamie Lokier wrote:
> >>Paolo Bonzini wrote:
> >>>These should be (at least for now) block-obj-$(CONFIG_POSIX).
> >>>
> >>>>+        while (QTAILQ_EMPTY(&(queue->request_list))&&
> >>>>+               (ret != ETIMEDOUT)) {
> >>>>+            ret = qemu_cond_timedwait(&(queue->cond),
> >>>>+ &(queue->lock), 10*100000);
> >>>>+        }
> >>>
> >>>Using qemu_cond_timedwait is a hack for not properly broadcasting the
> >>>condvar in flush_threadlet_queue.
> >>
> >>Are you sure?  It looks like it also expires idle threads after a
> >>fixed amount of idle time.
> >
> >Unnecessary idle threads are immediately expired as soon as the 
> >threadlet exits if ncecessary, since here
> 
> If a threadlet is waiting to consume more work, unless we do a 
> pthread_cancel (I dislike cancellation) it will keep waiting until it 
> gets more work (which would mean it's not actually idle)...

There's some mild abuse of the mutex/condvar going on.

As (queue->exit || queue->idle_threads > queue->min_threads) is a
condition for breaking out of the loop, that condition ought to be
checked in the mutex->cond_wait region, but it isn't.

It doesn't matter here because the queue is empty when queue->exit,
and the idle > min_threads condition can't become true.

> >The min/max_threads parameters of the queue are currently immutable, 
> >so it can never happen that a thread has to be expired while it's 
> >waiting.  It may well become true in the future, in which case the 
> >condvar will have to be broadcast when min_threads changes.

Broadcasting when min_threads decreases wouldn't be enough, because
min_threads isn't checked inside the mutex->cond_wait region.

-- Jamie
Jamie Lokier - June 16, 2010, 3:07 p.m.
Jamie Lokier wrote:
> Anthony Liguori wrote:
> > On 06/16/2010 09:29 AM, Paolo Bonzini wrote:
> > >On 06/16/2010 04:22 PM, Jamie Lokier wrote:
> > >>Paolo Bonzini wrote:
> > >>>These should be (at least for now) block-obj-$(CONFIG_POSIX).
> > >>>
> > >>>>+        while (QTAILQ_EMPTY(&(queue->request_list))&&
> > >>>>+               (ret != ETIMEDOUT)) {
> > >>>>+            ret = qemu_cond_timedwait(&(queue->cond),
> > >>>>+ &(queue->lock), 10*100000);
> > >>>>+        }
> > >>>
> > >>>Using qemu_cond_timedwait is a hack for not properly broadcasting the
> > >>>condvar in flush_threadlet_queue.
> > >>
> > >>Are you sure?  It looks like it also expires idle threads after a
> > >>fixed amount of idle time.
> > >
> > >Unnecessary idle threads are immediately expired as soon as the 
> > >threadlet exits if ncecessary, since here
> > 
> > If a threadlet is waiting to consume more work, unless we do a 
> > pthread_cancel (I dislike cancellation) it will keep waiting until it 
> > gets more work (which would mean it's not actually idle)...
> 
> There's some mild abuse of the mutex/condvar going on.
> 
> As (queue->exit || queue->idle_threads > queue->min_threads) is a
> condition for breaking out of the loop, that condition ought to be
> checked in the mutex->cond_wait region, but it isn't.
> 
> It doesn't matter here because the queue is empty when queue->exit,
> and the idle > min_threads condition can't become true.

Sorry, thinko.  It does matter when queue->exit, precisely because the
queue is empty :-)

Even cond_broadcast after queue->exit is set isn't enough to remove
the need for the timed wait hack.

Putting the whole condition inside the mutex->cond_wait region, not
just empty queue test, will remove the need for timed wait.  Broadcast
is still needed, or alternatively a cond_signal from each exiting
thread will allow them to wake and close without a thundering herd.

-- Jamie
Anthony Liguori - June 16, 2010, 3:20 p.m.
On 06/16/2010 09:52 AM, Paolo Bonzini wrote:
> On 06/16/2010 04:38 PM, Anthony Liguori wrote:
>> On 06/16/2010 09:29 AM, Paolo Bonzini wrote:
>>> On 06/16/2010 04:22 PM, Jamie Lokier wrote:
>>>> Paolo Bonzini wrote:
>>>>> These should be (at least for now) block-obj-$(CONFIG_POSIX).
>>>>>
>>>>>> + while (QTAILQ_EMPTY(&(queue->request_list))&&
>>>>>> + (ret != ETIMEDOUT)) {
>>>>>> + ret = qemu_cond_timedwait(&(queue->cond),
>>>>>> + &(queue->lock), 10*100000);
>>>>>> + }
>>>>>
>>>>> Using qemu_cond_timedwait is a hack for not properly broadcasting the
>>>>> condvar in flush_threadlet_queue.
>>>>
>>>> Are you sure? It looks like it also expires idle threads after a
>>>> fixed amount of idle time.
>>>
>>> Unnecessary idle threads are immediately expired as soon as the
>>> threadlet exits if ncecessary, since here
>>
>> If a threadlet is waiting to consume more work, unless we do a
>> pthread_cancel (I dislike cancellation) it will keep waiting until it
>> gets more work (which would mean it's not actually idle)...
>
> Agreed---no cancellation, please.
>
> BTW it's obviously okay with signaling the condition when a threadlet 
> is submitted.  But when something affects all queue's workers 
> (flush_threadlet_queue) you want a broadcast and using expiration as a 
> substitute is fishy.

IMHO, there shouldn't be a need for flush_threadlet_queue.  It doesn't 
look used in the aio conversion and if virtio-9p needs it, I suspect 
something is wrong.

Regards,

Anthony Liguori

>>> + queue->idle_threads++;
>>> +
>>> +check_exit:
>>> + if (queue->exit || ((queue->idle_threads > 0) &&
>>> + (queue->cur_threads > queue->min_threads))) {
>>> + /* We exit the queue or we retain minimum number of threads */
>>> + break;
>>> + }
>>>
>>> queue->idle_threads > 0 will always be true (so maybe that should be
>>> changed into an assertion: "this thread is idle, so there must be idle
>>> threads").
>>
>> queue->exit could be true though so it's necessary to at least check
>> that condition.
>
> Yes, of course.  The correct test should be:
>
>   if (queue->exit || queue->cur_threads > queue->min_threads)
>
> But queue->idle_threads will be > 0 even if coming via the goto (which 
> should be eliminated).
>
> Or maybe no.  After flushing you still want min_threads threads to 
> run.  The correct thing then would be:
>
>     do {
>         ...
>         assert (queue->idle_threads > 0);
>         if (queue->exit) {
>             /* Threads waiting on the barrier cannot do work.  */
>             queue->idle_threads--;
>             qemu_mutex_unlock(&(queue->lock));
>             qemu_barrier_wait(&queue->barr);
>             qemu_mutex_lock(&(queue->lock));
>             queue->idle_threads++;
>         }
>     } while (queue->cur_threads <= queue->min_threads);
>
>     queue->idle_threads--;
>     queue->cur_threads--;
>     qemu_mutex_unlock(&queue->lock);
>     return NULL;
>
> So, if min_threads were changed, broadcasting the condition would be 
> enough to exit unwanted threads one at a time, as soon as it grabs the 
> lock.
>
> Paolo
Corentin Chary - June 16, 2010, 3:47 p.m.
On Wed, Jun 16, 2010 at 5:20 PM, Anthony Liguori
<aliguori@linux.vnet.ibm.com> wrote:
> On 06/16/2010 09:52 AM, Paolo Bonzini wrote:
>>
>> On 06/16/2010 04:38 PM, Anthony Liguori wrote:
>>>
>>> On 06/16/2010 09:29 AM, Paolo Bonzini wrote:
>>>>
>>>> On 06/16/2010 04:22 PM, Jamie Lokier wrote:
>>>>>
>>>>> Paolo Bonzini wrote:
>>>>>>
>>>>>> These should be (at least for now) block-obj-$(CONFIG_POSIX).
>>>>>>
>>>>>>> + while (QTAILQ_EMPTY(&(queue->request_list))&&
>>>>>>> + (ret != ETIMEDOUT)) {
>>>>>>> + ret = qemu_cond_timedwait(&(queue->cond),
>>>>>>> + &(queue->lock), 10*100000);
>>>>>>> + }
>>>>>>
>>>>>> Using qemu_cond_timedwait is a hack for not properly broadcasting the
>>>>>> condvar in flush_threadlet_queue.
>>>>>
>>>>> Are you sure? It looks like it also expires idle threads after a
>>>>> fixed amount of idle time.
>>>>
>>>> Unnecessary idle threads are immediately expired as soon as the
>>>> threadlet exits if ncecessary, since here
>>>
>>> If a threadlet is waiting to consume more work, unless we do a
>>> pthread_cancel (I dislike cancellation) it will keep waiting until it
>>> gets more work (which would mean it's not actually idle)...
>>
>> Agreed---no cancellation, please.
>>
>> BTW it's obviously okay with signaling the condition when a threadlet is
>> submitted.  But when something affects all queue's workers
>> (flush_threadlet_queue) you want a broadcast and using expiration as a
>> substitute is fishy.
>
> IMHO, there shouldn't be a need for flush_threadlet_queue.  It doesn't look
> used in the aio conversion and if virtio-9p needs it, I suspect something is
> wrong.
>

I would need something like flush_threadlet_queue for the vnc server.
I need it in
vnc_disconnect(), vnc_dpy_resize() and vnc_dpy_cpy() so wait (and/or
abort) current
encoding jobs.
Anthony Liguori - June 16, 2010, 3:52 p.m.
On 06/16/2010 10:47 AM, Corentin Chary wrote:
>
> I would need something like flush_threadlet_queue for the vnc server.
> I need it in
> vnc_disconnect(), vnc_dpy_resize() and vnc_dpy_cpy() so wait (and/or
> abort) current
> encoding jobs.
>    

I'm not sure threadlets are the right thing for the VNC server.  The VNC 
server wants one dedicated thread.  Threadlets are a thread pool.  You 
could potentially use one thread per client but I doubt it would be 
worth it.

At any rate, flushing the full queue is overkill.  You want to wait for 
your specific thread to terminate and you want to block execution until 
that happens.  IOW, you want to join the thread.

Regards,

Anthony Liguori
Corentin Chary - June 16, 2010, 4:06 p.m.
On Wed, Jun 16, 2010 at 5:52 PM, Anthony Liguori
<aliguori@linux.vnet.ibm.com> wrote:
> On 06/16/2010 10:47 AM, Corentin Chary wrote:
>>
>> I would need something like flush_threadlet_queue for the vnc server.
>> I need it in
>> vnc_disconnect(), vnc_dpy_resize() and vnc_dpy_cpy() so wait (and/or
>> abort) current
>> encoding jobs.
>>
>
> I'm not sure threadlets are the right thing for the VNC server.  The VNC
> server wants one dedicated thread.  Threadlets are a thread pool.  You could
> potentially use one thread per client but I doubt it would be worth it.
>
> At any rate, flushing the full queue is overkill.  You want to wait for your
> specific thread to terminate and you want to block execution until that
> happens.  IOW, you want to join the thread.
>

Oh right, I should have read the changelog more carefully, it's a
global queue now ...

Thanks,
Paolo Bonzini - June 16, 2010, 4:45 p.m.
On 06/16/2010 05:07 PM, Jamie Lokier wrote:
> Putting the whole condition inside the mutex->cond_wait region, not
> just empty queue test, will remove the need for timed wait.

Yes, the condition must include all the cases when you broadcast.

> Broadcast
> is still needed, or alternatively a cond_signal from each exiting
> thread will allow them to wake and close without a thundering herd.

If the pthreads cause a thundering herd, that's the pthreads problem.  A 
properly implemented condvar will just requeue the threads to the mutex. 
  NPTL does, the Win32 condvar most likely won't. :)

Anyway if Corentin doesn't need the flush_threadlets_queue all this can 
be dropped.

Paolo
Gautham R Shenoy - June 17, 2010, 8:53 a.m.
On Wed, Jun 16, 2010 at 02:34:16PM +0200, Paolo Bonzini wrote:
>> +block-obj-y += qemu-thread.o
>> +block-obj-y += async-work.o
>
> These should be (at least for now) block-obj-$(CONFIG_POSIX).

Right. Will fix that.
>
>> +        while (QTAILQ_EMPTY(&(queue->request_list))&&
>> +               (ret != ETIMEDOUT)) {
>> +            ret = qemu_cond_timedwait(&(queue->cond),
>> +					&(queue->lock), 10*100000);
>> +        }
>
> Using qemu_cond_timedwait is a hack for not properly broadcasting the 
> condvar in flush_threadlet_queue.

I think Anthony answered this one.
>
>> +        if (QTAILQ_EMPTY(&(queue->request_list)))
>> +            goto check_exit;
>
> What's the reason for the goto?  {...} works just as well.

Yes {...} works.

Besides, this two step condition checking is broken and can
cause the threads to exit even in the presence of unprocessed
queued ThreadletWork items.
Will fix this in the v5 (hopefully there will be one :-))

>
>> +/**
>> + * flush_threadlet_queue: Wait till completion of all the submitted tasks
>> + * @queue: Queue containing the tasks we're waiting on.
>> + */
>> +void flush_threadlet_queue(ThreadletQueue *queue)
>> +{
>> +    qemu_mutex_lock(&queue->lock);
>> +    queue->exit = 1;
>> +
>> +    qemu_barrier_init(&queue->barr, queue->cur_threads + 1);
>> +    qemu_mutex_unlock(&queue->lock);
>> +
>> +    qemu_barrier_wait(&queue->barr);
>
> Can be implemented just as well with queue->cond and a loop waiting for 
> queue->cur_threads == 0.  This would remove the need to implement barriers 
> in qemu-threads (especially for Win32).  Anyway whoever will contribute 
> Win32 qemu-threads can do it, since it's not hard.

That was the other option I had considered before going for barriers,
for no particular reason. Now, considering that barriers are not
welcome, I will implement this method.

>
>> +int cancel_threadlet_common(ThreadletWork *work)
>> +{
>> +    return cancel_threadlet(&globalqueue, work);
>> +}
>
> I would prefer *_threadlet to be the globalqueue function (and 
> flush_threadlets) and queue_*_threadlet to be the special-queue function. I 
> should have spoken earlier probably, but please consider this if there will 
> be a v5.

Sure, will do that.
>
>> + * Generalization based on posix-aio emulation code.
>
> No need to specify these as long as the original authors are attributed 
> properly.

Ok!
>
>> +static inline void threadlet_queue_init(ThreadletQueue *queue,
>> +				    int max_threads, int min_threads)
>> +{
>> +    queue->cur_threads  = 0;
>> +    queue->idle_threads = 0;
>> +    queue->exit = 0;
>> +    queue->max_threads  = max_threads;
>> +    queue->min_threads  = min_threads;
>> +    QTAILQ_INIT(&(queue->request_list));
>> +    QTAILQ_INIT(&(queue->threadlet_work_pool));
>> +    qemu_mutex_init(&(queue->lock));
>> +    qemu_cond_init(&(queue->cond));
>> +}
>
> No need to make this inline.

Will fix this.
>
>> +extern void threadlet_submit(ThreadletQueue *queue,
>> +				  ThreadletWork *work);
>> +
>> +extern void threadlet_submit_common(ThreadletWork *work);
>> +
>> +extern int cancel_threadlet(ThreadletQueue *queue, ThreadletWork *work);
>> +extern int cancel_threadlet_common(ThreadletWork *work);
>> +
>> +
>> +extern void flush_threadlet_queue(ThreadletQueue *queue);
>> +extern void flush_common_threadlet_queue(void);
>
> Please make the position of the verb consistent (e.g. "submit_threadlet").

Overlooked threadlet_submit() in the rename process. It has to be
submit_threadlet(). Will fix.

Thanks for the detailed review.
Regards
gautham.
>
> Paolo
Gautham R Shenoy - June 17, 2010, 9:12 a.m.
On Wed, Jun 16, 2010 at 10:20:36AM -0500, Anthony Liguori wrote:
> On 06/16/2010 09:52 AM, Paolo Bonzini wrote:
>> BTW it's obviously okay with signaling the condition when a threadlet is 
>> submitted.  But when something affects all queue's workers 
>> (flush_threadlet_queue) you want a broadcast and using expiration as a 
>> substitute is fishy.
>
> IMHO, there shouldn't be a need for flush_threadlet_queue.  It doesn't look 
> used in the aio conversion and if virtio-9p needs it, I suspect something 
> is wrong.

virtio-9p doesn't need it.

The API has been added for the vnc-server case, where a subsystem wants
to wait on the threads of it's private queue to finish executing the
already queued tasks. It's the responsibility of the subsystem to make sure
that new tasks are not submitted during this interval.

I sought clarification regarding this earlier,
http://lists.gnu.org/archive/html/qemu-devel/2010-06/msg01382.html

But now I am beginning to doubt I understood the use-case correctly.
>
> Regards,
>
> Anthony Liguori
Gautham R Shenoy - June 17, 2010, 9:16 a.m.
On Wed, Jun 16, 2010 at 06:06:35PM +0200, Corentin Chary wrote:
> On Wed, Jun 16, 2010 at 5:52 PM, Anthony Liguori
> <aliguori@linux.vnet.ibm.com> wrote:
> > On 06/16/2010 10:47 AM, Corentin Chary wrote:
> >>
> >> I would need something like flush_threadlet_queue for the vnc server.
> >> I need it in
> >> vnc_disconnect(), vnc_dpy_resize() and vnc_dpy_cpy() so wait (and/or
> >> abort) current
> >> encoding jobs.
> >>
> >
> > I'm not sure threadlets are the right thing for the VNC server.  The VNC
> > server wants one dedicated thread.  Threadlets are a thread pool.  You could
> > potentially use one thread per client but I doubt it would be worth it.
> >
> > At any rate, flushing the full queue is overkill.  You want to wait for your
> > specific thread to terminate and you want to block execution until that
> > happens.  IOW, you want to join the thread.
> >
> 
> Oh right, I should have read the changelog more carefully, it's a
> global queue now ...

Well, the APIs that allow the subsystems to create their own private
queues is still retained. But having read what Anthony mentioned, I
doubt if you would want to do that for a single helper thread :-)

> 
> Thanks,
> -- 
> Corentin Chary
> http://xf.iksaif.net
Paolo Bonzini - June 17, 2010, 10:09 a.m.
>>> +        while (QTAILQ_EMPTY(&(queue->request_list))&&
>>> +               (ret != ETIMEDOUT)) {
>>> +            ret = qemu_cond_timedwait(&(queue->cond),
>>> +					&(queue->lock), 10*100000);
>>> +        }
>>
>> Using qemu_cond_timedwait is a hack for not properly broadcasting the
>> condvar in flush_threadlet_queue.
>
> I think Anthony answered this one.

I think he said that the code has been changed so I am right? :)

>>> +/**
>>> + * flush_threadlet_queue: Wait till completion of all the submitted tasks
>>> + * @queue: Queue containing the tasks we're waiting on.
>>> + */
>>> +void flush_threadlet_queue(ThreadletQueue *queue)
>>> +{
>>> +    qemu_mutex_lock(&queue->lock);
>>> +    queue->exit = 1;
>>> +
>>> +    qemu_barrier_init(&queue->barr, queue->cur_threads + 1);
>>> +    qemu_mutex_unlock(&queue->lock);
>>> +
>>> +    qemu_barrier_wait(&queue->barr);
>>
>> Can be implemented just as well with queue->cond and a loop waiting for
>> queue->cur_threads == 0.  This would remove the need to implement barriers
>> in qemu-threads (especially for Win32).  Anyway whoever will contribute
>> Win32 qemu-threads can do it, since it's not hard.
>
> That was the other option I had considered before going for barriers,
> for no particular reason. Now, considering that barriers are not
> welcome, I will implement this method.

I guess we decided flush isn't really useful at all.  Might as well 
leave it out of v5 and implement it later, so the barrier and 
complicated exit condition are now unnecessary.

Thanks,

Paolo
Anthony Liguori - June 17, 2010, 6:05 p.m.
On 06/17/2010 05:09 AM, Paolo Bonzini wrote:
>>>> +        while (QTAILQ_EMPTY(&(queue->request_list))&&
>>>> +               (ret != ETIMEDOUT)) {
>>>> +            ret = qemu_cond_timedwait(&(queue->cond),
>>>> + &(queue->lock), 10*100000);
>>>> +        }
>>>
>>> Using qemu_cond_timedwait is a hack for not properly broadcasting the
>>> condvar in flush_threadlet_queue.
>>
>> I think Anthony answered this one.
>
> I think he said that the code has been changed so I am right? :)

You're right about the condition we check in the exit path but the 
timedwait is needed to expire an idle thread.

Regards,

Anthony Liguori
Paolo Bonzini - June 18, 2010, 7:52 a.m.
On 06/17/2010 08:05 PM, Anthony Liguori wrote:
> On 06/17/2010 05:09 AM, Paolo Bonzini wrote:
>>>>> + while (QTAILQ_EMPTY(&(queue->request_list))&&
>>>>> + (ret != ETIMEDOUT)) {
>>>>> + ret = qemu_cond_timedwait(&(queue->cond),
>>>>> + &(queue->lock), 10*100000);
>>>>> + }
>>>>
>>>> Using qemu_cond_timedwait is a hack for not properly broadcasting the
>>>> condvar in flush_threadlet_queue.
>>>
>>> I think Anthony answered this one.
>>
>> I think he said that the code has been changed so I am right? :)
>
> You're right about the condition we check in the exit path but the
> timedwait is needed to expire an idle thread.

In posix-aio-compat, yes.  In threadlets you'll expire excess idle 
threads after each threadlet has completed.  If you want to keep the 
threads above the min_threads-th ready for 10 seconds, that's fine; but 
it's not what the v4 code does.

Paolo

Patch

diff --git a/Makefile.objs b/Makefile.objs
index 1a942e5..019646f 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -9,6 +9,8 @@  qobject-obj-y += qerror.o
 
 block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
 block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
+block-obj-y += qemu-thread.o
+block-obj-y += async-work.o
 block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
 block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
 
@@ -109,7 +111,6 @@  common-obj-y += iov.o
 common-obj-$(CONFIG_VNC_TLS) += vnc-tls.o vnc-auth-vencrypt.o
 common-obj-$(CONFIG_VNC_SASL) += vnc-auth-sasl.o
 common-obj-$(CONFIG_COCOA) += cocoa.o
-common-obj-$(CONFIG_IOTHREAD) += qemu-thread.o
 common-obj-y += notify.o event_notifier.o
 common-obj-y += qemu-timer.o
 
diff --git a/async-work.c b/async-work.c
new file mode 100644
index 0000000..50e39ce
--- /dev/null
+++ b/async-work.c
@@ -0,0 +1,186 @@ 
+/*
+ * Threadlet support for offloading tasks to be executed asynchronously
+ * Generalization based on posix-aio emulation code.
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ *  Anthony Liguori   <aliguori@us.ibm.com>
+ *  Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
+ *  Gautham R Shenoy <ego@in.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ */
+
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+#include <signal.h>
+#include "async-work.h"
+#include "osdep.h"
+
+#define MAX_GLOBAL_THREADS  64
+#define MIN_GLOBAL_THREADS  64
+ThreadletQueue globalqueue;
+static int globalqueue_init;
+
+static void *threadlet_worker(void *data)
+{
+    ThreadletQueue *queue = data;
+
+    while (1) {
+        ThreadletWork *work;
+        int ret = 0;
+        qemu_mutex_lock(&(queue->lock));
+
+        while (QTAILQ_EMPTY(&(queue->request_list)) &&
+               (ret != ETIMEDOUT)) {
+            ret = qemu_cond_timedwait(&(queue->cond),
+					 &(queue->lock), 10*100000);
+        }
+
+        if (QTAILQ_EMPTY(&(queue->request_list)))
+            goto check_exit;
+
+        work = QTAILQ_FIRST(&(queue->request_list));
+        QTAILQ_REMOVE(&(queue->request_list), work, node);
+        queue->idle_threads--;
+        qemu_mutex_unlock(&(queue->lock));
+
+        /* execute the work function */
+        work->func(work);
+
+        qemu_mutex_lock(&(queue->lock));
+        queue->idle_threads++;
+
+check_exit:
+        if (queue->exit || ((queue->idle_threads > 0) &&
+            (queue->cur_threads > queue->min_threads))) {
+            /* We exit the queue or we retain minimum number of threads */
+            break;
+        }
+        qemu_mutex_unlock(&(queue->lock));
+    }
+
+    queue->idle_threads--;
+    queue->cur_threads--;
+    if (queue->exit) {
+        qemu_mutex_unlock(&(queue->lock));
+        qemu_barrier_wait(&queue->barr);
+    } else
+        qemu_mutex_unlock(&queue->lock);
+
+    return NULL;
+}
+
+static void spawn_threadlet(ThreadletQueue *queue)
+{
+    QemuThread thread;
+
+    queue->cur_threads++;
+    queue->idle_threads++;
+
+    qemu_thread_create(&thread, threadlet_worker, queue);
+}
+
+/**
+ * threadlet_submit: Submit a new task to be executed asynchronously.
+ * @queue: Queue to which the new task needs to be submitted.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void threadlet_submit(ThreadletQueue *queue, ThreadletWork *work)
+{
+    qemu_mutex_lock(&(queue->lock));
+    if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
+        spawn_threadlet(queue);
+    }
+    QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
+    qemu_mutex_unlock(&(queue->lock));
+    qemu_cond_signal(&(queue->cond));
+}
+
+/**
+ * threadlet_submit_common: Submit to the global queue a new task to be
+ *                          executed asynchronously.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void threadlet_submit_common(ThreadletWork *work)
+{
+    if (!globalqueue_init) {
+        threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
+                                MIN_GLOBAL_THREADS);
+        globalqueue_init = 1;
+    }
+
+    threadlet_submit(&globalqueue, work);
+}
+
+/**
+ * flush_threadlet_queue: Wait till completion of all the submitted tasks
+ * @queue: Queue containing the tasks we're waiting on.
+ */
+void flush_threadlet_queue(ThreadletQueue *queue)
+{
+    qemu_mutex_lock(&queue->lock);
+    queue->exit = 1;
+
+    qemu_barrier_init(&queue->barr, queue->cur_threads + 1);
+    qemu_mutex_unlock(&queue->lock);
+
+    qemu_barrier_wait(&queue->barr);
+}
+
+/**
+ * flush_common_threadlet_queue: Wait till completion of all the
+ *                               submitted tasks
+ * @queue: Queue containing the tasks we're waiting on.
+ */
+void flush_common_threadlet_queue(void)
+{
+    flush_threadlet_queue(&globalqueue);
+}
+
+/**
+ * cancel_threadlet: Cancel a queued task.
+ * @queue: The queue containing the task to be cancelled.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ *          1 otherwise.
+ */
+int cancel_threadlet(ThreadletQueue *queue, ThreadletWork *work)
+{
+    ThreadletWork *ret_work;
+    int found = 0;
+
+    qemu_mutex_lock(&(queue->lock));
+    QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
+        if (ret_work == work) {
+            QTAILQ_REMOVE(&(queue->request_list), ret_work, node);
+            found = 1;
+            break;
+        }
+    }
+    qemu_mutex_unlock(&(queue->lock));
+
+    if (found) {
+        return 0;
+    }
+
+    return 1;
+}
+
+/**
+ * cancel_threadlet_common: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ *          1 otherwise.
+ */
+int cancel_threadlet_common(ThreadletWork *work)
+{
+    return cancel_threadlet(&globalqueue, work);
+}
diff --git a/async-work.h b/async-work.h
new file mode 100644
index 0000000..36d19fa
--- /dev/null
+++ b/async-work.h
@@ -0,0 +1,69 @@ 
+/*
+ * Threadlet support for offloading tasks to be executed asynchronously
+ * Generalization based on posix-aio emulation code.
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ *  Anthony Liguori   <aliguori@us.ibm.com>
+ *  Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
+ *  Gautham R Shenoy <ego@in.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ */
+
+#ifndef QEMU_ASYNC_WORK_H
+#define QEMU_ASYNC_WORK_H
+
+#include "qemu-queue.h"
+#include "qemu-common.h"
+#include "qemu-thread.h"
+
+typedef struct ThreadletQueue
+{
+    QemuMutex lock;
+    QemuCond cond;
+    QemuBarrier barr;
+    int max_threads;
+    int min_threads;
+    int cur_threads;
+    int idle_threads;
+    int exit;
+    QTAILQ_HEAD(, threadlet_work) request_list;
+    QTAILQ_HEAD(, threadlet_work) threadlet_work_pool;
+} ThreadletQueue;
+
+typedef struct threadlet_work
+{
+    QTAILQ_ENTRY(threadlet_work) node;
+    void (*func)(struct threadlet_work *work);
+} ThreadletWork;
+
+static inline void threadlet_queue_init(ThreadletQueue *queue,
+				    int max_threads, int min_threads)
+{
+    queue->cur_threads  = 0;
+    queue->idle_threads = 0;
+    queue->exit = 0;
+    queue->max_threads  = max_threads;
+    queue->min_threads  = min_threads;
+    QTAILQ_INIT(&(queue->request_list));
+    QTAILQ_INIT(&(queue->threadlet_work_pool));
+    qemu_mutex_init(&(queue->lock));
+    qemu_cond_init(&(queue->cond));
+}
+
+extern void threadlet_submit(ThreadletQueue *queue,
+				  ThreadletWork *work);
+
+extern void threadlet_submit_common(ThreadletWork *work);
+
+extern int cancel_threadlet(ThreadletQueue *queue, ThreadletWork *work);
+extern int cancel_threadlet_common(ThreadletWork *work);
+
+
+extern void flush_threadlet_queue(ThreadletQueue *queue);
+extern void flush_common_threadlet_queue(void);
+#endif