diff mbox

[V10,06/13] quorum: Add quorum mechanism.

Message ID 1390927974-31325-7-git-send-email-benoit.canet@irqsave.net
State New
Headers show

Commit Message

Benoît Canet Jan. 28, 2014, 4:52 p.m. UTC
From: Benoît Canet <benoit@irqsave.net>

Use gnutls's SHA-256 to compare versions.

Signed-off-by: Benoit Canet <benoit@irqsave.net>
---
 block/Makefile.objs       |   2 +-
 block/quorum.c            | 333 +++++++++++++++++++++++++++++++++++++++++++++-
 configure                 |  36 +++++
 docs/qmp/qmp-events.txt   |  33 +++++
 include/monitor/monitor.h |   2 +
 monitor.c                 |   2 +
 6 files changed, 406 insertions(+), 2 deletions(-)

Comments

Max Reitz Jan. 31, 2014, 11:04 p.m. UTC | #1
On 28.01.2014 17:52, Benoît Canet wrote:
> From: Benoît Canet <benoit@irqsave.net>
>
> Use gnutls's SHA-256 to compare versions.
>
> Signed-off-by: Benoit Canet <benoit@irqsave.net>
> ---
>   block/Makefile.objs       |   2 +-
>   block/quorum.c            | 333 +++++++++++++++++++++++++++++++++++++++++++++-
>   configure                 |  36 +++++
>   docs/qmp/qmp-events.txt   |  33 +++++
>   include/monitor/monitor.h |   2 +
>   monitor.c                 |   2 +
>   6 files changed, 406 insertions(+), 2 deletions(-)
>
> diff --git a/block/Makefile.objs b/block/Makefile.objs
> index a2650b9..4ca9d43 100644
> --- a/block/Makefile.objs
> +++ b/block/Makefile.objs
> @@ -3,7 +3,7 @@ block-obj-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-c
>   block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
>   block-obj-y += qed-check.o
>   block-obj-$(CONFIG_VHDX) += vhdx.o vhdx-endian.o vhdx-log.o
> -block-obj-y += quorum.o
> +block-obj-$(CONFIG_QUORUM) += quorum.o
>   block-obj-y += parallels.o blkdebug.o blkverify.o
>   block-obj-y += snapshot.o qapi.o
>   block-obj-$(CONFIG_WIN32) += raw-win32.o win32-aio.o
> diff --git a/block/quorum.c b/block/quorum.c
> index 5bf37b3..c319719 100644
> --- a/block/quorum.c
> +++ b/block/quorum.c
> @@ -13,7 +13,43 @@
>    * See the COPYING file in the top-level directory.
>    */
>   
> +#include <gnutls/gnutls.h>
> +#include <gnutls/crypto.h>
>   #include "block/block_int.h"
> +#include "qapi/qmp/qjson.h"
> +
> +#define HASH_LENGTH 32
> +
> +/* This union holds a vote hash value */
> +typedef union QuorumVoteValue {
> +    char h[HASH_LENGTH];       /* SHA-256 hash */
> +    int64_t l;                 /* simpler 64 bits hash */
> +} QuorumVoteValue;
> +
> +/* A vote item */
> +typedef struct QuorumVoteItem {
> +    int index;
> +    QLIST_ENTRY(QuorumVoteItem) next;
> +} QuorumVoteItem;
> +
> +/* this structure is a vote version. A version is the set of votes sharing the
> + * same vote value.
> + * The set of votes will be tracked with the items field and its cardinality is
> + * vote_count.
> + */
> +typedef struct QuorumVoteVersion {
> +    QuorumVoteValue value;
> +    int index;
> +    int vote_count;
> +    QLIST_HEAD(, QuorumVoteItem) items;
> +    QLIST_ENTRY(QuorumVoteVersion) next;
> +} QuorumVoteVersion;
> +
> +/* this structure holds a group of vote versions together */
> +typedef struct QuorumVotes {
> +    QLIST_HEAD(, QuorumVoteVersion) vote_list;
> +    int (*compare)(QuorumVoteValue *a, QuorumVoteValue *b);
> +} QuorumVotes;
>   
>   /* the following structure holds the state of one quorum instance */
>   typedef struct {
> @@ -60,10 +96,14 @@ struct QuorumAIOCB {
>       int success_count;          /* number of successfully completed AIOCB */
>       bool *finished;             /* completion signal for cancel */
>   
> +    QuorumVotes votes;
> +
>       bool is_read;
>       int vote_ret;
>   };
>   
> +static void quorum_vote(QuorumAIOCB *acb);
> +
>   static void quorum_aio_cancel(BlockDriverAIOCB *blockacb)
>   {
>       QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common);
> @@ -111,6 +151,10 @@ static void quorum_aio_finalize(QuorumAIOCB *acb)
>           acb->aios[i].ret = 0;
>       }
>   
> +    if (acb->vote_ret) {
> +        ret = acb->vote_ret;
> +    }

Hm, this makes the vote_ret take precedence over other errors returned 
by the children. If they differ (and both are not 0), we can choose 
between returning either (vote_ret or “real” errors reported by the 
child devices). Both are errors, so I don't see any natural precedence. 
However, generally, vote_ret will contain a more generic error code 
(i.e., -EIO), thus I could imagine the other error code reported by the 
child device to be more appropriate, as it might contain more useful 
information.

But, well, on the other had, Quorum is designed for hiding errors 
reported by a minority of child devices; therefore, hiding such errors 
in this case as well is probably just consequent.

I'll leave this up to you; I'm fine with it as it is and I'd be fine if 
(for whatever reason) you were to change it. :-)

> +
>       acb->common.cb(acb->common.opaque, ret);
>       if (acb->finished) {
>           *acb->finished = true;
> @@ -122,6 +166,11 @@ static void quorum_aio_finalize(QuorumAIOCB *acb)
>       qemu_aio_release(acb);
>   }
>   
> +static int quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
> +{
> +    return memcmp(a->h, b->h, HASH_LENGTH);
> +}
> +
>   static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
>                                      BlockDriverState *bs,
>                                      QEMUIOVector *qiov,
> @@ -141,6 +190,7 @@ static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
>       acb->count = 0;
>       acb->success_count = 0;
>       acb->finished = NULL;
> +    acb->votes.compare = quorum_sha256_compare;
>       acb->is_read = false;
>       acb->vote_ret = 0;
>   
> @@ -170,9 +220,290 @@ static void quorum_aio_cb(void *opaque, int ret)
>           return;
>       }
>   
> +    /* Do the vote on read */
> +    if (acb->is_read) {
> +        quorum_vote(acb);
> +    }
> +
>       quorum_aio_finalize(acb);
>   }
>   
> +static void quorum_report_bad(QuorumAIOCB *acb, char *node_name)
> +{
> +    QObject *data;
> +    data = qobject_from_jsonf("{ 'node-name': \"%s\""
> +                              ", 'sector-num': %" PRId64
> +                              ", 'sectors-count': %i }",
> +                              node_name,
> +                              acb->sector_num,
> +                              acb->nb_sectors);
> +    monitor_protocol_event(QEVENT_QUORUM_REPORT_BAD, data);
> +    qobject_decref(data);
> +}
> +
> +static void quorum_report_failure(QuorumAIOCB *acb)
> +{
> +    QObject *data;
> +    data = qobject_from_jsonf("{ 'sector-num': %" PRId64
> +                              ", 'sectors-count': %i }",
> +                              acb->sector_num,
> +                              acb->nb_sectors);
> +    monitor_protocol_event(QEVENT_QUORUM_FAILURE, data);
> +    qobject_decref(data);
> +}
> +
> +static void quorum_report_bad_versions(BDRVQuorumState *s,
> +                                       QuorumAIOCB *acb,
> +                                       QuorumVoteValue *value)
> +{
> +    QuorumVoteVersion *version;
> +    QuorumVoteItem *item;
> +
> +    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
> +        if (!acb->votes.compare(&version->value, value)) {
> +            continue;
> +        }
> +        QLIST_FOREACH(item, &version->items, next) {
> +            quorum_report_bad(acb, s->bs[item->index]->node_name);
> +        }
> +    }
> +}
> +
> +static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
> +{
> +    int i;
> +    assert(dest->niov == source->niov);
> +    assert(dest->size == source->size);
> +    for (i = 0; i < source->niov; i++) {
> +        assert(dest->iov[i].iov_len == source->iov[i].iov_len);
> +        memcpy(dest->iov[i].iov_base,
> +               source->iov[i].iov_base,
> +               source->iov[i].iov_len);
> +    }
> +}
> +
> +static void quorum_count_vote(QuorumVotes *votes,
> +                              QuorumVoteValue *value,
> +                              int index)
> +{
> +    QuorumVoteVersion *v = NULL, *version = NULL;
> +    QuorumVoteItem *item;
> +
> +    /* look if we have something with this hash */
> +    QLIST_FOREACH(v, &votes->vote_list, next) {
> +        if (!votes->compare(&v->value, value)) {
> +            version = v;
> +            break;
> +        }
> +    }
> +
> +    /* It's a version not yet in the list add it */
> +    if (!version) {
> +        version = g_new0(QuorumVoteVersion, 1);
> +        QLIST_INIT(&version->items);
> +        memcpy(&version->value, value, sizeof(version->value));
> +        version->index = index;
> +        version->vote_count = 0;
> +        QLIST_INSERT_HEAD(&votes->vote_list, version, next);
> +    }
> +
> +    version->vote_count++;
> +
> +    item = g_new0(QuorumVoteItem, 1);
> +    item->index = index;
> +    QLIST_INSERT_HEAD(&version->items, item, next);
> +}
> +
> +static void quorum_free_vote_list(QuorumVotes *votes)
> +{
> +    QuorumVoteVersion *version, *next_version;
> +    QuorumVoteItem *item, *next_item;
> +
> +    QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
> +        QLIST_REMOVE(version, next);
> +        QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
> +            QLIST_REMOVE(item, next);
> +            g_free(item);
> +        }
> +        g_free(version);
> +    }
> +}
> +
> +static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash)
> +{
> +    int j, ret;
> +    gnutls_hash_hd_t dig;
> +    QEMUIOVector *qiov = &acb->aios[i].qiov;
> +
> +    ret = gnutls_hash_init(&dig, GNUTLS_DIG_SHA256);
> +
> +    if (ret < 0) {
> +        return ret;
> +    }
> +
> +    for (j = 0; j < qiov->niov; j++) {
> +        ret = gnutls_hash(dig, qiov->iov[j].iov_base, qiov->iov[j].iov_len);
> +        if (ret < 0) {
> +            break;
> +        }
> +    }
> +
> +    gnutls_hash_deinit(dig, (void *) hash);
> +    return ret;
> +}
> +
> +static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes)
> +{
> +    int i = 0;
> +    QuorumVoteVersion *candidate, *winner = NULL;
> +
> +    QLIST_FOREACH(candidate, &votes->vote_list, next) {
> +        if (candidate->vote_count > i) {
> +            i = candidate->vote_count;
> +            winner = candidate;
> +        }
> +    }
> +
> +    return winner;
> +}
> +
> +static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
> +{
> +    int i;
> +    int result;
> +
> +    assert(a->niov == b->niov);
> +    for (i = 0; i < a->niov; i++) {
> +        assert(a->iov[i].iov_len == b->iov[i].iov_len);
> +        result = memcmp(a->iov[i].iov_base,
> +                        b->iov[i].iov_base,
> +                        a->iov[i].iov_len);
> +        if (result) {
> +            return false;
> +        }
> +    }
> +
> +    return true;
> +}
> +
> +static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb,
> +                                          const char *fmt, ...)
> +{
> +    va_list ap;
> +
> +    va_start(ap, fmt);
> +    fprintf(stderr, "quorum: sector_num=%" PRId64 " nb_sectors=%d ",
> +            acb->sector_num, acb->nb_sectors);
> +    vfprintf(stderr, fmt, ap);
> +    fprintf(stderr, "\n");
> +    va_end(ap);
> +    exit(1);
> +}
> +
> +static bool quorum_compare(QuorumAIOCB *acb,
> +                           QEMUIOVector *a,
> +                           QEMUIOVector *b)
> +{
> +    BDRVQuorumState *s = acb->bqs;
> +    ssize_t offset;
> +
> +    /* This driver will replace blkverify in this particular case */
> +    if (s->is_blkverify) {
> +        offset = qemu_iovec_compare(a, b);
> +        if (offset != -1) {
> +            quorum_err(acb, "contents mismatch in sector %" PRId64,
> +                       acb->sector_num +
> +                       (uint64_t)(offset / BDRV_SECTOR_SIZE));
> +        }
> +        return true;
> +    }
> +
> +    return quorum_iovec_compare(a, b);
> +}
> +
> +static void quorum_vote(QuorumAIOCB *acb)
> +{
> +    bool quorum = false;
> +    int i, j, ret;
> +    QuorumVoteValue hash;
> +    BDRVQuorumState *s = acb->bqs;
> +    QuorumVoteVersion *winner;
> +
> +    QLIST_INIT(&acb->votes.vote_list);
> +
> +    /* if we don't get any successful read use the first error code */
> +    if (!acb->success_count) {
> +        acb->vote_ret = acb->aios[0].ret;
> +        return;
> +    }
> +
> +    /* get the index of the first successful read (we are sure to find one) */
> +    for (i = 0; i < s->total; i++) {
> +        if (!acb->aios[i].ret) {
> +            break;
> +        }
> +    }
> +
> +    /* compare this read with all other successful read looking for quorum */
> +    for (j = i + 1; j < s->total; j++) {
> +        if (acb->aios[j].ret) {
> +            continue;
> +        }
> +        quorum = quorum_compare(acb, &acb->aios[i].qiov, &acb->aios[j].qiov);
> +        if (!quorum) {
> +            break;
> +       }
> +    }
> +
> +    /* Every successful read agrees and their count is higher or equal threshold
> +     * -> Quorum
> +     */
> +    if (quorum && acb->success_count >= s->threshold) {
> +        quorum_copy_qiov(acb->qiov, &acb->aios[i].qiov);
> +        return;
> +    }

Well, for quorum && acb->success_count < s->threshold, calculating and 
comparing hashes doesn't help at all (since the vote winner will only be 
able to get up to acb->success_count votes anyway). We should probably 
catch this case in the first if condition in this function (change it to 
“if (acb->success_count < s->threshold)”).

