Message ID | 50AA420D.6070806@profihost.ag |
---|---|
State | New |
Headers | show |
Il 19/11/2012 15:28, Stefan Priebe - Profihost AG ha scritto: > typedef struct RADOSCB { > @@ -376,6 +377,10 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb) > RBDAIOCB *acb = rcb->acb; > int64_t r; > > + if (acb->bh) { > + return; > + } > + > r = rcb->ret; > > if (acb->cmd == RBD_AIO_WRITE || > @@ -560,6 +565,20 @@ static void qemu_rbd_close(BlockDriverState *bs) > rados_shutdown(s->cluster); > } > > +static void qemu_rbd_aio_abort(void *private_data) > +{ > + RBDAIOCB *acb = (RBDAIOCB *) private_data; > + > + acb->status = -ECANCELED; > + > + if (acb->bh) { > + return; > + } > + > + acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); > + qemu_bh_schedule(acb->bh); > +} I think this is all unneeded. Just store rcb->ret into rcb->acb->status, and your version of qemu_rbd_aio_cancel should just work. Also, I think the acb->cancelled field is not necessary anymore after these changes. Paolo
Am 19.11.2012 15:41, schrieb Paolo Bonzini: > Il 19/11/2012 15:28, Stefan Priebe - Profihost AG ha scritto: >> typedef struct RADOSCB { >> @@ -376,6 +377,10 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb) >> RBDAIOCB *acb = rcb->acb; >> int64_t r; >> >> + if (acb->bh) { >> + return; >> + } >> + >> r = rcb->ret; >> >> if (acb->cmd == RBD_AIO_WRITE || >> @@ -560,6 +565,20 @@ static void qemu_rbd_close(BlockDriverState *bs) >> rados_shutdown(s->cluster); >> } >> >> +static void qemu_rbd_aio_abort(void *private_data) >> +{ >> + RBDAIOCB *acb = (RBDAIOCB *) private_data; >> + >> + acb->status = -ECANCELED; >> + >> + if (acb->bh) { >> + return; >> + } >> + >> + acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); >> + qemu_bh_schedule(acb->bh); >> +} > > I think this is all unneeded. Just store rcb->ret into > rcb->acb->status, and your version of qemu_rbd_aio_cancel should just work. > > Also, I think the acb->cancelled field is not necessary anymore after > these changes. The iscsi driver still relies on canceled that's why i left it here in too. So you mean in qemu_rbd_complete_aio i should remove the check for cancelled and then just overwrite acb->status to that it changes from -EINPROGRESS to something else? And qemu_rbd_aio_cancel should just wait for status != -EINPROGRESS? Stefan
Il 19/11/2012 15:48, Stefan Priebe - Profihost AG ha scritto: > So you mean in qemu_rbd_complete_aio i should remove the check for > cancelled and then just overwrite acb->status to that it changes from > -EINPROGRESS to something else? > > And qemu_rbd_aio_cancel should just wait for status != -EINPROGRESS? Yes. paolo
From e9eac2c7ed7b98ff102ab7da4573f081ebca32fa Mon Sep 17 00:00:00 2001 From: Stefan Priebe <s.priebe@profihost.ag> Date: Mon, 19 Nov 2012 15:01:16 +0100 Subject: [PATCH 2/2] rbd: fix races between io completition and abort Signed-off-by: Stefan Priebe <s.priebe@profhost.ag> --- block/rbd.c | 40 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/block/rbd.c b/block/rbd.c index 583bcc3..ae1d03b 100644 --- a/block/rbd.c +++ b/block/rbd.c @@ -77,6 +77,7 @@ typedef struct RBDAIOCB { int error; struct BDRVRBDState *s; int cancelled; + int status; } RBDAIOCB; typedef struct RADOSCB { @@ -376,6 +377,10 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb) RBDAIOCB *acb = rcb->acb; int64_t r; + if (acb->bh) { + return; + } + r = rcb->ret; if (acb->cmd == RBD_AIO_WRITE || @@ -560,6 +565,20 @@ static void qemu_rbd_close(BlockDriverState *bs) rados_shutdown(s->cluster); } +static void qemu_rbd_aio_abort(void *private_data) +{ + RBDAIOCB *acb = (RBDAIOCB *) private_data; + + acb->status = -ECANCELED; + + if (acb->bh) { + return; + } + + acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); + qemu_bh_schedule(acb->bh); +} + /* * Cancel aio. Since we don't reference acb in a non qemu threads, * it is safe to access it here. @@ -567,7 +586,22 @@ static void qemu_rbd_close(BlockDriverState *bs) static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb) { RBDAIOCB *acb = (RBDAIOCB *) blockacb; + + if (acb->status != -EINPROGRESS) { + return; + } + acb->cancelled = 1; + + // TODO / FIXME: send an abort command to rbd + // Normally we should call abort librbd and + // librbd gets qemu_rbd_aio_abort as a callback function + // i wasn't able to find an abort function in librbd at all + qemu_rbd_aio_abort(acb); + + while (acb->status == -EINPROGRESS) { + qemu_aio_wait(); + } } static AIOPool rbd_aio_pool = { @@ -636,10 +670,13 @@ static void rbd_aio_bh_cb(void *opaque) qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size); } qemu_vfree(acb->bounce); - acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); qemu_bh_delete(acb->bh); acb->bh = NULL; + if (acb->cancelled == 0) { + acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); + } + qemu_aio_release(acb); } @@ -685,6 +722,7 @@ static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs, acb->s = s; acb->cancelled = 0; acb->bh = NULL; + acb->status = -EINPROGRESS; if (cmd == RBD_AIO_WRITE) { qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size); -- 1.7.10.4