diff mbox

[RFC] aio: add aio_context_acquire() and aio_context_release()

Message ID 1377614385-20466-1-git-send-email-stefanha@redhat.com
State New
Headers show

Commit Message

Stefan Hajnoczi Aug. 27, 2013, 2:39 p.m. UTC
It can be useful to run an AioContext from a thread which normally does
not "own" the AioContext.  For example, request draining can be
implemented by acquiring the AioContext and looping aio_poll() until all
requests have been completed.

The following pattern should work:

  /* Event loop thread */
  while (running) {
      aio_context_acquire(ctx);
      aio_poll(ctx, true);
      aio_context_release(ctx);
  }

  /* Another thread */
  aio_context_acquire(ctx);
  bdrv_read(bs, 0x1000, buf, 1);
  aio_context_release(ctx);

This patch implements aio_context_acquire() and aio_context_release().
Note that existing aio_poll() callers do not need to worry about
acquiring and releasing - it is only needed when multiple threads will
call aio_poll() on the same AioContext.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
I previously sent patches that implement bdrv_drain_all() by stopping dataplane
threads.  AioContext acquire()/release() is a more general solution than
temporarily stopping dataplane threads.  This solution is less hacky and also
supported by other event loops like GMainContext.

No need to commit this patch yet, I still want to build things on top of it
before submitting a final version.

 async.c             | 27 +++++++++++++++++++++++++
 include/block/aio.h | 13 ++++++++++++
 tests/test-aio.c    | 58 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 98 insertions(+)

Comments

Paolo Bonzini Aug. 27, 2013, 3:12 p.m. UTC | #1
Il 27/08/2013 16:39, Stefan Hajnoczi ha scritto:
> It can be useful to run an AioContext from a thread which normally does
> not "own" the AioContext.  For example, request draining can be
> implemented by acquiring the AioContext and looping aio_poll() until all
> requests have been completed.
> 
> The following pattern should work:
> 
>   /* Event loop thread */
>   while (running) {
>       aio_context_acquire(ctx);
>       aio_poll(ctx, true);
>       aio_context_release(ctx);
>   }
> 
>   /* Another thread */
>   aio_context_acquire(ctx);
>   bdrv_read(bs, 0x1000, buf, 1);
>   aio_context_release(ctx);
> 
> This patch implements aio_context_acquire() and aio_context_release().
> Note that existing aio_poll() callers do not need to worry about
> acquiring and releasing - it is only needed when multiple threads will
> call aio_poll() on the same AioContext.
> 
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>

Really, really nice!   The "kick owner thread" technique is a very
interesting way to avoid dropping the lock around aio_poll's ppoll
system call.

On top of this, I think it would be useful to make aio_context_acquire
support recursive acquisition by returning a bool if the current thread
is already the owner.  Recursive acquisition != recursive locking! :)
In fact, acquisition and releasing could be done directly by the
synchronous block I/O functions perhaps?

One comment: ctx->owner is really "ctx->owned", if you replace the
argument of qemu_thread_is_self(ctx->owner) with &ctx->owner_thread.  It
is probably a bit clearer that way.

Paolo


> ---
> I previously sent patches that implement bdrv_drain_all() by stopping dataplane
> threads.  AioContext acquire()/release() is a more general solution than
> temporarily stopping dataplane threads.  This solution is less hacky and also
> supported by other event loops like GMainContext.
> 
> No need to commit this patch yet, I still want to build things on top of it
> before submitting a final version.
> 
>  async.c             | 27 +++++++++++++++++++++++++
>  include/block/aio.h | 13 ++++++++++++
>  tests/test-aio.c    | 58 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>  3 files changed, 98 insertions(+)
> 
> diff --git a/include/block/aio.h b/include/block/aio.h
> index 5743bf1..9035e87 100644
> --- a/include/block/aio.h
> +++ b/include/block/aio.h
> @@ -45,6 +45,11 @@ typedef void IOHandler(void *opaque);
>  typedef struct AioContext {
>      GSource source;
>  
> +    QemuMutex acquire_lock;
> +    QemuCond acquire_cond;
> +    QemuThread owner_thread;
> +    QemuThread *owner;
> +
>      /* The list of registered AIO handlers */
>      QLIST_HEAD(, AioHandler) aio_handlers;
>  
> @@ -99,6 +104,14 @@ void aio_context_ref(AioContext *ctx);
>   */
>  void aio_context_unref(AioContext *ctx);
>  
> +/* Take ownership of the AioContext.  If the AioContext will be shared between
> + * threads, a thread must have ownership when calling aio_poll().
> + */
> +void aio_context_acquire(AioContext *ctx);
> +
> +/* Reliquinish ownership of the AioContext. */
> +void aio_context_release(AioContext *ctx);
> +
>  /**
>   * aio_bh_new: Allocate a new bottom half structure.
>   *
> diff --git a/async.c b/async.c
> index 9791d8e..9fec07c 100644
> --- a/async.c
> +++ b/async.c
> @@ -203,6 +203,8 @@ aio_ctx_finalize(GSource     *source)
>      thread_pool_free(ctx->thread_pool);
>      aio_set_event_notifier(ctx, &ctx->notifier, NULL);
>      event_notifier_cleanup(&ctx->notifier);
> +    qemu_cond_destroy(&ctx->acquire_cond);
> +    qemu_mutex_destroy(&ctx->acquire_lock);
>      qemu_mutex_destroy(&ctx->bh_lock);
>      g_array_free(ctx->pollfds, TRUE);
>  }
> @@ -240,6 +242,9 @@ AioContext *aio_context_new(void)
>      ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
>      ctx->thread_pool = NULL;
>      qemu_mutex_init(&ctx->bh_lock);
> +    qemu_mutex_init(&ctx->acquire_lock);
> +    qemu_cond_init(&ctx->acquire_cond);
> +    ctx->owner = NULL;
>      event_notifier_init(&ctx->notifier, false);
>      aio_set_event_notifier(ctx, &ctx->notifier, 
>                             (EventNotifierHandler *)
> @@ -257,3 +262,25 @@ void aio_context_unref(AioContext *ctx)
>  {
>      g_source_unref(&ctx->source);
>  }
> +
> +void aio_context_acquire(AioContext *ctx)
> +{
> +    qemu_mutex_lock(&ctx->acquire_lock);
> +    while (ctx->owner) {
> +        assert(!qemu_thread_is_self(ctx->owner));
> +        aio_notify(ctx); /* kick current owner */
> +        qemu_cond_wait(&ctx->acquire_cond, &ctx->acquire_lock);
> +    }
> +    qemu_thread_get_self(&ctx->owner_thread);
> +    ctx->owner = &ctx->owner_thread;
> +    qemu_mutex_unlock(&ctx->acquire_lock);
> +}
> +
> +void aio_context_release(AioContext *ctx)
> +{
> +    qemu_mutex_lock(&ctx->acquire_lock);
> +    assert(ctx->owner && qemu_thread_is_self(ctx->owner));
> +    ctx->owner = NULL;
> +    qemu_cond_signal(&ctx->acquire_cond);
> +    qemu_mutex_unlock(&ctx->acquire_lock);
> +}
> diff --git a/tests/test-aio.c b/tests/test-aio.c
> index 1ab5637..324c099 100644
> --- a/tests/test-aio.c
> +++ b/tests/test-aio.c
> @@ -88,6 +88,63 @@ static void test_notify(void)
>      g_assert(!aio_poll(ctx, false));
>  }
>  
> +typedef struct {
> +    QemuMutex start_lock;
> +    bool thread_acquired;
> +} AcquireTestData;
> +
> +static void *test_acquire_thread(void *opaque)
> +{
> +    AcquireTestData *data = opaque;
> +
> +    /* Wait for other thread to let us start */
> +    qemu_mutex_lock(&data->start_lock);
> +    qemu_mutex_unlock(&data->start_lock);
> +
> +    aio_context_acquire(ctx);
> +    aio_context_release(ctx);
> +
> +    data->thread_acquired = true; /* success, we got here */
> +
> +    return NULL;
> +}
> +
> +static void dummy_notifier_read(EventNotifier *unused)
> +{
> +    g_assert(false); /* should never be invoked */
> +}
> +
> +static void test_acquire(void)
> +{
> +    QemuThread thread;
> +    EventNotifier notifier;
> +    AcquireTestData data;
> +
> +    /* Dummy event notifier ensures aio_poll() will block */
> +    event_notifier_init(&notifier, false);
> +    aio_set_event_notifier(ctx, &notifier, dummy_notifier_read);
> +    g_assert(!aio_poll(ctx, false)); /* consume aio_notify() */
> +
> +    qemu_mutex_init(&data.start_lock);
> +    qemu_mutex_lock(&data.start_lock);
> +    data.thread_acquired = false;
> +
> +    qemu_thread_create(&thread, test_acquire_thread,
> +                       &data, QEMU_THREAD_JOINABLE);
> +
> +    /* Block in aio_poll(), let other thread kick us and acquire context */
> +    aio_context_acquire(ctx);
> +    qemu_mutex_unlock(&data.start_lock); /* let the thread run */
> +    g_assert(!aio_poll(ctx, true));
> +    aio_context_release(ctx);
> +
> +    qemu_thread_join(&thread);
> +    aio_set_event_notifier(ctx, &notifier, NULL);
> +    event_notifier_cleanup(&notifier);
> +
> +    g_assert(data.thread_acquired);
> +}
> +
>  static void test_bh_schedule(void)
>  {
>      BHTestData data = { .n = 0 };
> @@ -639,6 +696,7 @@ int main(int argc, char **argv)
>  
>      g_test_init(&argc, &argv, NULL);
>      g_test_add_func("/aio/notify",                  test_notify);
> +    g_test_add_func("/aio/acquire",                 test_acquire);
>      g_test_add_func("/aio/bh/schedule",             test_bh_schedule);
>      g_test_add_func("/aio/bh/schedule10",           test_bh_schedule10);
>      g_test_add_func("/aio/bh/cancel",               test_bh_cancel);
>
Alex Bligh Aug. 27, 2013, 6:33 p.m. UTC | #2
--On 27 August 2013 16:39:45 +0200 Stefan Hajnoczi <stefanha@redhat.com> 
wrote:

> This patch implements aio_context_acquire() and aio_context_release().
> Note that existing aio_poll() callers do not need to worry about
> acquiring and releasing - it is only needed when multiple threads will
> call aio_poll() on the same AioContext.

I think this patch has another use.

For reasons I forget (init of the QemuTimerListGroup being one of them)
creation and deletion of AioContexts has to do be done with the BQL
held.

If a second thread is the ONLY user of the AioContext, this would allow
some synchronisation with the eventual deletion under the BQL on the
main thread.
Wayne Xia Aug. 28, 2013, 2:41 a.m. UTC | #3
于 2013-8-27 23:12, Paolo Bonzini 写道:
> Il 27/08/2013 16:39, Stefan Hajnoczi ha scritto:
>> It can be useful to run an AioContext from a thread which normally does
>> not "own" the AioContext.  For example, request draining can be
>> implemented by acquiring the AioContext and looping aio_poll() until all
>> requests have been completed.
>>
>> The following pattern should work:
>>
>>    /* Event loop thread */
>>    while (running) {
>>        aio_context_acquire(ctx);
>>        aio_poll(ctx, true);
>>        aio_context_release(ctx);
>>    }
>>
>>    /* Another thread */
>>    aio_context_acquire(ctx);
>>    bdrv_read(bs, 0x1000, buf, 1);
>>    aio_context_release(ctx);
>>
>> This patch implements aio_context_acquire() and aio_context_release().
>> Note that existing aio_poll() callers do not need to worry about
>> acquiring and releasing - it is only needed when multiple threads will
>> call aio_poll() on the same AioContext.
>>
>> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
>
> Really, really nice!   The "kick owner thread" technique is a very
> interesting way to avoid dropping the lock around aio_poll's ppoll
> system call.
>
> On top of this, I think it would be useful to make aio_context_acquire
> support recursive acquisition by returning a bool if the current thread
> is already the owner.  Recursive acquisition != recursive locking! :)
> In fact, acquisition and releasing could be done directly by the
> synchronous block I/O functions perhaps?
>
> One comment: ctx->owner is really "ctx->owned", if you replace the
> argument of qemu_thread_is_self(ctx->owner) with &ctx->owner_thread.  It
> is probably a bit clearer that way.
>
    That will also avoid to have two QemuThread member, which may be a