> +
> +    /* compute hashs for each successful read, also store indexes */
> +    for (i = 0; i < s->total; i++) {
> +        if (acb->aios[i].ret) {
> +            continue;
> +        }
> +        ret = quorum_compute_hash(acb, i, &hash);
> +        /* if ever the hash computation failed */
> +        if (ret < 0) {
> +            acb->vote_ret = ret;
> +            goto free_exit;
> +        }
> +        quorum_count_vote(&acb->votes, &hash, i);
> +    }
> +
> +    /* vote to select the most represented version */
> +    winner = quorum_get_vote_winner(&acb->votes);
> +    /* every vote version are differents -> error */
> +    if (!winner) {
> +        quorum_report_failure(acb);
> +        acb->vote_ret = -EIO;
> +        goto free_exit;
> +    }
> +
> +    /* if the winner count is smaller than threshold the read fails */
> +    if (winner->vote_count < s->threshold) {
> +        quorum_report_failure(acb);
> +        acb->vote_ret = -EIO;
> +        goto free_exit;
> +    }
> +
> +    /* we have a winner: copy it */
> +    quorum_copy_qiov(acb->qiov, &acb->aios[winner->index].qiov);
> +
> +    /* some versions are bad print them */
> +    quorum_report_bad_versions(s, acb, &winner->value);
> +
> +free_exit:
> +    /* free lists */
> +    quorum_free_vote_list(&acb->votes);
> +}
> +
>   static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
>                                            int64_t sector_num,
>                                            QEMUIOVector *qiov,
> @@ -194,7 +525,7 @@ static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
>       }
>   
>       for (i = 0; i < s->total; i++) {
> -        bdrv_aio_readv(s->bs[i], sector_num, qiov, nb_sectors,
> +        bdrv_aio_readv(s->bs[i], sector_num, &acb->aios[i].qiov, nb_sectors,
>                          quorum_aio_cb, &acb->aios[i]);
>       }
>   
> diff --git a/configure b/configure
> index b472694..5a68738 100755
> --- a/configure
> +++ b/configure
> @@ -263,6 +263,7 @@ gtkabi="2.0"
>   tpm="no"
>   libssh2=""
>   vhdx=""
> +quorum="no"
>   
>   # parse CC options first
>   for opt do
> @@ -1000,6 +1001,10 @@ for opt do
>     ;;
>     --disable-vhdx) vhdx="no"
>     ;;
> +  --disable-quorum) quorum="no"
> +  ;;
> +  --enable-quorum) quorum="yes"
> +  ;;
>     *) echo "ERROR: unknown option $opt"; show_help="yes"
>     ;;
>     esac
> @@ -1254,6 +1259,8 @@ Advanced options (experts only):
>     --enable-libssh2         enable ssh block device support
>     --disable-vhdx           disables support for the Microsoft VHDX image format
>     --enable-vhdx            enable support for the Microsoft VHDX image format
> +  --disable-quorum         disable quorum block filter support
> +  --enable-quorum          enable quorum block filter support
>   
>   NOTE: The object files are built at the place where configure is launched
>   EOF
> @@ -1923,6 +1930,30 @@ EOF
>   fi
>   
>   ##########################################
> +# Quorum probe (check for gnutls)
> +if test "$quorum" != "no" ; then
> +cat > $TMPC <<EOF
> +#include <gnutls/gnutls.h>
> +#include <gnutls/crypto.h>
> +int main(void) {char data[4096], digest[32];
> +gnutls_hash_fast(GNUTLS_DIG_SHA256, data, 4096, digest);
> +return 0;
> +}
> +EOF
> +quorum_tls_cflags=`$pkg_config --cflags gnutls 2> /dev/null`
> +quorum_tls_libs=`$pkg_config --libs gnutls 2> /dev/null`
> +if compile_prog "$quorum_tls_cflags" "$quorum_tls_libs" ; then
> +  qcow_tls=yes
> +  libs_softmmu="$quorum_tls_libs $libs_softmmu"
> +  libs_tools="$quorum_tls_libs $libs_softmmu"
> +  QEMU_CFLAGS="$QEMU_CFLAGS $quorum_tls_cflags"
> +else
> +  echo "gnutls > 2.10.0 required to compile Quorum"
> +  exit 1
> +fi
> +fi
> +
> +##########################################
>   # VNC SASL detection
>   if test "$vnc" = "yes" -a "$vnc_sasl" != "no" ; then
>     cat > $TMPC <<EOF
> @@ -3843,6 +3874,7 @@ echo "libssh2 support   $libssh2"
>   echo "TPM passthrough   $tpm_passthrough"
>   echo "QOM debugging     $qom_cast_debug"
>   echo "vhdx              $vhdx"
> +echo "Quorum            $quorum"
>   
>   if test "$sdl_too_old" = "yes"; then
>   echo "-> Your SDL version is too old - please upgrade to have SDL support"
> @@ -4241,6 +4273,10 @@ if test "$libssh2" = "yes" ; then
>     echo "CONFIG_LIBSSH2=y" >> $config_host_mak
>   fi
>   
> +if test "$quorum" = "yes" ; then
> +  echo "CONFIG_QUORUM=y" >> $config_host_mak
> +fi
> +
>   if test "$virtio_blk_data_plane" = "yes" ; then
>     echo 'CONFIG_VIRTIO_BLK_DATA_PLANE=$(CONFIG_VIRTIO)' >> $config_host_mak
>   fi
> diff --git a/docs/qmp/qmp-events.txt b/docs/qmp/qmp-events.txt
> index 6b87e97..1cbdde7 100644
> --- a/docs/qmp/qmp-events.txt
> +++ b/docs/qmp/qmp-events.txt
> @@ -500,3 +500,36 @@ Example:
>   
>   Note: If action is "reset", "shutdown", or "pause" the WATCHDOG event is
>   followed respectively by the RESET, SHUTDOWN, or STOP events.
> +
> +QUORUM_FAILURE
> +--------------
> +
> +Emitted by the Quorum block driver when it fail to establish a quorum.

Probably rather “if” than “when”; also, “fails”.

> +
> +Data:
> +
> +- "sector-num":   Number of the first sector of the failed read operation.
> +- "sector-count": Failed read operation sector count.
> +
> +Example:
> +
> +{ "event": "QUORUM_FAILURE",
> +     "data": { "sector-num": 345435, "sector-count": 5 },
> +     "timestamp": { "seconds": 1344522075, "microseconds": 745528 } }
> +
> +QUORUM_REPORT_BAD
> +-----------------
> +
> +Emitted to report a corruption of a Quorum file.
> +
> +Data:
> +
> +- "node-name":  The graph node name of the block driver state.

The description is not aligned with the others, this is probably due to 
the change from child-index to node-name…?

Max

