Patchwork [RFC,V8,06/13] quorum: Add quorum mechanism.

login
register
mail settings
Submitter Benoît Canet
Date Jan. 28, 2013, 5:07 p.m.
Message ID <1359392845-15905-7-git-send-email-benoit@irqsave.net>
Download mbox | patch
Permalink /patch/216292/
State New
Headers show

Comments

Benoît Canet - Jan. 28, 2013, 5:07 p.m.
Use gnutls's SHA-256 to compare versions.

Signed-off-by: Benoit Canet <benoit@irqsave.net>
---
 block/quorum.c |  303 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 configure      |   22 ++++
 2 files changed, 324 insertions(+), 1 deletion(-)
Kevin Wolf - Feb. 8, 2013, 12:07 p.m.
Am 28.01.2013 18:07, schrieb Benoît Canet:
> Use gnutls's SHA-256 to compare versions.
> 
> Signed-off-by: Benoit Canet <benoit@irqsave.net>
> ---
>  block/quorum.c |  303 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  configure      |   22 ++++
>  2 files changed, 324 insertions(+), 1 deletion(-)
> 
> diff --git a/block/quorum.c b/block/quorum.c
> index e3c6aad..4c552e4 100644
> --- a/block/quorum.c
> +++ b/block/quorum.c
> @@ -13,8 +13,30 @@
>   * See the COPYING file in the top-level directory.
>   */
>  
> +#include <gnutls/gnutls.h>
> +#include <gnutls/crypto.h>
>  #include "block/block_int.h"
>  
> +#define HASH_LENGTH 32
> +
> +typedef union QuorumVoteValue {
> +    char h[HASH_LENGTH];       /* SHA-256 hash */
> +    unsigned long l;  /* simpler hash */
> +} QuorumVoteValue;
> +
> +typedef struct QuorumVoteItem {
> +    int index;
> +    QLIST_ENTRY(QuorumVoteItem) next;
> +} QuorumVoteItem;
> +
> +typedef struct QuorumVoteVersion {
> +    QuorumVoteValue value;
> +    int index;
> +    int vote_count;
> +    QLIST_HEAD(, QuorumVoteItem) items;
> +    QLIST_ENTRY(QuorumVoteVersion) next;
> +} QuorumVoteVersion;

I wonder if it wouldn't become simpler if you used arrays instead of
lists. We know that s->total is the upper limit for entries.

> +
>  typedef struct {
>      BlockDriverState **bs;
>      unsigned long long threshold;
> @@ -32,6 +54,11 @@ typedef struct QuorumSingleAIOCB {
>      QuorumAIOCB *parent;
>  } QuorumSingleAIOCB;
>  
> +typedef struct QuorumVotes {
> +    QLIST_HEAD(, QuorumVoteVersion) vote_list;
> +    int (*compare)(QuorumVoteValue *a, QuorumVoteValue *b);
> +} QuorumVotes;

Can this be directly embedded into QuorumAIOCB?

compare is always quorum_sha256_compare, so why even have a field? We
can still introduce it once we add different options.

> +
>  struct QuorumAIOCB {
>      BlockDriverAIOCB common;
>      BDRVQuorumState *bqs;
> @@ -48,6 +75,8 @@ struct QuorumAIOCB {
>      int success_count;          /* number of successfully completed AIOCB */
>      bool *finished;             /* completion signal for cancel */
>  
> +    QuorumVotes votes;
> +
>      void (*vote)(QuorumAIOCB *acb);
>      int vote_ret;
>  };
> @@ -84,6 +113,11 @@ static void quorum_aio_bh(void *opaque)
>      }
>  
>      qemu_bh_delete(acb->bh);
> +
> +    if (acb->vote_ret) {
> +        ret = acb->vote_ret;
> +    }
> +
>      acb->common.cb(acb->common.opaque, ret);
>      if (acb->finished) {
>          *acb->finished = true;
> @@ -95,6 +129,11 @@ static void quorum_aio_bh(void *opaque)
>      qemu_aio_release(acb);
>  }
>  
> +static int quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
> +{
> +    return memcmp(a, b, HASH_LENGTH);
> +}

Comparing a.h and b.h would be cleaner.

> +
>  static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
>                                     BlockDriverState *bs,
>                                     QEMUIOVector *qiov,
> @@ -118,6 +157,8 @@ static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
>      acb->vote = NULL;
>      acb->vote_ret = 0;
>      acb->finished = NULL;
> +    acb->votes.compare = quorum_sha256_compare;
> +    QLIST_INIT(&acb->votes.vote_list);
>  
>      for (i = 0; i < s->total; i++) {
>          acb->aios[i].buf = NULL;
> @@ -145,10 +186,268 @@ static void quorum_aio_cb(void *opaque, int ret)
>          return;
>      }
>  
> +    /* Do the vote */
> +    if (acb->vote) {
> +        acb->vote(acb);
> +    }

This is NULL for all writes and quorum_vote for all reads. Is there any
chance that more options will be introduced? If not, why not have a bool
is_read and directly call the function here?

> +
>      acb->bh = qemu_bh_new(quorum_aio_bh, acb);
>      qemu_bh_schedule(acb->bh);
>  }
>  
> +static void quorum_print_bad(QuorumAIOCB *acb, const char *filename)
> +{
> +    fprintf(stderr, "quorum: corrected error in quorum file %s: sector_num=%"
> +            PRId64 " nb_sectors=%i\n", filename, acb->sector_num,
> +            acb->nb_sectors);
> +}
> +
> +static void quorum_print_failure(QuorumAIOCB *acb)
> +{
> +    fprintf(stderr, "quorum: failure sector_num=%" PRId64 " nb_sectors=%i\n",
> +            acb->sector_num, acb->nb_sectors);
> +}
> +
> +static void quorum_print_bad_versions(QuorumAIOCB *acb,
> +                                      QuorumVoteValue *value)
> +{
> +    QuorumVoteVersion *version;
> +    QuorumVoteItem *item;
> +    BDRVQuorumState *s = acb->bqs;
> +
> +    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
> +        if (!acb->votes.compare(&version->value, value)) {
> +            continue;
> +        }
> +        QLIST_FOREACH(item, &version->items, next) {
> +            quorum_print_bad(acb, s->filenames[item->index]);
> +        }
> +    }
> +}
> +
> +static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
> +{
> +    int i;
> +    assert(dest->niov == source->niov);
> +    assert(dest->size == source->size);
> +    for (i = 0; i < source->niov; i++) {
> +        assert(dest->iov[i].iov_len == source->iov[i].iov_len);
> +        memcpy(dest->iov[i].iov_base,
> +               source->iov[i].iov_base,
> +               source->iov[i].iov_len);
> +    }
> +}
> +
> +static void quorum_count_vote(QuorumVotes *votes,
> +                              QuorumVoteValue *value,
> +                              int index)
> +{
> +    QuorumVoteVersion *v = NULL, *version = NULL;
> +    QuorumVoteItem *item;
> +
> +    /* look if we have something with this hash */
> +    QLIST_FOREACH(v, &votes->vote_list, next) {
> +        if (!votes->compare(&v->value, value)) {
> +            version = v;
> +            break;
> +        }
> +    }
> +
> +    /* It's a version not yet in the list add it */
> +    if (!version) {
> +        version = g_new0(QuorumVoteVersion, 1);
> +        QLIST_INIT(&version->items);
> +        memcpy(&version->value, value, sizeof(version->value));
> +        version->index = index;
> +        version->vote_count = 0;
> +        QLIST_INSERT_HEAD(&votes->vote_list, version, next);
> +    }
> +
> +    version->vote_count++;
> +
> +    item = g_new0(QuorumVoteItem, 1);
> +    item->index = index;
> +    QLIST_INSERT_HEAD(&version->items, item, next);
> +}
> +
> +static void quorum_free_vote_list(QuorumVotes *votes)
> +{
> +    QuorumVoteVersion *version, *next_version;
> +    QuorumVoteItem *item, *next_item;
> +
> +    QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
> +        QLIST_REMOVE(version, next);
> +        QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
> +            QLIST_REMOVE(item, next);
> +            g_free(item);
> +        }
> +        g_free(version);
> +    }
> +}
> +
> +static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash)
> +{
> +    int j, ret;
> +    gnutls_hash_hd_t dig;
> +    QEMUIOVector *qiov = &acb->aios[i].qiov;
> +
> +    ret = gnutls_hash_init(&dig, GNUTLS_DIG_SHA256);
> +
> +    if (ret < 0) {
> +        return ret;
> +    }
> +
> +    for (j = 0; j < qiov->niov; j++) {
> +        ret = gnutls_hash(dig, qiov->iov[j].iov_base, qiov->iov[j].iov_len);
> +        if (ret < 0) {
> +            return ret;

I have no clue about this lib, but sure that you don't need to call deinit?

> +        }
> +    }
> +
> +    gnutls_hash_deinit(dig, (void *) hash);
> +
> +    return 0;
> +}
> +
> +static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes)
> +{
> +    int i = 0;
> +    QuorumVoteVersion *candidate, *winner = NULL;
> +
> +    QLIST_FOREACH(candidate, &votes->vote_list, next) {
> +        if (candidate->vote_count > i) {
> +            i = candidate->vote_count;
> +            winner = candidate;
> +        }
> +    }
> +
> +    return winner;
> +}
> +
> +static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
> +{
> +    int i;
> +    int result;
> +
> +    assert(a->niov == b->niov);
> +    for (i = 0; i < a->niov; i++) {
> +        assert(a->iov[i].iov_len == b->iov[i].iov_len);
> +        result = memcmp(a->iov[i].iov_base,
> +                        b->iov[i].iov_base,
> +                        a->iov[i].iov_len);
> +        if (result) {
> +            return false;
> +        }
> +    }
> +
> +    return true;
> +}