little strange.

> Paolo
>
>
>> ---
>> I previously sent patches that implement bdrv_drain_all() by stopping dataplane
>> threads.  AioContext acquire()/release() is a more general solution than
>> temporarily stopping dataplane threads.  This solution is less hacky and also
>> supported by other event loops like GMainContext.
>>
>> No need to commit this patch yet, I still want to build things on top of it
>> before submitting a final version.
>>
>>   async.c             | 27 +++++++++++++++++++++++++
>>   include/block/aio.h | 13 ++++++++++++
>>   tests/test-aio.c    | 58 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>>   3 files changed, 98 insertions(+)
>>
>> diff --git a/include/block/aio.h b/include/block/aio.h
>> index 5743bf1..9035e87 100644
>> --- a/include/block/aio.h
>> +++ b/include/block/aio.h
>> @@ -45,6 +45,11 @@ typedef void IOHandler(void *opaque);
>>   typedef struct AioContext {
>>       GSource source;
>>
>> +    QemuMutex acquire_lock;
>> +    QemuCond acquire_cond;
>> +    QemuThread owner_thread;
>> +    QemuThread *owner;
>> +
>>       /* The list of registered AIO handlers */
>>       QLIST_HEAD(, AioHandler) aio_handlers;
>>
>> @@ -99,6 +104,14 @@ void aio_context_ref(AioContext *ctx);
>>    */
>>   void aio_context_unref(AioContext *ctx);
>>
>> +/* Take ownership of the AioContext.  If the AioContext will be shared between
>> + * threads, a thread must have ownership when calling aio_poll().
>> + */
>> +void aio_context_acquire(AioContext *ctx);
>> +
>> +/* Reliquinish ownership of the AioContext. */
>> +void aio_context_release(AioContext *ctx);
>> +
>>   /**
>>    * aio_bh_new: Allocate a new bottom half structure.
>>    *
>> diff --git a/async.c b/async.c
>> index 9791d8e..9fec07c 100644
>> --- a/async.c
>> +++ b/async.c
>> @@ -203,6 +203,8 @@ aio_ctx_finalize(GSource     *source)
>>       thread_pool_free(ctx->thread_pool);
>>       aio_set_event_notifier(ctx, &ctx->notifier, NULL);
>>       event_notifier_cleanup(&ctx->notifier);
>> +    qemu_cond_destroy(&ctx->acquire_cond);
>> +    qemu_mutex_destroy(&ctx->acquire_lock);
>>       qemu_mutex_destroy(&ctx->bh_lock);
>>       g_array_free(ctx->pollfds, TRUE);
>>   }
>> @@ -240,6 +242,9 @@ AioContext *aio_context_new(void)
>>       ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
>>       ctx->thread_pool = NULL;
>>       qemu_mutex_init(&ctx->bh_lock);
>> +    qemu_mutex_init(&ctx->acquire_lock);
>> +    qemu_cond_init(&ctx->acquire_cond);
>> +    ctx->owner = NULL;
>>       event_notifier_init(&ctx->notifier, false);
>>       aio_set_event_notifier(ctx, &ctx->notifier,
>>                              (EventNotifierHandler *)
>> @@ -257,3 +262,25 @@ void aio_context_unref(AioContext *ctx)
>>   {
>>       g_source_unref(&ctx->source);
>>   }
>> +
>> +void aio_context_acquire(AioContext *ctx)
>> +{
>> +    qemu_mutex_lock(&ctx->acquire_lock);
>> +    while (ctx->owner) {
>> +        assert(!qemu_thread_is_self(ctx->owner));
>> +        aio_notify(ctx); /* kick current owner */
>> +        qemu_cond_wait(&ctx->acquire_cond, &ctx->acquire_lock);
>> +    }
>> +    qemu_thread_get_self(&ctx->owner_thread);
>> +    ctx->owner = &ctx->owner_thread;
>> +    qemu_mutex_unlock(&ctx->acquire_lock);
>> +}
>> +
>> +void aio_context_release(AioContext *ctx)
>> +{
>> +    qemu_mutex_lock(&ctx->acquire_lock);
>> +    assert(ctx->owner && qemu_thread_is_self(ctx->owner));
>> +    ctx->owner = NULL;
>> +    qemu_cond_signal(&ctx->acquire_cond);
>> +    qemu_mutex_unlock(&ctx->acquire_lock);
>> +}
>> diff --git a/tests/test-aio.c b/tests/test-aio.c
>> index 1ab5637..324c099 100644
>> --- a/tests/test-aio.c
>> +++ b/tests/test-aio.c
>> @@ -88,6 +88,63 @@ static void test_notify(void)
>>       g_assert(!aio_poll(ctx, false));
>>   }
>>
>> +typedef struct {
>> +    QemuMutex start_lock;
>> +    bool thread_acquired;
>> +} AcquireTestData;
>> +
>> +static void *test_acquire_thread(void *opaque)
>> +{
>> +    AcquireTestData *data = opaque;
>> +
>> +    /* Wait for other thread to let us start */
>> +    qemu_mutex_lock(&data->start_lock);
>> +    qemu_mutex_unlock(&data->start_lock);
>> +
>> +    aio_context_acquire(ctx);
>> +    aio_context_release(ctx);
>> +
>> +    data->thread_acquired = true; /* success, we got here */
>> +
>> +    return NULL;
>> +}
>> +
>> +static void dummy_notifier_read(EventNotifier *unused)
>> +{
>> +    g_assert(false); /* should never be invoked */
>> +}
>> +
>> +static void test_acquire(void)
>> +{
>> +    QemuThread thread;
>> +    EventNotifier notifier;
>> +    AcquireTestData data;
>> +
>> +    /* Dummy event notifier ensures aio_poll() will block */
>> +    event_notifier_init(&notifier, false);
>> +    aio_set_event_notifier(ctx, &notifier, dummy_notifier_read);
>> +    g_assert(!aio_poll(ctx, false)); /* consume aio_notify() */
>> +
>> +    qemu_mutex_init(&data.start_lock);
>> +    qemu_mutex_lock(&data.start_lock);
>> +    data.thread_acquired = false;
>> +
>> +    qemu_thread_create(&thread, test_acquire_thread,
>> +                       &data, QEMU_THREAD_JOINABLE);
>> +
>> +    /* Block in aio_poll(), let other thread kick us and acquire context */
>> +    aio_context_acquire(ctx);
>> +    qemu_mutex_unlock(&data.start_lock); /* let the thread run */
>> +    g_assert(!aio_poll(ctx, true));
>> +    aio_context_release(ctx);
>> +
>> +    qemu_thread_join(&thread);
>> +    aio_set_event_notifier(ctx, &notifier, NULL);
>> +    event_notifier_cleanup(&notifier);
>> +
>> +    g_assert(data.thread_acquired);
>> +}
>> +
>>   static void test_bh_schedule(void)
>>   {
>>       BHTestData data = { .n = 0 };
>> @@ -639,6 +696,7 @@ int main(int argc, char **argv)
>>
>>       g_test_init(&argc, &argv, NULL);
>>       g_test_add_func("/aio/notify",                  test_notify);
>> +    g_test_add_func("/aio/acquire",                 test_acquire);
>>       g_test_add_func("/aio/bh/schedule",             test_bh_schedule);
>>       g_test_add_func("/aio/bh/schedule10",           test_bh_schedule10);
>>       g_test_add_func("/aio/bh/cancel",               test_bh_cancel);
>>
>
>
Wayne Xia Aug. 28, 2013, 3:25 a.m. UTC | #4
The APIs look nice to me, have some minor comments.

> It can be useful to run an AioContext from a thread which normally does
> not "own" the AioContext.  For example, request draining can be
> implemented by acquiring the AioContext and looping aio_poll() until all
> requests have been completed.
> 
> The following pattern should work:
> 
>    /* Event loop thread */
>    while (running) {
>        aio_context_acquire(ctx);
>        aio_poll(ctx, true);
>        aio_context_release(ctx);
>    }
> 
>    /* Another thread */
>    aio_context_acquire(ctx);
>    bdrv_read(bs, 0x1000, buf, 1);
>    aio_context_release(ctx);
> 
> This patch implements aio_context_acquire() and aio_context_release().
> Note that existing aio_poll() callers do not need to worry about
> acquiring and releasing - it is only needed when multiple threads will
> call aio_poll() on the same AioContext.
> 
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
> I previously sent patches that implement bdrv_drain_all() by stopping dataplane
> threads.  AioContext acquire()/release() is a more general solution than
> temporarily stopping dataplane threads.  This solution is less hacky and also
> supported by other event loops like GMainContext.
> 
> No need to commit this patch yet, I still want to build things on top of it
> before submitting a final version.
> 
>   async.c             | 27 +++++++++++++++++++++++++
>   include/block/aio.h | 13 ++++++++++++
>   tests/test-aio.c    | 58 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>   3 files changed, 98 insertions(+)
> 
> diff --git a/include/block/aio.h b/include/block/aio.h
> index 5743bf1..9035e87 100644
> --- a/include/block/aio.h
> +++ b/include/block/aio.h
> @@ -45,6 +45,11 @@ typedef void IOHandler(void *opaque);
>   typedef struct AioContext {
>       GSource source;
> 
> +    QemuMutex acquire_lock;
> +    QemuCond acquire_cond;
> +    QemuThread owner_thread;
> +    QemuThread *owner;
> +
>       /* The list of registered AIO handlers */
>       QLIST_HEAD(, AioHandler) aio_handlers;
> 
> @@ -99,6 +104,14 @@ void aio_context_ref(AioContext *ctx);
>    */
>   void aio_context_unref(AioContext *ctx);
> 
> +/* Take ownership of the AioContext.  If the AioContext will be shared between
> + * threads, a thread must have ownership when calling aio_poll().
> + */
> +void aio_context_acquire(AioContext *ctx);
> +
> +/* Reliquinish ownership of the AioContext. */
> +void aio_context_release(AioContext *ctx);
> +
>   /**
>    * aio_bh_new: Allocate a new bottom half structure.
>    *
> diff --git a/async.c b/async.c
> index 9791d8e..9fec07c 100644
> --- a/async.c
> +++ b/async.c
> @@ -203,6 +203,8 @@ aio_ctx_finalize(GSource     *source)
>       thread_pool_free(ctx->thread_pool);
>       aio_set_event_notifier(ctx, &ctx->notifier, NULL);
>       event_notifier_cleanup(&ctx->notifier);
> +    qemu_cond_destroy(&ctx->acquire_cond);
> +    qemu_mutex_destroy(&ctx->acquire_lock);
>       qemu_mutex_destroy(&ctx->bh_lock);
>       g_array_free(ctx->pollfds, TRUE);
>   }
> @@ -240,6 +242,9 @@ AioContext *aio_context_new(void)
>       ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
>       ctx->thread_pool = NULL;
>       qemu_mutex_init(&ctx->bh_lock);
> +    qemu_mutex_init(&ctx->acquire_lock);
> +    qemu_cond_init(&ctx->acquire_cond);
> +    ctx->owner = NULL;
>       event_notifier_init(&ctx->notifier, false);
>       aio_set_event_notifier(ctx, &ctx->notifier,
>                              (EventNotifierHandler *)
> @@ -257,3 +262,25 @@ void aio_context_unref(AioContext *ctx)
>   {
>       g_source_unref(&ctx->source);
>   }
> +
> +void aio_context_acquire(AioContext *ctx)
> +{
> +    qemu_mutex_lock(&ctx->acquire_lock);
> +    while (ctx->owner) {
> +        assert(!qemu_thread_is_self(ctx->owner));
> +        aio_notify(ctx); /* kick current owner */
  Just tip better:
  /* kick current owner, since the owner may be blocked in ppoll() */

> +        qemu_cond_wait(&ctx->acquire_cond, &ctx->acquire_lock);
> +    }
> +    qemu_thread_get_self(&ctx->owner_thread);
> +    ctx->owner = &ctx->owner_thread;
> +    qemu_mutex_unlock(&ctx->acquire_lock);
> +}
> +
> +void aio_context_release(AioContext *ctx)
> +{
> +    qemu_mutex_lock(&ctx->acquire_lock);
> +    assert(ctx->owner && qemu_thread_is_self(ctx->owner));
> +    ctx->owner = NULL;
> +    qemu_cond_signal(&ctx->acquire_cond);
> +    qemu_mutex_unlock(&ctx->acquire_lock);
> +}
  if main thread have call bdrv_aio_readv(cb *bdrv_cb), now it