> +- "sector-num":   Number of the first sector of the failed read operation.
> +- "sector-count": Failed read operation sector count.
> +
> +Example:
> +
> +{ "event": "QUORUM_REPORT_BAD",
> +     "data": { "node-name": "1.raw", "sector-num": 345435, "sector-count": 5 },
> +     "timestamp": { "seconds": 1344522075, "microseconds": 745528 } }
> diff --git a/include/monitor/monitor.h b/include/monitor/monitor.h
> index 7e5f752..a49ea11 100644
> --- a/include/monitor/monitor.h
> +++ b/include/monitor/monitor.h
> @@ -49,6 +49,8 @@ typedef enum MonitorEvent {
>       QEVENT_SPICE_MIGRATE_COMPLETED,
>       QEVENT_GUEST_PANICKED,
>       QEVENT_BLOCK_IMAGE_CORRUPTED,
> +    QEVENT_QUORUM_FAILURE,
> +    QEVENT_QUORUM_REPORT_BAD,
>   
>       /* Add to 'monitor_event_names' array in monitor.c when
>        * defining new events here */
> diff --git a/monitor.c b/monitor.c
> index 80456fb..f8f6aea 100644
> --- a/monitor.c
> +++ b/monitor.c
> @@ -507,6 +507,8 @@ static const char *monitor_event_names[] = {
>       [QEVENT_SPICE_MIGRATE_COMPLETED] = "SPICE_MIGRATE_COMPLETED",
>       [QEVENT_GUEST_PANICKED] = "GUEST_PANICKED",
>       [QEVENT_BLOCK_IMAGE_CORRUPTED] = "BLOCK_IMAGE_CORRUPTED",
> +    [QEVENT_QUORUM_FAILURE] = "QUORUM_FAILURE",
> +    [QEVENT_QUORUM_REPORT_BAD] = "QUORUM_REPORT_BAD",
>   };
>   QEMU_BUILD_BUG_ON(ARRAY_SIZE(monitor_event_names) != QEVENT_MAX)
>
Benoît Canet Feb. 3, 2014, 11:44 a.m. UTC | #2
Le Saturday 01 Feb 2014 à 00:04:01 (+0100), Max Reitz a écrit :
> On 28.01.2014 17:52, Benoît Canet wrote:
> >From: Benoît Canet <benoit@irqsave.net>
> >
> >Use gnutls's SHA-256 to compare versions.
> >
> >Signed-off-by: Benoit Canet <benoit@irqsave.net>
> >---
> >  block/Makefile.objs       |   2 +-
> >  block/quorum.c            | 333 +++++++++++++++++++++++++++++++++++++++++++++-
> >  configure                 |  36 +++++
> >  docs/qmp/qmp-events.txt   |  33 +++++
> >  include/monitor/monitor.h |   2 +
> >  monitor.c                 |   2 +
> >  6 files changed, 406 insertions(+), 2 deletions(-)
> >
> >diff --git a/block/Makefile.objs b/block/Makefile.objs
> >index a2650b9..4ca9d43 100644
> >--- a/block/Makefile.objs
> >+++ b/block/Makefile.objs
> >@@ -3,7 +3,7 @@ block-obj-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-c
> >  block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
> >  block-obj-y += qed-check.o
> >  block-obj-$(CONFIG_VHDX) += vhdx.o vhdx-endian.o vhdx-log.o
> >-block-obj-y += quorum.o
> >+block-obj-$(CONFIG_QUORUM) += quorum.o
> >  block-obj-y += parallels.o blkdebug.o blkverify.o
> >  block-obj-y += snapshot.o qapi.o
> >  block-obj-$(CONFIG_WIN32) += raw-win32.o win32-aio.o
> >diff --git a/block/quorum.c b/block/quorum.c
> >index 5bf37b3..c319719 100644
> >--- a/block/quorum.c
> >+++ b/block/quorum.c
> >@@ -13,7 +13,43 @@
> >   * See the COPYING file in the top-level directory.
> >   */
> >+#include <gnutls/gnutls.h>
> >+#include <gnutls/crypto.h>
> >  #include "block/block_int.h"
> >+#include "qapi/qmp/qjson.h"
> >+
> >+#define HASH_LENGTH 32
> >+
> >+/* This union holds a vote hash value */
> >+typedef union QuorumVoteValue {
> >+    char h[HASH_LENGTH];       /* SHA-256 hash */
> >+    int64_t l;                 /* simpler 64 bits hash */
> >+} QuorumVoteValue;
> >+
> >+/* A vote item */
> >+typedef struct QuorumVoteItem {
> >+    int index;
> >+    QLIST_ENTRY(QuorumVoteItem) next;
> >+} QuorumVoteItem;
> >+
> >+/* this structure is a vote version. A version is the set of votes sharing the
> >+ * same vote value.
> >+ * The set of votes will be tracked with the items field and its cardinality is
> >+ * vote_count.
> >+ */
> >+typedef struct QuorumVoteVersion {
> >+    QuorumVoteValue value;
> >+    int index;
> >+    int vote_count;
> >+    QLIST_HEAD(, QuorumVoteItem) items;
> >+    QLIST_ENTRY(QuorumVoteVersion) next;
> >+} QuorumVoteVersion;
> >+
> >+/* this structure holds a group of vote versions together */
> >+typedef struct QuorumVotes {
> >+    QLIST_HEAD(, QuorumVoteVersion) vote_list;
> >+    int (*compare)(QuorumVoteValue *a, QuorumVoteValue *b);
> >+} QuorumVotes;
> >  /* the following structure holds the state of one quorum instance */
> >  typedef struct {
> >@@ -60,10 +96,14 @@ struct QuorumAIOCB {
> >      int success_count;          /* number of successfully completed AIOCB */
> >      bool *finished;             /* completion signal for cancel */
> >+    QuorumVotes votes;
> >+
> >      bool is_read;
> >      int vote_ret;
> >  };
> >+static void quorum_vote(QuorumAIOCB *acb);
> >+
> >  static void quorum_aio_cancel(BlockDriverAIOCB *blockacb)
> >  {
> >      QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common);
> >@@ -111,6 +151,10 @@ static void quorum_aio_finalize(QuorumAIOCB *acb)
> >          acb->aios[i].ret = 0;
> >      }
> >+    if (acb->vote_ret) {
> >+        ret = acb->vote_ret;
> >+    }
> 
> Hm, this makes the vote_ret take precedence over other errors
> returned by the children. If they differ (and both are not 0), we
> can choose between returning either (vote_ret or “real” errors
> reported by the child devices). Both are errors, so I don't see any
> natural precedence. However, generally, vote_ret will contain a more
> generic error code (i.e., -EIO), thus I could imagine the other
> error code reported by the child device to be more appropriate, as
> it might contain more useful information.
> 
> But, well, on the other had, Quorum is designed for hiding errors
> reported by a minority of child devices; therefore, hiding such
> errors in this case as well is probably just consequent.

Yes the general idea of quorum is to hide errors as long as the majority is reached.
For the case where too many children errors occured to be able to do a vote
there is:
    ret = s->threshold <= acb->success_count ? 0 : quorum_get_first_error(acb); 
Kevin asked me do return the first error instead of -EIO.

> 
> I'll leave this up to you; I'm fine with it as it is and I'd be fine
> if (for whatever reason) you were to change it. :-)
> 
> >+
> >      acb->common.cb(acb->common.opaque, ret);
> >      if (acb->finished) {
> >          *acb->finished = true;
> >@@ -122,6 +166,11 @@ static void quorum_aio_finalize(QuorumAIOCB *acb)
> >      qemu_aio_release(acb);
> >  }
> >+static int quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
> >+{
> >+    return memcmp(a->h, b->h, HASH_LENGTH);
> >+}
> >+
> >  static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
> >                                     BlockDriverState *bs,
> >                                     QEMUIOVector *qiov,
> >@@ -141,6 +190,7 @@ static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
> >      acb->count = 0;
> >      acb->success_count = 0;
> >      acb->finished = NULL;
> >+    acb->votes.compare = quorum_sha256_compare;
> >      acb->is_read = false;
> >      acb->vote_ret = 0;
> >@@ -170,9 +220,290 @@ static void quorum_aio_cb(void *opaque, int ret)
> >          return;
> >      }
> >+    /* Do the vote on read */
> >+    if (acb->is_read) {
> >+        quorum_vote(acb);
> >+    }
> >+
> >      quorum_aio_finalize(acb);
> >  }
> >+static void quorum_report_bad(QuorumAIOCB *acb, char *node_name)
> >+{
> >+    QObject *data;
> >+    data = qobject_from_jsonf("{ 'node-name': \"%s\""
> >+                              ", 'sector-num': %" PRId64
> >+                              ", 'sectors-count': %i }",
> >+                              node_name,
> >+                              acb->sector_num,
> >+                              acb->nb_sectors);
> >+    monitor_protocol_event(QEVENT_QUORUM_REPORT_BAD, data);
> >+    qobject_decref(data);
> >+}
> >+
> >+static void quorum_report_failure(QuorumAIOCB *acb)
> >+{
> >+    QObject *data;
> >+    data = qobject_from_jsonf("{ 'sector-num': %" PRId64
> >+                              ", 'sectors-count': %i }",
> >+                              acb->sector_num,
> >+                              acb->nb_sectors);
> >+    monitor_protocol_event(QEVENT_QUORUM_FAILURE, data);
> >+    qobject_decref(data);
> >+}
> >+
> >+static void quorum_report_bad_versions(BDRVQuorumState *s,
> >+                                       QuorumAIOCB *acb,
> >+                                       QuorumVoteValue *value)
> >+{
> >+    QuorumVoteVersion *version;
> >+    QuorumVoteItem *item;
> >+
> >+    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
> >+        if (!acb->votes.compare(&version->value, value)) {
> >+            continue;
> >+        }
> >+        QLIST_FOREACH(item, &version->items, next) {
> >+            quorum_report_bad(acb, s->bs[item->index]->node_name);
> >+        }
> >+    }
> >+}
> >+
> >+static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
> >+{
> >+    int i;
> >+    assert(dest->niov == source->niov);
> >+    assert(dest->size == source->size);
> >+    for (i = 0; i < source->niov; i++) {
> >+        assert(dest->iov[i].iov_len == source->iov[i].iov_len);
> >+        memcpy(dest->iov[i].iov_base,
> >+               source->iov[i].iov_base,
> >+               source->iov[i].iov_len);
> >+    }
> >+}
> >+
> >+static void quorum_count_vote(QuorumVotes *votes,
> >+                              QuorumVoteValue *value,
> >+                              int index)
> >+{
> >+    QuorumVoteVersion *v = NULL, *version = NULL;
> >+    QuorumVoteItem *item;
> >+
> >+    /* look if we have something with this hash */
> >+    QLIST_FOREACH(v, &votes->vote_list, next) {
> >+        if (!votes->compare(&v->value, value)) {
> >+            version = v;
> >+            break;
> >+        }
> >+    }
> >+
> >+    /* It's a version not yet in the list add it */
> >+    if (!version) {
> >+        version = g_new0(QuorumVoteVersion, 1);
> >+        QLIST_INIT(&version->items);
> >+        memcpy(&version->value, value, sizeof(version->value));
> >+        version->index = index;
> >+        version->vote_count = 0;
> >+        QLIST_INSERT_HEAD(&votes->vote_list, version, next);
> >+    }
> >+
> >+    version->vote_count++;
> >+
> >+    item = g_new0(QuorumVoteItem, 1);
> >+    item->index = index;
> >+    QLIST_INSERT_HEAD(&version->items, item, next);
> >+}
> >+
> >+static void quorum_free_vote_list(QuorumVotes *votes)
> >+{
> >+    QuorumVoteVersion *version, *next_version;
> >+    QuorumVoteItem *item, *next_item;
> >+
> >+    QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
> >+        QLIST_REMOVE(version, next);
> >+        QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
> >+            QLIST_REMOVE(item, next);
> >+            g_free(item);
> >+        }
> >+        g_free(version);
> >+    }
> >+}
> >+
> >+static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash)
> >+{
> >+    int j, ret;
> >+    gnutls_hash_hd_t dig;
> >+    QEMUIOVector *qiov = &acb->aios[i].qiov;
> >+
> >+    ret = gnutls_hash_init(&dig, GNUTLS_DIG_SHA256);
> >+
> >+    if (ret < 0) {
> >+        return ret;
> >+    }
> >+
> >+    for (j = 0; j < qiov->niov; j++) {
> >+        ret = gnutls_hash(dig, qiov->iov[j].iov_base, qiov->iov[j].iov_len);
> >+        if (ret < 0) {
> >+            break;
> >+        }
> >+    }
> >+
> >+    gnutls_hash_deinit(dig, (void *) hash);
> >+    return ret;
> >+}
> >+
> >+static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes)
> >+{
> >+    int i = 0;
> >+    QuorumVoteVersion *candidate, *winner = NULL;
> >+
> >+    QLIST_FOREACH(candidate, &votes->vote_list, next) {
> >+        if (candidate->vote_count > i) {
> >+            i = candidate->vote_count;
> >+            winner = candidate;
> >+        }
> >+    }
> >+
> >+    return winner;
> >+}
> >+
> >+static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
> >+{
> >+    int i;
> >+    int result;
> >+
> >+    assert(a->niov == b->niov);
> >+    for (i = 0; i < a->niov; i++) {
> >+        assert(a->iov[i].iov_len == b->iov[i].iov_len);
> >+        result = memcmp(a->iov[i].iov_base,
> >+                        b->iov[i].iov_base,
> >+                        a->iov[i].iov_len);
> >+        if (result) {
> >+            return false;
> >+        }
> >+    }
> >+
> >+    return true;
> >+}
> >+
> >+static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb,
> >+                                          const char *fmt, ...)
> >+{
> >+    va_list ap;
> >+
> >+    va_start(ap, fmt);
> >+    fprintf(stderr, "quorum: sector_num=%" PRId64 " nb_sectors=%d ",
> >+            acb->sector_num, acb->nb_sectors);
> >+    vfprintf(stderr, fmt, ap);
> >+    fprintf(stderr, "\n");
> >+    va_end(ap);
> >+    exit(1);
> >+}
> >+
> >+static bool quorum_compare(QuorumAIOCB *acb,
> >+                           QEMUIOVector *a,
> >+                           QEMUIOVector *b)
> >+{
> >+    BDRVQuorumState *s = acb->bqs;
> >+    ssize_t offset;
> >+
> >+    /* This driver will replace blkverify in this particular case */
> >+    if (s->is_blkverify) {
> >+        offset = qemu_iovec_compare(a, b);
> >+        if (offset != -1) {
> >+            quorum_err(acb, "contents mismatch in sector %" PRId64,
> >+                       acb->sector_num +
> >+                       (uint64_t)(offset / BDRV_SECTOR_SIZE));
> >+        }
> >+        return true;
> >+    }
> >+
> >+    return quorum_iovec_compare(a, b);
> >+}
> >+
> >+static void quorum_vote(QuorumAIOCB *acb)
> >+{
> >+    bool quorum = false;
> >+    int i, j, ret;
> >+    QuorumVoteValue hash;
> >+    BDRVQuorumState *s = acb->bqs;
> >+    QuorumVoteVersion *winner;
> >+
> >+    QLIST_INIT(&acb->votes.vote_list);
> >+
> >+    /* if we don't get any successful read use the first error code */
> >+    if (!acb->success_count) {
> >+        acb->vote_ret = acb->aios[0].ret;
> >+        return;
> >+    }
> >+
> >+    /* get the index of the first successful read (we are sure to find one) */
> >+    for (i = 0; i < s->total; i++) {
> >+        if (!acb->aios[i].ret) {
> >+            break;
> >+        }
> >+    }
> >+
> >+    /* compare this read with all other successful read looking for quorum */
> >+    for (j = i + 1; j < s->total; j++) {
> >+        if (acb->aios[j].ret) {
> >+            continue;
> >+        }
> >+        quorum = quorum_compare(acb, &acb->aios[i].qiov, &acb->aios[j].qiov);
> >+        if (!quorum) {
> >+            break;
> >+       }
> >+    }
> >+
> >+    /* Every successful read agrees and their count is higher or equal threshold
> >+     * -> Quorum
> >+     */
> >+    if (quorum && acb->success_count >= s->threshold) {
> >+        quorum_copy_qiov(acb->qiov, &acb->aios[i].qiov);
> >+        return;
> >+    }
> 
> Well, for quorum && acb->success_count < s->threshold, calculating
> and comparing hashes doesn't help at all (since the vote winner will
> only be able to get up to acb->success_count votes anyway). We
> should probably catch this case in the first if condition in this
> function (change it to “if (acb->success_count < s->threshold)”).
> 
> >+
> >+    /* compute hashs for each successful read, also store indexes */
> >+    for (i = 0; i < s->total; i++) {
> >+        if (acb->aios[i].ret) {
> >+            continue;
> >+        }
> >+        ret = quorum_compute_hash(acb, i, &hash);
> >+        /* if ever the hash computation failed */
> >+        if (ret < 0) {
> >+            acb->vote_ret = ret;
> >+            goto free_exit;
> >+        }
> >+        quorum_count_vote(&acb->votes, &hash, i);
> >+    }
> >+
> >+    /* vote to select the most represented version */
> >+    winner = quorum_get_vote_winner(&acb->votes);
> >+    /* every vote version are differents -> error */
> >+    if (!winner) {
> >+        quorum_report_failure(acb);
> >+        acb->vote_ret = -EIO;
> >+        goto free_exit;
> >+    }
> >+
> >+    /* if the winner count is smaller than threshold the read fails */
> >+    if (winner->vote_count < s->threshold) {
> >+        quorum_report_failure(acb);
> >+        acb->vote_ret = -EIO;
> >+        goto free_exit;
> >+    }
> >+
> >+    /* we have a winner: copy it */
> >+    quorum_copy_qiov(acb->qiov, &acb->aios[winner->index].qiov);
> >+
> >+    /* some versions are bad print them */
> >+    quorum_report_bad_versions(s, acb, &winner->value);
> >+
> >+free_exit:
> >+    /* free lists */
> >+    quorum_free_vote_list(&acb->votes);
> >+}
> >+
> >  static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
> >                                           int64_t sector_num,
> >                                           QEMUIOVector *qiov,
> >@@ -194,7 +525,7 @@ static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
> >      }
> >      for (i = 0; i < s->total; i++) {
> >-        bdrv_aio_readv(s->bs[i], sector_num, qiov, nb_sectors,
> >+        bdrv_aio_readv(s->bs[i], sector_num, &acb->aios[i].qiov, nb_sectors,
> >                         quorum_aio_cb, &acb->aios[i]);
> >      }
> >diff --git a/configure b/configure
> >index b472694..5a68738 100755
> >--- a/configure
> >+++ b/configure
> >@@ -263,6 +263,7 @@ gtkabi="2.0"
> >  tpm="no"
> >  libssh2=""
> >  vhdx=""
> >+quorum="no"
> >  # parse CC options first
> >  for opt do
> >@@ -1000,6 +1001,10 @@ for opt do
> >    ;;
> >    --disable-vhdx) vhdx="no"
> >    ;;
> >+  --disable-quorum) quorum="no"
> >+  ;;
> >+  --enable-quorum) quorum="yes"
> >+  ;;
> >    *) echo "ERROR: unknown option $opt"; show_help="yes"
> >    ;;
> >    esac
> >@@ -1254,6 +1259,8 @@ Advanced options (experts only):
> >    --enable-libssh2         enable ssh block device support
> >    --disable-vhdx           disables support for the Microsoft VHDX image format
> >    --enable-vhdx            enable support for the Microsoft VHDX image format
> >+  --disable-quorum         disable quorum block filter support
> >+  --enable-quorum          enable quorum block filter support
> >  NOTE: The object files are built at the place where configure is launched
> >  EOF
> >@@ -1923,6 +1930,30 @@ EOF
> >  fi
> >  ##########################################
> >+# Quorum probe (check for gnutls)
> >+if test "$quorum" != "no" ; then
> >+cat > $TMPC <<EOF
> >+#include <gnutls/gnutls.h>
> >+#include <gnutls/crypto.h>
> >+int main(void) {char data[4096], digest[32];
> >+gnutls_hash_fast(GNUTLS_DIG_SHA256, data, 4096, digest);
> >+return 0;
> >+}
> >+EOF
> >+quorum_tls_cflags=`$pkg_config --cflags gnutls 2> /dev/null`
> >+quorum_tls_libs=`$pkg_config --libs gnutls 2> /dev/null`
> >+if compile_prog "$quorum_tls_cflags" "$quorum_tls_libs" ; then
> >+  qcow_tls=yes
> >+  libs_softmmu="$quorum_tls_libs $libs_softmmu"
> >+  libs_tools="$quorum_tls_libs $libs_softmmu"
> >+  QEMU_CFLAGS="$QEMU_CFLAGS $quorum_tls_cflags"
> >+else
> >+  echo "gnutls > 2.10.0 required to compile Quorum"
> >+  exit 1
> >+fi
> >+fi
> >+
> >+##########################################
> >  # VNC SASL detection
> >  if test "$vnc" = "yes" -a "$vnc_sasl" != "no" ; then
> >    cat > $TMPC <<EOF
> >@@ -3843,6 +3874,7 @@ echo "libssh2 support   $libssh2"
> >  echo "TPM passthrough   $tpm_passthrough"
> >  echo "QOM debugging     $qom_cast_debug"
> >  echo "vhdx              $vhdx"
> >+echo "Quorum            $quorum"
> >  if test "$sdl_too_old" = "yes"; then
> >  echo "-> Your SDL version is too old - please upgrade to have SDL support"
> >@@ -4241,6 +4273,10 @@ if test "$libssh2" = "yes" ; then
> >    echo "CONFIG_LIBSSH2=y" >> $config_host_mak
> >  fi
> >+if test "$quorum" = "yes" ; then
> >+  echo "CONFIG_QUORUM=y" >> $config_host_mak
> >+fi
> >+
> >  if test "$virtio_blk_data_plane" = "yes" ; then
> >    echo 'CONFIG_VIRTIO_BLK_DATA_PLANE=$(CONFIG_VIRTIO)' >> $config_host_mak
> >  fi
> >diff --git a/docs/qmp/qmp-events.txt b/docs/qmp/qmp-events.txt
> >index 6b87e97..1cbdde7 100644
> >--- a/docs/qmp/qmp-events.txt
> >+++ b/docs/qmp/qmp-events.txt
> >@@ -500,3 +500,36 @@ Example:
> >  Note: If action is "reset", "shutdown", or "pause" the WATCHDOG event is
> >  followed respectively by the RESET, SHUTDOWN, or STOP events.
> >+
> >+QUORUM_FAILURE
> >+--------------
> >+
> >+Emitted by the Quorum block driver when it fail to establish a quorum.
> 
> Probably rather “if” than “when”; also, “fails”.
> 
> >+
> >+Data:
> >+
> >+- "sector-num":   Number of the first sector of the failed read operation.
> >+- "sector-count": Failed read operation sector count.
> >+
> >+Example:
> >+
> >+{ "event": "QUORUM_FAILURE",
> >+     "data": { "sector-num": 345435, "sector-count": 5 },
> >+     "timestamp": { "seconds": 1344522075, "microseconds": 745528 } }
> >+
> >+QUORUM_REPORT_BAD
> >+-----------------
> >+
> >+Emitted to report a corruption of a Quorum file.
> >+
> >+Data:
> >+
> >+- "node-name":  The graph node name of the block driver state.
> 
> The description is not aligned with the others, this is probably due
> to the change from child-index to node-name…?
> 
> Max
> 
> >+- "sector-num":   Number of the first sector of the failed read operation.
> >+- "sector-count": Failed read operation sector count.
> >+
> >+Example:
> >+
> >+{ "event": "QUORUM_REPORT_BAD",
> >+     "data": { "node-name": "1.raw", "sector-num": 345435, "sector-count": 5 },
> >+     "timestamp": { "seconds": 1344522075, "microseconds": 745528 } }
> >diff --git a/include/monitor/monitor.h b/include/monitor/monitor.h
> >index 7e5f752..a49ea11 100644
> >--- a/include/monitor/monitor.h
> >+++ b/include/monitor/monitor.h
> >@@ -49,6 +49,8 @@ typedef enum MonitorEvent {
> >      QEVENT_SPICE_MIGRATE_COMPLETED,
> >      QEVENT_GUEST_PANICKED,
> >      QEVENT_BLOCK_IMAGE_CORRUPTED,
> >+    QEVENT_QUORUM_FAILURE,
> >+    QEVENT_QUORUM_REPORT_BAD,
> >      /* Add to 'monitor_event_names' array in monitor.c when
> >       * defining new events here */
> >diff --git a/monitor.c b/monitor.c
> >index 80456fb..f8f6aea 100644
> >--- a/monitor.c
> >+++ b/monitor.c
> >@@ -507,6 +507,8 @@ static const char *monitor_event_names[] = {
> >      [QEVENT_SPICE_MIGRATE_COMPLETED] = "SPICE_MIGRATE_COMPLETED",
> >      [QEVENT_GUEST_PANICKED] = "GUEST_PANICKED",
> >      [QEVENT_BLOCK_IMAGE_CORRUPTED] = "BLOCK_IMAGE_CORRUPTED",
> >+    [QEVENT_QUORUM_FAILURE] = "QUORUM_FAILURE",
> >+    [QEVENT_QUORUM_REPORT_BAD] = "QUORUM_REPORT_BAD",
> >  };
> >  QEMU_BUILD_BUG_ON(ARRAY_SIZE(monitor_event_names) != QEVENT_MAX)
> 
>
Max Reitz Feb. 3, 2014, 7:03 p.m. UTC | #3
On 03.02.2014 12:44, Benoît Canet wrote:
> Le Saturday 01 Feb 2014 à 00:04:01 (+0100), Max Reitz a écrit :
>> On 28.01.2014 17:52, Benoît Canet wrote:
>>> From: Benoît Canet <benoit@irqsave.net>
>>>
>>> Use gnutls's SHA-256 to compare versions.
>>>
>>> Signed-off-by: Benoit Canet <benoit@irqsave.net>
>>> ---
>>>   block/Makefile.objs       |   2 +-
>>>   block/quorum.c            | 333 +++++++++++++++++++++++++++++++++++++++++++++-
>>>   configure                 |  36 +++++
>>>   docs/qmp/qmp-events.txt   |  33 +++++
>>>   include/monitor/monitor.h |   2 +
>>>   monitor.c                 |   2 +
>>>   6 files changed, 406 insertions(+), 2 deletions(-)
>>>
>>> diff --git a/block/Makefile.objs b/block/Makefile.objs
>>> index a2650b9..4ca9d43 100644
>>> --- a/block/Makefile.objs
>>> +++ b/block/Makefile.objs
>>> @@ -3,7 +3,7 @@ block-obj-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-c
>>>   block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
>>>   block-obj-y += qed-check.o
>>>   block-obj-$(CONFIG_VHDX) += vhdx.o vhdx-endian.o vhdx-log.o
>>> -block-obj-y += quorum.o
>>> +block-obj-$(CONFIG_QUORUM) += quorum.o
>>>   block-obj-y += parallels.o blkdebug.o blkverify.o
>>>   block-obj-y += snapshot.o qapi.o
>>>   block-obj-$(CONFIG_WIN32) += raw-win32.o win32-aio.o
>>> diff --git a/block/quorum.c b/block/quorum.c
>>> index 5bf37b3..c319719 100644
>>> --- a/block/quorum.c
>>> +++ b/block/quorum.c
>>> @@ -13,7 +13,43 @@
>>>    * See the COPYING file in the top-level directory.
>>>    */
>>> +#include <gnutls/gnutls.h>
>>> +#include <gnutls/crypto.h>
>>>   #include "block/block_int.h"
>>> +#include "qapi/qmp/qjson.h"
>>> +
>>> +#define HASH_LENGTH 32
>>> +
>>> +/* This union holds a vote hash value */
>>> +typedef union QuorumVoteValue {
>>> +    char h[HASH_LENGTH];       /* SHA-256 hash */
>>> +    int64_t l;                 /* simpler 64 bits hash */
>>> +} QuorumVoteValue;
>>> +
>>> +/* A vote item */
>>> +typedef struct QuorumVoteItem {
>>> +    int index;
>>> +    QLIST_ENTRY(QuorumVoteItem) next;
>>> +} QuorumVoteItem;
>>> +
>>> +/* this structure is a vote version. A version is the set of votes sharing the
>>> + * same vote value.
>>> + * The set of votes will be tracked with the items field and its cardinality is
>>> + * vote_count.
>>> + */
>>> +typedef struct QuorumVoteVersion {
>>> +    QuorumVoteValue value;
>>> +    int index;
>>> +    int vote_count;
>>> +    QLIST_HEAD(, QuorumVoteItem) items;
>>> +    QLIST_ENTRY(QuorumVoteVersion) next;
>>> +} QuorumVoteVersion;
>>> +
>>> +/* this structure holds a group of vote versions together */
>>> +typedef struct QuorumVotes {
>>> +    QLIST_HEAD(, QuorumVoteVersion) vote_list;
>>> +    int (*compare)(QuorumVoteValue *a, QuorumVoteValue *b);
>>> +} QuorumVotes;
>>>   /* the following structure holds the state of one quorum instance */
>>>   typedef struct {
>>> @@ -60,10 +96,14 @@ struct QuorumAIOCB {
>>>       int success_count;          /* number of successfully completed AIOCB */
>>>       bool *finished;             /* completion signal for cancel */
>>> +    QuorumVotes votes;
>>> +
>>>       bool is_read;
>>>       int vote_ret;
>>>   };
>>> +static void quorum_vote(QuorumAIOCB *acb);
>>> +
>>>   static void quorum_aio_cancel(BlockDriverAIOCB *blockacb)
>>>   {
>>>       QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common);
>>> @@ -111,6 +151,10 @@ static void quorum_aio_finalize(QuorumAIOCB *acb)
>>>           acb->aios[i].ret = 0;
>>>       }
>>> +    if (acb->vote_ret) {
>>> +        ret = acb->vote_ret;
>>> +    }
>> Hm, this makes the vote_ret take precedence over other errors
>> returned by the children. If they differ (and both are not 0), we
>> can choose between returning either (vote_ret or “real” errors
>> reported by the child devices). Both are errors, so I don't see any
>> natural precedence. However, generally, vote_ret will contain a more
>> generic error code (i.e., -EIO), thus I could imagine the other
>> error code reported by the child device to be more appropriate, as
>> it might contain more useful information.
>>
>> But, well, on the other had, Quorum is designed for hiding errors
>> reported by a minority of child devices; therefore, hiding such
>> errors in this case as well is probably just consequent.
> Yes the general idea of quorum is to hide errors as long as the majority is reached.
> For the case where too many children errors occured to be able to do a vote
> there is:
>      ret = s->threshold <= acb->success_count ? 0 : quorum_get_first_error(acb);
> Kevin asked me do return the first error instead of -EIO.

That's right, but due to this condition here, that first error will be 
overwritten with acb->vote_ret (which will be -EIO, most probably).

Max

>> I'll leave this up to you; I'm fine with it as it is and I'd be fine
>> if (for whatever reason) you were to change it. :-)
>>
>>> +
>>>       acb->common.cb(acb->common.opaque, ret);
>>>       if (acb->finished) {
>>>           *acb->finished = true;
>>> @@ -122,6 +166,11 @@ static void quorum_aio_finalize(QuorumAIOCB *acb)
>>>       qemu_aio_release(acb);
>>>   }
>>> +static int quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
>>> +{
>>> +    return memcmp(a->h, b->h, HASH_LENGTH);
>>> +}
>>> +
>>>   static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
>>>                                      BlockDriverState *bs,
>>>                                      QEMUIOVector *qiov,
>>> @@ -141,6 +190,7 @@ static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
>>>       acb->count = 0;
>>>       acb->success_count = 0;
>>>       acb->finished = NULL;
>>> +    acb->votes.compare = quorum_sha256_compare;
>>>       acb->is_read = false;
>>>       acb->vote_ret = 0;
>>> @@ -170,9 +220,290 @@ static void quorum_aio_cb(void *opaque, int ret)
>>>           return;
>>>       }
>>> +    /* Do the vote on read */
>>> +    if (acb->is_read) {
>>> +        quorum_vote(acb);
>>> +    }
>>> +
>>>       quorum_aio_finalize(acb);
>>>   }
>>> +static void quorum_report_bad(QuorumAIOCB *acb, char *node_name)
>>> +{
>>> +    QObject *data;
>>> +    data = qobject_from_jsonf("{ 'node-name': \"%s\""
>>> +                              ", 'sector-num': %" PRId64
>>> +                              ", 'sectors-count': %i }",
>>> +                              node_name,
>>> +                              acb->sector_num,
>>> +                              acb->nb_sectors);
>>> +    monitor_protocol_event(QEVENT_QUORUM_REPORT_BAD, data);
>>> +    qobject_decref(data);
>>> +}
>>> +
>>> +static void quorum_report_failure(QuorumAIOCB *acb)
>>> +{
>>> +    QObject *data;
>>> +    data = qobject_from_jsonf("{ 'sector-num': %" PRId64
>>> +                              ", 'sectors-count': %i }",
>>> +                              acb->sector_num,
>>> +                              acb->nb_sectors);
>>> +    monitor_protocol_event(QEVENT_QUORUM_FAILURE, data);
>>> +    qobject_decref(data);
>>> +}
>>> +
>>> +static void quorum_report_bad_versions(BDRVQuorumState *s,
>>> +                                       QuorumAIOCB *acb,
>>> +                                       QuorumVoteValue *value)
>>> +{
>>> +    QuorumVoteVersion *version;
>>> +    QuorumVoteItem *item;
>>> +
>>> +    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
>>> +        if (!acb->votes.compare(&version->value, value)) {
>>> +            continue;
>>> +        }
>>> +        QLIST_FOREACH(item, &version->items, next) {
>>> +            quorum_report_bad(acb, s->bs[item->index]->node_name);
>>> +        }
>>> +    }
>>> +}
>>> +
>>> +static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
>>> +{
>>> +    int i;
>>> +    assert(dest->niov == source->niov);
>>> +    assert(dest->size == source->size);
>>> +    for (i = 0; i < source->niov; i++) {
>>> +        assert(dest->iov[i].iov_len == source->iov[i].iov_len);
>>> +        memcpy(dest->iov[i].iov_base,
>>> +               source->iov[i].iov_base,
>>> +               source->iov[i].iov_len);
>>> +    }
>>> +}
>>> +
>>> +static void quorum_count_vote(QuorumVotes *votes,
>>> +                              QuorumVoteValue *value,
>>> +                              int index)
>>> +{
>>> +    QuorumVoteVersion *v = NULL, *version = NULL;
>>> +    QuorumVoteItem *item;
>>> +
>>> +    /* look if we have something with this hash */
>>> +    QLIST_FOREACH(v, &votes->vote_list, next) {
>>> +        if (!votes->compare(&v->value, value)) {
>>> +            version = v;
>>> +            break;
>>> +        }
>>> +    }
>>> +
>>> +    /* It's a version not yet in the list add it */
>>> +    if (!version) {
>>> +        version = g_new0(QuorumVoteVersion, 1);
>>> +        QLIST_INIT(&version->items);
>>> +        memcpy(&version->value, value, sizeof(version->value));
>>> +        version->index = index;
>>> +        version->vote_count = 0;
>>> +        QLIST_INSERT_HEAD(&votes->vote_list, version, next);
>>> +    }
>>> +
>>> +    version->vote_count++;
>>> +
>>> +    item = g_new0(QuorumVoteItem, 1);
>>> +    item->index = index;
>>> +    QLIST_INSERT_HEAD(&version->items, item, next);
>>> +}
>>> +
>>> +static void quorum_free_vote_list(QuorumVotes *votes)
>>> +{
>>> +    QuorumVoteVersion *version, *next_version;
>>> +    QuorumVoteItem *item, *next_item;
>>> +
>>> +    QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
>>> +        QLIST_REMOVE(version, next);
>>> +        QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
>>> +            QLIST_REMOVE(item, next);
>>> +            g_free(item);
>>> +        }
>>> +        g_free(version);
>>> +    }
>>> +}
>>> +
>>> +static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash)
>>> +{
>>> +    int j, ret;
>>> +    gnutls_hash_hd_t dig;
>>> +    QEMUIOVector *qiov = &acb->aios[i].qiov;
>>> +
>>> +    ret = gnutls_hash_init(&dig, GNUTLS_DIG_SHA256);
>>> +
>>> +    if (ret < 0) {
>>> +        return ret;
>>> +    }
>>> +
>>> +    for (j = 0; j < qiov->niov; j++) {
>>> +        ret = gnutls_hash(dig, qiov->iov[j].iov_base, qiov->iov[j].iov_len);
>>> +        if (ret < 0) {
>>> +            break;
>>> +        }
>>> +    }
>>> +
>>> +    gnutls_hash_deinit(dig, (void *) hash);
>>> +    return ret;
>>> +}
>>> +
>>> +static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes)
>>> +{
>>> +    int i = 0;
>>> +    QuorumVoteVersion *candidate, *winner = NULL;
>>> +
>>> +    QLIST_FOREACH(candidate, &votes->vote_list, next) {
>>> +        if (candidate->vote_count > i) {
>>> +            i = candidate->vote_count;
>>> +            winner = candidate;
>>> +        }
>>> +    }
>>> +
>>> +    return winner;
>>> +}
>>> +
>>> +static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
>>> +{
>>> +    int i;
>>> +    int result;
>>> +
>>> +    assert(a->niov == b->niov);
>>> +    for (i = 0; i < a->niov; i++) {
>>> +        assert(a->iov[i].iov_len == b->iov[i].iov_len);
>>> +        result = memcmp(a->iov[i].iov_base,
>>> +                        b->iov[i].iov_base,
>>> +                        a->iov[i].iov_len);
>>> +        if (result) {
>>> +            return false;
>>> +        }
>>> +    }
>>> +
>>> +    return true;
>>> +}
>>> +
>>> +static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb,
>>> +                                          const char *fmt, ...)
>>> +{
>>> +    va_list ap;
>>> +
>>> +    va_start(ap, fmt);
>>> +    fprintf(stderr, "quorum: sector_num=%" PRId64 " nb_sectors=%d ",
>>> +            acb->sector_num, acb->nb_sectors);
>>> +    vfprintf(stderr, fmt, ap);
>>> +    fprintf(stderr, "\n");
>>> +    va_end(ap);
>>> +    exit(1);
>>> +}
>>> +
>>> +static bool quorum_compare(QuorumAIOCB *acb,
>>> +                           QEMUIOVector *a,
>>> +                           QEMUIOVector *b)
>>> +{
>>> +    BDRVQuorumState *s = acb->bqs;
>>> +    ssize_t offset;
>>> +
>>> +    /* This driver will replace blkverify in this particular case */
>>> +    if (s->is_blkverify) {
>>> +        offset = qemu_iovec_compare(a, b);
>>> +        if (offset != -1) {
>>> +            quorum_err(acb, "contents mismatch in sector %" PRId64,
>>> +                       acb->sector_num +
>>> +                       (uint64_t)(offset / BDRV_SECTOR_SIZE));
>>> +        }
>>> +        return true;
>>> +    }
>>> +
>>> +    return quorum_iovec_compare(a, b);
>>> +}
>>> +
>>> +static void quorum_vote(QuorumAIOCB *acb)
>>> +{
>>> +    bool quorum = false;
>>> +    int i, j, ret;
>>> +    QuorumVoteValue hash;
>>> +    BDRVQuorumState *s = acb->bqs;
>>> +    QuorumVoteVersion *winner;
>>> +
>>> +    QLIST_INIT(&acb->votes.vote_list);
>>> +
>>> +    /* if we don't get any successful read use the first error code */
>>> +    if (!acb->success_count) {
>>> +        acb->vote_ret = acb->aios[0].ret;
>>> +        return;
>>> +    }
>>> +
>>> +    /* get the index of the first successful read (we are sure to find one) */
>>> +    for (i = 0; i < s->total; i++) {
>>> +        if (!acb->aios[i].ret) {
>>> +            break;
>>> +        }
>>> +    }
>>> +
>>> +    /* compare this read with all other successful read looking for quorum */
>>> +    for (j = i + 1; j < s->total; j++) {
>>> +        if (acb->aios[j].ret) {
>>> +            continue;
>>> +        }
>>> +        quorum = quorum_compare(acb, &acb->aios[i].qiov, &acb->aios[j].qiov);
>>> +        if (!quorum) {
>>> +            break;
>>> +       }
>>> +    }
>>> +
>>> +    /* Every successful read agrees and their count is higher or equal threshold
>>> +     * -> Quorum
>>> +     */
>>> +    if (quorum && acb->success_count >= s->threshold) {
>>> +        quorum_copy_qiov(acb->qiov, &acb->aios[i].qiov);
>>> +        return;
>>> +    }
>> Well, for quorum && acb->success_count < s->threshold, calculating
>> and comparing hashes doesn't help at all (since the vote winner will
>> only be able to get up to acb->success_count votes anyway). We
>> should probably catch this case in the first if condition in this
>> function (change it to “if (acb->success_count < s->threshold)”).
>>
>>> +
>>> +    /* compute hashs for each successful read, also store indexes */
>>> +    for (i = 0; i < s->total; i++) {
>>> +        if (acb->aios[i].ret) {
>>> +            continue;
>>> +        }
>>> +        ret = quorum_compute_hash(acb, i, &hash);
>>> +        /* if ever the hash computation failed */
>>> +        if (ret < 0) {
>>> +            acb->vote_ret = ret;
>>> +            goto free_exit;
>>> +        }
>>> +        quorum_count_vote(&acb->votes, &hash, i);
>>> +    }
>>> +
>>> +    /* vote to select the most represented version */
>>> +    winner = quorum_get_vote_winner(&acb->votes);
>>> +    /* every vote version are differents -> error */
>>> +    if (!winner) {
>>> +        quorum_report_failure(acb);
>>> +        acb->vote_ret = -EIO;
>>> +        goto free_exit;
>>> +    }
>>> +
>>> +    /* if the winner count is smaller than threshold the read fails */
>>> +    if (winner->vote_count < s->threshold) {
>>> +        quorum_report_failure(acb);
>>> +        acb->vote_ret = -EIO;
>>> +        goto free_exit;
>>> +    }
>>> +
>>> +    /* we have a winner: copy it */
>>> +    quorum_copy_qiov(acb->qiov, &acb->aios[winner->index].qiov);
>>> +
>>> +    /* some versions are bad print them */
>>> +    quorum_report_bad_versions(s, acb, &winner->value);
>>> +
>>> +free_exit:
>>> +    /* free lists */
>>> +    quorum_free_vote_list(&acb->votes);
>>> +}
>>> +
>>>   static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
>>>                                            int64_t sector_num,
>>>                                            QEMUIOVector *qiov,
>>> @@ -194,7 +525,7 @@ static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
>>>       }
>>>       for (i = 0; i < s->total; i++) {
>>> -        bdrv_aio_readv(s->bs[i], sector_num, qiov, nb_sectors,
>>> +        bdrv_aio_readv(s->bs[i], sector_num, &acb->aios[i].qiov, nb_sectors,
>>>                          quorum_aio_cb, &acb->aios[i]);
>>>       }
>>> diff --git a/configure b/configure
>>> index b472694..5a68738 100755
>>> --- a/configure
>>> +++ b/configure
>>> @@ -263,6 +263,7 @@ gtkabi="2.0"
>>>   tpm="no"
>>>   libssh2=""
>>>   vhdx=""
>>> +quorum="no"
>>>   # parse CC options first
>>>   for opt do
>>> @@ -1000,6 +1001,10 @@ for opt do
>>>     ;;
>>>     --disable-vhdx) vhdx="no"
>>>     ;;
>>> +  --disable-quorum) quorum="no"
>>> +  ;;
>>> +  --enable-quorum) quorum="yes"
>>> +  ;;
>>>     *) echo "ERROR: unknown option $opt"; show_help="yes"
>>>     ;;
>>>     esac
>>> @@ -1254,6 +1259,8 @@ Advanced options (experts only):
>>>     --enable-libssh2         enable ssh block device support
>>>     --disable-vhdx           disables support for the Microsoft VHDX image format
>>>     --enable-vhdx            enable support for the Microsoft VHDX image format
>>> +  --disable-quorum         disable quorum block filter support
>>> +  --enable-quorum          enable quorum block filter support
>>>   NOTE: The object files are built at the place where configure is launched
>>>   EOF
>>> @@ -1923,6 +1930,30 @@ EOF
>>>   fi
>>>   ##########################################
>>> +# Quorum probe (check for gnutls)
>>> +if test "$quorum" != "no" ; then
>>> +cat > $TMPC <<EOF
>>> +#include <gnutls/gnutls.h>
>>> +#include <gnutls/crypto.h>
>>> +int main(void) {char data[4096], digest[32];
>>> +gnutls_hash_fast(GNUTLS_DIG_SHA256, data, 4096, digest);
>>> +return 0;
>>> +}
>>> +EOF
>>> +quorum_tls_cflags=`$pkg_config --cflags gnutls 2> /dev/null`
>>> +quorum_tls_libs=`$pkg_config --libs gnutls 2> /dev/null`
>>> +if compile_prog "$quorum_tls_cflags" "$quorum_tls_libs" ; then
>>> +  qcow_tls=yes
>>> +  libs_softmmu="$quorum_tls_libs $libs_softmmu"
>>> +  libs_tools="$quorum_tls_libs $libs_softmmu"
>>> +  QEMU_CFLAGS="$QEMU_CFLAGS $quorum_tls_cflags"
>>> +else
>>> +  echo "gnutls > 2.10.0 required to compile Quorum"
>>> +  exit 1
>>> +fi
>>> +fi
>>> +
>>> +##########################################
>>>   # VNC SASL detection
>>>   if test "$vnc" = "yes" -a "$vnc_sasl" != "no" ; then
>>>     cat > $TMPC <<EOF
>>> @@ -3843,6 +3874,7 @@ echo "libssh2 support   $libssh2"
>>>   echo "TPM passthrough   $tpm_passthrough"
>>>   echo "QOM debugging     $qom_cast_debug"
>>>   echo "vhdx              $vhdx"
>>> +echo "Quorum            $quorum"
>>>   if test "$sdl_too_old" = "yes"; then
>>>   echo "-> Your SDL version is too old - please upgrade to have SDL support"
>>> @@ -4241,6 +4273,10 @@ if test "$libssh2" = "yes" ; then
>>>     echo "CONFIG_LIBSSH2=y" >> $config_host_mak
>>>   fi
>>> +if test "$quorum" = "yes" ; then
>>> +  echo "CONFIG_QUORUM=y" >> $config_host_mak
>>> +fi
>>> +
>>>   if test "$virtio_blk_data_plane" = "yes" ; then
>>>     echo 'CONFIG_VIRTIO_BLK_DATA_PLANE=$(CONFIG_VIRTIO)' >> $config_host_mak
>>>   fi
>>> diff --git a/docs/qmp/qmp-events.txt b/docs/qmp/qmp-events.txt
>>> index 6b87e97..1cbdde7 100644
>>> --- a/docs/qmp/qmp-events.txt
>>> +++ b/docs/qmp/qmp-events.txt
>>> @@ -500,3 +500,36 @@ Example:
>>>   Note: If action is "reset", "shutdown", or "pause" the WATCHDOG event is
>>>   followed respectively by the RESET, SHUTDOWN, or STOP events.
>>> +
>>> +QUORUM_FAILURE
>>> +--------------
>>> +
>>> +Emitted by the Quorum block driver when it fail to establish a quorum.
>> Probably rather “if” than “when”; also, “fails”.
>>
>>> +
>>> +Data:
>>> +
>>> +- "sector-num":   Number of the first sector of the failed read operation.
>>> +- "sector-count": Failed read operation sector count.
>>> +
>>> +Example:
>>> +
>>> +{ "event": "QUORUM_FAILURE",
>>> +     "data": { "sector-num": 345435, "sector-count": 5 },
>>> +     "timestamp": { "seconds": 1344522075, "microseconds": 745528 } }
>>> +
>>> +QUORUM_REPORT_BAD
>>> +-----------------
>>> +
>>> +Emitted to report a corruption of a Quorum file.
>>> +
>>> +Data:
>>> +
>>> +- "node-name":  The graph node name of the block driver state.
>> The description is not aligned with the others, this is probably due
>> to the change from child-index to node-name…?
>>
>> Max
>>
>>> +- "sector-num":   Number of the first sector of the failed read operation.
>>> +- "sector-count": Failed read operation sector count.
>>> +
>>> +Example:
>>> +
>>> +{ "event": "QUORUM_REPORT_BAD",
>>> +     "data": { "node-name": "1.raw", "sector-num": 345435, "sector-count": 5 },
>>> +     "timestamp": { "seconds": 1344522075, "microseconds": 745528 } }
>>> diff --git a/include/monitor/monitor.h b/include/monitor/monitor.h
>>> index 7e5f752..a49ea11 100644
>>> --- a/include/monitor/monitor.h
>>> +++ b/include/monitor/monitor.h
>>> @@ -49,6 +49,8 @@ typedef enum MonitorEvent {
>>>       QEVENT_SPICE_MIGRATE_COMPLETED,
>>>       QEVENT_GUEST_PANICKED,
>>>       QEVENT_BLOCK_IMAGE_CORRUPTED,
>>> +    QEVENT_QUORUM_FAILURE,
>>> +    QEVENT_QUORUM_REPORT_BAD,
>>>       /* Add to 'monitor_event_names' array in monitor.c when
>>>        * defining new events here */
>>> diff --git a/monitor.c b/monitor.c
>>> index 80456fb..f8f6aea 100644
>>> --- a/monitor.c
>>> +++ b/monitor.c
>>> @@ -507,6 +507,8 @@ static const char *monitor_event_names[] = {
>>>       [QEVENT_SPICE_MIGRATE_COMPLETED] = "SPICE_MIGRATE_COMPLETED",
>>>       [QEVENT_GUEST_PANICKED] = "GUEST_PANICKED",
>>>       [QEVENT_BLOCK_IMAGE_CORRUPTED] = "BLOCK_IMAGE_CORRUPTED",
>>> +    [QEVENT_QUORUM_FAILURE] = "QUORUM_FAILURE",
>>> +    [QEVENT_QUORUM_REPORT_BAD] = "QUORUM_REPORT_BAD",
>>>   };
>>>   QEMU_BUILD_BUG_ON(ARRAY_SIZE(monitor_event_names) != QEVENT_MAX)
>>
diff mbox

Patch

diff --git a/block/Makefile.objs b/block/Makefile.objs
index a2650b9..4ca9d43 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -3,7 +3,7 @@  block-obj-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-c
 block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
 block-obj-y += qed-check.o
 block-obj-$(CONFIG_VHDX) += vhdx.o vhdx-endian.o vhdx-log.o
-block-obj-y += quorum.o
+block-obj-$(CONFIG_QUORUM) += quorum.o
 block-obj-y += parallels.o blkdebug.o blkverify.o
 block-obj-y += snapshot.o qapi.o
 block-obj-$(CONFIG_WIN32) += raw-win32.o win32-aio.o
diff --git a/block/quorum.c b/block/quorum.c
index 5bf37b3..c319719 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -13,7 +13,43 @@ 
  * See the COPYING file in the top-level directory.
  */
 
+#include <gnutls/gnutls.h>
+#include <gnutls/crypto.h>
 #include "block/block_int.h"
+#include "qapi/qmp/qjson.h"
+
+#define HASH_LENGTH 32
+
+/* This union holds a vote hash value */
+typedef union QuorumVoteValue {
+    char h[HASH_LENGTH];       /* SHA-256 hash */
+    int64_t l;                 /* simpler 64 bits hash */
+} QuorumVoteValue;
+
+/* A vote item */
+typedef struct QuorumVoteItem {
+    int index;
+    QLIST_ENTRY(QuorumVoteItem) next;
+} QuorumVoteItem;
+
+/* this structure is a vote version. A version is the set of votes sharing the
+ * same vote value.
+ * The set of votes will be tracked with the items field and its cardinality is
+ * vote_count.
+ */
+typedef struct QuorumVoteVersion {
+    QuorumVoteValue value;
+    int index;
+    int vote_count;
+    QLIST_HEAD(, QuorumVoteItem) items;
+    QLIST_ENTRY(QuorumVoteVersion) next;
+} QuorumVoteVersion;
+
+/* this structure holds a group of vote versions together */
+typedef struct QuorumVotes {
+    QLIST_HEAD(, QuorumVoteVersion) vote_list;
+    int (*compare)(QuorumVoteValue *a, QuorumVoteValue *b);
+} QuorumVotes;
 
 /* the following structure holds the state of one quorum instance */
 typedef struct {
@@ -60,10 +96,14 @@  struct QuorumAIOCB {
     int success_count;          /* number of successfully completed AIOCB */
     bool *finished;             /* completion signal for cancel */
 
+    QuorumVotes votes;
+
     bool is_read;
     int vote_ret;
 };
 
+static void quorum_vote(QuorumAIOCB *acb);
+
 static void quorum_aio_cancel(BlockDriverAIOCB *blockacb)
 {
     QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common);
@@ -111,6 +151,10 @@  static void quorum_aio_finalize(QuorumAIOCB *acb)
         acb->aios[i].ret = 0;
     }
 
+    if (acb->vote_ret) {
+        ret = acb->vote_ret;
+    }
+
     acb->common.cb(acb->common.opaque, ret);
     if (acb->finished) {
         *acb->finished = true;
@@ -122,6 +166,11 @@  static void quorum_aio_finalize(QuorumAIOCB *acb)
     qemu_aio_release(acb);
 }
 
+static int quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
+{
+    return memcmp(a->h, b->h, HASH_LENGTH);
+}
+
 static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
                                    BlockDriverState *bs,
                                    QEMUIOVector *qiov,
@@ -141,6 +190,7 @@  static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
     acb->count = 0;
     acb->success_count = 0;
     acb->finished = NULL;
+    acb->votes.compare = quorum_sha256_compare;
     acb->is_read = false;
     acb->vote_ret = 0;
 
@@ -170,9 +220,290 @@  static void quorum_aio_cb(void *opaque, int ret)
         return;
     }
 
