Message ID | 1371178554-14432-1-git-send-email-pingfanl@linux.vnet.ibm.com |
---|---|
State | New |
Headers | show |
Il 13/06/2013 22:55, Liu Ping Fan ha scritto: > BH will be used outside big lock, so introduce lock to protect it. > Note that the lock only affects the writer and bh's callback does > not take this extra lock. > > Signed-off-by: Liu Ping Fan <pingfanl@linux.vnet.ibm.com> > --- > async.c | 10 +++++++++- > include/block/aio.h | 2 ++ > 2 files changed, 11 insertions(+), 1 deletion(-) > > diff --git a/async.c b/async.c > index 90fe906..b1dcead 100644 > --- a/async.c > +++ b/async.c > @@ -47,8 +47,11 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque) > bh->ctx = ctx; > bh->cb = cb; > bh->opaque = opaque; > + qemu_mutex_lock(&ctx->bh_lock); > bh->next = ctx->first_bh; > + smp_wmb(); > ctx->first_bh = bh; > + qemu_mutex_unlock(&ctx->bh_lock); > return bh; > } > > @@ -60,7 +63,9 @@ int aio_bh_poll(AioContext *ctx) > ctx->walking_bh++; > > ret = 0; > - for (bh = ctx->first_bh; bh; bh = next) { > + bh = ctx->first_bh; > + smp_rmb(); > + for (; bh; bh = next) { > next = bh->next; The read barrier in theory should go inside the loop. On the other hand, it only needs to be a smp_read_barrier_depends(). Paolo > if (!bh->deleted && bh->scheduled) { > bh->scheduled = 0; > @@ -75,6 +80,7 @@ int aio_bh_poll(AioContext *ctx) > > /* remove deleted bhs */ > if (!ctx->walking_bh) { > + qemu_mutex_lock(&ctx->bh_lock); > bhp = &ctx->first_bh; > while (*bhp) { > bh = *bhp; > @@ -85,6 +91,7 @@ int aio_bh_poll(AioContext *ctx) > bhp = &bh->next; > } > } > + qemu_mutex_unlock(&ctx->bh_lock); > } > > return ret; > @@ -211,6 +218,7 @@ AioContext *aio_context_new(void) > ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext)); > ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD)); > ctx->thread_pool = NULL; > + qemu_mutex_init(&ctx->bh_lock); > event_notifier_init(&ctx->notifier, false); > aio_set_event_notifier(ctx, &ctx->notifier, > (EventNotifierHandler *) > diff --git a/include/block/aio.h b/include/block/aio.h > index 1836793..a65d16f 100644 > --- a/include/block/aio.h > +++ b/include/block/aio.h > @@ -17,6 +17,7 @@ > #include "qemu-common.h" > #include "qemu/queue.h" > #include "qemu/event_notifier.h" > +#include "qemu/thread.h" > > typedef struct BlockDriverAIOCB BlockDriverAIOCB; > typedef void BlockDriverCompletionFunc(void *opaque, int ret); > @@ -53,6 +54,7 @@ typedef struct AioContext { > */ > int walking_handlers; > > + QemuMutex bh_lock; > /* Anchor of the list of Bottom Halves belonging to the context */ > struct QEMUBH *first_bh; > >
On Sat, Jun 15, 2013 at 1:35 PM, Paolo Bonzini <pbonzini@redhat.com> wrote: > Il 13/06/2013 22:55, Liu Ping Fan ha scritto: >> BH will be used outside big lock, so introduce lock to protect it. >> Note that the lock only affects the writer and bh's callback does >> not take this extra lock. >> >> Signed-off-by: Liu Ping Fan <pingfanl@linux.vnet.ibm.com> >> --- >> async.c | 10 +++++++++- >> include/block/aio.h | 2 ++ >> 2 files changed, 11 insertions(+), 1 deletion(-) >> >> diff --git a/async.c b/async.c >> index 90fe906..b1dcead 100644 >> --- a/async.c >> +++ b/async.c >> @@ -47,8 +47,11 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque) >> bh->ctx = ctx; >> bh->cb = cb; >> bh->opaque = opaque; >> + qemu_mutex_lock(&ctx->bh_lock); >> bh->next = ctx->first_bh; >> + smp_wmb(); >> ctx->first_bh = bh; >> + qemu_mutex_unlock(&ctx->bh_lock); >> return bh; >> } >> >> @@ -60,7 +63,9 @@ int aio_bh_poll(AioContext *ctx) >> ctx->walking_bh++; >> >> ret = 0; >> - for (bh = ctx->first_bh; bh; bh = next) { >> + bh = ctx->first_bh; >> + smp_rmb(); >> + for (; bh; bh = next) { >> next = bh->next; > > The read barrier in theory should go inside the loop. On the other For read after more than 1 writer? > hand, it only needs to be a smp_read_barrier_depends(). > Ok, so it will depend on ops introduced in your urcu patch series. Thanks and regards, Pingfan > Paolo > >> if (!bh->deleted && bh->scheduled) { >> bh->scheduled = 0; >> @@ -75,6 +80,7 @@ int aio_bh_poll(AioContext *ctx) >> >> /* remove deleted bhs */ >> if (!ctx->walking_bh) { >> + qemu_mutex_lock(&ctx->bh_lock); >> bhp = &ctx->first_bh; >> while (*bhp) { >> bh = *bhp; >> @@ -85,6 +91,7 @@ int aio_bh_poll(AioContext *ctx) >> bhp = &bh->next; >> } >> } >> + qemu_mutex_unlock(&ctx->bh_lock); >> } >> >> return ret; >> @@ -211,6 +218,7 @@ AioContext *aio_context_new(void) >> ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext)); >> ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD)); >> ctx->thread_pool = NULL; >> + qemu_mutex_init(&ctx->bh_lock); >> event_notifier_init(&ctx->notifier, false); >> aio_set_event_notifier(ctx, &ctx->notifier, >> (EventNotifierHandler *) >> diff --git a/include/block/aio.h b/include/block/aio.h >> index 1836793..a65d16f 100644 >> --- a/include/block/aio.h >> +++ b/include/block/aio.h >> @@ -17,6 +17,7 @@ >> #include "qemu-common.h" >> #include "qemu/queue.h" >> #include "qemu/event_notifier.h" >> +#include "qemu/thread.h" >> >> typedef struct BlockDriverAIOCB BlockDriverAIOCB; >> typedef void BlockDriverCompletionFunc(void *opaque, int ret); >> @@ -53,6 +54,7 @@ typedef struct AioContext { >> */ >> int walking_handlers; >> >> + QemuMutex bh_lock; >> /* Anchor of the list of Bottom Halves belonging to the context */ >> struct QEMUBH *first_bh; >> >> >
Il 15/06/2013 04:43, liu ping fan ha scritto: > On Sat, Jun 15, 2013 at 1:35 PM, Paolo Bonzini <pbonzini@redhat.com> wrote: >> Il 13/06/2013 22:55, Liu Ping Fan ha scritto: >>> BH will be used outside big lock, so introduce lock to protect it. >>> Note that the lock only affects the writer and bh's callback does >>> not take this extra lock. >>> >>> Signed-off-by: Liu Ping Fan <pingfanl@linux.vnet.ibm.com> >>> --- >>> async.c | 10 +++++++++- >>> include/block/aio.h | 2 ++ >>> 2 files changed, 11 insertions(+), 1 deletion(-) >>> >>> diff --git a/async.c b/async.c >>> index 90fe906..b1dcead 100644 >>> --- a/async.c >>> +++ b/async.c >>> @@ -47,8 +47,11 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque) >>> bh->ctx = ctx; >>> bh->cb = cb; >>> bh->opaque = opaque; >>> + qemu_mutex_lock(&ctx->bh_lock); >>> bh->next = ctx->first_bh; >>> + smp_wmb(); >>> ctx->first_bh = bh; >>> + qemu_mutex_unlock(&ctx->bh_lock); >>> return bh; >>> } >>> >>> @@ -60,7 +63,9 @@ int aio_bh_poll(AioContext *ctx) >>> ctx->walking_bh++; >>> >>> ret = 0; >>> - for (bh = ctx->first_bh; bh; bh = next) { >>> + bh = ctx->first_bh; >>> + smp_rmb(); >>> + for (; bh; bh = next) { >>> next = bh->next; >> >> The read barrier in theory should go inside the loop. On the other > > For read after more than 1 writer? Yes. >> hand, it only needs to be a smp_read_barrier_depends(). >> > Ok, so it will depend on ops introduced in your urcu patch series. Well, it's just the Alpha that needs the barrier, so it could even be commented for now. But... >>> if (!bh->deleted && bh->scheduled) { >>> bh->scheduled = 0; >>> @@ -75,6 +80,7 @@ int aio_bh_poll(AioContext *ctx) >>> >>> /* remove deleted bhs */ >>> if (!ctx->walking_bh) { >>> + qemu_mutex_lock(&ctx->bh_lock); ... I'm not sure that this works yet. I see two problems: ctx->walking_bh needs to be accessed atomic, and while you are doing the deletions somebody could come up and start a read. Havoc. It's not an easy problem, because even with RCU the bottom half callback should run _outside_ the RCU critical section. I suggest that you hunt the kernel for something that is trying to do the same. One idea could be to have a separate list for scheduled bottom halves. You can walk this list destructively, so you have no problem releasing the RCU critical section while invoking the callbacks. Again, the kernel or urcu data structure library can help (urcu has a RCU-based FIFO). You can track if there are any deleted BHs, and if so do a call_rcu at the end of aio_bh_poll. You do not delete scheduled BHs, instead let the next aio_bh_poll remove them from the scheduled list and do another call_rcu. Thanks for doing this work, we need "textbook examples" of how to do this, that we can perhaps later apply to timers and all that. Paolo >>> bhp = &ctx->first_bh; >>> while (*bhp) { >>> bh = *bhp; >>> @@ -85,6 +91,7 @@ int aio_bh_poll(AioContext *ctx) >>> bhp = &bh->next; >>> } >>> } >>> + qemu_mutex_unlock(&ctx->bh_lock); >>> } >>> >>> return ret; >>> @@ -211,6 +218,7 @@ AioContext *aio_context_new(void) >>> ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext)); >>> ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD)); >>> ctx->thread_pool = NULL; >>> + qemu_mutex_init(&ctx->bh_lock); >>> event_notifier_init(&ctx->notifier, false); >>> aio_set_event_notifier(ctx, &ctx->notifier, >>> (EventNotifierHandler *) >>> diff --git a/include/block/aio.h b/include/block/aio.h >>> index 1836793..a65d16f 100644 >>> --- a/include/block/aio.h >>> +++ b/include/block/aio.h >>> @@ -17,6 +17,7 @@ >>> #include "qemu-common.h" >>> #include "qemu/queue.h" >>> #include "qemu/event_notifier.h" >>> +#include "qemu/thread.h" >>> >>> typedef struct BlockDriverAIOCB BlockDriverAIOCB; >>> typedef void BlockDriverCompletionFunc(void *opaque, int ret); >>> @@ -53,6 +54,7 @@ typedef struct AioContext { >>> */ >>> int walking_handlers; >>> >>> + QemuMutex bh_lock; >>> /* Anchor of the list of Bottom Halves belonging to the context */ >>> struct QEMUBH *first_bh; >>> >>> >> > >
Il 15/06/2013 07:16, Paolo Bonzini ha scritto: > ... I'm not sure that this works yet. I see two problems: > ctx->walking_bh needs to be accessed atomic, and while you are doing the > deletions somebody could come up and start a read. Havoc. Hmm, are you just trying to protect aio_bh_poll from aio_bh_new, while still having only one thread that can do aio_bh_poll (multiple producer, single consumer)? In this case what you have should work and this patch is almost ready for commit. In fact it's actually important to have it for the dataplane stuff that Stefan is doing, I think. But _please document what you are doing_! It's not the first time that I tell you this: locking policy must be _thoroughly_ documented, and _not_ figured out by the reviewers. Without this, I'm not going to give my Reviewed-by. Regarding barriers, there is another write barrier you need to add before anything that sets bh->scheduled. This makes sure any writes that are needed by the callback are done before the locations are read in the aio_bh_poll thread. Similarly, you want a matching read barrier before calling the callback. Please pick up the atomics patch from my branch so that you have the smp_read_barrier_depends() primitive; and resubmit with documentation about the locking policy in the commit message _and_ the header files. Also please add a comment before each barrier saying what is ordered against what. Thanks, Paolo > It's not an easy problem, because even with RCU the bottom half callback > should run _outside_ the RCU critical section. I suggest that you hunt > the kernel for something that is trying to do the same.
On Sat, Jun 15, 2013 at 11:44 PM, Paolo Bonzini <pbonzini@redhat.com> wrote: > Il 15/06/2013 07:16, Paolo Bonzini ha scritto: >> ... I'm not sure that this works yet. I see two problems: >> ctx->walking_bh needs to be accessed atomic, and while you are doing the >> deletions somebody could come up and start a read. Havoc. > > Hmm, are you just trying to protect aio_bh_poll from aio_bh_new, while > still having only one thread that can do aio_bh_poll (multiple producer, > single consumer)? > Yes :) > In this case what you have should work and this patch is almost ready > for commit. In fact it's actually important to have it for the > dataplane stuff that Stefan is doing, I think. > > But _please document what you are doing_! It's not the first time that > I tell you this: locking policy must be _thoroughly_ documented, and > _not_ figured out by the reviewers. Without this, I'm not going to give > my Reviewed-by. > Ok, will document it. > Regarding barriers, there is another write barrier you need to add > before anything that sets bh->scheduled. This makes sure any writes > that are needed by the callback are done before the locations are read > in the aio_bh_poll thread. Similarly, you want a matching read barrier > before calling the callback. > Will fix. > Please pick up the atomics patch from my branch so that you have the > smp_read_barrier_depends() primitive; and resubmit with documentation > about the locking policy in the commit message _and_ the header files. > Also please add a comment before each barrier saying what is ordered > against what. > Ok. Thanks and regards, Pingfan > Thanks, > > Paolo > >> It's not an easy problem, because even with RCU the bottom half callback >> should run _outside_ the RCU critical section. I suggest that you hunt >> the kernel for something that is trying to do the same.
On Sat, Jun 15, 2013 at 7:16 PM, Paolo Bonzini <pbonzini@redhat.com> wrote: > Il 15/06/2013 04:43, liu ping fan ha scritto: >> On Sat, Jun 15, 2013 at 1:35 PM, Paolo Bonzini <pbonzini@redhat.com> wrote: >>> Il 13/06/2013 22:55, Liu Ping Fan ha scritto: >>>> BH will be used outside big lock, so introduce lock to protect it. >>>> Note that the lock only affects the writer and bh's callback does >>>> not take this extra lock. >>>> >>>> Signed-off-by: Liu Ping Fan <pingfanl@linux.vnet.ibm.com> >>>> --- >>>> async.c | 10 +++++++++- >>>> include/block/aio.h | 2 ++ >>>> 2 files changed, 11 insertions(+), 1 deletion(-) >>>> >>>> diff --git a/async.c b/async.c >>>> index 90fe906..b1dcead 100644 >>>> --- a/async.c >>>> +++ b/async.c >>>> @@ -47,8 +47,11 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque) >>>> bh->ctx = ctx; >>>> bh->cb = cb; >>>> bh->opaque = opaque; >>>> + qemu_mutex_lock(&ctx->bh_lock); >>>> bh->next = ctx->first_bh; >>>> + smp_wmb(); >>>> ctx->first_bh = bh; >>>> + qemu_mutex_unlock(&ctx->bh_lock); >>>> return bh; >>>> } >>>> >>>> @@ -60,7 +63,9 @@ int aio_bh_poll(AioContext *ctx) >>>> ctx->walking_bh++; >>>> >>>> ret = 0; >>>> - for (bh = ctx->first_bh; bh; bh = next) { >>>> + bh = ctx->first_bh; >>>> + smp_rmb(); >>>> + for (; bh; bh = next) { >>>> next = bh->next; >>> >>> The read barrier in theory should go inside the loop. On the other >> >> For read after more than 1 writer? > > Yes. > >>> hand, it only needs to be a smp_read_barrier_depends(). >>> >> Ok, so it will depend on ops introduced in your urcu patch series. > > Well, it's just the Alpha that needs the barrier, so it could even be > commented for now. But... > >>>> if (!bh->deleted && bh->scheduled) { >>>> bh->scheduled = 0; >>>> @@ -75,6 +80,7 @@ int aio_bh_poll(AioContext *ctx) >>>> >>>> /* remove deleted bhs */ >>>> if (!ctx->walking_bh) { >>>> + qemu_mutex_lock(&ctx->bh_lock); > > ... I'm not sure that this works yet. I see two problems: > ctx->walking_bh needs to be accessed atomic, and while you are doing the > deletions somebody could come up and start a read. Havoc. > Sorry, could you elaborate on this? I guess you plan to have thread-pools to run all of the AioContexts, so the reader and deleter can occur in parallel, right? Thanks and regards, Pingfan > It's not an easy problem, because even with RCU the bottom half callback > should run _outside_ the RCU critical section. I suggest that you hunt > the kernel for something that is trying to do the same. > > One idea could be to have a separate list for scheduled bottom halves. > You can walk this list destructively, so you have no problem releasing > the RCU critical section while invoking the callbacks. Again, the > kernel or urcu data structure library can help (urcu has a RCU-based FIFO). > > You can track if there are any deleted BHs, and if so do a call_rcu at > the end of aio_bh_poll. You do not delete scheduled BHs, instead let > the next aio_bh_poll remove them from the scheduled list and do another > call_rcu. > > Thanks for doing this work, we need "textbook examples" of how to do > this, that we can perhaps later apply to timers and all that. > > Paolo > >>>> bhp = &ctx->first_bh; >>>> while (*bhp) { >>>> bh = *bhp; >>>> @@ -85,6 +91,7 @@ int aio_bh_poll(AioContext *ctx) >>>> bhp = &bh->next; >>>> } >>>> } >>>> + qemu_mutex_unlock(&ctx->bh_lock); >>>> } >>>> >>>> return ret; >>>> @@ -211,6 +218,7 @@ AioContext *aio_context_new(void) >>>> ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext)); >>>> ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD)); >>>> ctx->thread_pool = NULL; >>>> + qemu_mutex_init(&ctx->bh_lock); >>>> event_notifier_init(&ctx->notifier, false); >>>> aio_set_event_notifier(ctx, &ctx->notifier, >>>> (EventNotifierHandler *) >>>> diff --git a/include/block/aio.h b/include/block/aio.h >>>> index 1836793..a65d16f 100644 >>>> --- a/include/block/aio.h >>>> +++ b/include/block/aio.h >>>> @@ -17,6 +17,7 @@ >>>> #include "qemu-common.h" >>>> #include "qemu/queue.h" >>>> #include "qemu/event_notifier.h" >>>> +#include "qemu/thread.h" >>>> >>>> typedef struct BlockDriverAIOCB BlockDriverAIOCB; >>>> typedef void BlockDriverCompletionFunc(void *opaque, int ret); >>>> @@ -53,6 +54,7 @@ typedef struct AioContext { >>>> */ >>>> int walking_handlers; >>>> >>>> + QemuMutex bh_lock; >>>> /* Anchor of the list of Bottom Halves belonging to the context */ >>>> struct QEMUBH *first_bh; >>>> >>>> >>> >> >> >
diff --git a/async.c b/async.c index 90fe906..b1dcead 100644 --- a/async.c +++ b/async.c @@ -47,8 +47,11 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque) bh->ctx = ctx; bh->cb = cb; bh->opaque = opaque; + qemu_mutex_lock(&ctx->bh_lock); bh->next = ctx->first_bh; + smp_wmb(); ctx->first_bh = bh; + qemu_mutex_unlock(&ctx->bh_lock); return bh; } @@ -60,7 +63,9 @@ int aio_bh_poll(AioContext *ctx) ctx->walking_bh++; ret = 0; - for (bh = ctx->first_bh; bh; bh = next) { + bh = ctx->first_bh; + smp_rmb(); + for (; bh; bh = next) { next = bh->next; if (!bh->deleted && bh->scheduled) { bh->scheduled = 0; @@ -75,6 +80,7 @@ int aio_bh_poll(AioContext *ctx) /* remove deleted bhs */ if (!ctx->walking_bh) { + qemu_mutex_lock(&ctx->bh_lock); bhp = &ctx->first_bh; while (*bhp) { bh = *bhp; @@ -85,6 +91,7 @@ int aio_bh_poll(AioContext *ctx) bhp = &bh->next; } } + qemu_mutex_unlock(&ctx->bh_lock); } return ret; @@ -211,6 +218,7 @@ AioContext *aio_context_new(void) ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext)); ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD)); ctx->thread_pool = NULL; + qemu_mutex_init(&ctx->bh_lock); event_notifier_init(&ctx->notifier, false); aio_set_event_notifier(ctx, &ctx->notifier, (EventNotifierHandler *) diff --git a/include/block/aio.h b/include/block/aio.h index 1836793..a65d16f 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -17,6 +17,7 @@ #include "qemu-common.h" #include "qemu/queue.h" #include "qemu/event_notifier.h" +#include "qemu/thread.h" typedef struct BlockDriverAIOCB BlockDriverAIOCB; typedef void BlockDriverCompletionFunc(void *opaque, int ret); @@ -53,6 +54,7 @@ typedef struct AioContext { */ int walking_handlers; + QemuMutex bh_lock; /* Anchor of the list of Bottom Halves belonging to the context */ struct QEMUBH *first_bh;
BH will be used outside big lock, so introduce lock to protect it. Note that the lock only affects the writer and bh's callback does not take this extra lock. Signed-off-by: Liu Ping Fan <pingfanl@linux.vnet.ibm.com> --- async.c | 10 +++++++++- include/block/aio.h | 2 ++ 2 files changed, 11 insertions(+), 1 deletion(-)