is possible bdrv_cb will be executed in another thread which
aio_context_acquire() it. I think there are some ways to solve,
but leave a comments here now to tip better?


> diff --git a/tests/test-aio.c b/tests/test-aio.c
> index 1ab5637..324c099 100644
> --- a/tests/test-aio.c
> +++ b/tests/test-aio.c
> @@ -88,6 +88,63 @@ static void test_notify(void)
>       g_assert(!aio_poll(ctx, false));
>   }
> 
> +typedef struct {
> +    QemuMutex start_lock;
> +    bool thread_acquired;
> +} AcquireTestData;
> +
> +static void *test_acquire_thread(void *opaque)
> +{
> +    AcquireTestData *data = opaque;
> +
> +    /* Wait for other thread to let us start */
> +    qemu_mutex_lock(&data->start_lock);
> +    qemu_mutex_unlock(&data->start_lock);
> +
> +    aio_context_acquire(ctx);
> +    aio_context_release(ctx);
> +
> +    data->thread_acquired = true; /* success, we got here */
> +
> +    return NULL;
> +}
> +
> +static void dummy_notifier_read(EventNotifier *unused)
> +{
> +    g_assert(false); /* should never be invoked */
> +}
> +
> +static void test_acquire(void)
> +{
> +    QemuThread thread;
> +    EventNotifier notifier;
> +    AcquireTestData data;
> +
> +    /* Dummy event notifier ensures aio_poll() will block */
> +    event_notifier_init(&notifier, false);
> +    aio_set_event_notifier(ctx, &notifier, dummy_notifier_read);
> +    g_assert(!aio_poll(ctx, false)); /* consume aio_notify() */
> +
> +    qemu_mutex_init(&data.start_lock);
> +    qemu_mutex_lock(&data.start_lock);
> +    data.thread_acquired = false;
> +
> +    qemu_thread_create(&thread, test_acquire_thread,
> +                       &data, QEMU_THREAD_JOINABLE);
> +
> +    /* Block in aio_poll(), let other thread kick us and acquire context */
> +    aio_context_acquire(ctx);
> +    qemu_mutex_unlock(&data.start_lock); /* let the thread run */
> +    g_assert(!aio_poll(ctx, true));
> +    aio_context_release(ctx);
> +
> +    qemu_thread_join(&thread);
> +    aio_set_event_notifier(ctx, &notifier, NULL);
> +    event_notifier_cleanup(&notifier);
> +
> +    g_assert(data.thread_acquired);
> +}
> +
>   static void test_bh_schedule(void)
>   {
>       BHTestData data = { .n = 0 };
> @@ -639,6 +696,7 @@ int main(int argc, char **argv)
> 
>       g_test_init(&argc, &argv, NULL);
>       g_test_add_func("/aio/notify",                  test_notify);
> +    g_test_add_func("/aio/acquire",                 test_acquire);
>       g_test_add_func("/aio/bh/schedule",             test_bh_schedule);
>       g_test_add_func("/aio/bh/schedule10",           test_bh_schedule10);
>       g_test_add_func("/aio/bh/cancel",               test_bh_cancel);
>
Stefan Hajnoczi Aug. 28, 2013, 8:49 a.m. UTC | #5
On Wed, Aug 28, 2013 at 11:25:33AM +0800, Wenchao Xia wrote:
> > +void aio_context_release(AioContext *ctx)
> > +{
> > +    qemu_mutex_lock(&ctx->acquire_lock);
> > +    assert(ctx->owner && qemu_thread_is_self(ctx->owner));
> > +    ctx->owner = NULL;
> > +    qemu_cond_signal(&ctx->acquire_cond);
> > +    qemu_mutex_unlock(&ctx->acquire_lock);
> > +}
>   if main thread have call bdrv_aio_readv(cb *bdrv_cb), now it
> is possible bdrv_cb will be executed in another thread which
> aio_context_acquire() it. I think there are some ways to solve,
> but leave a comments here now to tip better?

Callbacks, BHs, and timers are executed in the thread that calls
aio_poll().  This is safe since other threads cannot run aio_poll() or
submit new block I/O requests at the same time.

In other words: code should only care which AioContext it runs under,
not which thread ID it runs under.  (I think we talked about this on IRC
a few weeks ago.)

Are there any situations you are worried about?

Stefan
Wayne Xia Aug. 29, 2013, 1:09 a.m. UTC | #6
于 2013-8-28 16:49, Stefan Hajnoczi 写道:
> On Wed, Aug 28, 2013 at 11:25:33AM +0800, Wenchao Xia wrote:
>>> +void aio_context_release(AioContext *ctx)
>>> +{
>>> +    qemu_mutex_lock(&ctx->acquire_lock);
>>> +    assert(ctx->owner && qemu_thread_is_self(ctx->owner));
>>> +    ctx->owner = NULL;
>>> +    qemu_cond_signal(&ctx->acquire_cond);
>>> +    qemu_mutex_unlock(&ctx->acquire_lock);
>>> +}
>>    if main thread have call bdrv_aio_readv(cb *bdrv_cb), now it
>> is possible bdrv_cb will be executed in another thread which
>> aio_context_acquire() it. I think there are some ways to solve,
>> but leave a comments here now to tip better?
>
> Callbacks, BHs, and timers are executed in the thread that calls
> aio_poll().  This is safe since other threads cannot run aio_poll() or
> submit new block I/O requests at the same time.
>
> In other words: code should only care which AioContext it runs under,
> not which thread ID it runs under.  (I think we talked about this on IRC
> a few weeks ago.)
>
> Are there any situations you are worried about?
>
> Stefan
>
   Yes, we have discussed it before and think it may be safe for block
driver caller. Still, here I mean to add some in-code comment to tip how
to use it safely.

for example:

static int glob_test = 0;

int aio_cb(void *opaque)
{
     glob_test++;
}

Thread A:
bdrv_aio_read(bs, aio_cb...);
.....
glob_test++;


Normally glob_test have no race condition since they supposed
to work in one thread, but it need to be considered when
aio_context_acquire() is involved. How about:
/* Note that callback can run in different thread which acquired the
AioContext and do a poll() call. */
Stefan Hajnoczi Aug. 29, 2013, 7:43 a.m. UTC | #7
On Thu, Aug 29, 2013 at 09:09:45AM +0800, Wenchao Xia wrote:
> 于 2013-8-28 16:49, Stefan Hajnoczi 写道:
> >On Wed, Aug 28, 2013 at 11:25:33AM +0800, Wenchao Xia wrote:
> >>>+void aio_context_release(AioContext *ctx)
> >>>+{
> >>>+    qemu_mutex_lock(&ctx->acquire_lock);
> >>>+    assert(ctx->owner && qemu_thread_is_self(ctx->owner));
> >>>+    ctx->owner = NULL;
> >>>+    qemu_cond_signal(&ctx->acquire_cond);
> >>>+    qemu_mutex_unlock(&ctx->acquire_lock);
> >>>+}
> >>   if main thread have call bdrv_aio_readv(cb *bdrv_cb), now it
> >>is possible bdrv_cb will be executed in another thread which
> >>aio_context_acquire() it. I think there are some ways to solve,
> >>but leave a comments here now to tip better?
> >
> >Callbacks, BHs, and timers are executed in the thread that calls
> >aio_poll().  This is safe since other threads cannot run aio_poll() or
> >submit new block I/O requests at the same time.
> >
> >In other words: code should only care which AioContext it runs under,
> >not which thread ID it runs under.  (I think we talked about this on IRC
> >a few weeks ago.)
> >
> >Are there any situations you are worried about?
> >
> >Stefan
> >
>   Yes, we have discussed it before and think it may be safe for block
> driver caller. Still, here I mean to add some in-code comment to tip how
> to use it safely.
> 
> for example:
> 
> static int glob_test = 0;
> 
> int aio_cb(void *opaque)
> {
>     glob_test++;
> }
> 
> Thread A:
> bdrv_aio_read(bs, aio_cb...);
> .....
> glob_test++;
> 
> 
> Normally glob_test have no race condition since they supposed
> to work in one thread, but it need to be considered when
> aio_context_acquire() is involved. How about:
> /* Note that callback can run in different thread which acquired the
> AioContext and do a poll() call. */

I will add a comment to aio_context_acquire() to explain that callbacks,
timers, and BHs may run in another thread.

Normally this is not a problem since the callbacks access BDS or
AioContext, which are both protected by acquire/release.  But people
should be aware of this.

Stefan
Paolo Bonzini Aug. 29, 2013, 8:26 a.m. UTC | #8
Il 27/08/2013 16:39, Stefan Hajnoczi ha scritto:
> +void aio_context_acquire(AioContext *ctx)
> +{
> +    qemu_mutex_lock(&ctx->acquire_lock);
> +    while (ctx->owner) {
> +        assert(!qemu_thread_is_self(ctx->owner));
> +        aio_notify(ctx); /* kick current owner */
> +        qemu_cond_wait(&ctx->acquire_cond, &ctx->acquire_lock);
> +    }
> +    qemu_thread_get_self(&ctx->owner_thread);
> +    ctx->owner = &ctx->owner_thread;
> +    qemu_mutex_unlock(&ctx->acquire_lock);
> +}
> +
> +void aio_context_release(AioContext *ctx)
> +{
> +    qemu_mutex_lock(&ctx->acquire_lock);
> +    assert(ctx->owner && qemu_thread_is_self(ctx->owner));
> +    ctx->owner = NULL;
> +    qemu_cond_signal(&ctx->acquire_cond);
> +    qemu_mutex_unlock(&ctx->acquire_lock);
> +}

Thinking more about it, there is a risk of busy waiting here if one
thread releases the AioContext and tries to acquire it again (as in the
common case of one thread doing acquire/poll/release in a loop).  It
would only work if mutexes guarantee some level of fairness.

If you implement recursive acquisition, however, you can make aio_poll
acquire the context up until just before it invokes ppoll, and then
again after it comes back from the ppoll.  The two acquire/release pair
will be no-ops if called during "synchronous" I/O such as

  /* Another thread */
  aio_context_acquire(ctx);
  bdrv_read(bs, 0x1000, buf, 1);
  aio_context_release(ctx);

Yet they will do the right thing when called from the event loop thread.

(where the bdrv_read can actually be something more complicated such as
a live snapshot or, in general, anything involving bdrv_drain_all).

Paolo
Wayne Xia Aug. 30, 2013, 4:02 a.m. UTC | #9
于 2013-8-27 22:39, Stefan Hajnoczi 写道:
> It can be useful to run an AioContext from a thread which normally does
> not "own" the AioContext.  For example, request draining can be
> implemented by acquiring the AioContext and looping aio_poll() until all
> requests have been completed.
> 
> The following pattern should work:
> 
>    /* Event loop thread */
>    while (running) {
>        aio_context_acquire(ctx);
>        aio_poll(ctx, true);
>        aio_context_release(ctx);
>    }
> 
>    /* Another thread */
>    aio_context_acquire(ctx);
>    bdrv_read(bs, 0x1000, buf, 1);
>    aio_context_release(ctx);
> 
> This patch implements aio_context_acquire() and aio_context_release().
> Note that existing aio_poll() callers do not need to worry about
> acquiring and releasing - it is only needed when multiple threads will
> call aio_poll() on the same AioContext.
> 
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---

  After a chat with Stefan on IRC, To enable block layer usage by
multiple thread, there are two potential program models. Thanks for
Stefan's analysis, I think it would be helpful to share the info as
following, pls correct me if my understanding is not right:

The models:
1) context per request model:
  aio_context_acquire(ctx);
  bds_acquire(bds);
  bdrv_aio_read(AioContext *ctx, BlockDriverStates *bds, ...);
  bds_release(bds);
  aio_context_release(ctx);