+    /* Do the vote on read */
+    if (acb->is_read) {
+        quorum_vote(acb);
+    }
+
     quorum_aio_finalize(acb);
 }
 
+static void quorum_report_bad(QuorumAIOCB *acb, char *node_name)
+{
+    QObject *data;
+    data = qobject_from_jsonf("{ 'node-name': \"%s\""
+                              ", 'sector-num': %" PRId64
+                              ", 'sectors-count': %i }",
+                              node_name,
+                              acb->sector_num,
+                              acb->nb_sectors);
+    monitor_protocol_event(QEVENT_QUORUM_REPORT_BAD, data);
+    qobject_decref(data);
+}
+
+static void quorum_report_failure(QuorumAIOCB *acb)
+{
+    QObject *data;
+    data = qobject_from_jsonf("{ 'sector-num': %" PRId64
+                              ", 'sectors-count': %i }",
+                              acb->sector_num,
+                              acb->nb_sectors);
+    monitor_protocol_event(QEVENT_QUORUM_FAILURE, data);
+    qobject_decref(data);
+}
+
+static void quorum_report_bad_versions(BDRVQuorumState *s,
+                                       QuorumAIOCB *acb,
+                                       QuorumVoteValue *value)
+{
+    QuorumVoteVersion *version;
+    QuorumVoteItem *item;
+
+    QLIST_FOREACH(version, &acb->votes.vote_list, next) {
+        if (!acb->votes.compare(&version->value, value)) {
+            continue;
+        }
+        QLIST_FOREACH(item, &version->items, next) {
+            quorum_report_bad(acb, s->bs[item->index]->node_name);
+        }
+    }
+}
+
+static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
+{
+    int i;
+    assert(dest->niov == source->niov);
+    assert(dest->size == source->size);
+    for (i = 0; i < source->niov; i++) {
+        assert(dest->iov[i].iov_len == source->iov[i].iov_len);
+        memcpy(dest->iov[i].iov_base,
+               source->iov[i].iov_base,
+               source->iov[i].iov_len);
+    }
+}
+
+static void quorum_count_vote(QuorumVotes *votes,
+                              QuorumVoteValue *value,
+                              int index)
+{
+    QuorumVoteVersion *v = NULL, *version = NULL;
+    QuorumVoteItem *item;
+
+    /* look if we have something with this hash */
+    QLIST_FOREACH(v, &votes->vote_list, next) {
+        if (!votes->compare(&v->value, value)) {
+            version = v;
+            break;
+        }
+    }
+
+    /* It's a version not yet in the list add it */
+    if (!version) {
+        version = g_new0(QuorumVoteVersion, 1);
+        QLIST_INIT(&version->items);
+        memcpy(&version->value, value, sizeof(version->value));
+        version->index = index;
+        version->vote_count = 0;
+        QLIST_INSERT_HEAD(&votes->vote_list, version, next);
+    }
+
+    version->vote_count++;
+
+    item = g_new0(QuorumVoteItem, 1);
+    item->index = index;
+    QLIST_INSERT_HEAD(&version->items, item, next);
+}
+
+static void quorum_free_vote_list(QuorumVotes *votes)
+{
+    QuorumVoteVersion *version, *next_version;
+    QuorumVoteItem *item, *next_item;
+
+    QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
+        QLIST_REMOVE(version, next);
+        QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
+            QLIST_REMOVE(item, next);
+            g_free(item);
+        }
+        g_free(version);
+    }
+}
+
+static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash)
+{
+    int j, ret;
+    gnutls_hash_hd_t dig;
+    QEMUIOVector *qiov = &acb->aios[i].qiov;
+
+    ret = gnutls_hash_init(&dig, GNUTLS_DIG_SHA256);
+
+    if (ret < 0) {
+        return ret;
+    }
+
+    for (j = 0; j < qiov->niov; j++) {
+        ret = gnutls_hash(dig, qiov->iov[j].iov_base, qiov->iov[j].iov_len);
+        if (ret < 0) {
+            break;
+        }
+    }
+
+    gnutls_hash_deinit(dig, (void *) hash);
+    return ret;
+}
+
+static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes)
+{
+    int i = 0;
+    QuorumVoteVersion *candidate, *winner = NULL;
+
+    QLIST_FOREACH(candidate, &votes->vote_list, next) {
+        if (candidate->vote_count > i) {
+            i = candidate->vote_count;
+            winner = candidate;
+        }
+    }
+
+    return winner;
+}
+
+static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
+{
+    int i;
+    int result;
+
+    assert(a->niov == b->niov);
+    for (i = 0; i < a->niov; i++) {
+        assert(a->iov[i].iov_len == b->iov[i].iov_len);
+        result = memcmp(a->iov[i].iov_base,
+                        b->iov[i].iov_base,
+                        a->iov[i].iov_len);
+        if (result) {
+            return false;
+        }
+    }
+
+    return true;
+}
+
+static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb,
+                                          const char *fmt, ...)
+{
+    va_list ap;
+
+    va_start(ap, fmt);
+    fprintf(stderr, "quorum: sector_num=%" PRId64 " nb_sectors=%d ",
+            acb->sector_num, acb->nb_sectors);
+    vfprintf(stderr, fmt, ap);
+    fprintf(stderr, "\n");
+    va_end(ap);
+    exit(1);
+}
+
+static bool quorum_compare(QuorumAIOCB *acb,
+                           QEMUIOVector *a,
+                           QEMUIOVector *b)
+{
+    BDRVQuorumState *s = acb->bqs;
+    ssize_t offset;
+
+    /* This driver will replace blkverify in this particular case */
+    if (s->is_blkverify) {
+        offset = qemu_iovec_compare(a, b);
+        if (offset != -1) {
+            quorum_err(acb, "contents mismatch in sector %" PRId64,
+                       acb->sector_num +
+                       (uint64_t)(offset / BDRV_SECTOR_SIZE));
+        }
+        return true;
+    }
+
+    return quorum_iovec_compare(a, b);
+}
+
+static void quorum_vote(QuorumAIOCB *acb)
+{
+    bool quorum = false;
+    int i, j, ret;
+    QuorumVoteValue hash;
+    BDRVQuorumState *s = acb->bqs;
+    QuorumVoteVersion *winner;
+
+    QLIST_INIT(&acb->votes.vote_list);
+
+    /* if we don't get any successful read use the first error code */
+    if (!acb->success_count) {
+        acb->vote_ret = acb->aios[0].ret;
+        return;
+    }
+
+    /* get the index of the first successful read (we are sure to find one) */
+    for (i = 0; i < s->total; i++) {
+        if (!acb->aios[i].ret) {
+            break;
+        }
+    }
+
+    /* compare this read with all other successful read looking for quorum */
+    for (j = i + 1; j < s->total; j++) {
+        if (acb->aios[j].ret) {
+            continue;
+        }
+        quorum = quorum_compare(acb, &acb->aios[i].qiov, &acb->aios[j].qiov);
+        if (!quorum) {
+            break;
+       }
+    }
+
+    /* Every successful read agrees and their count is higher or equal threshold
+     * -> Quorum
+     */
+    if (quorum && acb->success_count >= s->threshold) {
+        quorum_copy_qiov(acb->qiov, &acb->aios[i].qiov);
+        return;
+    }
+
+    /* compute hashs for each successful read, also store indexes */
+    for (i = 0; i < s->total; i++) {
+        if (acb->aios[i].ret) {
+            continue;
+        }
+        ret = quorum_compute_hash(acb, i, &hash);
+        /* if ever the hash computation failed */
+        if (ret < 0) {
+            acb->vote_ret = ret;
+            goto free_exit;
+        }
+        quorum_count_vote(&acb->votes, &hash, i);
+    }
+
+    /* vote to select the most represented version */
+    winner = quorum_get_vote_winner(&acb->votes);
+    /* every vote version are differents -> error */
+    if (!winner) {
+        quorum_report_failure(acb);
+        acb->vote_ret = -EIO;
+        goto free_exit;
+    }
+
+    /* if the winner count is smaller than threshold the read fails */
+    if (winner->vote_count < s->threshold) {
+        quorum_report_failure(acb);
+        acb->vote_ret = -EIO;
+        goto free_exit;
+    }
+
+    /* we have a winner: copy it */
+    quorum_copy_qiov(acb->qiov, &acb->aios[winner->index].qiov);
+
+    /* some versions are bad print them */
+    quorum_report_bad_versions(s, acb, &winner->value);
+
+free_exit:
+    /* free lists */
+    quorum_free_vote_list(&acb->votes);
+}
+
 static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
                                          int64_t sector_num,
                                          QEMUIOVector *qiov,
