diff mbox

[RFC,V6,07/11] quorum: Add quorum mechanism.

Message ID 1358437897-24251-8-git-send-email-benoit@irqsave.net
State New
Headers show

Commit Message

Benoît Canet Jan. 17, 2013, 3:51 p.m. UTC
Signed-off-by: Benoit Canet <benoit@irqsave.net>
---
 block/quorum.c |  278 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 277 insertions(+), 1 deletion(-)

Comments

Eric Blake Jan. 17, 2013, 10:23 p.m. UTC | #1
On 01/17/2013 08:51 AM, Benoît Canet wrote:
> Signed-off-by: Benoit Canet <benoit@irqsave.net>
> ---
>  block/quorum.c |  278 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 277 insertions(+), 1 deletion(-)
> 
> diff --git a/block/quorum.c b/block/quorum.c
> index 98052eb..2bffff4 100644
> --- a/block/quorum.c
> +++ b/block/quorum.c
> @@ -14,6 +14,20 @@
>   */
>  
>  #include "block/block_int.h"
> +#include "zlib.h"

Since zlib.h isn't part of qemu.git, this should be <zlib.h>.

> +
> +static unsigned long quorum_compute_checksum(QuorumAIOCB *acb, int i)
> +{
> +    int j;
> +    unsigned long adler = adler32(0L, Z_NULL, 0);

Hmm, adler32() is basically a weak hashing mechanism; are you sure you
won't have any false collisions?  Furthermore, how does this compare
with the series for adding deduplication, which uses much
stronger/longer hashes, but where those take more time to compute?  Is
there any way you can share efforts between the two series?

> +    /* get the index of the first successfull read */

s/successfull/successful/ (several times in this patch)
Benoît Canet Jan. 18, 2013, 3:19 p.m. UTC | #2
> Hmm, adler32() is basically a weak hashing mechanism; are you sure you
> won't have any false collisions?  Furthermore, how does this compare
> with the series for adding deduplication, which uses much
> stronger/longer hashes, but where those take more time to compute?  Is
> there any way you can share efforts between the two series?

I agree that using sha256 would be a better option as it will not be called
frequently.
However the call to gnutls is a one liner so it's likely not shareable.
The configure would need to be modified to require gnutls and It likely not
factorisable either.

I'll change the code to use sha256.

Regards

Benoît


> 
> > +    /* get the index of the first successfull read */
> 
> s/successfull/successful/ (several times in this patch)
> 
> -- 
> Eric Blake   eblake redhat com    +1-919-301-3266
> Libvirt virtualization library http://libvirt.org
>
diff mbox

Patch

diff --git a/block/quorum.c b/block/quorum.c
index 98052eb..2bffff4 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -14,6 +14,20 @@ 
  */
 
 #include "block/block_int.h"
+#include "zlib.h"
+
+typedef struct QuorumVoteItem {
+    int index;
+    QLIST_ENTRY(QuorumVoteItem) next;
+} QuorumVoteItem;
+
+typedef struct QuorumVoteVersion {
+    unsigned long value;
+    int index;
+    int vote_count;
+    QLIST_HEAD(, QuorumVoteItem) items;
+    QLIST_ENTRY(QuorumVoteVersion) next;
+} QuorumVoteVersion;
 
 typedef struct {
     BlockDriverState **bs;
@@ -31,6 +45,10 @@  typedef struct QuorumSingleAIOCB {
     QuorumAIOCB *parent;
 } QuorumSingleAIOCB;
 
+typedef struct QuorumVotes {
+    QLIST_HEAD(, QuorumVoteVersion) vote_list;
+} QuorumVotes;
+
 struct QuorumAIOCB {
     BlockDriverAIOCB common;
     BDRVQuorumState *bqs;
@@ -48,6 +66,8 @@  struct QuorumAIOCB {
     int success_count;          /* number of successfully completed AIOCB */
     bool *finished;             /* completion signal for cancel */
 
+    QuorumVotes votes;
+
     void (*vote)(QuorumAIOCB *acb);
     int vote_ret;
 };
@@ -203,6 +223,11 @@  static void quorum_aio_bh(void *opaque)
     }
 
     qemu_bh_delete(acb->bh);
+
+    if (acb->vote_ret) {
+        ret = acb->vote_ret;
+    }
+
     acb->common.cb(acb->common.opaque, ret);
     if (acb->finished) {
         *acb->finished = true;
@@ -239,6 +264,7 @@  static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
     acb->vote = NULL;
     acb->vote_ret = 0;
     acb->finished = NULL;
+    QLIST_INIT(&acb->votes.vote_list);
 
     for (i = 0; i < s->total; i++) {
         acb->aios[i].buf = NULL;
@@ -266,10 +292,258 @@  static void quorum_aio_cb(void *opaque, int ret)
         return;
     }
 
+    /* Do the vote */
+    if (acb->vote) {
+        acb->vote(acb);
+    }
+
     acb->bh = qemu_bh_new(quorum_aio_bh, acb);
     qemu_bh_schedule(acb->bh);
 }
 
+static void quorum_print_bad(QuorumAIOCB *acb, const char *filename)
+{
+    fprintf(stderr, "quorum: corrected error in quorum file %s: sector_num=%"
+            PRId64 " nb_sectors=%i\n", filename, acb->sector_num,
+            acb->nb_sectors);
+}
+
+static void quorum_print_failure(QuorumAIOCB *acb)
+{
+    fprintf(stderr, "quorum: failure sector_num=%" PRId64 " nb_sectors=%i\n",
+            acb->sector_num, acb->nb_sectors);
+}
+
+static void quorum_print_bad_versions(QuorumAIOCB *acb,
+                                      unsigned long checksum)
+{
+    QuorumVoteVersion *version;
+    QuorumVoteItem *item;
+    BDRVQuorumState *s = acb->bqs;
+
+    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
+        if (version->value == checksum) {
+            continue;
+        }
+        QLIST_FOREACH(item, &version->items, next) {
+            quorum_print_bad(acb, s->filenames[item->index]);
+        }
+    }
+}
+
+static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
+{
+    int i;
+    assert(dest->niov == source->niov);
+    assert(dest->size == source->size);
+    for (i = 0; i < source->niov; i++) {
+        assert(dest->iov[i].iov_len == source->iov[i].iov_len);
+        memcpy(dest->iov[i].iov_base,
+               source->iov[i].iov_base,
+               source->iov[i].iov_len);
+    }
+}
+
+static void quorum_count_vote(QuorumVotes *votes,
+                              unsigned long checksum,
+                              int index)
+{
+    QuorumVoteVersion *v = NULL, *version = NULL;
+    QuorumVoteItem *item;
+
+    /* look if we have something with this checksum */
+    QLIST_FOREACH(v, &votes->vote_list, next) {
+        if (v->value == checksum) {
+            version = v;
+            break;
+        }
+    }
+
+    /* It's a version not yet in the list add it */
+    if (!version) {
+        version = g_new0(QuorumVoteVersion, 1);
+        QLIST_INIT(&version->items);
+        version->value = checksum;
+        version->index = index;
+        version->vote_count = 0;
+        QLIST_INSERT_HEAD(&votes->vote_list, version, next);
+    }
+
+    version->vote_count++;
+
+    item = g_new0(QuorumVoteItem, 1);
+    item->index = index;
+    QLIST_INSERT_HEAD(&version->items, item, next);
+}
+
+static void quorum_free_vote_list(QuorumVotes *votes)
+{
+    QuorumVoteVersion *version, *next_version;
+    QuorumVoteItem *item, *next_item;
+
+    QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
+        QLIST_REMOVE(version, next);
+        QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
+            QLIST_REMOVE(item, next);
+            g_free(item);
+        }
+        g_free(version);
+    }
+}
+
+static unsigned long quorum_compute_checksum(QuorumAIOCB *acb, int i)
+{
+    int j;
+    unsigned long adler = adler32(0L, Z_NULL, 0);
+    QEMUIOVector *qiov = &acb->qiovs[i];
+
+    for (j = 0; j < qiov->niov; j++) {
+        adler = adler32(adler,
+                        qiov->iov[j].iov_base,
+                        qiov->iov[j].iov_len);
+    }
+
+    return adler;
+}
+
+static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes)
+{
+    int i = 0;
+    QuorumVoteVersion *candidate, *winner = NULL;
+
+    QLIST_FOREACH(candidate, &votes->vote_list, next) {
+        if (candidate->vote_count > i) {
+            i = candidate->vote_count;
+            winner = candidate;
+        }
+    }
+
+    return winner;
+}
+
+static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
+{
+    int i;
+    int result;
+
+    assert(a->niov == b->niov);
+    for (i = 0; i < a->niov; i++) {
+        assert(a->iov[i].iov_len == b->iov[i].iov_len);
+        result = memcmp(a->iov[i].iov_base,
+                        b->iov[i].iov_base,
+                        a->iov[i].iov_len);
+        if (result) {
+            return false;
+        }
+    }
+
+    return true;
+}
+
+static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb,
+                                          const char *fmt, ...)
+{
+    va_list ap;
+
+    va_start(ap, fmt);
+    fprintf(stderr, "quorum: sector_num=%" PRId64 " nb_sectors=%d ",
+            acb->sector_num, acb->nb_sectors);
+    vfprintf(stderr, fmt, ap);
+    fprintf(stderr, "\n");
+    va_end(ap);
+    exit(1);
+}
+
+static bool quorum_compare(QuorumAIOCB *acb,
+                           QEMUIOVector *a,
+                           QEMUIOVector *b)
+{
+    BDRVQuorumState *s = acb->bqs;
+    bool blkverify = false;
+    ssize_t offset;
+
+    if (s->total == 2 && s->threshold == 2) {
+        blkverify = true;
+    }
+
+    if (blkverify) {
+        offset = qemu_iovec_compare(a, b);
+        if (offset != -1) {
+            quorum_err(acb, "contents mismatch in sector %" PRId64,
+                       acb->sector_num +
+                       (uint64_t)(offset / BDRV_SECTOR_SIZE));
+        }
+        return true;
+    }
+
+    return quorum_iovec_compare(a, b);
+}
+
+
+static void quorum_vote(QuorumAIOCB *acb)
+{
+    bool quorum = true;
+    int i, j;
+    unsigned long checksum = 0;
+    BDRVQuorumState *s = acb->bqs;
+    QuorumVoteVersion *winner;
+
+    /* get the index of the first successfull read */
+    for (i = 0; i < s->total; i++) {
+        if (!acb->aios[i].ret) {
+            break;
+        }
+    }
+
+    /* compare this read with all other successfull read looking for quorum */
+    for (j = i + 1; j < s->total; j++) {
+        if (acb->aios[j].ret) {
+            continue;
+        }
+        quorum = quorum_compare(acb, &acb->qiovs[i], &acb->qiovs[j]);
+        if (!quorum) {
+            break;
+       }
+    }
+
+    /* Every successfull read agrees -> Quorum */
+    if (quorum) {
+        quorum_copy_qiov(acb->qiov, &acb->qiovs[i]);
+        return;
+    }
+
+    /* compute checksums for each successfull read, also store indexes */
+    for (i = 0; i < s->total; i++) {
+        if (acb->aios[i].ret) {
+            continue;
+        }
+        checksum = quorum_compute_checksum(acb, i);
+        quorum_count_vote(&acb->votes, checksum, i);
+    }
+
+    /* vote to select the most represented version */
+    winner = quorum_get_vote_winner(&acb->votes);
+    assert(winner != NULL);
+
+    /* if the winner count is smaller than threshold read fail */
+    if (winner->vote_count < s->threshold) {
+        quorum_print_failure(acb);
+        acb->vote_ret = -EIO;
+        fprintf(stderr, "quorum: vote result inferior to threshold\n");
+        goto free_exit;
+    }
+
+    /* we have a winner: copy it */
+    quorum_copy_qiov(acb->qiov, &acb->qiovs[winner->index]);
+
+    /* some versions are bad print them */
+    quorum_print_bad_versions(acb, winner->value);
+
+free_exit:
+    /* free lists */
+    quorum_free_vote_list(&acb->votes);
+}
+
 static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
                                          int64_t sector_num,
                                          QEMUIOVector *qiov,
@@ -282,6 +556,8 @@  static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
                                       nb_sectors, cb, opaque);
     int i;
 
+    acb->vote = quorum_vote;
+
     for (i = 0; i < s->total; i++) {
         acb->aios[i].buf = qemu_blockalign(bs->file, qiov->size);
         qemu_iovec_init(&acb->qiovs[i], qiov->niov);
@@ -289,7 +565,7 @@  static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
     }
 
     for (i = 0; i < s->total; i++) {
-        bdrv_aio_readv(s->bs[i], sector_num, qiov, nb_sectors,
+        bdrv_aio_readv(s->bs[i], sector_num, &acb->qiovs[i], nb_sectors,
                        quorum_aio_cb, &acb->aios[i]);
     }