2) context per BDS model:
  BlockDriverState *bds = bdrv_new(AioContext *ctx); /* or another API
bind AioContext */
  aio_context_acquire(ctx);
  bdrv_aio_read(BlockDriverStates *bds, ...);
  aio_context_release(ctx);

The difference:
context per request model in 1):
    AioContext c0     AioContext c1
        /|\              /|\
         |----------------|
                 |
                 |
         -------------------
        /|\                |
         |                 |
        BDS b0        BDS b1..
        /|\
         |
     -------------
    /|\         /|\
     |           |
  request      request
from thread   from thread
  t0 who        t1 who
acquired c0   acquired c1


context per BDS model in 2):
AioContext c0    AioContext c1
    /|\               |
     |                |
     |                |
    BDS b0        BDS b1..
    /|\
     |
     |
  request
from thread
  t0 who
acquired c0

(t1's request can't
submitted at this time)

  If BDS is considered as front end used to submit task, AioContext is
considered as a back end used to process task, whether to bind BDS
with AioContext, determine whether the request for one BDS, can be
submitted in front end, processed in the back end, in parallel.
  Generally speaking 1) will gain more parallel capability, it enable
multiple thread usage at BDS level, but requires more code inside block
layer, for sync and series/parallel request converting, so we are
heading to 2), request will be submitted. Interesting thing is
that, it can still enable multiple thread usage in AioContext level:

AioContext c0    AioContext c1
    /|\              /|\
     |                |
     |                |
    BDS b0         BDS b1
    /|\              /|\
     |                |
     |                |
  request          request
from thread     from thread
  t0 who           t1 who
acquired c0     acquired c1

  So later dataplane thread will becomes another user who create a
AioContext to do jobs in parallel.
Stefan Hajnoczi Aug. 30, 2013, 9:22 a.m. UTC | #10
On Thu, Aug 29, 2013 at 10:26:31AM +0200, Paolo Bonzini wrote:
> Il 27/08/2013 16:39, Stefan Hajnoczi ha scritto:
> > +void aio_context_acquire(AioContext *ctx)
> > +{
> > +    qemu_mutex_lock(&ctx->acquire_lock);
> > +    while (ctx->owner) {
> > +        assert(!qemu_thread_is_self(ctx->owner));
> > +        aio_notify(ctx); /* kick current owner */
> > +        qemu_cond_wait(&ctx->acquire_cond, &ctx->acquire_lock);
> > +    }
> > +    qemu_thread_get_self(&ctx->owner_thread);
> > +    ctx->owner = &ctx->owner_thread;
> > +    qemu_mutex_unlock(&ctx->acquire_lock);
> > +}
> > +
> > +void aio_context_release(AioContext *ctx)
> > +{
> > +    qemu_mutex_lock(&ctx->acquire_lock);
> > +    assert(ctx->owner && qemu_thread_is_self(ctx->owner));
> > +    ctx->owner = NULL;
> > +    qemu_cond_signal(&ctx->acquire_cond);
> > +    qemu_mutex_unlock(&ctx->acquire_lock);
> > +}
> 
> Thinking more about it, there is a risk of busy waiting here if one
> thread releases the AioContext and tries to acquire it again (as in the
> common case of one thread doing acquire/poll/release in a loop).  It
> would only work if mutexes guarantee some level of fairness.

