Message ID | 1344347073-7773-7-git-send-email-benoit@irqsave.net |
---|---|
State | New |
Headers | show |
On Tue, Aug 7, 2012 at 2:44 PM, Benoît Canet <benoit.canet@gmail.com> wrote: > +static int quorum_check_ret(QuorumAIOCB *acb) > +{ > + int i, j; > + > + for (i = 0, j = 0; i <= 2; i++) { > + if (acb->aios[0].ret) { > + j++; > + } > + } > + > + if (j > 1) { > + return -EIO; > + } > + > + return 0; > +} Simpler version just scans the return values (also I think acb->aios[0].ret should be acb->aios[i].ret): static int quorum_check_ret(QuorumAIOCB *acb) { int i; for (i = 0; i <= 2; i++) { if (acb->aios[i].ret) { return -EIO; /* or acb->aios[i].ret */ } } return 0; } > + > +static void quorum_aio_bh(void *opaque) > +{ > + QuorumAIOCB *acb = opaque; > + > + qemu_bh_delete(acb->bh); > + acb->common.cb(acb->common.opaque, quorum_check_ret(acb)); > + if (acb->finished) { > + *acb->finished = true; > + } > + qemu_aio_release(acb); > +} > + > +static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs, > + QEMUIOVector *qiov, > + int64_t sector_num, > + int nb_sectors, > + BlockDriverCompletionFunc *cb, > + void *opaque) > +{ > + QuorumAIOCB *acb = qemu_aio_get(&quorum_aio_pool, bs, cb, opaque); > + int i; > + > + acb->qiov = qiov; > + acb->bh = NULL; > + acb->count = 0; > + acb->sector_num = sector_num; > + acb->nb_sectors = nb_sectors; > + acb->vote = NULL; > + acb->vote_ret = 0; acb->finished = NULL;
Le Wednesday 08 Aug 2012 à 16:37:13 (+0100), Stefan Hajnoczi a écrit : > On Tue, Aug 7, 2012 at 2:44 PM, Benoît Canet <benoit.canet@gmail.com> wrote: > > +static int quorum_check_ret(QuorumAIOCB *acb) > > +{ > > + int i, j; > > + > > + for (i = 0, j = 0; i <= 2; i++) { > > + if (acb->aios[0].ret) { > > + j++; > > + } > > + } > > + > > + if (j > 1) { > > + return -EIO; > > + } > > + > > + return 0; > > +} > > Simpler version just scans the return values (also I think > acb->aios[0].ret should be acb->aios[i].ret): > > static int quorum_check_ret(QuorumAIOCB *acb) > { > int i; > for (i = 0; i <= 2; i++) { > if (acb->aios[i].ret) { > return -EIO; /* or acb->aios[i].ret */ > } > } > return 0; > } I am wondering what is the best code to return. There is some potential case like a filer containing a particular image (or a image on the network) going down where the user probably don't want to get an -EIO. The if (j > 1) { return -EIO; } part was about detecting an error count greater dans the threshold (2). > > > + > > +static void quorum_aio_bh(void *opaque) > > +{ > > + QuorumAIOCB *acb = opaque; > > + > > + qemu_bh_delete(acb->bh); > > + acb->common.cb(acb->common.opaque, quorum_check_ret(acb)); > > + if (acb->finished) { > > + *acb->finished = true; > > + } > > + qemu_aio_release(acb); > > +} > > + > > +static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs, > > + QEMUIOVector *qiov, > > + int64_t sector_num, > > + int nb_sectors, > > + BlockDriverCompletionFunc *cb, > > + void *opaque) > > +{ > > + QuorumAIOCB *acb = qemu_aio_get(&quorum_aio_pool, bs, cb, opaque); > > + int i; > > + > > + acb->qiov = qiov; > > + acb->bh = NULL; > > + acb->count = 0; > > + acb->sector_num = sector_num; > > + acb->nb_sectors = nb_sectors; > > + acb->vote = NULL; > > + acb->vote_ret = 0; > > acb->finished = NULL; >
On Thu, Aug 9, 2012 at 10:24 AM, Benoît Canet <benoit.canet@irqsave.net> wrote: > Le Wednesday 08 Aug 2012 ŕ 16:37:13 (+0100), Stefan Hajnoczi a écrit : >> On Tue, Aug 7, 2012 at 2:44 PM, Benoît Canet <benoit.canet@gmail.com> wrote: >> > +static int quorum_check_ret(QuorumAIOCB *acb) >> > +{ >> > + int i, j; >> > + >> > + for (i = 0, j = 0; i <= 2; i++) { >> > + if (acb->aios[0].ret) { >> > + j++; >> > + } >> > + } >> > + >> > + if (j > 1) { >> > + return -EIO; >> > + } >> > + >> > + return 0; >> > +} >> >> Simpler version just scans the return values (also I think >> acb->aios[0].ret should be acb->aios[i].ret): >> >> static int quorum_check_ret(QuorumAIOCB *acb) >> { >> int i; >> for (i = 0; i <= 2; i++) { >> if (acb->aios[i].ret) { >> return -EIO; /* or acb->aios[i].ret */ >> } >> } >> return 0; >> } > > I am wondering what is the best code to return. > There is some potential case like a filer containing a particular > image (or a image on the network) going down where the user probably > don't want to get an -EIO. > > The > if (j > 1) { > return -EIO; > } > part was about detecting an error count greater dans the threshold (2). Yeah, this starts to get into policy and depends on the user. The best we can do is to make it configurable and choose a sane default. Stefan
diff --git a/block/quorum.c b/block/quorum.c index 5cd7083..e6d2274 100644 --- a/block/quorum.c +++ b/block/quorum.c @@ -133,6 +133,114 @@ static int64_t quorum_getlength(BlockDriverState *bs) return ret; } +static void quorum_aio_cancel(BlockDriverAIOCB *blockacb) +{ + QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common); + bool finished = false; + + /* Wait for the request to finish */ + acb->finished = &finished; + while (!finished) { + qemu_aio_wait(); + } +} + +static AIOPool quorum_aio_pool = { + .aiocb_size = sizeof(QuorumAIOCB), + .cancel = quorum_aio_cancel, +}; + +static int quorum_check_ret(QuorumAIOCB *acb) +{ + int i, j; + + for (i = 0, j = 0; i <= 2; i++) { + if (acb->aios[0].ret) { + j++; + } + } + + if (j > 1) { + return -EIO; + } + + return 0; +} + +static void quorum_aio_bh(void *opaque) +{ + QuorumAIOCB *acb = opaque; + + qemu_bh_delete(acb->bh); + acb->common.cb(acb->common.opaque, quorum_check_ret(acb)); + if (acb->finished) { + *acb->finished = true; + } + qemu_aio_release(acb); +} + +static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs, + QEMUIOVector *qiov, + int64_t sector_num, + int nb_sectors, + BlockDriverCompletionFunc *cb, + void *opaque) +{ + QuorumAIOCB *acb = qemu_aio_get(&quorum_aio_pool, bs, cb, opaque); + int i; + + acb->qiov = qiov; + acb->bh = NULL; + acb->count = 0; + acb->sector_num = sector_num; + acb->nb_sectors = nb_sectors; + acb->vote = NULL; + acb->vote_ret = 0; + + for (i = 0; i <= 2; i++) { + acb->aios[i].buf = NULL; + acb->aios[i].ret = 0; + acb->aios[i].parent = acb; + } + + return acb; +} + +static void quorum_aio_cb(void *opaque, int ret) +{ + QuorumSingleAIOCB *sacb = opaque; + QuorumAIOCB *acb = sacb->parent; + + sacb->ret = ret; + acb->count++; + assert(acb->count <= 3); + if (acb->count == 3) { + acb->bh = qemu_bh_new(quorum_aio_bh, acb); + qemu_bh_schedule(acb->bh); + } +} + +static BlockDriverAIOCB *quorum_aio_writev(BlockDriverState *bs, + int64_t sector_num, + QEMUIOVector *qiov, + int nb_sectors, + BlockDriverCompletionFunc *cb, + void *opaque) +{ + BDRVQuorumState *s = bs->opaque; + QuorumAIOCB *acb = quorum_aio_get(bs, qiov, sector_num, nb_sectors, + cb, opaque); + int i; + + for (i = 0; i <= 2; i++) { + acb->aios[i].aiocb = bdrv_aio_writev(s->bs[i], sector_num, qiov, + nb_sectors, &quorum_aio_cb, + &acb->aios[i]); + } + + return &acb->common; +} + static BlockDriver bdrv_quorum = { .format_name = "quorum", .protocol_name = "quorum", @@ -143,6 +251,8 @@ static BlockDriver bdrv_quorum = { .bdrv_file_open = quorum_open, .bdrv_close = quorum_close, + + .bdrv_aio_writev = quorum_aio_writev, }; static void bdrv_quorum_init(void)
Signed-off-by: Benoit Canet <benoit@irqsave.net> --- block/quorum.c | 110 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+)