diff mbox

QEMUBH: make AioContext's bh re-entrant

Message ID 1371178554-14432-1-git-send-email-pingfanl@linux.vnet.ibm.com
State New
Headers show

Commit Message

pingfan liu June 14, 2013, 2:55 a.m. UTC
BH will be used outside big lock, so introduce lock to protect it.
Note that the lock only affects the writer and bh's callback does
not take this extra lock.

Signed-off-by: Liu Ping Fan <pingfanl@linux.vnet.ibm.com>
---
 async.c             | 10 +++++++++-
 include/block/aio.h |  2 ++
 2 files changed, 11 insertions(+), 1 deletion(-)

Comments

Paolo Bonzini June 15, 2013, 5:35 a.m. UTC | #1
Il 13/06/2013 22:55, Liu Ping Fan ha scritto:
> BH will be used outside big lock, so introduce lock to protect it.
> Note that the lock only affects the writer and bh's callback does
> not take this extra lock.
> 
> Signed-off-by: Liu Ping Fan <pingfanl@linux.vnet.ibm.com>
> ---
>  async.c             | 10 +++++++++-
>  include/block/aio.h |  2 ++
>  2 files changed, 11 insertions(+), 1 deletion(-)
> 
> diff --git a/async.c b/async.c
> index 90fe906..b1dcead 100644
> --- a/async.c
> +++ b/async.c
> @@ -47,8 +47,11 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
>      bh->ctx = ctx;
>      bh->cb = cb;
>      bh->opaque = opaque;
> +    qemu_mutex_lock(&ctx->bh_lock);
>      bh->next = ctx->first_bh;
> +    smp_wmb();
>      ctx->first_bh = bh;
> +    qemu_mutex_unlock(&ctx->bh_lock);
>      return bh;
>  }
>  
> @@ -60,7 +63,9 @@ int aio_bh_poll(AioContext *ctx)
>      ctx->walking_bh++;
>  
>      ret = 0;
> -    for (bh = ctx->first_bh; bh; bh = next) {
> +    bh = ctx->first_bh;
> +    smp_rmb();
> +    for (; bh; bh = next) {
>          next = bh->next;

The read barrier in theory should go inside the loop.  On the other
hand, it only needs to be a smp_read_barrier_depends().

Paolo

>          if (!bh->deleted && bh->scheduled) {
>              bh->scheduled = 0;
> @@ -75,6 +80,7 @@ int aio_bh_poll(AioContext *ctx)
>  
>      /* remove deleted bhs */
>      if (!ctx->walking_bh) {
> +        qemu_mutex_lock(&ctx->bh_lock);
>          bhp = &ctx->first_bh;
>          while (*bhp) {
>              bh = *bhp;
> @@ -85,6 +91,7 @@ int aio_bh_poll(AioContext *ctx)
>                  bhp = &bh->next;
>              }
>          }
> +        qemu_mutex_unlock(&ctx->bh_lock);
>      }
>  
>      return ret;
> @@ -211,6 +218,7 @@ AioContext *aio_context_new(void)
>      ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
>      ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
>      ctx->thread_pool = NULL;
> +    qemu_mutex_init(&ctx->bh_lock);
>      event_notifier_init(&ctx->notifier, false);
>      aio_set_event_notifier(ctx, &ctx->notifier, 
>                             (EventNotifierHandler *)
> diff --git a/include/block/aio.h b/include/block/aio.h
> index 1836793..a65d16f 100644
> --- a/include/block/aio.h
> +++ b/include/block/aio.h
> @@ -17,6 +17,7 @@
>  #include "qemu-common.h"
>  #include "qemu/queue.h"
>  #include "qemu/event_notifier.h"
> +#include "qemu/thread.h"
>  
>  typedef struct BlockDriverAIOCB BlockDriverAIOCB;
>  typedef void BlockDriverCompletionFunc(void *opaque, int ret);
> @@ -53,6 +54,7 @@ typedef struct AioContext {
>       */
>      int walking_handlers;
>  
> +    QemuMutex bh_lock;
>      /* Anchor of the list of Bottom Halves belonging to the context */
>      struct QEMUBH *first_bh;
>  
>
pingfan liu June 15, 2013, 8:43 a.m. UTC | #2
On Sat, Jun 15, 2013 at 1:35 PM, Paolo Bonzini <pbonzini@redhat.com> wrote:
> Il 13/06/2013 22:55, Liu Ping Fan ha scritto:
>> BH will be used outside big lock, so introduce lock to protect it.
>> Note that the lock only affects the writer and bh's callback does
>> not take this extra lock.
>>
>> Signed-off-by: Liu Ping Fan <pingfanl@linux.vnet.ibm.com>
>> ---
>>  async.c             | 10 +++++++++-
>>  include/block/aio.h |  2 ++
>>  2 files changed, 11 insertions(+), 1 deletion(-)
>>
>> diff --git a/async.c b/async.c
>> index 90fe906..b1dcead 100644
>> --- a/async.c
>> +++ b/async.c
>> @@ -47,8 +47,11 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
>>      bh->ctx = ctx;
>>      bh->cb = cb;
>>      bh->opaque = opaque;
>> +    qemu_mutex_lock(&ctx->bh_lock);
>>      bh->next = ctx->first_bh;
>> +    smp_wmb();
>>      ctx->first_bh = bh;
>> +    qemu_mutex_unlock(&ctx->bh_lock);
>>      return bh;
>>  }
>>
>> @@ -60,7 +63,9 @@ int aio_bh_poll(AioContext *ctx)
>>      ctx->walking_bh++;
>>
>>      ret = 0;
>> -    for (bh = ctx->first_bh; bh; bh = next) {
>> +    bh = ctx->first_bh;
>> +    smp_rmb();
>> +    for (; bh; bh = next) {
>>          next = bh->next;
>
> The read barrier in theory should go inside the loop.  On the other

For read after more than 1 writer?
> hand, it only needs to be a smp_read_barrier_depends().
>
Ok, so it will depend on ops introduced in your urcu patch series.

Thanks and regards,
Pingfan
> Paolo
>
>>          if (!bh->deleted && bh->scheduled) {
>>              bh->scheduled = 0;
>> @@ -75,6 +80,7 @@ int aio_bh_poll(AioContext *ctx)
>>
>>      /* remove deleted bhs */
>>      if (!ctx->walking_bh) {
>> +        qemu_mutex_lock(&ctx->bh_lock);
>>          bhp = &ctx->first_bh;
>>          while (*bhp) {
>>              bh = *bhp;
>> @@ -85,6 +91,7 @@ int aio_bh_poll(AioContext *ctx)
>>                  bhp = &bh->next;
>>              }
>>          }
>> +        qemu_mutex_unlock(&ctx->bh_lock);
>>      }
>>
>>      return ret;
>> @@ -211,6 +218,7 @@ AioContext *aio_context_new(void)
>>      ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
>>      ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
>>      ctx->thread_pool = NULL;
>> +    qemu_mutex_init(&ctx->bh_lock);
>>      event_notifier_init(&ctx->notifier, false);
>>      aio_set_event_notifier(ctx, &ctx->notifier,
>>                             (EventNotifierHandler *)
>> diff --git a/include/block/aio.h b/include/block/aio.h
>> index 1836793..a65d16f 100644
>> --- a/include/block/aio.h
>> +++ b/include/block/aio.h
>> @@ -17,6 +17,7 @@
>>  #include "qemu-common.h"
>>  #include "qemu/queue.h"
>>  #include "qemu/event_notifier.h"
>> +#include "qemu/thread.h"
>>
>>  typedef struct BlockDriverAIOCB BlockDriverAIOCB;
>>  typedef void BlockDriverCompletionFunc(void *opaque, int ret);
>> @@ -53,6 +54,7 @@ typedef struct AioContext {
>>       */
>>      int walking_handlers;
>>
>> +    QemuMutex bh_lock;
>>      /* Anchor of the list of Bottom Halves belonging to the context */
>>      struct QEMUBH *first_bh;
>>
>>
>
Paolo Bonzini June 15, 2013, 11:16 a.m. UTC | #3
Il 15/06/2013 04:43, liu ping fan ha scritto:
> On Sat, Jun 15, 2013 at 1:35 PM, Paolo Bonzini <pbonzini@redhat.com> wrote:
>> Il 13/06/2013 22:55, Liu Ping Fan ha scritto:
>>> BH will be used outside big lock, so introduce lock to protect it.
>>> Note that the lock only affects the writer and bh's callback does
>>> not take this extra lock.
>>>
>>> Signed-off-by: Liu Ping Fan <pingfanl@linux.vnet.ibm.com>
>>> ---
>>>  async.c             | 10 +++++++++-
>>>  include/block/aio.h |  2 ++
>>>  2 files changed, 11 insertions(+), 1 deletion(-)
>>>
>>> diff --git a/async.c b/async.c
>>> index 90fe906..b1dcead 100644
>>> --- a/async.c
>>> +++ b/async.c
>>> @@ -47,8 +47,11 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
>>>      bh->ctx = ctx;
>>>      bh->cb = cb;
>>>      bh->opaque = opaque;
>>> +    qemu_mutex_lock(&ctx->bh_lock);
>>>      bh->next = ctx->first_bh;
>>> +    smp_wmb();
>>>      ctx->first_bh = bh;
>>> +    qemu_mutex_unlock(&ctx->bh_lock);
>>>      return bh;
>>>  }
>>>
>>> @@ -60,7 +63,9 @@ int aio_bh_poll(AioContext *ctx)
>>>      ctx->walking_bh++;
>>>
>>>      ret = 0;
>>> -    for (bh = ctx->first_bh; bh; bh = next) {
>>> +    bh = ctx->first_bh;
>>> +    smp_rmb();
>>> +    for (; bh; bh = next) {
>>>          next = bh->next;
>>
>> The read barrier in theory should go inside the loop.  On the other
> 
> For read after more than 1 writer?

Yes.

>> hand, it only needs to be a smp_read_barrier_depends().
>>
> Ok, so it will depend on ops introduced in your urcu patch series.

Well, it's just the Alpha that needs the barrier, so it could even be
commented for now.  But...

>>>          if (!bh->deleted && bh->scheduled) {
>>>              bh->scheduled = 0;
>>> @@ -75,6 +80,7 @@ int aio_bh_poll(AioContext *ctx)
>>>
>>>      /* remove deleted bhs */
>>>      if (!ctx->walking_bh) {
>>> +        qemu_mutex_lock(&ctx->bh_lock);

... I'm not sure that this works yet.  I see two problems:
ctx->walking_bh needs to be accessed atomic, and while you are doing the
deletions somebody could come up and start a read.  Havoc.

It's not an easy problem, because even with RCU the bottom half callback
should run _outside_ the RCU critical section.  I suggest that you hunt
the kernel for something that is trying to do the same.

One idea could be to have a separate list for scheduled bottom halves.
You can walk this list destructively, so you have no problem releasing
the RCU critical section while invoking the callbacks.  Again, the
kernel or urcu data structure library can help (urcu has a RCU-based FIFO).

You can track if there are any deleted BHs, and if so do a call_rcu at
the end of aio_bh_poll.  You do not delete scheduled BHs, instead let
the next aio_bh_poll remove them from the scheduled list and do another
call_rcu.

Thanks for doing this work, we need "textbook examples" of how to do
this, that we can perhaps later apply to timers and all that.

Paolo

>>>          bhp = &ctx->first_bh;
>>>          while (*bhp) {
>>>              bh = *bhp;
>>> @@ -85,6 +91,7 @@ int aio_bh_poll(AioContext *ctx)
>>>                  bhp = &bh->next;
>>>              }
>>>          }
>>> +        qemu_mutex_unlock(&ctx->bh_lock);
>>>      }
>>>
>>>      return ret;
>>> @@ -211,6 +218,7 @@ AioContext *aio_context_new(void)
>>>      ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
>>>      ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
>>>      ctx->thread_pool = NULL;
>>> +    qemu_mutex_init(&ctx->bh_lock);
>>>      event_notifier_init(&ctx->notifier, false);
>>>      aio_set_event_notifier(ctx, &ctx->notifier,
>>>                             (EventNotifierHandler *)
>>> diff --git a/include/block/aio.h b/include/block/aio.h
>>> index 1836793..a65d16f 100644
>>> --- a/include/block/aio.h
>>> +++ b/include/block/aio.h
>>> @@ -17,6 +17,7 @@
>>>  #include "qemu-common.h"
>>>  #include "qemu/queue.h"
>>>  #include "qemu/event_notifier.h"
>>> +#include "qemu/thread.h"
>>>
>>>  typedef struct BlockDriverAIOCB BlockDriverAIOCB;
>>>  typedef void BlockDriverCompletionFunc(void *opaque, int ret);
>>> @@ -53,6 +54,7 @@ typedef struct AioContext {
>>>       */
>>>      int walking_handlers;
>>>
>>> +    QemuMutex bh_lock;
>>>      /* Anchor of the list of Bottom Halves belonging to the context */
>>>      struct QEMUBH *first_bh;
>>>
>>>
>>
> 
>
Paolo Bonzini June 15, 2013, 3:44 p.m. UTC | #4
Il 15/06/2013 07:16, Paolo Bonzini ha scritto:
> ... I'm not sure that this works yet.  I see two problems:
> ctx->walking_bh needs to be accessed atomic, and while you are doing the
> deletions somebody could come up and start a read.  Havoc.

Hmm, are you just trying to protect aio_bh_poll from aio_bh_new, while
still having only one thread that can do aio_bh_poll (multiple producer,
single consumer)?

In this case what you have should work and this patch is almost ready
for commit.  In fact it's actually important to have it for the
dataplane stuff that Stefan is doing, I think.

But _please document what you are doing_!  It's not the first time that
I tell you this: locking policy must be _thoroughly_ documented, and
_not_ figured out by the reviewers.  Without this, I'm not going to give
my Reviewed-by.

Regarding barriers, there is another write barrier you need to add
before anything that sets bh->scheduled.  This makes sure any writes
that are needed by the callback are done before the locations are read
in the aio_bh_poll thread.  Similarly, you want a matching read barrier
before calling the callback.

Please pick up the atomics patch from my branch so that you have the
smp_read_barrier_depends() primitive; and resubmit with documentation
about the locking policy in the commit message _and_ the header files.
Also please add a comment before each barrier saying what is ordered
against what.

Thanks,

Paolo

> It's not an easy problem, because even with RCU the bottom half callback
> should run _outside_ the RCU critical section.  I suggest that you hunt
> the kernel for something that is trying to do the same.
pingfan liu June 16, 2013, 10 a.m. UTC | #5
On Sat, Jun 15, 2013 at 11:44 PM, Paolo Bonzini <pbonzini@redhat.com> wrote:
> Il 15/06/2013 07:16, Paolo Bonzini ha scritto:
>> ... I'm not sure that this works yet.  I see two problems:
>> ctx->walking_bh needs to be accessed atomic, and while you are doing the
>> deletions somebody could come up and start a read.  Havoc.
>
> Hmm, are you just trying to protect aio_bh_poll from aio_bh_new, while
> still having only one thread that can do aio_bh_poll (multiple producer,
> single consumer)?
>
Yes :)
> In this case what you have should work and this patch is almost ready
> for commit.  In fact it's actually important to have it for the
> dataplane stuff that Stefan is doing, I think.
>
> But _please document what you are doing_!  It's not the first time that
> I tell you this: locking policy must be _thoroughly_ documented, and
> _not_ figured out by the reviewers.  Without this, I'm not going to give
> my Reviewed-by.
>
Ok, will document it.
> Regarding barriers, there is another write barrier you need to add
> before anything that sets bh->scheduled.  This makes sure any writes
> that are needed by the callback are done before the locations are read
> in the aio_bh_poll thread.  Similarly, you want a matching read barrier
> before calling the callback.
>
Will fix.
> Please pick up the atomics patch from my branch so that you have the
> smp_read_barrier_depends() primitive; and resubmit with documentation
> about the locking policy in the commit message _and_ the header files.
> Also please add a comment before each barrier saying what is ordered
> against what.
>
Ok.
Thanks and regards,
Pingfan

> Thanks,
>
> Paolo
>
>> It's not an easy problem, because even with RCU the bottom half callback
>> should run _outside_ the RCU critical section.  I suggest that you hunt
>> the kernel for something that is trying to do the same.
pingfan liu June 16, 2013, 10:03 a.m. UTC | #6
On Sat, Jun 15, 2013 at 7:16 PM, Paolo Bonzini <pbonzini@redhat.com> wrote:
> Il 15/06/2013 04:43, liu ping fan ha scritto:
>> On Sat, Jun 15, 2013 at 1:35 PM, Paolo Bonzini <pbonzini@redhat.com> wrote:
>>> Il 13/06/2013 22:55, Liu Ping Fan ha scritto:
>>>> BH will be used outside big lock, so introduce lock to protect it.
>>>> Note that the lock only affects the writer and bh's callback does
>>>> not take this extra lock.
>>>>
>>>> Signed-off-by: Liu Ping Fan <pingfanl@linux.vnet.ibm.com>
>>>> ---
>>>>  async.c             | 10 +++++++++-
>>>>  include/block/aio.h |  2 ++
>>>>  2 files changed, 11 insertions(+), 1 deletion(-)
>>>>
>>>> diff --git a/async.c b/async.c
>>>> index 90fe906..b1dcead 100644
>>>> --- a/async.c
>>>> +++ b/async.c
>>>> @@ -47,8 +47,11 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
>>>>      bh->ctx = ctx;
>>>>      bh->cb = cb;
>>>>      bh->opaque = opaque;
>>>> +    qemu_mutex_lock(&ctx->bh_lock);
>>>>      bh->next = ctx->first_bh;
>>>> +    smp_wmb();
>>>>      ctx->first_bh = bh;
>>>> +    qemu_mutex_unlock(&ctx->bh_lock);
>>>>      return bh;
>>>>  }
>>>>
>>>> @@ -60,7 +63,9 @@ int aio_bh_poll(AioContext *ctx)
>>>>      ctx->walking_bh++;
>>>>
>>>>      ret = 0;
>>>> -    for (bh = ctx->first_bh; bh; bh = next) {
>>>> +    bh = ctx->first_bh;
>>>> +    smp_rmb();
>>>> +    for (; bh; bh = next) {
>>>>          next = bh->next;
>>>
>>> The read barrier in theory should go inside the loop.  On the other
>>
>> For read after more than 1 writer?
>
> Yes.
>
>>> hand, it only needs to be a smp_read_barrier_depends().
>>>
>> Ok, so it will depend on ops introduced in your urcu patch series.
>
> Well, it's just the Alpha that needs the barrier, so it could even be
> commented for now.  But...
>
>>>>          if (!bh->deleted && bh->scheduled) {
>>>>              bh->scheduled = 0;
>>>> @@ -75,6 +80,7 @@ int aio_bh_poll(AioContext *ctx)
>>>>
>>>>      /* remove deleted bhs */
>>>>      if (!ctx->walking_bh) {
>>>> +        qemu_mutex_lock(&ctx->bh_lock);
>
> ... I'm not sure that this works yet.  I see two problems:
> ctx->walking_bh needs to be accessed atomic, and while you are doing the
> deletions somebody could come up and start a read.  Havoc.
>
Sorry, could you elaborate on this? I guess you plan to have
thread-pools to run all of the AioContexts, so the reader and deleter
can occur in parallel, right?

Thanks and regards,
Pingfan
> It's not an easy problem, because even with RCU the bottom half callback
> should run _outside_ the RCU critical section.  I suggest that you hunt
> the kernel for something that is trying to do the same.
>
> One idea could be to have a separate list for scheduled bottom halves.
> You can walk this list destructively, so you have no problem releasing
> the RCU critical section while invoking the callbacks.  Again, the
> kernel or urcu data structure library can help (urcu has a RCU-based FIFO).
>
> You can track if there are any deleted BHs, and if so do a call_rcu at
> the end of aio_bh_poll.  You do not delete scheduled BHs, instead let
> the next aio_bh_poll remove them from the scheduled list and do another
> call_rcu.
>
> Thanks for doing this work, we need "textbook examples" of how to do
> this, that we can perhaps later apply to timers and all that.
>
> Paolo
>
>>>>          bhp = &ctx->first_bh;
>>>>          while (*bhp) {
>>>>              bh = *bhp;
>>>> @@ -85,6 +91,7 @@ int aio_bh_poll(AioContext *ctx)
>>>>                  bhp = &bh->next;
>>>>              }
>>>>          }
>>>> +        qemu_mutex_unlock(&ctx->bh_lock);
>>>>      }
>>>>
>>>>      return ret;
>>>> @@ -211,6 +218,7 @@ AioContext *aio_context_new(void)
>>>>      ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
>>>>      ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
>>>>      ctx->thread_pool = NULL;
>>>> +    qemu_mutex_init(&ctx->bh_lock);
>>>>      event_notifier_init(&ctx->notifier, false);
>>>>      aio_set_event_notifier(ctx, &ctx->notifier,
>>>>                             (EventNotifierHandler *)
>>>> diff --git a/include/block/aio.h b/include/block/aio.h
>>>> index 1836793..a65d16f 100644
>>>> --- a/include/block/aio.h
>>>> +++ b/include/block/aio.h
>>>> @@ -17,6 +17,7 @@
>>>>  #include "qemu-common.h"
>>>>  #include "qemu/queue.h"
>>>>  #include "qemu/event_notifier.h"
>>>> +#include "qemu/thread.h"
>>>>
>>>>  typedef struct BlockDriverAIOCB BlockDriverAIOCB;
>>>>  typedef void BlockDriverCompletionFunc(void *opaque, int ret);
>>>> @@ -53,6 +54,7 @@ typedef struct AioContext {
>>>>       */
>>>>      int walking_handlers;
>>>>
>>>> +    QemuMutex bh_lock;
>>>>      /* Anchor of the list of Bottom Halves belonging to the context */
>>>>      struct QEMUBH *first_bh;
>>>>
>>>>
>>>
>>
>>
>
diff mbox

Patch

diff --git a/async.c b/async.c
index 90fe906..b1dcead 100644
--- a/async.c
+++ b/async.c
@@ -47,8 +47,11 @@  QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
     bh->ctx = ctx;
     bh->cb = cb;
     bh->opaque = opaque;
+    qemu_mutex_lock(&ctx->bh_lock);
     bh->next = ctx->first_bh;
+    smp_wmb();
     ctx->first_bh = bh;
+    qemu_mutex_unlock(&ctx->bh_lock);
     return bh;
 }
 
@@ -60,7 +63,9 @@  int aio_bh_poll(AioContext *ctx)
     ctx->walking_bh++;
 
     ret = 0;
-    for (bh = ctx->first_bh; bh; bh = next) {
+    bh = ctx->first_bh;
+    smp_rmb();
+    for (; bh; bh = next) {
         next = bh->next;
         if (!bh->deleted && bh->scheduled) {
             bh->scheduled = 0;
@@ -75,6 +80,7 @@  int aio_bh_poll(AioContext *ctx)
 
     /* remove deleted bhs */
     if (!ctx->walking_bh) {
+        qemu_mutex_lock(&ctx->bh_lock);
         bhp = &ctx->first_bh;
         while (*bhp) {
             bh = *bhp;
@@ -85,6 +91,7 @@  int aio_bh_poll(AioContext *ctx)
                 bhp = &bh->next;
             }
         }
+        qemu_mutex_unlock(&ctx->bh_lock);
     }
 
     return ret;
@@ -211,6 +218,7 @@  AioContext *aio_context_new(void)
     ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
     ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
     ctx->thread_pool = NULL;
+    qemu_mutex_init(&ctx->bh_lock);
     event_notifier_init(&ctx->notifier, false);
     aio_set_event_notifier(ctx, &ctx->notifier, 
                            (EventNotifierHandler *)
diff --git a/include/block/aio.h b/include/block/aio.h
index 1836793..a65d16f 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -17,6 +17,7 @@ 
 #include "qemu-common.h"
 #include "qemu/queue.h"
 #include "qemu/event_notifier.h"
+#include "qemu/thread.h"
 
 typedef struct BlockDriverAIOCB BlockDriverAIOCB;
 typedef void BlockDriverCompletionFunc(void *opaque, int ret);
@@ -53,6 +54,7 @@  typedef struct AioContext {
      */
     int walking_handlers;
 
+    QemuMutex bh_lock;
     /* Anchor of the list of Bottom Halves belonging to the context */
     struct QEMUBH *first_bh;