You are right.  I wrote a test that showed there is no fairness.  For
some reason I thought the condvar would provide fairness.

> If you implement recursive acquisition, however, you can make aio_poll
> acquire the context up until just before it invokes ppoll, and then
> again after it comes back from the ppoll.  The two acquire/release pair
> will be no-ops if called during "synchronous" I/O such as
> 
>   /* Another thread */
>   aio_context_acquire(ctx);
>   bdrv_read(bs, 0x1000, buf, 1);
>   aio_context_release(ctx);
> 
> Yet they will do the right thing when called from the event loop thread.
> 
> (where the bdrv_read can actually be something more complicated such as
> a live snapshot or, in general, anything involving bdrv_drain_all).

This doesn't guarantee fairness either, right?  If ppoll(2) returns
immediately then the thread might still be scheduled and have enough of
its time slice left to acquire the AioContext again.

With your approach another thread can squeeze in when ppoll(2) is
returning so newer fd activity can be processed before we processed
*before* older activity.  Not sure out-of-order callbacks are a problem
but it can happen since we don't have fairness.

But at least this way other threads can acquire the AioContext while
ppoll(2) is blocked without racing against each other for the acquire
lock.

Stefan
Paolo Bonzini Aug. 30, 2013, 1:24 p.m. UTC | #11
Il 30/08/2013 11:22, Stefan Hajnoczi ha scritto:
> On Thu, Aug 29, 2013 at 10:26:31AM +0200, Paolo Bonzini wrote:
>> Il 27/08/2013 16:39, Stefan Hajnoczi ha scritto:
>>> +void aio_context_acquire(AioContext *ctx)
>>> +{
>>> +    qemu_mutex_lock(&ctx->acquire_lock);
>>> +    while (ctx->owner) {
>>> +        assert(!qemu_thread_is_self(ctx->owner));
>>> +        aio_notify(ctx); /* kick current owner */
>>> +        qemu_cond_wait(&ctx->acquire_cond, &ctx->acquire_lock);
>>> +    }
>>> +    qemu_thread_get_self(&ctx->owner_thread);
>>> +    ctx->owner = &ctx->owner_thread;
>>> +    qemu_mutex_unlock(&ctx->acquire_lock);
>>> +}
>>> +
>>> +void aio_context_release(AioContext *ctx)
>>> +{
>>> +    qemu_mutex_lock(&ctx->acquire_lock);
>>> +    assert(ctx->owner && qemu_thread_is_self(ctx->owner));
>>> +    ctx->owner = NULL;
>>> +    qemu_cond_signal(&ctx->acquire_cond);
>>> +    qemu_mutex_unlock(&ctx->acquire_lock);
>>> +}
>>
>> Thinking more about it, there is a risk of busy waiting here if one
>> thread releases the AioContext and tries to acquire it again (as in the
>> common case of one thread doing acquire/poll/release in a loop).  It
>> would only work if mutexes guarantee some level of fairness.
> 
> You are right.  I wrote a test that showed there is no fairness.  For
> some reason I thought the condvar would provide fairness.
> 
>> If you implement recursive acquisition, however, you can make aio_poll
>> acquire the context up until just before it invokes ppoll, and then
>> again after it comes back from the ppoll.  The two acquire/release pair
>> will be no-ops if called during "synchronous" I/O such as
>>
>>   /* Another thread */
>>   aio_context_acquire(ctx);
>>   bdrv_read(bs, 0x1000, buf, 1);
>>   aio_context_release(ctx);
>>
>> Yet they will do the right thing when called from the event loop thread.
>>
>> (where the bdrv_read can actually be something more complicated such as
>> a live snapshot or, in general, anything involving bdrv_drain_all).
> 
> This doesn't guarantee fairness either, right?

Yes, but the non-zero timeout of ppoll would in practice guarantee it.
The problem happens only when the release and acquire are very close in
time, which shouldn't happen if the ppoll is done released.

> With your approach another thread can squeeze in when ppoll(2) is
> returning so newer fd activity can be processed before we processed
> *before* older activity.  Not sure out-of-order callbacks are a problem
> but it can happen since we don't have fairness.

I think this should not happen.  The other thread would rerun ppoll(2).
 Since poll/ppoll are level-triggered, you could have some flags
processed twice.  But this is not a problem, we had the same bug with
iothread and qemu_aio_wait and we should have fixed all occurrences.

> But at least this way other threads can acquire the AioContext while
> ppoll(2) is blocked without racing against each other for the acquire
> lock.

Yes, that's also true.

Paolo
Stefan Hajnoczi Aug. 30, 2013, 2:25 p.m. UTC | #12
On Fri, Aug 30, 2013 at 3:24 PM, Paolo Bonzini <pbonzini@redhat.com> wrote:
> Il 30/08/2013 11:22, Stefan Hajnoczi ha scritto:
>> On Thu, Aug 29, 2013 at 10:26:31AM +0200, Paolo Bonzini wrote:
>>> Il 27/08/2013 16:39, Stefan Hajnoczi ha scritto:
>>>> +void aio_context_acquire(AioContext *ctx)
>>>> +{
>>>> +    qemu_mutex_lock(&ctx->acquire_lock);
>>>> +    while (ctx->owner) {
>>>> +        assert(!qemu_thread_is_self(ctx->owner));
>>>> +        aio_notify(ctx); /* kick current owner */
>>>> +        qemu_cond_wait(&ctx->acquire_cond, &ctx->acquire_lock);
>>>> +    }
>>>> +    qemu_thread_get_self(&ctx->owner_thread);
>>>> +    ctx->owner = &ctx->owner_thread;
>>>> +    qemu_mutex_unlock(&ctx->acquire_lock);
>>>> +}
>>>> +
>>>> +void aio_context_release(AioContext *ctx)
>>>> +{
>>>> +    qemu_mutex_lock(&ctx->acquire_lock);
>>>> +    assert(ctx->owner && qemu_thread_is_self(ctx->owner));
>>>> +    ctx->owner = NULL;
>>>> +    qemu_cond_signal(&ctx->acquire_cond);
>>>> +    qemu_mutex_unlock(&ctx->acquire_lock);
>>>> +}
>>>
>>> Thinking more about it, there is a risk of busy waiting here if one
>>> thread releases the AioContext and tries to acquire it again (as in the
>>> common case of one thread doing acquire/poll/release in a loop).  It
>>> would only work if mutexes guarantee some level of fairness.
>>
>> You are right.  I wrote a test that showed there is no fairness.  For
>> some reason I thought the condvar would provide fairness.
>>
>>> If you implement recursive acquisition, however, you can make aio_poll
>>> acquire the context up until just before it invokes ppoll, and then
>>> again after it comes back from the ppoll.  The two acquire/release pair
>>> will be no-ops if called during "synchronous" I/O such as
>>>
>>>   /* Another thread */
>>>   aio_context_acquire(ctx);
>>>   bdrv_read(bs, 0x1000, buf, 1);
>>>   aio_context_release(ctx);
>>>
>>> Yet they will do the right thing when called from the event loop thread.
>>>
>>> (where the bdrv_read can actually be something more complicated such as
>>> a live snapshot or, in general, anything involving bdrv_drain_all).
>>
>> This doesn't guarantee fairness either, right?
>
> Yes, but the non-zero timeout of ppoll would in practice guarantee it.
> The problem happens only when the release and acquire are very close in
> time, which shouldn't happen if the ppoll is done released.
>
>> With your approach another thread can squeeze in when ppoll(2) is
>> returning so newer fd activity can be processed before we processed
>> *before* older activity.  Not sure out-of-order callbacks are a problem
>> but it can happen since we don't have fairness.
>
> I think this should not happen.  The other thread would rerun ppoll(2).
>  Since poll/ppoll are level-triggered, you could have some flags
> processed twice.  But this is not a problem, we had the same bug with
> iothread and qemu_aio_wait and we should have fixed all occurrences.

I forgot they are level-triggered.  Releasing around the blocking
operation (ppoll) is similar to how iothread/vcpu thread work so it
seems like a good idea to follow that pattern here too.