@@ -194,7 +525,7 @@  static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
     }
 
     for (i = 0; i < s->total; i++) {
-        bdrv_aio_readv(s->bs[i], sector_num, qiov, nb_sectors,
+        bdrv_aio_readv(s->bs[i], sector_num, &acb->aios[i].qiov, nb_sectors,
                        quorum_aio_cb, &acb->aios[i]);
     }
 
diff --git a/configure b/configure
index b472694..5a68738 100755
--- a/configure
+++ b/configure
@@ -263,6 +263,7 @@  gtkabi="2.0"
 tpm="no"
 libssh2=""
 vhdx=""
+quorum="no"
 
 # parse CC options first
 for opt do
@@ -1000,6 +1001,10 @@  for opt do
   ;;
   --disable-vhdx) vhdx="no"
   ;;
+  --disable-quorum) quorum="no"
+  ;;
+  --enable-quorum) quorum="yes"
+  ;;
   *) echo "ERROR: unknown option $opt"; show_help="yes"
   ;;
   esac
@@ -1254,6 +1259,8 @@  Advanced options (experts only):
   --enable-libssh2         enable ssh block device support
   --disable-vhdx           disables support for the Microsoft VHDX image format
   --enable-vhdx            enable support for the Microsoft VHDX image format
+  --disable-quorum         disable quorum block filter support
+  --enable-quorum          enable quorum block filter support
 
 NOTE: The object files are built at the place where configure is launched
 EOF
