diff mbox

[RFC,V2,06/10] quorum: Add quorum_aio_writev and its dependencies.

Message ID 1344347073-7773-7-git-send-email-benoit@irqsave.net
State New
Headers show

Commit Message

Benoit Canet Aug. 7, 2012, 1:44 p.m. UTC
Signed-off-by: Benoit Canet <benoit@irqsave.net>
---
 block/quorum.c |  110 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 110 insertions(+)

Comments

Stefan Hajnoczi Aug. 8, 2012, 3:37 p.m. UTC | #1
On Tue, Aug 7, 2012 at 2:44 PM, Benoît Canet <benoit.canet@gmail.com> wrote:
> +static int quorum_check_ret(QuorumAIOCB *acb)
> +{
> +    int i, j;
> +
> +    for (i = 0, j = 0; i <= 2; i++) {
> +        if (acb->aios[0].ret) {
> +            j++;
> +        }
> +    }
> +
> +    if (j > 1) {
> +        return -EIO;
> +    }
> +
> +    return 0;
> +}

Simpler version just scans the return values (also I think
acb->aios[0].ret should be acb->aios[i].ret):

static int quorum_check_ret(QuorumAIOCB *acb)
{
    int i;
    for (i = 0; i <= 2; i++) {
        if (acb->aios[i].ret) {
            return -EIO; /* or acb->aios[i].ret */
        }
    }
    return 0;
}

> +
> +static void quorum_aio_bh(void *opaque)
> +{
> +    QuorumAIOCB *acb = opaque;
> +
> +    qemu_bh_delete(acb->bh);
> +    acb->common.cb(acb->common.opaque, quorum_check_ret(acb));
> +    if (acb->finished) {
> +        *acb->finished = true;
> +    }
> +    qemu_aio_release(acb);
> +}
> +
> +static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs,
> +                                 QEMUIOVector *qiov,
> +                                 int64_t sector_num,
> +                                 int nb_sectors,
> +                                 BlockDriverCompletionFunc *cb,
> +                                 void *opaque)
> +{
> +    QuorumAIOCB *acb = qemu_aio_get(&quorum_aio_pool, bs, cb, opaque);
> +    int i;
> +
> +    acb->qiov = qiov;
> +    acb->bh = NULL;
> +    acb->count = 0;
> +    acb->sector_num = sector_num;
> +    acb->nb_sectors = nb_sectors;
> +    acb->vote = NULL;
> +    acb->vote_ret = 0;

acb->finished = NULL;
Benoît Canet Aug. 9, 2012, 9:24 a.m. UTC | #2
Le Wednesday 08 Aug 2012 à 16:37:13 (+0100), Stefan Hajnoczi a écrit :
> On Tue, Aug 7, 2012 at 2:44 PM, Benoît Canet <benoit.canet@gmail.com> wrote:
> > +static int quorum_check_ret(QuorumAIOCB *acb)
> > +{
> > +    int i, j;
> > +
> > +    for (i = 0, j = 0; i <= 2; i++) {
> > +        if (acb->aios[0].ret) {
> > +            j++;
> > +        }
> > +    }
> > +
> > +    if (j > 1) {
> > +        return -EIO;
> > +    }
> > +
> > +    return 0;
> > +}
> 
> Simpler version just scans the return values (also I think
> acb->aios[0].ret should be acb->aios[i].ret):
> 
> static int quorum_check_ret(QuorumAIOCB *acb)
> {
>     int i;
>     for (i = 0; i <= 2; i++) {
>         if (acb->aios[i].ret) {
>             return -EIO; /* or acb->aios[i].ret */
>         }
>     }
>     return 0;
> }

I am wondering what is the best code to return.
There is some potential case like a filer containing a particular
image (or a image on the network) going down where the user probably
don't want to get an -EIO.

The 
if (j > 1) {
    return -EIO;
}
part was about detecting an error count greater dans the threshold (2).

> 
> > +
> > +static void quorum_aio_bh(void *opaque)
> > +{
> > +    QuorumAIOCB *acb = opaque;
> > +
> > +    qemu_bh_delete(acb->bh);
> > +    acb->common.cb(acb->common.opaque, quorum_check_ret(acb));
> > +    if (acb->finished) {
> > +        *acb->finished = true;
> > +    }
> > +    qemu_aio_release(acb);
> > +}
> > +
> > +static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs,
> > +                                 QEMUIOVector *qiov,
> > +                                 int64_t sector_num,
> > +                                 int nb_sectors,
> > +                                 BlockDriverCompletionFunc *cb,
> > +                                 void *opaque)
> > +{
> > +    QuorumAIOCB *acb = qemu_aio_get(&quorum_aio_pool, bs, cb, opaque);
> > +    int i;
> > +
> > +    acb->qiov = qiov;
> > +    acb->bh = NULL;
> > +    acb->count = 0;
> > +    acb->sector_num = sector_num;
> > +    acb->nb_sectors = nb_sectors;
> > +    acb->vote = NULL;
> > +    acb->vote_ret = 0;
> 
> acb->finished = NULL;
>
Stefan Hajnoczi Aug. 9, 2012, 9:35 a.m. UTC | #3
On Thu, Aug 9, 2012 at 10:24 AM, Benoît Canet <benoit.canet@irqsave.net> wrote:
> Le Wednesday 08 Aug 2012 ŕ 16:37:13 (+0100), Stefan Hajnoczi a écrit :
>> On Tue, Aug 7, 2012 at 2:44 PM, Benoît Canet <benoit.canet@gmail.com> wrote:
>> > +static int quorum_check_ret(QuorumAIOCB *acb)
>> > +{
>> > +    int i, j;
>> > +
>> > +    for (i = 0, j = 0; i <= 2; i++) {
>> > +        if (acb->aios[0].ret) {
>> > +            j++;
>> > +        }
>> > +    }
>> > +
>> > +    if (j > 1) {
>> > +        return -EIO;
>> > +    }
>> > +
>> > +    return 0;
>> > +}
>>
>> Simpler version just scans the return values (also I think
>> acb->aios[0].ret should be acb->aios[i].ret):
>>
>> static int quorum_check_ret(QuorumAIOCB *acb)
>> {
>>     int i;
>>     for (i = 0; i <= 2; i++) {
>>         if (acb->aios[i].ret) {
>>             return -EIO; /* or acb->aios[i].ret */
>>         }
>>     }
>>     return 0;
>> }
>
> I am wondering what is the best code to return.
> There is some potential case like a filer containing a particular
> image (or a image on the network) going down where the user probably
> don't want to get an -EIO.
>
> The
> if (j > 1) {
>     return -EIO;
> }
> part was about detecting an error count greater dans the threshold (2).

Yeah, this starts to get into policy and depends on the user.  The
best we can do is to make it configurable and choose a sane default.

Stefan
diff mbox

Patch

diff --git a/block/quorum.c b/block/quorum.c
index 5cd7083..e6d2274 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -133,6 +133,114 @@  static int64_t quorum_getlength(BlockDriverState *bs)
     return ret;
 }
 