I'll implement this in the next revision.

Stefan
Michael Roth Sept. 10, 2013, 7:42 p.m. UTC | #13
Quoting Stefan Hajnoczi (2013-08-29 02:43:02)
> On Thu, Aug 29, 2013 at 09:09:45AM +0800, Wenchao Xia wrote:
> > 于 2013-8-28 16:49, Stefan Hajnoczi 写道:
> > >On Wed, Aug 28, 2013 at 11:25:33AM +0800, Wenchao Xia wrote:
> > >>>+void aio_context_release(AioContext *ctx)
> > >>>+{
> > >>>+    qemu_mutex_lock(&ctx->acquire_lock);
> > >>>+    assert(ctx->owner && qemu_thread_is_self(ctx->owner));
> > >>>+    ctx->owner = NULL;
> > >>>+    qemu_cond_signal(&ctx->acquire_cond);
> > >>>+    qemu_mutex_unlock(&ctx->acquire_lock);
> > >>>+}
> > >>   if main thread have call bdrv_aio_readv(cb *bdrv_cb), now it
> > >>is possible bdrv_cb will be executed in another thread which
> > >>aio_context_acquire() it. I think there are some ways to solve,
> > >>but leave a comments here now to tip better?
> > >
> > >Callbacks, BHs, and timers are executed in the thread that calls
> > >aio_poll().  This is safe since other threads cannot run aio_poll() or
> > >submit new block I/O requests at the same time.
> > >
> > >In other words: code should only care which AioContext it runs under,
> > >not which thread ID it runs under.  (I think we talked about this on IRC
> > >a few weeks ago.)
> > >
> > >Are there any situations you are worried about?
> > >
> > >Stefan
> > >
> >   Yes, we have discussed it before and think it may be safe for block
> > driver caller. Still, here I mean to add some in-code comment to tip how
> > to use it safely.
> > 
> > for example:
> > 
> > static int glob_test = 0;
> > 
> > int aio_cb(void *opaque)
> > {
> >     glob_test++;
> > }
> > 
> > Thread A:
> > bdrv_aio_read(bs, aio_cb...);
> > .....
> > glob_test++;
> > 
> > 
> > Normally glob_test have no race condition since they supposed
> > to work in one thread, but it need to be considered when
> > aio_context_acquire() is involved. How about:
> > /* Note that callback can run in different thread which acquired the
> > AioContext and do a poll() call. */
> 
> I will add a comment to aio_context_acquire() to explain that callbacks,
> timers, and BHs may run in another thread.
> 
> Normally this is not a problem since the callbacks access BDS or
> AioContext, which are both protected by acquire/release.  But people
> should be aware of this.

Do you imagine we'll ever have a case where one main loop thread attempts
to register events (aio_set_fd_handler etc.) with another thread? Currently
virtio-blk (iothread) registers events with the dataplane's AioContext, but
doesn't start up the dataplane thread until afterward, so there's no
contention there. But looking at a scenario where the dataplane threads
are created/started independently of virtio-blk (as in the QContext RFC for
example), that may not be the case (there we start up the main loop thread
during machine init time, then attach events to it via dataplane start
routine).

So, in that case, I take it we'd do something like this, in, say,
virtio_blk_data_plane_start?

    aio_context_acquire(ctx)
    aio_set_fd_handler(ctx, ...)
    aio_context_release(ctx)

Or maybe push the acquire/release down into aio_set_fd_handler, and allow
for support for recursive acquisition?

As far as locking constraints (for situations like virtio-net dataplane, or
threaded virtio-blk, where we may have lots of shared state between threads,
potentially protected by a 'block'/'network' lock or something along that
line), I assume we need to ensure that any locks that may be acquired after
acquisition of an AioContext (via, say, event callbacks), must be released
prior to calling aio_context_acquire() from another thread to avoid an
ABBA deadlock/'lock'-order reversal?

I ask because I'm looking at how to handle this same scenario for
qemu_set_fd_handler, set_fd_handler(ctx, ...). My approach is a little
different:

https://github.com/mdroth/qemu/commit/9a749a2a1ae93ac1b7d6a1216edaf0ca96ff1edb#L1R110

but i think the requirements end up being similar for how
users need to structure their code and handle locking, just wanted to
double-check though because it seems like a potential pain to have to
figure out what locks you need to drop prior to do event registration.
(one alternative to this requirement is making the event updates
asynchronous, and pushing the logic to deal with stable
callbacks/events into the event callbacks and their opaque data).

For anything outside of event registration, I assume a 'light-weight'
AioContext loop could be converted to driving a more general GMainContext
loop via the corresponding g_main_context_acquire(ctx)/release(ctx)?

> 
> Stefan
Stefan Hajnoczi Sept. 12, 2013, 8:11 a.m. UTC | #14
On Tue, Sep 10, 2013 at 02:42:10PM -0500, Michael Roth wrote:
> Quoting Stefan Hajnoczi (2013-08-29 02:43:02)
> > On Thu, Aug 29, 2013 at 09:09:45AM +0800, Wenchao Xia wrote:
> > > 于 2013-8-28 16:49, Stefan Hajnoczi 写道:
> > > >On Wed, Aug 28, 2013 at 11:25:33AM +0800, Wenchao Xia wrote:
> > > >>>+void aio_context_release(AioContext *ctx)
> > > >>>+{
> > > >>>+    qemu_mutex_lock(&ctx->acquire_lock);
> > > >>>+    assert(ctx->owner && qemu_thread_is_self(ctx->owner));
> > > >>>+    ctx->owner = NULL;
> > > >>>+    qemu_cond_signal(&ctx->acquire_cond);
> > > >>>+    qemu_mutex_unlock(&ctx->acquire_lock);
> > > >>>+}
> > > >>   if main thread have call bdrv_aio_readv(cb *bdrv_cb), now it
> > > >>is possible bdrv_cb will be executed in another thread which
> > > >>aio_context_acquire() it. I think there are some ways to solve,
> > > >>but leave a comments here now to tip better?
> > > >
> > > >Callbacks, BHs, and timers are executed in the thread that calls
> > > >aio_poll().  This is safe since other threads cannot run aio_poll() or
> > > >submit new block I/O requests at the same time.
> > > >
> > > >In other words: code should only care which AioContext it runs under,
> > > >not which thread ID it runs under.  (I think we talked about this on IRC
> > > >a few weeks ago.)
> > > >
> > > >Are there any situations you are worried about?
> > > >
> > > >Stefan
> > > >
> > >   Yes, we have discussed it before and think it may be safe for block
> > > driver caller. Still, here I mean to add some in-code comment to tip how
> > > to use it safely.
> > > 
> > > for example:
> > > 
> > > static int glob_test = 0;
> > > 
> > > int aio_cb(void *opaque)
> > > {
> > >     glob_test++;
> > > }
> > > 
> > > Thread A:
> > > bdrv_aio_read(bs, aio_cb...);
> > > .....
> > > glob_test++;
> > > 
> > > 
> > > Normally glob_test have no race condition since they supposed
> > > to work in one thread, but it need to be considered when
> > > aio_context_acquire() is involved. How about:
> > > /* Note that callback can run in different thread which acquired the
> > > AioContext and do a poll() call. */
> > 
> > I will add a comment to aio_context_acquire() to explain that callbacks,
> > timers, and BHs may run in another thread.
> > 
> > Normally this is not a problem since the callbacks access BDS or
> > AioContext, which are both protected by acquire/release.  But people
> > should be aware of this.
> 
> Do you imagine we'll ever have a case where one main loop thread attempts
> to register events (aio_set_fd_handler etc.) with another thread? Currently
> virtio-blk (iothread) registers events with the dataplane's AioContext, but
> doesn't start up the dataplane thread until afterward, so there's no
> contention there. But looking at a scenario where the dataplane threads
> are created/started independently of virtio-blk (as in the QContext RFC for
> example), that may not be the case (there we start up the main loop thread
> during machine init time, then attach events to it via dataplane start
> routine).
> 
> So, in that case, I take it we'd do something like this, in, say,
> virtio_blk_data_plane_start?
> 
>     aio_context_acquire(ctx)
>     aio_set_fd_handler(ctx, ...)
>     aio_context_release(ctx)
> 
> Or maybe push the acquire/release down into aio_set_fd_handler, and allow
> for support for recursive acquisition?