@@ -1923,6 +1930,30 @@  EOF
 fi
 
 ##########################################
+# Quorum probe (check for gnutls)
+if test "$quorum" != "no" ; then
+cat > $TMPC <<EOF
+#include <gnutls/gnutls.h>
+#include <gnutls/crypto.h>
+int main(void) {char data[4096], digest[32];
+gnutls_hash_fast(GNUTLS_DIG_SHA256, data, 4096, digest);
+return 0;
+}
+EOF
+quorum_tls_cflags=`$pkg_config --cflags gnutls 2> /dev/null`
+quorum_tls_libs=`$pkg_config --libs gnutls 2> /dev/null`
+if compile_prog "$quorum_tls_cflags" "$quorum_tls_libs" ; then
+  qcow_tls=yes
+  libs_softmmu="$quorum_tls_libs $libs_softmmu"
+  libs_tools="$quorum_tls_libs $libs_softmmu"
+  QEMU_CFLAGS="$QEMU_CFLAGS $quorum_tls_cflags"
+else
+  echo "gnutls > 2.10.0 required to compile Quorum"
+  exit 1
+fi
+fi
+
+##########################################
 # VNC SASL detection
 if test "$vnc" = "yes" -a "$vnc_sasl" != "no" ; then
   cat > $TMPC <<EOF
