Patchwork [RFC,V3,9/9] quorum: Add quorum mechanism.

login
register
mail settings
Submitter Benoit Canet
Date Aug. 14, 2012, 2:14 p.m.
Message ID <1344953651-16622-10-git-send-email-benoit@irqsave.net>
Download mbox | patch
Permalink /patch/177302/
State New
Headers show

Comments

Benoit Canet - Aug. 14, 2012, 2:14 p.m.
Signed-off-by: Benoit Canet <benoit@irqsave.net>
---
 block/quorum.c |  211 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 210 insertions(+), 1 deletion(-)
Stefan Hajnoczi - Aug. 15, 2012, 10:51 a.m.
On Tue, Aug 14, 2012 at 04:14:11PM +0200, BenoƮt Canet wrote:
> +#define QUORUM_FREE_QIOV_ITEMS(qlist) do {                  \
> +        QLIST_FOREACH_SAFE(item, qlist, next, next_item) { \
> +        QLIST_REMOVE(item, next);                          \
> +        g_free(item);                                   \
> +    } } while (0)

This is only used once, please open code it and don't use a macro.

> +
> +static void quorum_free_vote_list(QuorumAIOCB *acb)
> +{
> +    QuorumIOVectorVersion *version, *next_version;
> +    QuorumIOVectorItem *item, *next_item;
> +
> +    QLIST_FOREACH_SAFE(version, &acb->vote_list, next, next_version) {
> +        QLIST_REMOVE(version, next);
> +        QUORUM_FREE_QIOV_ITEMS(&version->qiov_items);
> +        g_free(version);
> +    }
> +}
> +
> +#undef QUORUM_FREE_QIOV_ITEMS

Patch

diff --git a/block/quorum.c b/block/quorum.c
index 8b449fb..24c8298 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -14,6 +14,20 @@ 
  */
 
 #include "block_int.h"
+#include "zlib.h"
+
+typedef struct QuorumIOVectorItem {
+    int index;
+    QLIST_ENTRY(QuorumIOVectorItem) next;
+} QuorumIOVectorItem;
+
+typedef struct QuorumIOVectorVersion {
+    unsigned long checksum;
+    int index;
+    int vote_count;
+    QLIST_HEAD(, QuorumIOVectorItem) qiov_items;
+    QLIST_ENTRY(QuorumIOVectorVersion) next;
+} QuorumIOVectorVersion;
 
 typedef struct {
     BlockDriverState **bs;
@@ -48,6 +62,7 @@  struct QuorumAIOCB {
     int success_count;          /* number of successfully completed AIOCB */
     bool *finished;             /* completion signal for cancel */
 
+    QLIST_HEAD(, QuorumIOVectorVersion) vote_list;
     void (*vote)(QuorumAIOCB *acb);
     int vote_ret;
 };
@@ -201,6 +216,11 @@  static void quorum_aio_bh(void *opaque)
     }
 
     qemu_bh_delete(acb->bh);
+
+    if (acb->vote_ret) {
+        ret = acb->vote_ret;
+    }
+
     acb->common.cb(acb->common.opaque, ret);
     if (acb->finished) {
         *acb->finished = true;
@@ -233,6 +253,7 @@  static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
     acb->nb_sectors = nb_sectors;
     acb->vote = NULL;
     acb->vote_ret = 0;
+    QLIST_INIT(&acb->vote_list);
 
     for (i = 0; i < s->m; i++) {
         acb->aios[i].buf = NULL;
@@ -260,10 +281,196 @@  static void quorum_aio_cb(void *opaque, int ret)
         return;
     }
 
+    /* Do the vote */
+    if (acb->vote) {
+        acb->vote(acb);
+    }
+
     acb->bh = qemu_bh_new(quorum_aio_bh, acb);
     qemu_bh_schedule(acb->bh);
 }
 