qemu_iovec_compare() seems to do exactly the same, except that this
doesn't return the offset of the first difference.

Why is it a good idea to duplicate the code?

> +
> +static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb,
> +                                          const char *fmt, ...)
> +{
> +    va_list ap;
> +
> +    va_start(ap, fmt);
> +    fprintf(stderr, "quorum: sector_num=%" PRId64 " nb_sectors=%d ",
> +            acb->sector_num, acb->nb_sectors);
> +    vfprintf(stderr, fmt, ap);
> +    fprintf(stderr, "\n");
> +    va_end(ap);
> +    exit(1);
> +}
> +
> +static bool quorum_compare(QuorumAIOCB *acb,
> +                           QEMUIOVector *a,
> +                           QEMUIOVector *b)
> +{
> +    BDRVQuorumState *s = acb->bqs;
> +    bool blkverify = false;
> +    ssize_t offset;
> +
> +    if (s->total == 2 && s->threshold == 2) {
> +        blkverify = true;
> +    }
> +
> +    if (blkverify) {
> +        offset = qemu_iovec_compare(a, b);
> +        if (offset != -1) {
> +            quorum_err(acb, "contents mismatch in sector %" PRId64,
> +                       acb->sector_num +
> +                       (uint64_t)(offset / BDRV_SECTOR_SIZE));
> +        }
> +        return true;
> +    }

wtf is happening here? Why an uncommented special case for two images
that results in an entirely different behaviour? Why split the if block
and introduce a useless blkverify variable?

> +
> +    return quorum_iovec_compare(a, b);
> +}
> +
> +
> +static void quorum_vote(QuorumAIOCB *acb)
> +{
> +    bool quorum = true;
> +    int i, j, ret;
> +    QuorumVoteValue hash;
> +    BDRVQuorumState *s = acb->bqs;
> +    QuorumVoteVersion *winner;
> +
> +    /* get the index of the first successful read */
> +    for (i = 0; i < s->total; i++) {
> +        if (!acb->aios[i].ret) {
> +            break;
> +        }
> +    }
> +
> +    /* compare this read with all other successful read looking for quorum */
> +    for (j = i + 1; j < s->total; j++) {
> +        if (acb->aios[j].ret) {
> +            continue;
> +        }
> +        quorum = quorum_compare(acb, &acb->aios[i].qiov, &acb->aios[j].qiov);
> +        if (!quorum) {
> +            break;
> +       }
> +    }
> +
> +    /* Every successful read agrees -> Quorum */
> +    if (quorum) {
> +        quorum_copy_qiov(acb->qiov, &acb->aios[i].qiov);
> +        return;
> +    }
> +
> +    /* compute hashs for each successful read, also store indexes */
> +    for (i = 0; i < s->total; i++) {
> +        if (acb->aios[i].ret) {
> +            continue;
> +        }
> +        ret = quorum_compute_hash(acb, i, &hash);
> +        assert(ret == 0);

So you're claiming that this function cannot possibly fail? Why does it
even have a return code then?

> +        quorum_count_vote(&acb->votes, &hash, i);
> +    }
> +
> +    /* vote to select the most represented version */
> +    winner = quorum_get_vote_winner(&acb->votes);
> +    assert(winner != NULL);

Won't this assertion fail if all requests returned an error?

> +
> +    /* if the winner count is smaller than threshold read fail */

s/than/then/

> +    if (winner->vote_count < s->threshold) {
> +        quorum_print_failure(acb);
> +        acb->vote_ret = -EIO;
> +        fprintf(stderr, "quorum: vote result inferior to threshold\n");

Block drivers usually shouldn't print anything to stderr or stdout.

> +        goto free_exit;
> +    }
> +
> +    /* we have a winner: copy it */
> +    quorum_copy_qiov(acb->qiov, &acb->aios[winner->index].qiov);
> +
> +    /* some versions are bad print them */
> +    quorum_print_bad_versions(acb, &winner->value);

Same here. Is this driver meant to be used in production or only for
debugging? Maybe it should have a debug mode that must explicitly be
enabled and messages are only printed in this mode?

> +
> +free_exit:
> +    /* free lists */
> +    quorum_free_vote_list(&acb->votes);
> +}
> +
>  static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
>                                           int64_t sector_num,
>                                           QEMUIOVector *qiov,
> @@ -161,6 +460,8 @@ static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
>                                        nb_sectors, cb, opaque);
>      int i;
>  
> +    acb->vote = quorum_vote;
> +
>      for (i = 0; i < s->total; i++) {
>          acb->aios[i].buf = qemu_blockalign(bs->file, qiov->size);
>          qemu_iovec_init(&acb->aios[i].qiov, qiov->niov);
> @@ -168,7 +469,7 @@ static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
>      }
>  
>      for (i = 0; i < s->total; i++) {
> -        bdrv_aio_readv(s->bs[i], sector_num, qiov, nb_sectors,
> +        bdrv_aio_readv(s->bs[i], sector_num, &acb->aios[i].qiov, nb_sectors,
>                         quorum_aio_cb, &acb->aios[i]);
>      }
>  
> diff --git a/configure b/configure
> index 4ebb60d..0832d26 100755
> --- a/configure
> +++ b/configure
> @@ -1733,6 +1733,28 @@ EOF
>  fi
>  
>  ##########################################
> +# Quorum gnutls detection
> +cat > $TMPC <<EOF
> +#include <gnutls/gnutls.h>
> +#include <gnutls/crypto.h>
> +int main(void) {char data[4096], digest[32];
> +gnutls_hash_fast(GNUTLS_DIG_SHA256, data, 4096, digest);
> +return 0;
> +}
> +EOF
> +qcow_tls_cflags=`$pkg_config --cflags gnutls 2> /dev/null`

Wait, what has qcow to do with it?

> +qcow_tls_libs=`$pkg_config --libs gnutls 2> /dev/null`
> +if compile_prog "$qcow_tls_cflags" "$qcow_tls_libs" ; then
> +  qcow_tls=yes
> +  libs_softmmu="$qcow_tls_libs $libs_softmmu"
> +  libs_tools="$qcow_tls_libs $libs_softmmu"
> +  QEMU_CFLAGS="$QEMU_CFLAGS $qcow_tls_cflags"
> +else
> +  echo "gnutls > 2.10.0 required to compile QEMU"
> +  exit 1
> +fi

Should this really be a hard dependency? Why not just disable quorum if
the lib isn't there?

Kevin
Benoît Canet - Sept. 26, 2013, 4:46 p.m.
Le Friday 08 Feb 2013 à 13:07:03 (+0100), Kevin Wolf a écrit :
> Am 28.01.2013 18:07, schrieb Benoît Canet:
> > Use gnutls's SHA-256 to compare versions.
> > 
> > Signed-off-by: Benoit Canet <benoit@irqsave.net>
> > ---
> >  block/quorum.c |  303 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-
> >  configure      |   22 ++++
> >  2 files changed, 324 insertions(+), 1 deletion(-)
> > 
> > diff --git a/block/quorum.c b/block/quorum.c
> > index e3c6aad..4c552e4 100644
> > --- a/block/quorum.c
> > +++ b/block/quorum.c
> > @@ -13,8 +13,30 @@
> >   * See the COPYING file in the top-level directory.
> >   */
> >  
> > +#include <gnutls/gnutls.h>
> > +#include <gnutls/crypto.h>
> >  #include "block/block_int.h"
> >  
> > +#define HASH_LENGTH 32
> > +
> > +typedef union QuorumVoteValue {
> > +    char h[HASH_LENGTH];       /* SHA-256 hash */
> > +    unsigned long l;  /* simpler hash */
> > +} QuorumVoteValue;
> > +
> > +typedef struct QuorumVoteItem {
> > +    int index;
> > +    QLIST_ENTRY(QuorumVoteItem) next;
> > +} QuorumVoteItem;
> > +
> > +typedef struct QuorumVoteVersion {
> > +    QuorumVoteValue value;
> > +    int index;
> > +    int vote_count;
> > +    QLIST_HEAD(, QuorumVoteItem) items;
> > +    QLIST_ENTRY(QuorumVoteVersion) next;
> > +} QuorumVoteVersion;
> 
> I wonder if it wouldn't become simpler if you used arrays instead of
> lists. We know that s->total is the upper limit for entries.
> 
> > +
> >  typedef struct {
> >      BlockDriverState **bs;
> >      unsigned long long threshold;
> > @@ -32,6 +54,11 @@ typedef struct QuorumSingleAIOCB {
> >      QuorumAIOCB *parent;
> >  } QuorumSingleAIOCB;
> >  
> > +typedef struct QuorumVotes {
> > +    QLIST_HEAD(, QuorumVoteVersion) vote_list;
> > +    int (*compare)(QuorumVoteValue *a, QuorumVoteValue *b);
> > +} QuorumVotes;
> 
> Can this be directly embedded into QuorumAIOCB?
> 
> compare is always quorum_sha256_compare, so why even have a field? We
> can still introduce it once we add different options.
> 
> > +
> >  struct QuorumAIOCB {
> >      BlockDriverAIOCB common;
> >      BDRVQuorumState *bqs;
> > @@ -48,6 +75,8 @@ struct QuorumAIOCB {
> >      int success_count;          /* number of successfully completed AIOCB */
> >      bool *finished;             /* completion signal for cancel */
> >  
> > +    QuorumVotes votes;
> > +
> >      void (*vote)(QuorumAIOCB *acb);
> >      int vote_ret;
> >  };
> > @@ -84,6 +113,11 @@ static void quorum_aio_bh(void *opaque)
> >      }
> >  
> >      qemu_bh_delete(acb->bh);
> > +
> > +    if (acb->vote_ret) {
> > +        ret = acb->vote_ret;
> > +    }
> > +
> >      acb->common.cb(acb->common.opaque, ret);
> >      if (acb->finished) {
> >          *acb->finished = true;
> > @@ -95,6 +129,11 @@ static void quorum_aio_bh(void *opaque)
> >      qemu_aio_release(acb);
> >  }
> >  
> > +static int quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
> > +{
> > +    return memcmp(a, b, HASH_LENGTH);
> > +}
> 
> Comparing a.h and b.h would be cleaner.
> 
> > +
> >  static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
> >                                     BlockDriverState *bs,
> >                                     QEMUIOVector *qiov,
> > @@ -118,6 +157,8 @@ static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
> >      acb->vote = NULL;
> >      acb->vote_ret = 0;
> >      acb->finished = NULL;
> > +    acb->votes.compare = quorum_sha256_compare;
> > +    QLIST_INIT(&acb->votes.vote_list);
> >  
> >      for (i = 0; i < s->total; i++) {
> >          acb->aios[i].buf = NULL;
> > @@ -145,10 +186,268 @@ static void quorum_aio_cb(void *opaque, int ret)
> >          return;
> >      }
> >  
> > +    /* Do the vote */
> > +    if (acb->vote) {
> > +        acb->vote(acb);
> > +    }
> 
> This is NULL for all writes and quorum_vote for all reads. Is there any
> chance that more options will be introduced? If not, why not have a bool
> is_read and directly call the function here?
> 
> > +
> >      acb->bh = qemu_bh_new(quorum_aio_bh, acb);
> >      qemu_bh_schedule(acb->bh);
> >  }
> >  
> > +static void quorum_print_bad(QuorumAIOCB *acb, const char *filename)
> > +{
> > +    fprintf(stderr, "quorum: corrected error in quorum file %s: sector_num=%"
> > +            PRId64 " nb_sectors=%i\n", filename, acb->sector_num,
> > +            acb->nb_sectors);
> > +}
> > +
> > +static void quorum_print_failure(QuorumAIOCB *acb)
> > +{
> > +    fprintf(stderr, "quorum: failure sector_num=%" PRId64 " nb_sectors=%i\n",
> > +            acb->sector_num, acb->nb_sectors);
> > +}
> > +
> > +static void quorum_print_bad_versions(QuorumAIOCB *acb,
> > +                                      QuorumVoteValue *value)
> > +{
> > +    QuorumVoteVersion *version;
> > +    QuorumVoteItem *item;
> > +    BDRVQuorumState *s = acb->bqs;
> > +
> > +    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
> > +        if (!acb->votes.compare(&version->value, value)) {
> > +            continue;
> > +        }
> > +        QLIST_FOREACH(item, &version->items, next) {
> > +            quorum_print_bad(acb, s->filenames[item->index]);
> > +        }
> > +    }
> > +}
> > +
> > +static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
> > +{
> > +    int i;
> > +    assert(dest->niov == source->niov);
> > +    assert(dest->size == source->size);
> > +    for (i = 0; i < source->niov; i++) {
> > +        assert(dest->iov[i].iov_len == source->iov[i].iov_len);
> > +        memcpy(dest->iov[i].iov_base,
> > +               source->iov[i].iov_base,
> > +               source->iov[i].iov_len);
> > +    }
> > +}
> > +
> > +static void quorum_count_vote(QuorumVotes *votes,
> > +                              QuorumVoteValue *value,
> > +                              int index)
> > +{
> > +    QuorumVoteVersion *v = NULL, *version = NULL;
> > +    QuorumVoteItem *item;
> > +
> > +    /* look if we have something with this hash */
> > +    QLIST_FOREACH(v, &votes->vote_list, next) {
> > +        if (!votes->compare(&v->value, value)) {
> > +            version = v;
> > +            break;
> > +        }
> > +    }
> > +
> > +    /* It's a version not yet in the list add it */
> > +    if (!version) {
> > +        version = g_new0(QuorumVoteVersion, 1);
> > +        QLIST_INIT(&version->items);
> > +        memcpy(&version->value, value, sizeof(version->value));
> > +        version->index = index;
> > +        version->vote_count = 0;
> > +        QLIST_INSERT_HEAD(&votes->vote_list, version, next);
> > +    }
> > +
> > +    version->vote_count++;
> > +
> > +    item = g_new0(QuorumVoteItem, 1);
> > +    item->index = index;
> > +    QLIST_INSERT_HEAD(&version->items, item, next);
> > +}
> > +
> > +static void quorum_free_vote_list(QuorumVotes *votes)
> > +{
> > +    QuorumVoteVersion *version, *next_version;
> > +    QuorumVoteItem *item, *next_item;
> > +
> > +    QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
> > +        QLIST_REMOVE(version, next);
> > +        QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
> > +            QLIST_REMOVE(item, next);
> > +            g_free(item);
> > +        }
> > +        g_free(version);
> > +    }
> > +}
> > +
> > +static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash)
> > +{
> > +    int j, ret;
> > +    gnutls_hash_hd_t dig;
> > +    QEMUIOVector *qiov = &acb->aios[i].qiov;
> > +
> > +    ret = gnutls_hash_init(&dig, GNUTLS_DIG_SHA256);
> > +
> > +    if (ret < 0) {
> > +        return ret;
> > +    }
> > +
> > +    for (j = 0; j < qiov->niov; j++) {
> > +        ret = gnutls_hash(dig, qiov->iov[j].iov_base, qiov->iov[j].iov_len);
> > +        if (ret < 0) {
> > +            return ret;
> 
> I have no clue about this lib, but sure that you don't need to call deinit?
> 
> > +        }
> > +    }
> > +
> > +    gnutls_hash_deinit(dig, (void *) hash);
> > +
> > +    return 0;
> > +}
> > +
> > +static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes)
> > +{
> > +    int i = 0;
> > +    QuorumVoteVersion *candidate, *winner = NULL;
> > +
> > +    QLIST_FOREACH(candidate, &votes->vote_list, next) {
> > +        if (candidate->vote_count > i) {
> > +            i = candidate->vote_count;
> > +            winner = candidate;
> > +        }
> > +    }
> > +
> > +    return winner;
> > +}
> > +
> > +static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
> > +{
> > +    int i;
> > +    int result;
> > +
> > +    assert(a->niov == b->niov);
> > +    for (i = 0; i < a->niov; i++) {
> > +        assert(a->iov[i].iov_len == b->iov[i].iov_len);
> > +        result = memcmp(a->iov[i].iov_base,
> > +                        b->iov[i].iov_base,
> > +                        a->iov[i].iov_len);
> > +        if (result) {
> > +            return false;
> > +        }
> > +    }
> > +
> > +    return true;
> > +}
> 
> qemu_iovec_compare() seems to do exactly the same, except that this
> doesn't return the offset of the first difference.
> 
> Why is it a good idea to duplicate the code?