@@ -3843,6 +3874,7 @@  echo "libssh2 support   $libssh2"
 echo "TPM passthrough   $tpm_passthrough"
 echo "QOM debugging     $qom_cast_debug"
 echo "vhdx              $vhdx"
+echo "Quorum            $quorum"
 
 if test "$sdl_too_old" = "yes"; then
 echo "-> Your SDL version is too old - please upgrade to have SDL support"
@@ -4241,6 +4273,10 @@  if test "$libssh2" = "yes" ; then
   echo "CONFIG_LIBSSH2=y" >> $config_host_mak
 fi
 
+if test "$quorum" = "yes" ; then
+  echo "CONFIG_QUORUM=y" >> $config_host_mak
+fi
+
 if test "$virtio_blk_data_plane" = "yes" ; then
   echo 'CONFIG_VIRTIO_BLK_DATA_PLANE=$(CONFIG_VIRTIO)' >> $config_host_mak
 fi
diff --git a/docs/qmp/qmp-events.txt b/docs/qmp/qmp-events.txt
index 6b87e97..1cbdde7 100644
--- a/docs/qmp/qmp-events.txt
+++ b/docs/qmp/qmp-events.txt
@@ -500,3 +500,36 @@  Example:
 
 Note: If action is "reset", "shutdown", or "pause" the WATCHDOG event is
 followed respectively by the RESET, SHUTDOWN, or STOP events.