+static void quorum_print_bad(QuorumAIOCB *acb, const char *filename)
+{
+    fprintf(stderr, "quorum: corrected error in quorum file %s: sector_num=%"
+            PRId64 " nb_sectors=%i\n", filename, acb->sector_num,
+            acb->nb_sectors);
+}
+
+static void quorum_print_failure(QuorumAIOCB *acb)
+{
+    fprintf(stderr, "quorum: failure sector_num=%" PRId64 " nb_sectors=%i\n",
+            acb->sector_num, acb->nb_sectors);
+}
+
+static void quorum_print_bad_versions(QuorumAIOCB *acb,
+                                      unsigned long checksum)
+{
+    QuorumIOVectorVersion *version;
+    QuorumIOVectorItem *item;
+    BDRVQuorumState *s = acb->bqs;
+
+    QLIST_FOREACH(version, &acb->vote_list, next) {
+        if (version->checksum == checksum) {
+            continue;
+        }
+        QLIST_FOREACH(item, &version->qiov_items, next) {
+            quorum_print_bad(acb, s->filenames[item->index]);
+        }
+    }
+}
+
+static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
+{
+    int i;
+    assert(dest->niov == source->niov);
+    assert(dest->size == source->size);
+    for (i = 0; i < source->niov; i++) {
+        assert(dest->iov[i].iov_len == source->iov[i].iov_len);
+        memcpy(dest->iov[i].iov_base,
+               source->iov[i].iov_base,
+               source->iov[i].iov_len);
+    }
+}
+
+static void quorum_count_iovector_version(QuorumAIOCB *acb,
+                                          unsigned long checksum,
+                                          int index)
+{
+    QuorumIOVectorVersion *v = NULL, *version = NULL;
+    QuorumIOVectorItem *item;
+
+    /* look if we have something with this checksum */
+    QLIST_FOREACH(v, &acb->vote_list, next) {
+        if (v->checksum == checksum) {
+            version = v;
+            break;
+        }
+    }
+
+    /* It's a version not yet in the list add it */
+    if (!version) {
+        version = g_new0(QuorumIOVectorVersion, 1);
+        QLIST_INIT(&version->qiov_items);
+        version->checksum = checksum;
+        version->index = index;
+        version->vote_count = 0;
+        QLIST_INSERT_HEAD(&acb->vote_list, version, next);
+    }
+
+    version->vote_count += 1;
+
+    item = g_new0(QuorumIOVectorItem, 1);
+    item->index = index;
+    QLIST_INSERT_HEAD(&version->qiov_items, item, next);
+}
+
+#define QUORUM_FREE_QIOV_ITEMS(qlist) do {                  \
+        QLIST_FOREACH_SAFE(item, qlist, next, next_item) { \
+        QLIST_REMOVE(item, next);                          \
+        g_free(item);                                   \
+    } } while (0)
+
+static void quorum_free_vote_list(QuorumAIOCB *acb)
+{
+    QuorumIOVectorVersion *version, *next_version;
+    QuorumIOVectorItem *item, *next_item;
+
+    QLIST_FOREACH_SAFE(version, &acb->vote_list, next, next_version) {
+        QLIST_REMOVE(version, next);
+        QUORUM_FREE_QIOV_ITEMS(&version->qiov_items);
+        g_free(version);
+    }
+}
+
+#undef QUORUM_FREE_QIOV_ITEMS
+
+static unsigned long quorum_compute_checksum(QuorumAIOCB *acb, int i)
+{
+    int j;
+    unsigned long adler = adler32(0L, Z_NULL, 0);
+    QEMUIOVector *qiov = &acb->qiovs[i];
+
+    for (j = 0; j < qiov->niov; j++) {
+        adler = adler32(adler,
+                        qiov->iov[j].iov_base,
+                        qiov->iov[j].iov_len);
+    }
+
+    return adler;
+}
+
+static void quorum_vote(QuorumAIOCB *acb)
+{
+    bool quorum = true;
+    int i, j;
+    unsigned long checksum = 0;
+    BDRVQuorumState *s = acb->bqs;
+    QuorumIOVectorVersion *candidate, *winner = NULL;
+
+    /* get the index of the first successfull read */
+    for (i = 0; i < s->m; i++) {
+        if (!acb->aios[i].ret) {
+            break;
+        }
+    }
+
+    /* compare this read with all other successfull read looking for quorum */
+    for (j = i + 1; j < s->m; j++) {
+        if (acb->aios[j].ret) {
+            continue;
+        }
+        if (qemu_iovec_compare(&acb->qiovs[i],
+                               &acb->qiovs[j]) != -1) {
+            quorum = false;
+            break;
+        }
+    }
+
+    /* Every successfull read agrees -> Quorum */
+    if (quorum) {
+        quorum_copy_qiov(acb->qiov, &acb->qiovs[i]);
+        return;
+    }
+
+    /* compute checksums for each successfull read, also store indexes */
+    for (i = 0; i < s->m; i++) {
+        if (acb->aios[i].ret) {
+            continue;
+        }
+        checksum = quorum_compute_checksum(acb, i);
+        quorum_count_iovector_version(acb, checksum, i);
+    }
+
+    /* vote to select the most represented version */
+    i = 0;
+    QLIST_FOREACH(candidate, &acb->vote_list, next) {
+        if (candidate->vote_count > i) {
+            i = candidate->vote_count;
+            winner = candidate;
+        }
+    }
+
+    /* if the winner count is smaller than threshold read fail */
+    if (winner->vote_count < s->n) {
+        quorum_print_failure(acb);
+        acb->vote_ret = -EIO;
+        goto free_exit;
+    }
+
+    /* we have a winner: copy it */
+    quorum_copy_qiov(acb->qiov, &acb->qiovs[winner->index]);
+
+    /* if some versions are bad print them */
+    if (i < s->m) {
+        quorum_print_bad_versions(acb, winner->checksum);
+    }
+
+free_exit:
+    /* free lists */
+    quorum_free_vote_list(acb);
+}
+
 static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
                                          int64_t sector_num,
                                          QEMUIOVector *qiov,
@@ -276,6 +483,8 @@  static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
                                       nb_sectors, cb, opaque);
     int i;
 
+    acb->vote = quorum_vote;
+
     for (i = 0; i < s->m; i++) {
         acb->aios[i].buf = qemu_blockalign(bs->file, qiov->size);
         qemu_iovec_init(&acb->qiovs[i], qiov->niov);
@@ -283,7 +492,7 @@  static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
     }
 
     for (i = 0; i < s->m; i++) {
-        bdrv_aio_readv(s->bs[i], sector_num, qiov, nb_sectors,
+        bdrv_aio_readv(s->bs[i], sector_num, &acb->qiovs[i], nb_sectors,
                        quorum_aio_cb, &acb->aios[i]);
     }