Quorum need speed: qemu_iovec_compare will do byte level comparison whereas
memcmp will use SSE on large block.

> 
> > +
> > +static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb,
> > +                                          const char *fmt, ...)
> > +{
> > +    va_list ap;
> > +
> > +    va_start(ap, fmt);
> > +    fprintf(stderr, "quorum: sector_num=%" PRId64 " nb_sectors=%d ",
> > +            acb->sector_num, acb->nb_sectors);
> > +    vfprintf(stderr, fmt, ap);
> > +    fprintf(stderr, "\n");
> > +    va_end(ap);
> > +    exit(1);
> > +}
> > +
> > +static bool quorum_compare(QuorumAIOCB *acb,
> > +                           QEMUIOVector *a,
> > +                           QEMUIOVector *b)
> > +{
> > +    BDRVQuorumState *s = acb->bqs;
> > +    bool blkverify = false;
> > +    ssize_t offset;
> > +
> > +    if (s->total == 2 && s->threshold == 2) {
> > +        blkverify = true;
> > +    }
> > +
> > +    if (blkverify) {
> > +        offset = qemu_iovec_compare(a, b);
> > +        if (offset != -1) {
> > +            quorum_err(acb, "contents mismatch in sector %" PRId64,
> > +                       acb->sector_num +
> > +                       (uint64_t)(offset / BDRV_SECTOR_SIZE));
> > +        }
> > +        return true;
> > +    }
> 
> wtf is happening here? Why an uncommented special case for two images
> that results in an entirely different behaviour? Why split the if block
> and introduce a useless blkverify variable?
> 
> > +
> > +    return quorum_iovec_compare(a, b);
> > +}
> > +
> > +
> > +static void quorum_vote(QuorumAIOCB *acb)
> > +{
> > +    bool quorum = true;
> > +    int i, j, ret;
> > +    QuorumVoteValue hash;
> > +    BDRVQuorumState *s = acb->bqs;
> > +    QuorumVoteVersion *winner;
> > +
> > +    /* get the index of the first successful read */
> > +    for (i = 0; i < s->total; i++) {
> > +        if (!acb->aios[i].ret) {
> > +            break;
> > +        }
> > +    }
> > +
> > +    /* compare this read with all other successful read looking for quorum */
> > +    for (j = i + 1; j < s->total; j++) {
> > +        if (acb->aios[j].ret) {
> > +            continue;
> > +        }
> > +        quorum = quorum_compare(acb, &acb->aios[i].qiov, &acb->aios[j].qiov);
> > +        if (!quorum) {
> > +            break;
> > +       }
> > +    }
> > +
> > +    /* Every successful read agrees -> Quorum */
> > +    if (quorum) {
> > +        quorum_copy_qiov(acb->qiov, &acb->aios[i].qiov);
> > +        return;
> > +    }
> > +
> > +    /* compute hashs for each successful read, also store indexes */
> > +    for (i = 0; i < s->total; i++) {
> > +        if (acb->aios[i].ret) {
> > +            continue;
> > +        }
> > +        ret = quorum_compute_hash(acb, i, &hash);
> > +        assert(ret == 0);
> 
> So you're claiming that this function cannot possibly fail? Why does it
> even have a return code then?
> 
> > +        quorum_count_vote(&acb->votes, &hash, i);
> > +    }
> > +
> > +    /* vote to select the most represented version */
> > +    winner = quorum_get_vote_winner(&acb->votes);
> > +    assert(winner != NULL);
> 
> Won't this assertion fail if all requests returned an error?
> 
> > +
> > +    /* if the winner count is smaller than threshold read fail */
> 
> s/than/then/
> 
> > +    if (winner->vote_count < s->threshold) {
> > +        quorum_print_failure(acb);
> > +        acb->vote_ret = -EIO;
> > +        fprintf(stderr, "quorum: vote result inferior to threshold\n");
> 
> Block drivers usually shouldn't print anything to stderr or stdout.
> 
> > +        goto free_exit;
> > +    }
> > +
> > +    /* we have a winner: copy it */
> > +    quorum_copy_qiov(acb->qiov, &acb->aios[winner->index].qiov);
> > +
> > +    /* some versions are bad print them */
> > +    quorum_print_bad_versions(acb, &winner->value);
> 
> Same here. Is this driver meant to be used in production or only for
> debugging? Maybe it should have a debug mode that must explicitly be
> enabled and messages are only printed in this mode?

Quorum needs to communicate some event to the user for maintainance.
Would thowing QMP events be better ?

> 
> > +
> > +free_exit:
> > +    /* free lists */
> > +    quorum_free_vote_list(&acb->votes);
> > +}
> > +
> >  static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
> >                                           int64_t sector_num,
> >                                           QEMUIOVector *qiov,
> > @@ -161,6 +460,8 @@ static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
> >                                        nb_sectors, cb, opaque);
> >      int i;
> >  
> > +    acb->vote = quorum_vote;
> > +
> >      for (i = 0; i < s->total; i++) {
> >          acb->aios[i].buf = qemu_blockalign(bs->file, qiov->size);
> >          qemu_iovec_init(&acb->aios[i].qiov, qiov->niov);
> > @@ -168,7 +469,7 @@ static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
> >      }
> >  
> >      for (i = 0; i < s->total; i++) {
> > -        bdrv_aio_readv(s->bs[i], sector_num, qiov, nb_sectors,
> > +        bdrv_aio_readv(s->bs[i], sector_num, &acb->aios[i].qiov, nb_sectors,
> >                         quorum_aio_cb, &acb->aios[i]);
> >      }
> >  
> > diff --git a/configure b/configure
> > index 4ebb60d..0832d26 100755
> > --- a/configure
> > +++ b/configure
> > @@ -1733,6 +1733,28 @@ EOF
> >  fi
> >  
> >  ##########################################
> > +# Quorum gnutls detection
> > +cat > $TMPC <<EOF
> > +#include <gnutls/gnutls.h>
> > +#include <gnutls/crypto.h>
> > +int main(void) {char data[4096], digest[32];
> > +gnutls_hash_fast(GNUTLS_DIG_SHA256, data, 4096, digest);
> > +return 0;
> > +}
> > +EOF
> > +qcow_tls_cflags=`$pkg_config --cflags gnutls 2> /dev/null`
> 
> Wait, what has qcow to do with it?
> 
> > +qcow_tls_libs=`$pkg_config --libs gnutls 2> /dev/null`
> > +if compile_prog "$qcow_tls_cflags" "$qcow_tls_libs" ; then
> > +  qcow_tls=yes
> > +  libs_softmmu="$qcow_tls_libs $libs_softmmu"
> > +  libs_tools="$qcow_tls_libs $libs_softmmu"
> > +  QEMU_CFLAGS="$QEMU_CFLAGS $qcow_tls_cflags"
> > +else
> > +  echo "gnutls > 2.10.0 required to compile QEMU"
> > +  exit 1
> > +fi
> 
> Should this really be a hard dependency? Why not just disable quorum if
> the lib isn't there?
> 
> Kevin
>
Kevin Wolf - Sept. 27, 2013, 10:05 a.m.
Am 26.09.2013 um 18:46 hat Benoît Canet geschrieben:
> Le Friday 08 Feb 2013 à 13:07:03 (+0100), Kevin Wolf a écrit :
> > Am 28.01.2013 18:07, schrieb Benoît Canet:
> > > Use gnutls's SHA-256 to compare versions.
> > > 
> > > Signed-off-by: Benoit Canet <benoit@irqsave.net>
> > > ---
> > >  block/quorum.c |  303 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-
> > >  configure      |   22 ++++
> > >  2 files changed, 324 insertions(+), 1 deletion(-)

> > > +static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
> > > +{
> > > +    int i;
> > > +    int result;
> > > +
> > > +    assert(a->niov == b->niov);
> > > +    for (i = 0; i < a->niov; i++) {
> > > +        assert(a->iov[i].iov_len == b->iov[i].iov_len);
> > > +        result = memcmp(a->iov[i].iov_base,
> > > +                        b->iov[i].iov_base,
> > > +                        a->iov[i].iov_len);
> > > +        if (result) {
> > > +            return false;
> > > +        }
> > > +    }
> > > +
> > > +    return true;
> > > +}
> > 
> > qemu_iovec_compare() seems to do exactly the same, except that this
> > doesn't return the offset of the first difference.
> > 
> > Why is it a good idea to duplicate the code?
> 
> Quorum need speed: qemu_iovec_compare will do byte level comparison whereas
> memcmp will use SSE on large block.

Fair enough.

> > > +    /* we have a winner: copy it */
> > > +    quorum_copy_qiov(acb->qiov, &acb->aios[winner->index].qiov);
> > > +
> > > +    /* some versions are bad print them */
> > > +    quorum_print_bad_versions(acb, &winner->value);
> > 
> > Same here. Is this driver meant to be used in production or only for
> > debugging? Maybe it should have a debug mode that must explicitly be
> > enabled and messages are only printed in this mode?
> 
> Quorum needs to communicate some event to the user for maintainance.
> Would thowing QMP events be better ?

Yes, absolutely. Error messages on stderr end up in a log file at best
and can only be read by human administrators. Using QMP enables
management tools to take action.

Kevin
Benoît Canet - Sept. 30, 2013, 12:58 p.m.
Le Friday 08 Feb 2013 à 13:07:03 (+0100), Kevin Wolf a écrit :
> Am 28.01.2013 18:07, schrieb Benoît Canet:
> > Use gnutls's SHA-256 to compare versions.
> > 
> > Signed-off-by: Benoit Canet <benoit@irqsave.net>
> > ---
> >  block/quorum.c |  303 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-
> >  configure      |   22 ++++
> >  2 files changed, 324 insertions(+), 1 deletion(-)
> > 
> > diff --git a/block/quorum.c b/block/quorum.c
> > index e3c6aad..4c552e4 100644
> > --- a/block/quorum.c
> > +++ b/block/quorum.c
> > @@ -13,8 +13,30 @@
> >   * See the COPYING file in the top-level directory.
> >   */
> >  
> > +#include <gnutls/gnutls.h>
> > +#include <gnutls/crypto.h>
> >  #include "block/block_int.h"
> >  
> > +#define HASH_LENGTH 32
> > +
> > +typedef union QuorumVoteValue {
> > +    char h[HASH_LENGTH];       /* SHA-256 hash */
> > +    unsigned long l;  /* simpler hash */
> > +} QuorumVoteValue;
> > +
> > +typedef struct QuorumVoteItem {
> > +    int index;
> > +    QLIST_ENTRY(QuorumVoteItem) next;
> > +} QuorumVoteItem;
> > +
> > +typedef struct QuorumVoteVersion {
> > +    QuorumVoteValue value;
> > +    int index;
> > +    int vote_count;
> > +    QLIST_HEAD(, QuorumVoteItem) items;
> > +    QLIST_ENTRY(QuorumVoteVersion) next;
> > +} QuorumVoteVersion;
> 
> I wonder if it wouldn't become simpler if you used arrays instead of
> lists. We know that s->total is the upper limit for entries.

I am not sure about this.
The voting code is a big part of quorum and currently works fine.
Stefan already reviewed it.
I don't know if it's wize to throw it for a new one for such a small gain.

> 
> > +
> >  typedef struct {
> >      BlockDriverState **bs;
> >      unsigned long long threshold;
> > @@ -32,6 +54,11 @@ typedef struct QuorumSingleAIOCB {
> >      QuorumAIOCB *parent;
> >  } QuorumSingleAIOCB;
> >  
> > +typedef struct QuorumVotes {
> > +    QLIST_HEAD(, QuorumVoteVersion) vote_list;
> > +    int (*compare)(QuorumVoteValue *a, QuorumVoteValue *b);
> > +} QuorumVotes;
> 
> Can this be directly embedded into QuorumAIOCB?
> 
> compare is always quorum_sha256_compare, so why even have a field? We
> can still introduce it once we add different options.

I introduce another compare function later.

> 
> > +
> >  struct QuorumAIOCB {
> >      BlockDriverAIOCB common;
> >      BDRVQuorumState *bqs;
> > @@ -48,6 +75,8 @@ struct QuorumAIOCB {
> >      int success_count;          /* number of successfully completed AIOCB */
> >      bool *finished;             /* completion signal for cancel */
> >  
> > +    QuorumVotes votes;
> > +
> >      void (*vote)(QuorumAIOCB *acb);
> >      int vote_ret;
> >  };
> > @@ -84,6 +113,11 @@ static void quorum_aio_bh(void *opaque)
> >      }
> >  
> >      qemu_bh_delete(acb->bh);
> > +
> > +    if (acb->vote_ret) {
> > +        ret = acb->vote_ret;
> > +    }
> > +
> >      acb->common.cb(acb->common.opaque, ret);
> >      if (acb->finished) {
> >          *acb->finished = true;
> > @@ -95,6 +129,11 @@ static void quorum_aio_bh(void *opaque)
> >      qemu_aio_release(acb);
> >  }
> >  
> > +static int quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
> > +{
> > +    return memcmp(a, b, HASH_LENGTH);
> > +}
> 
> Comparing a.h and b.h would be cleaner.
> 
> > +
> >  static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
> >                                     BlockDriverState *bs,
> >                                     QEMUIOVector *qiov,
> > @@ -118,6 +157,8 @@ static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
> >      acb->vote = NULL;
> >      acb->vote_ret = 0;
> >      acb->finished = NULL;
> > +    acb->votes.compare = quorum_sha256_compare;
> > +    QLIST_INIT(&acb->votes.vote_list);
> >  
> >      for (i = 0; i < s->total; i++) {
> >          acb->aios[i].buf = NULL;
> > @@ -145,10 +186,268 @@ static void quorum_aio_cb(void *opaque, int ret)
> >          return;
> >      }
> >  
> > +    /* Do the vote */
> > +    if (acb->vote) {
> > +        acb->vote(acb);
> > +    }
> 
> This is NULL for all writes and quorum_vote for all reads. Is there any
> chance that more options will be introduced? If not, why not have a bool
> is_read and directly call the function here?
> 
> > +
> >      acb->bh = qemu_bh_new(quorum_aio_bh, acb);
> >      qemu_bh_schedule(acb->bh);
> >  }
> >  
> > +static void quorum_print_bad(QuorumAIOCB *acb, const char *filename)
> > +{
> > +    fprintf(stderr, "quorum: corrected error in quorum file %s: sector_num=%"
> > +            PRId64 " nb_sectors=%i\n", filename, acb->sector_num,
> > +            acb->nb_sectors);
> > +}
> > +
> > +static void quorum_print_failure(QuorumAIOCB *acb)
> > +{
> > +    fprintf(stderr, "quorum: failure sector_num=%" PRId64 " nb_sectors=%i\n",
> > +            acb->sector_num, acb->nb_sectors);
> > +}
> > +
> > +static void quorum_print_bad_versions(QuorumAIOCB *acb,
> > +                                      QuorumVoteValue *value)
> > +{
> > +    QuorumVoteVersion *version;
> > +    QuorumVoteItem *item;
> > +    BDRVQuorumState *s = acb->bqs;
> > +
> > +    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
> > +        if (!acb->votes.compare(&version->value, value)) {
> > +            continue;
> > +        }
> > +        QLIST_FOREACH(item, &version->items, next) {
> > +            quorum_print_bad(acb, s->filenames[item->index]);
> > +        }
> > +    }
> > +}
> > +
> > +static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
> > +{
> > +    int i;
> > +    assert(dest->niov == source->niov);
> > +    assert(dest->size == source->size);
> > +    for (i = 0; i < source->niov; i++) {
> > +        assert(dest->iov[i].iov_len == source->iov[i].iov_len);
> > +        memcpy(dest->iov[i].iov_base,
> > +               source->iov[i].iov_base,
> > +               source->iov[i].iov_len);
> > +    }
> > +}
> > +
> > +static void quorum_count_vote(QuorumVotes *votes,
> > +                              QuorumVoteValue *value,
> > +                              int index)
> > +{
> > +    QuorumVoteVersion *v = NULL, *version = NULL;
> > +    QuorumVoteItem *item;
> > +
> > +    /* look if we have something with this hash */
> > +    QLIST_FOREACH(v, &votes->vote_list, next) {
> > +        if (!votes->compare(&v->value, value)) {
> > +            version = v;
> > +            break;
> > +        }
> > +    }
> > +
> > +    /* It's a version not yet in the list add it */
> > +    if (!version) {
> > +        version = g_new0(QuorumVoteVersion, 1);
> > +        QLIST_INIT(&version->items);
> > +        memcpy(&version->value, value, sizeof(version->value));
> > +        version->index = index;
> > +        version->vote_count = 0;
> > +        QLIST_INSERT_HEAD(&votes->vote_list, version, next);
> > +    }
> > +
> > +    version->vote_count++;
> > +
> > +    item = g_new0(QuorumVoteItem, 1);
> > +    item->index = index;
> > +    QLIST_INSERT_HEAD(&version->items, item, next);
> > +}
> > +
> > +static void quorum_free_vote_list(QuorumVotes *votes)
> > +{
> > +    QuorumVoteVersion *version, *next_version;
> > +    QuorumVoteItem *item, *next_item;
> > +
> > +    QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
> > +        QLIST_REMOVE(version, next);
> > +        QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
> > +            QLIST_REMOVE(item, next);
> > +            g_free(item);
> > +        }
> > +        g_free(version);
> > +    }
> > +}
> > +
> > +static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash)
> > +{
> > +    int j, ret;
> > +    gnutls_hash_hd_t dig;
> > +    QEMUIOVector *qiov = &acb->aios[i].qiov;
> > +
> > +    ret = gnutls_hash_init(&dig, GNUTLS_DIG_SHA256);
> > +
> > +    if (ret < 0) {
> > +        return ret;
> > +    }
> > +
> > +    for (j = 0; j < qiov->niov; j++) {
> > +        ret = gnutls_hash(dig, qiov->iov[j].iov_base, qiov->iov[j].iov_len);
> > +        if (ret < 0) {
> > +            return ret;
> 
> I have no clue about this lib, but sure that you don't need to call deinit?
> 
> > +        }
> > +    }
> > +
> > +    gnutls_hash_deinit(dig, (void *) hash);
> > +
> > +    return 0;
> > +}
> > +
> > +static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes)
> > +{
> > +    int i = 0;
> > +    QuorumVoteVersion *candidate, *winner = NULL;
> > +
> > +    QLIST_FOREACH(candidate, &votes->vote_list, next) {
> > +        if (candidate->vote_count > i) {
> > +            i = candidate->vote_count;
> > +            winner = candidate;
> > +        }
> > +    }
> > +
> > +    return winner;
> > +}
> > +
> > +static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
> > +{
> > +    int i;
> > +    int result;
> > +
> > +    assert(a->niov == b->niov);
> > +    for (i = 0; i < a->niov; i++) {
> > +        assert(a->iov[i].iov_len == b->iov[i].iov_len);
> > +        result = memcmp(a->iov[i].iov_base,
> > +                        b->iov[i].iov_base,
> > +                        a->iov[i].iov_len);
> > +        if (result) {
> > +            return false;
> > +        }
> > +    }
> > +
> > +    return true;
> > +}
> 
> qemu_iovec_compare() seems to do exactly the same, except that this
> doesn't return the offset of the first difference.
> 
> Why is it a good idea to duplicate the code?
> 
> > +
> > +static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb,
> > +                                          const char *fmt, ...)
> > +{
> > +    va_list ap;
> > +
> > +    va_start(ap, fmt);
> > +    fprintf(stderr, "quorum: sector_num=%" PRId64 " nb_sectors=%d ",
> > +            acb->sector_num, acb->nb_sectors);
> > +    vfprintf(stderr, fmt, ap);
> > +    fprintf(stderr, "\n");
> > +    va_end(ap);
> > +    exit(1);
> > +}
> > +
> > +static bool quorum_compare(QuorumAIOCB *acb,
> > +                           QEMUIOVector *a,
> > +                           QEMUIOVector *b)
> > +{
> > +    BDRVQuorumState *s = acb->bqs;
> > +    bool blkverify = false;
> > +    ssize_t offset;
> > +
> > +    if (s->total == 2 && s->threshold == 2) {
> > +        blkverify = true;
> > +    }
> > +
> > +    if (blkverify) {
> > +        offset = qemu_iovec_compare(a, b);
> > +        if (offset != -1) {
> > +            quorum_err(acb, "contents mismatch in sector %" PRId64,
> > +                       acb->sector_num +
> > +                       (uint64_t)(offset / BDRV_SECTOR_SIZE));
> > +        }
> > +        return true;
> > +    }
> 
> wtf is happening here? Why an uncommented special case for two images
> that results in an entirely different behaviour? Why split the if block
> and introduce a useless blkverify variable?

I plan that quorum will replace blkverify.

> 
> > +
> > +    return quorum_iovec_compare(a, b);
> > +}
> > +
> > +
> > +static void quorum_vote(QuorumAIOCB *acb)
> > +{
> > +    bool quorum = true;
> > +    int i, j, ret;
> > +    QuorumVoteValue hash;
> > +    BDRVQuorumState *s = acb->bqs;
> > +    QuorumVoteVersion *winner;
> > +
> > +    /* get the index of the first successful read */
> > +    for (i = 0; i < s->total; i++) {
> > +        if (!acb->aios[i].ret) {
> > +            break;
> > +        }
> > +    }
> > +
> > +    /* compare this read with all other successful read looking for quorum */
> > +    for (j = i + 1; j < s->total; j++) {
> > +        if (acb->aios[j].ret) {
> > +            continue;
> > +        }
> > +        quorum = quorum_compare(acb, &acb->aios[i].qiov, &acb->aios[j].qiov);
> > +        if (!quorum) {
> > +            break;
> > +       }
> > +    }
> > +
> > +    /* Every successful read agrees -> Quorum */
> > +    if (quorum) {
> > +        quorum_copy_qiov(acb->qiov, &acb->aios[i].qiov);
> > +        return;
> > +    }
> > +
> > +    /* compute hashs for each successful read, also store indexes */
> > +    for (i = 0; i < s->total; i++) {
> > +        if (acb->aios[i].ret) {
> > +            continue;
> > +        }
> > +        ret = quorum_compute_hash(acb, i, &hash);
> > +        assert(ret == 0);
> 
> So you're claiming that this function cannot possibly fail? Why does it
> even have a return code then?
> 
> > +        quorum_count_vote(&acb->votes, &hash, i);
> > +    }
> > +
> > +    /* vote to select the most represented version */
> > +    winner = quorum_get_vote_winner(&acb->votes);
> > +    assert(winner != NULL);
> 
> Won't this assertion fail if all requests returned an error?
> 
> > +
> > +    /* if the winner count is smaller than threshold read fail */
> 
> s/than/then/
> 
> > +    if (winner->vote_count < s->threshold) {
> > +        quorum_print_failure(acb);
> > +        acb->vote_ret = -EIO;
> > +        fprintf(stderr, "quorum: vote result inferior to threshold\n");
> 
> Block drivers usually shouldn't print anything to stderr or stdout.
> 
> > +        goto free_exit;
> > +    }
> > +
> > +    /* we have a winner: copy it */
> > +    quorum_copy_qiov(acb->qiov, &acb->aios[winner->index].qiov);
> > +
> > +    /* some versions are bad print them */
> > +    quorum_print_bad_versions(acb, &winner->value);
> 
> Same here. Is this driver meant to be used in production or only for
> debugging? Maybe it should have a debug mode that must explicitly be
> enabled and messages are only printed in this mode?
> 
> > +
> > +free_exit:
> > +    /* free lists */
> > +    quorum_free_vote_list(&acb->votes);
> > +}
> > +
> >  static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
> >                                           int64_t sector_num,
> >                                           QEMUIOVector *qiov,
> > @@ -161,6 +460,8 @@ static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
> >                                        nb_sectors, cb, opaque);
> >      int i;
> >  
> > +    acb->vote = quorum_vote;
> > +
> >      for (i = 0; i < s->total; i++) {
> >          acb->aios[i].buf = qemu_blockalign(bs->file, qiov->size);
> >          qemu_iovec_init(&acb->aios[i].qiov, qiov->niov);
> > @@ -168,7 +469,7 @@ static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
> >      }
> >  
> >      for (i = 0; i < s->total; i++) {
> > -        bdrv_aio_readv(s->bs[i], sector_num, qiov, nb_sectors,
> > +        bdrv_aio_readv(s->bs[i], sector_num, &acb->aios[i].qiov, nb_sectors,
> >                         quorum_aio_cb, &acb->aios[i]);
> >      }
> >  
> > diff --git a/configure b/configure
> > index 4ebb60d..0832d26 100755
> > --- a/configure
> > +++ b/configure
> > @@ -1733,6 +1733,28 @@ EOF
> >  fi
> >  
> >  ##########################################
> > +# Quorum gnutls detection
> > +cat > $TMPC <<EOF
> > +#include <gnutls/gnutls.h>
> > +#include <gnutls/crypto.h>
> > +int main(void) {char data[4096], digest[32];
> > +gnutls_hash_fast(GNUTLS_DIG_SHA256, data, 4096, digest);
> > +return 0;
> > +}
> > +EOF
> > +qcow_tls_cflags=`$pkg_config --cflags gnutls 2> /dev/null`
> 
> Wait, what has qcow to do with it?
> 
> > +qcow_tls_libs=`$pkg_config --libs gnutls 2> /dev/null`
> > +if compile_prog "$qcow_tls_cflags" "$qcow_tls_libs" ; then
> > +  qcow_tls=yes
> > +  libs_softmmu="$qcow_tls_libs $libs_softmmu"
> > +  libs_tools="$qcow_tls_libs $libs_softmmu"
> > +  QEMU_CFLAGS="$QEMU_CFLAGS $qcow_tls_cflags"
> > +else
> > +  echo "gnutls > 2.10.0 required to compile QEMU"
> > +  exit 1
> > +fi
> 
> Should this really be a hard dependency? Why not just disable quorum if
> the lib isn't there?
> 
> Kevin
>

Patch

diff --git a/block/quorum.c b/block/quorum.c
index e3c6aad..4c552e4 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -13,8 +13,30 @@ 
  * See the COPYING file in the top-level directory.
  */
 
+#include <gnutls/gnutls.h>
+#include <gnutls/crypto.h>
 #include "block/block_int.h"
 
+#define HASH_LENGTH 32
+
+typedef union QuorumVoteValue {
+    char h[HASH_LENGTH];       /* SHA-256 hash */
+    unsigned long l;  /* simpler hash */
+} QuorumVoteValue;
+
+typedef struct QuorumVoteItem {
+    int index;
+    QLIST_ENTRY(QuorumVoteItem) next;
+} QuorumVoteItem;
+
+typedef struct QuorumVoteVersion {
+    QuorumVoteValue value;
+    int index;
+    int vote_count;
+    QLIST_HEAD(, QuorumVoteItem) items;
+    QLIST_ENTRY(QuorumVoteVersion) next;
+} QuorumVoteVersion;
+
 typedef struct {
     BlockDriverState **bs;
     unsigned long long threshold;
@@ -32,6 +54,11 @@  typedef struct QuorumSingleAIOCB {
     QuorumAIOCB *parent;
 } QuorumSingleAIOCB;
 
+typedef struct QuorumVotes {
+    QLIST_HEAD(, QuorumVoteVersion) vote_list;
+    int (*compare)(QuorumVoteValue *a, QuorumVoteValue *b);
+} QuorumVotes;
+
 struct QuorumAIOCB {
     BlockDriverAIOCB common;
     BDRVQuorumState *bqs;
@@ -48,6 +75,8 @@  struct QuorumAIOCB {
     int success_count;          /* number of successfully completed AIOCB */
     bool *finished;             /* completion signal for cancel */
 
+    QuorumVotes votes;
+
     void (*vote)(QuorumAIOCB *acb);
     int vote_ret;
 };
@@ -84,6 +113,11 @@  static void quorum_aio_bh(void *opaque)
     }
 
     qemu_bh_delete(acb->bh);
+
+    if (acb->vote_ret) {
+        ret = acb->vote_ret;
+    }
+
     acb->common.cb(acb->common.opaque, ret);
     if (acb->finished) {
         *acb->finished = true;
@@ -95,6 +129,11 @@  static void quorum_aio_bh(void *opaque)
     qemu_aio_release(acb);
 }
 
+static int quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
+{
+    return memcmp(a, b, HASH_LENGTH);
+}
+
 static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
                                    BlockDriverState *bs,
                                    QEMUIOVector *qiov,
@@ -118,6 +157,8 @@  static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
     acb->vote = NULL;
     acb->vote_ret = 0;
     acb->finished = NULL;
+    acb->votes.compare = quorum_sha256_compare;
+    QLIST_INIT(&acb->votes.vote_list);
 
     for (i = 0; i < s->total; i++) {
         acb->aios[i].buf = NULL;
@@ -145,10 +186,268 @@  static void quorum_aio_cb(void *opaque, int ret)
         return;
     }
 
+    /* Do the vote */
+    if (acb->vote) {
+        acb->vote(acb);
+    }
+
     acb->bh = qemu_bh_new(quorum_aio_bh, acb);
     qemu_bh_schedule(acb->bh);
 }
 
+static void quorum_print_bad(QuorumAIOCB *acb, const char *filename)
+{
+    fprintf(stderr, "quorum: corrected error in quorum file %s: sector_num=%"
+            PRId64 " nb_sectors=%i\n", filename, acb->sector_num,
+            acb->nb_sectors);
+}
+
+static void quorum_print_failure(QuorumAIOCB *acb)
+{
+    fprintf(stderr, "quorum: failure sector_num=%" PRId64 " nb_sectors=%i\n",
+            acb->sector_num, acb->nb_sectors);
+}
+
+static void quorum_print_bad_versions(QuorumAIOCB *acb,
+                                      QuorumVoteValue *value)
+{
+    QuorumVoteVersion *version;
+    QuorumVoteItem *item;
+    BDRVQuorumState *s = acb->bqs;
+
+    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
+        if (!acb->votes.compare(&version->value, value)) {
+            continue;
+        }
+        QLIST_FOREACH(item, &version->items, next) {
+            quorum_print_bad(acb, s->filenames[item->index]);
+        }
+    }
+}
+
+static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
+{
+    int i;
+    assert(dest->niov == source->niov);
+    assert(dest->size == source->size);
+    for (i = 0; i < source->niov; i++) {
+        assert(dest->iov[i].iov_len == source->iov[i].iov_len);
+        memcpy(dest->iov[i].iov_base,
+               source->iov[i].iov_base,
+               source->iov[i].iov_len);
+    }
+}
+
+static void quorum_count_vote(QuorumVotes *votes,
+                              QuorumVoteValue *value,
+                              int index)
+{
+    QuorumVoteVersion *v = NULL, *version = NULL;
+    QuorumVoteItem *item;
+
+    /* look if we have something with this hash */
+    QLIST_FOREACH(v, &votes->vote_list, next) {
+        if (!votes->compare(&v->value, value)) {
+            version = v;
+            break;
+        }
+    }
+
+    /* It's a version not yet in the list add it */
+    if (!version) {
+        version = g_new0(QuorumVoteVersion, 1);
+        QLIST_INIT(&version->items);
+        memcpy(&version->value, value, sizeof(version->value));
+        version->index = index;
+        version->vote_count = 0;
+        QLIST_INSERT_HEAD(&votes->vote_list, version, next);
+    }
+
+    version->vote_count++;
+
+    item = g_new0(QuorumVoteItem, 1);
+    item->index = index;
+    QLIST_INSERT_HEAD(&version->items, item, next);
+}
+
+static void quorum_free_vote_list(QuorumVotes *votes)
+{
+    QuorumVoteVersion *version, *next_version;
+    QuorumVoteItem *item, *next_item;
+
+    QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
+        QLIST_REMOVE(version, next);
+        QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
+            QLIST_REMOVE(item, next);
+            g_free(item);
+        }
+        g_free(version);
+    }
+}
+
+static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash)
+{
+    int j, ret;
+    gnutls_hash_hd_t dig;
+    QEMUIOVector *qiov = &acb->aios[i].qiov;
+
+    ret = gnutls_hash_init(&dig, GNUTLS_DIG_SHA256);
+
+    if (ret < 0) {
+        return ret;
+    }
+
+    for (j = 0; j < qiov->niov; j++) {
+        ret = gnutls_hash(dig, qiov->iov[j].iov_base, qiov->iov[j].iov_len);
+        if (ret < 0) {
+            return ret;
+        }
+    }
+
+    gnutls_hash_deinit(dig, (void *) hash);
+
+    return 0;
+}
+
+static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes)
+{
+    int i = 0;
+    QuorumVoteVersion *candidate, *winner = NULL;
+
+    QLIST_FOREACH(candidate, &votes->vote_list, next) {
+        if (candidate->vote_count > i) {
+            i = candidate->vote_count;
+            winner = candidate;
+        }
+    }
+
+    return winner;
+}
+
+static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
+{
+    int i;
+    int result;
+
+    assert(a->niov == b->niov);
+    for (i = 0; i < a->niov; i++) {
+        assert(a->iov[i].iov_len == b->iov[i].iov_len);
+        result = memcmp(a->iov[i].iov_base,
+                        b->iov[i].iov_base,
+                        a->iov[i].iov_len);
+        if (result) {
+            return false;
+        }
+    }
+
+    return true;
+}
+
+static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb,
+                                          const char *fmt, ...)
+{
+    va_list ap;
+
+    va_start(ap, fmt);
+    fprintf(stderr, "quorum: sector_num=%" PRId64 " nb_sectors=%d ",
+            acb->sector_num, acb->nb_sectors);
+    vfprintf(stderr, fmt, ap);
+    fprintf(stderr, "\n");
+    va_end(ap);
+    exit(1);
+}
+
+static bool quorum_compare(QuorumAIOCB *acb,
+                           QEMUIOVector *a,
+                           QEMUIOVector *b)
+{
+    BDRVQuorumState *s = acb->bqs;
+    bool blkverify = false;
+    ssize_t offset;
+
+    if (s->total == 2 && s->threshold == 2) {
+        blkverify = true;
+    }
+
+    if (blkverify) {
+        offset = qemu_iovec_compare(a, b);
+        if (offset != -1) {
+            quorum_err(acb, "contents mismatch in sector %" PRId64,
+                       acb->sector_num +
+                       (uint64_t)(offset / BDRV_SECTOR_SIZE));
+        }
+        return true;
+    }
+
+    return quorum_iovec_compare(a, b);
+}
+
+
+static void quorum_vote(QuorumAIOCB *acb)
+{
+    bool quorum = true;
+    int i, j, ret;
+    QuorumVoteValue hash;
+    BDRVQuorumState *s = acb->bqs;
+    QuorumVoteVersion *winner;
+
+    /* get the index of the first successful read */
+    for (i = 0; i < s->total; i++) {
+        if (!acb->aios[i].ret) {
+            break;
+        }
+    }
+
+    /* compare this read with all other successful read looking for quorum */
+    for (j = i + 1; j < s->total; j++) {
+        if (acb->aios[j].ret) {
+            continue;
+        }
+        quorum = quorum_compare(acb, &acb->aios[i].qiov, &acb->aios[j].qiov);
+        if (!quorum) {
+            break;
+       }
+    }
+
+    /* Every successful read agrees -> Quorum */
+    if (quorum) {
+        quorum_copy_qiov(acb->qiov, &acb->aios[i].qiov);
+        return;
+    }
+
+    /* compute hashs for each successful read, also store indexes */
+    for (i = 0; i < s->total; i++) {
+        if (acb->aios[i].ret) {
+            continue;
+        }
+        ret = quorum_compute_hash(acb, i, &hash);
+        assert(ret == 0);
+        quorum_count_vote(&acb->votes, &hash, i);
+    }
+
+    /* vote to select the most represented version */
+    winner = quorum_get_vote_winner(&acb->votes);
+    assert(winner != NULL);
+
+    /* if the winner count is smaller than threshold read fail */
+    if (winner->vote_count < s->threshold) {
+        quorum_print_failure(acb);
+        acb->vote_ret = -EIO;
+        fprintf(stderr, "quorum: vote result inferior to threshold\n");
+        goto free_exit;
+    }
+
+    /* we have a winner: copy it */
+    quorum_copy_qiov(acb->qiov, &acb->aios[winner->index].qiov);
+
+    /* some versions are bad print them */
+    quorum_print_bad_versions(acb, &winner->value);
+
+free_exit:
+    /* free lists */
+    quorum_free_vote_list(&acb->votes);
+}
+
 static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
                                          int64_t sector_num,
                                          QEMUIOVector *qiov,
