Patchwork [RFC,V2,06/10] quorum: Add quorum_aio_writev and its dependencies.

login
register
mail settings
Submitter Benoit Canet
Date Aug. 7, 2012, 1:44 p.m.
Message ID <1344347073-7773-7-git-send-email-benoit@irqsave.net>
Download mbox | patch
Permalink /patch/175654/
State New
Headers show

Comments

Benoit Canet - Aug. 7, 2012, 1:44 p.m.
Signed-off-by: Benoit Canet <benoit@irqsave.net>
---
 block/quorum.c |  110 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 110 insertions(+)
Stefan Hajnoczi - Aug. 8, 2012, 3:37 p.m.
On Tue, Aug 7, 2012 at 2:44 PM, Benoît Canet <benoit.canet@gmail.com> wrote:
> +static int quorum_check_ret(QuorumAIOCB *acb)
> +{
> +    int i, j;
> +
> +    for (i = 0, j = 0; i <= 2; i++) {
> +        if (acb->aios[0].ret) {
> +            j++;
> +        }
> +    }
> +
> +    if (j > 1) {
> +        return -EIO;
> +    }
> +
> +    return 0;
> +}

Simpler version just scans the return values (also I think
acb->aios[0].ret should be acb->aios[i].ret):

static int quorum_check_ret(QuorumAIOCB *acb)
{
    int i;
    for (i = 0; i <= 2; i++) {
        if (acb->aios[i].ret) {
            return -EIO; /* or acb->aios[i].ret */
        }
    }
    return 0;
}

> +
> +static void quorum_aio_bh(void *opaque)
> +{
> +    QuorumAIOCB *acb = opaque;
> +
> +    qemu_bh_delete(acb->bh);
> +    acb->common.cb(acb->common.opaque, quorum_check_ret(acb));
> +    if (acb->finished) {
> +        *acb->finished = true;
> +    }
> +    qemu_aio_release(acb);
> +}
> +
> +static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs,
> +                                 QEMUIOVector *qiov,
> +                                 int64_t sector_num,
> +                                 int nb_sectors,
> +                                 BlockDriverCompletionFunc *cb,
> +                                 void *opaque)
> +{
> +    QuorumAIOCB *acb = qemu_aio_get(&quorum_aio_pool, bs, cb, opaque);
> +    int i;
> +
> +    acb->qiov = qiov;
> +    acb->bh = NULL;
> +    acb->count = 0;
> +    acb->sector_num = sector_num;
> +    acb->nb_sectors = nb_sectors;
> +    acb->vote = NULL;
> +    acb->vote_ret = 0;

acb->finished = NULL;
Benoît Canet - Aug. 9, 2012, 9:24 a.m.
Le Wednesday 08 Aug 2012 à 16:37:13 (+0100), Stefan Hajnoczi a écrit :
> On Tue, Aug 7, 2012 at 2:44 PM, Benoît Canet <benoit.canet@gmail.com> wrote:
> > +static int quorum_check_ret(QuorumAIOCB *acb)
> > +{
> > +    int i, j;
> > +
> > +    for (i = 0, j = 0; i <= 2; i++) {
> > +        if (acb->aios[0].ret) {
> > +            j++;
> > +        }
> > +    }
> > +
> > +    if (j > 1) {
> > +        return -EIO;
> > +    }
> > +
> > +    return 0;
> > +}
> 
> Simpler version just scans the return values (also I think
> acb->aios[0].ret should be acb->aios[i].ret):
> 
> static int quorum_check_ret(QuorumAIOCB *acb)
> {
>     int i;
>     for (i = 0; i <= 2; i++) {
>         if (acb->aios[i].ret) {
>             return -EIO; /* or acb->aios[i].ret */
>         }
>     }
>     return 0;
> }

I am wondering what is the best code to return.
There is some potential case like a filer containing a particular
image (or a image on the network) going down where the user probably
don't want to get an -EIO.

The 
if (j > 1) {
    return -EIO;
}
part was about detecting an error count greater dans the threshold (2).

> 
> > +
> > +static void quorum_aio_bh(void *opaque)
> > +{
> > +    QuorumAIOCB *acb = opaque;
> > +
> > +    qemu_bh_delete(acb->bh);
> > +    acb->common.cb(acb->common.opaque, quorum_check_ret(acb));
> > +    if (acb->finished) {
> > +        *acb->finished = true;
> > +    }
> > +    qemu_aio_release(acb);
> > +}
> > +
> > +static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs,
> > +                                 QEMUIOVector *qiov,
> > +                                 int64_t sector_num,
> > +                                 int nb_sectors,
> > +                                 BlockDriverCompletionFunc *cb,
> > +                                 void *opaque)
> > +{
> > +    QuorumAIOCB *acb = qemu_aio_get(&quorum_aio_pool, bs, cb, opaque);
> > +    int i;
> > +
> > +    acb->qiov = qiov;
> > +    acb->bh = NULL;
> > +    acb->count = 0;
> > +    acb->sector_num = sector_num;
> > +    acb->nb_sectors = nb_sectors;
> > +    acb->vote = NULL;
> > +    acb->vote_ret = 0;
> 
> acb->finished = NULL;
>
Stefan Hajnoczi - Aug. 9, 2012, 9:35 a.m.
On Thu, Aug 9, 2012 at 10:24 AM, Benoît Canet <benoit.canet@irqsave.net> wrote:
> Le Wednesday 08 Aug 2012 ŕ 16:37:13 (+0100), Stefan Hajnoczi a écrit :
>> On Tue, Aug 7, 2012 at 2:44 PM, Benoît Canet <benoit.canet@gmail.com> wrote:
>> > +static int quorum_check_ret(QuorumAIOCB *acb)
>> > +{
>> > +    int i, j;
>> > +
>> > +    for (i = 0, j = 0; i <= 2; i++) {
>> > +        if (acb->aios[0].ret) {
>> > +            j++;
>> > +        }
>> > +    }
>> > +
>> > +    if (j > 1) {
>> > +        return -EIO;
>> > +    }
>> > +
>> > +    return 0;
>> > +}
>>
>> Simpler version just scans the return values (also I think
>> acb->aios[0].ret should be acb->aios[i].ret):
>>
>> static int quorum_check_ret(QuorumAIOCB *acb)
>> {
>>     int i;
>>     for (i = 0; i <= 2; i++) {
>>         if (acb->aios[i].ret) {
>>             return -EIO; /* or acb->aios[i].ret */
>>         }
>>     }
>>     return 0;
>> }
>
> I am wondering what is the best code to return.
> There is some potential case like a filer containing a particular
> image (or a image on the network) going down where the user probably
> don't want to get an -EIO.
>
> The
> if (j > 1) {
>     return -EIO;
> }
> part was about detecting an error count greater dans the threshold (2).

Yeah, this starts to get into policy and depends on the user.  The
best we can do is to make it configurable and choose a sane default.

Stefan

Patch

diff --git a/block/quorum.c b/block/quorum.c
index 5cd7083..e6d2274 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -133,6 +133,114 @@  static int64_t quorum_getlength(BlockDriverState *bs)
     return ret;
 }
 