Or use a BH to schedule initialization code into the AioContext.

> As far as locking constraints (for situations like virtio-net dataplane, or
> threaded virtio-blk, where we may have lots of shared state between threads,
> potentially protected by a 'block'/'network' lock or something along that
> line), I assume we need to ensure that any locks that may be acquired after
> acquisition of an AioContext (via, say, event callbacks), must be released
> prior to calling aio_context_acquire() from another thread to avoid an
> ABBA deadlock/'lock'-order reversal?

We should avoid long-held locks where possible.  For example, protect
the relevant subsystem structures (e.g. bdrv_states or net_clients)
during access (adding/remove a device) but don't have a block/net
subsystem lock.

This way it's much easier to avoid lock ordering problems since
fine-grained locks also don't nest as much.

> I ask because I'm looking at how to handle this same scenario for
> qemu_set_fd_handler, set_fd_handler(ctx, ...). My approach is a little
> different:
> 
> https://github.com/mdroth/qemu/commit/9a749a2a1ae93ac1b7d6a1216edaf0ca96ff1edb#L1R110
> 
> but i think the requirements end up being similar for how
> users need to structure their code and handle locking, just wanted to
> double-check though because it seems like a potential pain to have to
> figure out what locks you need to drop prior to do event registration.
> (one alternative to this requirement is making the event updates
> asynchronous, and pushing the logic to deal with stable
> callbacks/events into the event callbacks and their opaque data).
> 
> For anything outside of event registration, I assume a 'light-weight'
> AioContext loop could be converted to driving a more general GMainContext
> loop via the corresponding g_main_context_acquire(ctx)/release(ctx)?

If there are any doubts about lock ordering, I think the simplest
solution is executing code in the context of the event loop thread (see
the comment I made about BHs).

Stefan
diff mbox

Patch

diff --git a/include/block/aio.h b/include/block/aio.h
index 5743bf1..9035e87 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -45,6 +45,11 @@  typedef void IOHandler(void *opaque);
 typedef struct AioContext {
     GSource source;
 
+    QemuMutex acquire_lock;
+    QemuCond acquire_cond;
+    QemuThread owner_thread;
+    QemuThread *owner;
+
     /* The list of registered AIO handlers */
     QLIST_HEAD(, AioHandler) aio_handlers;
 
@@ -99,6 +104,14 @@  void aio_context_ref(AioContext *ctx);
  */
 void aio_context_unref(AioContext *ctx);
 
+/* Take ownership of the AioContext.  If the AioContext will be shared between
+ * threads, a thread must have ownership when calling aio_poll().
+ */
+void aio_context_acquire(AioContext *ctx);
+
+/* Reliquinish ownership of the AioContext. */
+void aio_context_release(AioContext *ctx);
+
 /**
  * aio_bh_new: Allocate a new bottom half structure.
  *
diff --git a/async.c b/async.c
index 9791d8e..9fec07c 100644
--- a/async.c
+++ b/async.c
@@ -203,6 +203,8 @@  aio_ctx_finalize(GSource     *source)
     thread_pool_free(ctx->thread_pool);
     aio_set_event_notifier(ctx, &ctx->notifier, NULL);
     event_notifier_cleanup(&ctx->notifier);
+    qemu_cond_destroy(&ctx->acquire_cond);
+    qemu_mutex_destroy(&ctx->acquire_lock);
     qemu_mutex_destroy(&ctx->bh_lock);
     g_array_free(ctx->pollfds, TRUE);
 }
@@ -240,6 +242,9 @@  AioContext *aio_context_new(void)
     ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
     ctx->thread_pool = NULL;
     qemu_mutex_init(&ctx->bh_lock);
+    qemu_mutex_init(&ctx->acquire_lock);
+    qemu_cond_init(&ctx->acquire_cond);
+    ctx->owner = NULL;
     event_notifier_init(&ctx->notifier, false);
     aio_set_event_notifier(ctx, &ctx->notifier, 
                            (EventNotifierHandler *)
@@ -257,3 +262,25 @@  void aio_context_unref(AioContext *ctx)
 {
     g_source_unref(&ctx->source);
 }
+
+void aio_context_acquire(AioContext *ctx)
+{
+    qemu_mutex_lock(&ctx->acquire_lock);
+    while (ctx->owner) {
+        assert(!qemu_thread_is_self(ctx->owner));
+        aio_notify(ctx); /* kick current owner */
+        qemu_cond_wait(&ctx->acquire_cond, &ctx->acquire_lock);
+    }
+    qemu_thread_get_self(&ctx->owner_thread);
+    ctx->owner = &ctx->owner_thread;
+    qemu_mutex_unlock(&ctx->acquire_lock);
+}
+
+void aio_context_release(AioContext *ctx)
+{
+    qemu_mutex_lock(&ctx->acquire_lock);
+    assert(ctx->owner && qemu_thread_is_self(ctx->owner));
+    ctx->owner = NULL;
+    qemu_cond_signal(&ctx->acquire_cond);
+    qemu_mutex_unlock(&ctx->acquire_lock);
+}
diff --git a/tests/test-aio.c b/tests/test-aio.c
index 1ab5637..324c099 100644
--- a/tests/test-aio.c
+++ b/tests/test-aio.c
@@ -88,6 +88,63 @@  static void test_notify(void)
     g_assert(!aio_poll(ctx, false));
 }
 
+typedef struct {
+    QemuMutex start_lock;
+    bool thread_acquired;
+} AcquireTestData;
+
+static void *test_acquire_thread(void *opaque)
+{
+    AcquireTestData *data = opaque;
+
+    /* Wait for other thread to let us start */
+    qemu_mutex_lock(&data->start_lock);
+    qemu_mutex_unlock(&data->start_lock);
+
+    aio_context_acquire(ctx);
+    aio_context_release(ctx);
+
+    data->thread_acquired = true; /* success, we got here */
+
+    return NULL;
+}
+
+static void dummy_notifier_read(EventNotifier *unused)
+{
+    g_assert(false); /* should never be invoked */
+}
+
+static void test_acquire(void)
+{
+    QemuThread thread;
+    EventNotifier notifier;
+    AcquireTestData data;
+
+    /* Dummy event notifier ensures aio_poll() will block */
+    event_notifier_init(&notifier, false);
+    aio_set_event_notifier(ctx, &notifier, dummy_notifier_read);
+    g_assert(!aio_poll(ctx, false)); /* consume aio_notify() */
+
+    qemu_mutex_init(&data.start_lock);
+    qemu_mutex_lock(&data.start_lock);
+    data.thread_acquired = false;
+
+    qemu_thread_create(&thread, test_acquire_thread,
+                       &data, QEMU_THREAD_JOINABLE);
+
+    /* Block in aio_poll(), let other thread kick us and acquire context */
+    aio_context_acquire(ctx);
+    qemu_mutex_unlock(&data.start_lock); /* let the thread run */
+    g_assert(!aio_poll(ctx, true));
+    aio_context_release(ctx);
+
+    qemu_thread_join(&thread);
+    aio_set_event_notifier(ctx, &notifier, NULL);
+    event_notifier_cleanup(&notifier);
+
+    g_assert(data.thread_acquired);
+}
+
 static void test_bh_schedule(void)
 {
     BHTestData data = { .n = 0 };
@@ -639,6 +696,7 @@  int main(int argc, char **argv)
 
     g_test_init(&argc, &argv, NULL);
     g_test_add_func("/aio/notify",                  test_notify);
+    g_test_add_func("/aio/acquire",                 test_acquire);
     g_test_add_func("/aio/bh/schedule",             test_bh_schedule);
     g_test_add_func("/aio/bh/schedule10",           test_bh_schedule10);
     g_test_add_func("/aio/bh/cancel",               test_bh_cancel);