@@ -161,6 +460,8 @@  static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
                                       nb_sectors, cb, opaque);
     int i;
 
+    acb->vote = quorum_vote;
+
     for (i = 0; i < s->total; i++) {
         acb->aios[i].buf = qemu_blockalign(bs->file, qiov->size);
         qemu_iovec_init(&acb->aios[i].qiov, qiov->niov);
@@ -168,7 +469,7 @@  static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
     }
 
     for (i = 0; i < s->total; i++) {
-        bdrv_aio_readv(s->bs[i], sector_num, qiov, nb_sectors,
+        bdrv_aio_readv(s->bs[i], sector_num, &acb->aios[i].qiov, nb_sectors,
                        quorum_aio_cb, &acb->aios[i]);
     }
 
diff --git a/configure b/configure
index 4ebb60d..0832d26 100755
--- a/configure
+++ b/configure
@@ -1733,6 +1733,28 @@  EOF
 fi
 
 ##########################################
+# Quorum gnutls detection
+cat > $TMPC <<EOF
+#include <gnutls/gnutls.h>
+#include <gnutls/crypto.h>
+int main(void) {char data[4096], digest[32];
+gnutls_hash_fast(GNUTLS_DIG_SHA256, data, 4096, digest);
+return 0;
+}
+EOF
+qcow_tls_cflags=`$pkg_config --cflags gnutls 2> /dev/null`
+qcow_tls_libs=`$pkg_config --libs gnutls 2> /dev/null`
+if compile_prog "$qcow_tls_cflags" "$qcow_tls_libs" ; then
+  qcow_tls=yes
+  libs_softmmu="$qcow_tls_libs $libs_softmmu"
+  libs_tools="$qcow_tls_libs $libs_softmmu"
+  QEMU_CFLAGS="$QEMU_CFLAGS $qcow_tls_cflags"
+else
+  echo "gnutls > 2.10.0 required to compile QEMU"
+  exit 1
+fi
+
+##########################################
 # VNC SASL detection
 if test "$vnc" = "yes" -a "$vnc_sasl" != "no" ; then
   cat > $TMPC <<EOF