+static void quorum_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+    QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common);
+    bool finished = false;
+
+    /* Wait for the request to finish */
+    acb->finished = &finished;
+    while (!finished) {
+        qemu_aio_wait();
+    }
+}
+
+static AIOPool quorum_aio_pool = {
+    .aiocb_size         = sizeof(QuorumAIOCB),
+    .cancel             = quorum_aio_cancel,
+};
+
+static int quorum_check_ret(QuorumAIOCB *acb)
+{
+    int i, j;
+
+    for (i = 0, j = 0; i <= 2; i++) {
+        if (acb->aios[0].ret) {
+            j++;
+        }
+    }
+
+    if (j > 1) {
+        return -EIO;
+    }
+
+    return 0;
+}
+
+static void quorum_aio_bh(void *opaque)
+{
+    QuorumAIOCB *acb = opaque;
+
+    qemu_bh_delete(acb->bh);
+    acb->common.cb(acb->common.opaque, quorum_check_ret(acb));
+    if (acb->finished) {
+        *acb->finished = true;
+    }
+    qemu_aio_release(acb);
+}
+
+static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs,
+                                 QEMUIOVector *qiov,
+                                 int64_t sector_num,
+                                 int nb_sectors,
+                                 BlockDriverCompletionFunc *cb,
+                                 void *opaque)
+{
+    QuorumAIOCB *acb = qemu_aio_get(&quorum_aio_pool, bs, cb, opaque);
+    int i;
+
+    acb->qiov = qiov;
+    acb->bh = NULL;
+    acb->count = 0;
+    acb->sector_num = sector_num;
+    acb->nb_sectors = nb_sectors;
+    acb->vote = NULL;
+    acb->vote_ret = 0;
+
+    for (i = 0; i <= 2; i++) {
+        acb->aios[i].buf = NULL;
+        acb->aios[i].ret = 0;
+        acb->aios[i].parent = acb;
+    }
+
+    return acb;
+}
+
+static void quorum_aio_cb(void *opaque, int ret)
+{
+    QuorumSingleAIOCB *sacb = opaque;
+    QuorumAIOCB *acb = sacb->parent;
+
+    sacb->ret = ret;
+    acb->count++;
+    assert(acb->count <= 3);
+    if (acb->count == 3) {
+        acb->bh = qemu_bh_new(quorum_aio_bh, acb);
+        qemu_bh_schedule(acb->bh);
+    }
+}
+
+static BlockDriverAIOCB *quorum_aio_writev(BlockDriverState *bs,
+                                          int64_t sector_num,
+                                          QEMUIOVector *qiov,
+                                          int nb_sectors,
+                                          BlockDriverCompletionFunc *cb,
+                                          void *opaque)
+{
+    BDRVQuorumState *s = bs->opaque;
+    QuorumAIOCB *acb = quorum_aio_get(bs, qiov, sector_num, nb_sectors,
+                                    cb, opaque);
+    int i;
+
+    for (i = 0; i <= 2; i++) {
+        acb->aios[i].aiocb = bdrv_aio_writev(s->bs[i], sector_num, qiov,
+                                             nb_sectors, &quorum_aio_cb,
+                                             &acb->aios[i]);
+    }
+
+    return &acb->common;
+}
+
 static BlockDriver bdrv_quorum = {
     .format_name        = "quorum",
     .protocol_name      = "quorum",
@@ -143,6 +251,8 @@  static BlockDriver bdrv_quorum = {
 
     .bdrv_file_open     = quorum_open,
     .bdrv_close         = quorum_close,
+
+    .bdrv_aio_writev    = quorum_aio_writev,
 };
 
 static void bdrv_quorum_init(void)