+static void quorum_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+    QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common);
+    bool finished = false;
+
+    /* Wait for the request to finish */
+    acb->finished = &finished;
+    while (!finished) {
+        qemu_aio_wait();
+    }
+}
+
+static AIOPool quorum_aio_pool = {
+    .aiocb_size         = sizeof(QuorumAIOCB),
+    .cancel             = quorum_aio_cancel,
+};
+
+static int quorum_check_ret(QuorumAIOCB *acb)
+{
+    int i, j;
+
+    for (i = 0, j = 0; i <= 2; i++) {
+        if (acb->aios[0].ret) {
+            j++;
+        }
+    }
+
+    if (j > 1) {
+        return -EIO;
+    }
+
+    return 0;
+}
+
+static void quorum_aio_bh(void *opaque)
+{
+    QuorumAIOCB *acb = opaque;
+
+    qemu_bh_delete(acb->bh);
+    acb->common.cb(acb->common.opaque, quorum_check_ret(acb));
+    if (acb->finished) {
+        *acb->finished = true;
+    }
+    qemu_aio_release(acb);
+}
+
+static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs,
+                                 QEMUIOVector *qiov,
+                                 int64_t sector_num,
+                                 int nb_sectors,
+                                 BlockDriverCompletionFunc *cb,
+                                 void *opaque)
+{
+    QuorumAIOCB *acb = qemu_aio_get(&quorum_aio_pool, bs, cb, opaque);
+    int i;
+
+    acb->qiov = qiov;
+    acb->bh = NULL;
+    acb->count = 0;
+    acb->sector_num = sector_num;
+    acb->nb_sectors = nb_sectors;
+    acb->vote = NULL;
+    acb->vote_ret = 0;
+
+    for (i = 0; i <= 2; i++) {
+        acb->aios[i].buf = NULL;
+        acb->aios[i].ret = 0;
+        acb->aios[i].parent = acb;
+    }
+
+    return acb;
+}
+
+static void quorum_aio_cb(void *opaque, int ret)
+{
+    QuorumSingleAIOCB *sacb = opaque;
+    QuorumAIOCB *acb = sacb->parent;
+
+    sacb->ret = ret;
+    acb->count++;
+    assert(acb->count <= 3);
+    if (acb->count == 3) {
+        acb->bh = qemu_bh_new(quorum_aio_bh, acb);
+        qemu_bh_schedule(acb->bh);
+    }
+}
+
+static BlockDriverAIOCB *quorum_aio_writev(BlockDriverState *bs,
+                                          int64_t sector_num,
+                                          QEMUIOVector *qiov,
+                                          int nb_sectors,
+                                          BlockDriverCompletionFunc *cb,
+                                          void *opaque)
+{
+    BDRVQuorumState *s = bs->opaque;
+    QuorumAIOCB *acb = quorum_aio_get(bs, qiov, sector_num, nb_sectors,
+                                    cb, opaque);
+    int i;
+
+    for (i = 0; i <= 2; i++) {
+        acb->aios[i].aiocb = bdrv_aio_writev(s->bs[i], sector_num, qiov,
+                                             nb_sectors, &quorum_aio_cb,
+                                             &acb->aios[i]);
+    }
+
+    return &acb->common;
+}
+
 static BlockDriver bdrv_quorum = {
     .format_name        = "quorum",
     .protocol_name      = "quorum",
@@ -143,6 +251,8 @@  static BlockDriver bdrv_quorum = {
 
     .bdrv_file_open     = quorum_open,
     .bdrv_close         = quorum_close,
+
+    .bdrv_aio_writev    = quorum_aio_writev,
 };
 
 static void bdrv_quorum_init(void)