Message ID | 1353357585-31746-2-git-send-email-s.priebe@profihost.ag |
---|---|
State | New |
Headers | show |
On 11/19/2012 12:39 PM, Stefan Priebe wrote: > From: Stefan Priebe <s.priebe@profhost.ag> > > This one fixes a race qemu also had in iscsi block driver between > cancellation and io completition. > > qemu_rbd_aio_cancel was not synchronously waiting for the end of > the command. > > It also removes the useless cancelled flag and introduces instead > a status flag with EINPROGRESS like iscsi block driver. > > Signed-off-by: Stefan Priebe <s.priebe@profihost.ag> > --- > block/rbd.c | 19 ++++++++++++------- > 1 file changed, 12 insertions(+), 7 deletions(-) > > diff --git a/block/rbd.c b/block/rbd.c > index 5a0f79f..7b3bcbb 100644 > --- a/block/rbd.c > +++ b/block/rbd.c > @@ -76,7 +76,7 @@ typedef struct RBDAIOCB { > int64_t sector_num; > int error; > struct BDRVRBDState *s; > - int cancelled; > + int status; > } RBDAIOCB; > > typedef struct RADOSCB { > @@ -376,9 +376,7 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb) > RBDAIOCB *acb = rcb->acb; > int64_t r; > > - if (acb->cancelled) { > - qemu_vfree(acb->bounce); > - qemu_aio_release(acb); > + if (acb->bh) { > goto done; > } I don't think this is necessary at all anymore, since this callback will never be called more than once, and it's the only thing that will allocate acb->bh. Removing this block (and the done label) altogether should have the intended effect, and the bh scheduled will free/release things as usual. > > @@ -406,9 +404,12 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb) > acb->ret = r; > } > } > + acb->status = acb->ret; > + > /* Note that acb->bh can be NULL in case where the aio was cancelled */ > acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); > qemu_bh_schedule(acb->bh); > + > done: > g_free(rcb); > } > @@ -573,7 +574,10 @@ static void qemu_rbd_close(BlockDriverState *bs) > static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb) > { > RBDAIOCB *acb = (RBDAIOCB *) blockacb; > - acb->cancelled = 1; > + > + while (acb->status == -EINPROGRESS) { > + qemu_aio_wait(); > + } > } > > static AIOPool rbd_aio_pool = { > @@ -642,10 +646,11 @@ static void rbd_aio_bh_cb(void *opaque) > qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size); > } > qemu_vfree(acb->bounce); > - acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); > qemu_bh_delete(acb->bh); > acb->bh = NULL; > > + acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); > + I'm not sure changing the order matters here, but maybe I'm missing something. > qemu_aio_release(acb); > } > > @@ -689,8 +694,8 @@ static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs, > acb->ret = 0; > acb->error = 0; > acb->s = s; > - acb->cancelled = 0; > acb->bh = NULL; > + acb->status = -EINPROGRESS; > > if (cmd == RBD_AIO_WRITE) { > qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size); >
On Mon, Nov 19, 2012 at 09:39:45PM +0100, Stefan Priebe wrote: > @@ -376,9 +376,7 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb) > RBDAIOCB *acb = rcb->acb; > int64_t r; > > - if (acb->cancelled) { > - qemu_vfree(acb->bounce); > - qemu_aio_release(acb); > + if (acb->bh) { > goto done; > } When is acb->bh != NULL here? > > @@ -406,9 +404,12 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb) > acb->ret = r; > } > } > + acb->status = acb->ret; How about initializing acb->ret with -EINPROGRESS and using it instead of adding a new status field? > @@ -573,7 +574,10 @@ static void qemu_rbd_close(BlockDriverState *bs) > static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb) > { > RBDAIOCB *acb = (RBDAIOCB *) blockacb; > - acb->cancelled = 1; > + > + while (acb->status == -EINPROGRESS) { > + qemu_aio_wait(); > + } > } Need to take care that acb is still valid (not yet released!) when the while loop iterates. One way of doing this is to mark the acb as cancelled so the completion handler won't release it. Then the cancellation function can release the acb - it's the last piece of code that needs a reference to acb. In this case the acb->cancelled field is useful. > @@ -642,10 +646,11 @@ static void rbd_aio_bh_cb(void *opaque) > qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size); > } > qemu_vfree(acb->bounce); > - acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); > qemu_bh_delete(acb->bh); > acb->bh = NULL; > > + acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); > + > qemu_aio_release(acb); > } > What does this hunk do?
Hello Stefan, hello Paolo, most of the ideas and removing the whole cancellation stuff came from Paolo. Maybe he can comment also? I would then make a new patch. Greets, Stefan Am 21.11.2012 10:07, schrieb Stefan Hajnoczi: > On Mon, Nov 19, 2012 at 09:39:45PM +0100, Stefan Priebe wrote: >> @@ -376,9 +376,7 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb) >> RBDAIOCB *acb = rcb->acb; >> int64_t r; >> >> - if (acb->cancelled) { >> - qemu_vfree(acb->bounce); >> - qemu_aio_release(acb); >> + if (acb->bh) { >> goto done; >> } > > When is acb->bh != NULL here? > >> >> @@ -406,9 +404,12 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb) >> acb->ret = r; >> } >> } >> + acb->status = acb->ret; > > How about initializing acb->ret with -EINPROGRESS and using it instead > of adding a new status field? > >> @@ -573,7 +574,10 @@ static void qemu_rbd_close(BlockDriverState *bs) >> static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb) >> { >> RBDAIOCB *acb = (RBDAIOCB *) blockacb; >> - acb->cancelled = 1; >> + >> + while (acb->status == -EINPROGRESS) { >> + qemu_aio_wait(); >> + } >> } > > Need to take care that acb is still valid (not yet released!) when the > while loop iterates. > > One way of doing this is to mark the acb as cancelled so the completion > handler won't release it. Then the cancellation function can release > the acb - it's the last piece of code that needs a reference to acb. In > this case the acb->cancelled field is useful. > >> @@ -642,10 +646,11 @@ static void rbd_aio_bh_cb(void *opaque) >> qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size); >> } >> qemu_vfree(acb->bounce); >> - acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); >> qemu_bh_delete(acb->bh); >> acb->bh = NULL; >> >> + acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); >> + >> qemu_aio_release(acb); >> } >> > > What does this hunk do? >
Il 21/11/2012 10:19, Stefan Priebe - Profihost AG ha scritto: > Hello Stefan, > hello Paolo, > > most of the ideas and removing the whole cancellation stuff came from > Paolo. Maybe he can comment also? I agree with all of Stefan's comments. This includes putting back acb->cancelled---but this time done right: note that qemu_rbd_complete_aio was always releasing the AIOCB if (acb->cancelled) { qemu_vfree(acb->bounce); qemu_aio_release(acb); goto done; } and according to Stefan's review it shouldn't. Paolo
Hello, i've send a new patch which hopefully cares about all your comments. [PATCH] rbd block driver fix race between aio completition and aio cancel Greets Stefan Am 21.11.2012 10:07, schrieb Stefan Hajnoczi: > On Mon, Nov 19, 2012 at 09:39:45PM +0100, Stefan Priebe wrote: >> @@ -376,9 +376,7 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb) >> RBDAIOCB *acb = rcb->acb; >> int64_t r; >> >> - if (acb->cancelled) { >> - qemu_vfree(acb->bounce); >> - qemu_aio_release(acb); >> + if (acb->bh) { >> goto done; >> } > > When is acb->bh != NULL here? > >> >> @@ -406,9 +404,12 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb) >> acb->ret = r; >> } >> } >> + acb->status = acb->ret; > > How about initializing acb->ret with -EINPROGRESS and using it instead > of adding a new status field? > >> @@ -573,7 +574,10 @@ static void qemu_rbd_close(BlockDriverState *bs) >> static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb) >> { >> RBDAIOCB *acb = (RBDAIOCB *) blockacb; >> - acb->cancelled = 1; >> + >> + while (acb->status == -EINPROGRESS) { >> + qemu_aio_wait(); >> + } >> } > > Need to take care that acb is still valid (not yet released!) when the > while loop iterates. > > One way of doing this is to mark the acb as cancelled so the completion > handler won't release it. Then the cancellation function can release > the acb - it's the last piece of code that needs a reference to acb. In > this case the acb->cancelled field is useful. > >> @@ -642,10 +646,11 @@ static void rbd_aio_bh_cb(void *opaque) >> qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size); >> } >> qemu_vfree(acb->bounce); >> - acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); >> qemu_bh_delete(acb->bh); >> acb->bh = NULL; >> >> + acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); >> + >> qemu_aio_release(acb); >> } >> > > What does this hunk do? >
diff --git a/block/rbd.c b/block/rbd.c index 5a0f79f..7b3bcbb 100644 --- a/block/rbd.c +++ b/block/rbd.c @@ -76,7 +76,7 @@ typedef struct RBDAIOCB { int64_t sector_num; int error; struct BDRVRBDState *s; - int cancelled; + int status; } RBDAIOCB; typedef struct RADOSCB { @@ -376,9 +376,7 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb) RBDAIOCB *acb = rcb->acb; int64_t r; - if (acb->cancelled) { - qemu_vfree(acb->bounce); - qemu_aio_release(acb); + if (acb->bh) { goto done; } @@ -406,9 +404,12 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb) acb->ret = r; } } + acb->status = acb->ret; + /* Note that acb->bh can be NULL in case where the aio was cancelled */ acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); qemu_bh_schedule(acb->bh); + done: g_free(rcb); } @@ -573,7 +574,10 @@ static void qemu_rbd_close(BlockDriverState *bs) static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb) { RBDAIOCB *acb = (RBDAIOCB *) blockacb; - acb->cancelled = 1; + + while (acb->status == -EINPROGRESS) { + qemu_aio_wait(); + } } static AIOPool rbd_aio_pool = { @@ -642,10 +646,11 @@ static void rbd_aio_bh_cb(void *opaque) qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size); } qemu_vfree(acb->bounce); - acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); qemu_bh_delete(acb->bh); acb->bh = NULL; + acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); + qemu_aio_release(acb); } @@ -689,8 +694,8 @@ static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs, acb->ret = 0; acb->error = 0; acb->s = s; - acb->cancelled = 0; acb->bh = NULL; + acb->status = -EINPROGRESS; if (cmd == RBD_AIO_WRITE) { qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);