+
+QUORUM_FAILURE
+--------------
+
+Emitted by the Quorum block driver when it fail to establish a quorum.
+
+Data:
+
+- "sector-num":   Number of the first sector of the failed read operation.
+- "sector-count": Failed read operation sector count.
+
+Example:
+
+{ "event": "QUORUM_FAILURE",
+     "data": { "sector-num": 345435, "sector-count": 5 },
+     "timestamp": { "seconds": 1344522075, "microseconds": 745528 } }
+
+QUORUM_REPORT_BAD
+-----------------
+
+Emitted to report a corruption of a Quorum file.
+
+Data:
+
+- "node-name":  The graph node name of the block driver state.
+- "sector-num":   Number of the first sector of the failed read operation.
+- "sector-count": Failed read operation sector count.
+
+Example:
+
+{ "event": "QUORUM_REPORT_BAD",
+     "data": { "node-name": "1.raw", "sector-num": 345435, "sector-count": 5 },
+     "timestamp": { "seconds": 1344522075, "microseconds": 745528 } }
diff --git a/include/monitor/monitor.h b/include/monitor/monitor.h
index 7e5f752..a49ea11 100644
--- a/include/monitor/monitor.h
+++ b/include/monitor/monitor.h
@@ -49,6 +49,8 @@  typedef enum MonitorEvent {
     QEVENT_SPICE_MIGRATE_COMPLETED,
     QEVENT_GUEST_PANICKED,
     QEVENT_BLOCK_IMAGE_CORRUPTED,
+    QEVENT_QUORUM_FAILURE,
+    QEVENT_QUORUM_REPORT_BAD,
 
     /* Add to 'monitor_event_names' array in monitor.c when
      * defining new events here */
diff --git a/monitor.c b/monitor.c
index 80456fb..f8f6aea 100644
--- a/monitor.c
+++ b/monitor.c
@@ -507,6 +507,8 @@  static const char *monitor_event_names[] = {
     [QEVENT_SPICE_MIGRATE_COMPLETED] = "SPICE_MIGRATE_COMPLETED",
     [QEVENT_GUEST_PANICKED] = "GUEST_PANICKED",
     [QEVENT_BLOCK_IMAGE_CORRUPTED] = "BLOCK_IMAGE_CORRUPTED",
+    [QEVENT_QUORUM_FAILURE] = "QUORUM_FAILURE",
+    [QEVENT_QUORUM_REPORT_BAD] = "QUORUM_REPORT_BAD",
 };
 QEMU_BUILD_BUG_ON(ARRAY_SIZE(monitor_event_names) != QEVENT_MAX)