diff mbox

[v3,RFC] block/vxhs: Initial commit to add Veritas HyperScale VxHS block device support

Message ID 1471149312-28148-1-git-send-email-ashish.mittal@veritas.com
State New
Headers show

Commit Message

Ashish Mittal Aug. 14, 2016, 4:35 a.m. UTC
This patch adds support for a new block device type called "vxhs".
Source code for the library that this code loads can be downloaded from:
https://github.com/MittalAshish/libqnio.git

Sample command line with a vxhs block device specification:
./qemu-system-x86_64 -name instance-00000008 -S -vnc 0.0.0.0:0 -k en-us -vga cirrus -device virtio-balloon-pci,id=balloon0,bus=pci.0,addr=0x5 -msg timestamp=on 'json:{"driver":"vxhs","vdisk_id":"{c3e9095a-a5ee-4dce-afeb-2a59fb387410}","server":[{"host":"172.172.17.4","port":"9999"},{"host":"172.172.17.2","port":"9999"}]}'

Signed-off-by: Ashish Mittal <ashish.mittal@veritas.com>
---
v3 changelog:

Comments

no-reply@patchew.org Aug. 14, 2016, 4:41 a.m. UTC | #1
Hi,

Your series seems to have some coding style problems. See output below for
more information:

Message-id: 1471149312-28148-1-git-send-email-ashish.mittal@veritas.com
Subject: [Qemu-devel] [PATCH v3 RFC] block/vxhs: Initial commit to add Veritas HyperScale VxHS block device support
Type: series

=== TEST SCRIPT BEGIN ===
#!/bin/bash

BASE=base
n=1
total=$(git log --oneline $BASE.. | wc -l)
failed=0

commits="$(git log --format=%H --reverse $BASE..)"
for c in $commits; do
    echo "Checking PATCH $n/$total: $(git show --no-patch --format=%s $c)..."
    if ! git show $c --format=email | ./scripts/checkpatch.pl --mailback -; then
        failed=1
        echo
    fi
    n=$((n+1))
done

exit $failed
=== TEST SCRIPT END ===

Updating 3c8cf5a9c21ff8782164d1def7f44bd888713384
From https://github.com/patchew-project/qemu
 * [new tag]         patchew/1471149312-28148-1-git-send-email-ashish.mittal@veritas.com -> patchew/1471149312-28148-1-git-send-email-ashish.mittal@veritas.com
Switched to a new branch 'test'
7b11227 block/vxhs: Initial commit to add Veritas HyperScale VxHS block device support

=== OUTPUT BEGIN ===
Checking PATCH 1/1: block/vxhs: Initial commit to add Veritas HyperScale VxHS block device support...
WARNING: line over 80 characters
#305: FILE: block/vxhs.c:212:
+            trace_vxhs_iio_callback_fail(reason, acb, acb->segments, acb->size, error);

WARNING: line over 80 characters
#341: FILE: block/vxhs.c:248:
+        trace_vxhs_iio_callback_ready(((BDRVVXHSState *)ctx)->vdisk_guid, error);

ERROR: spaces required around that '+' (ctx:VxV)
#608: FILE: block/vxhs.c:515:
+            trace_vxhs_parse_json_hostinfo(i+1, vxhsconf->host, vxhsconf->port);
                                             ^

ERROR: spaces required around that '==' (ctx:VxV)
#669: FILE: block/vxhs.c:576:
+        if (i==0 && (strstr(uri->path, "vxhs") == NULL)) {
              ^

ERROR: spaces required around that '+' (ctx:VxV)
#670: FILE: block/vxhs.c:577:
+            conf->vdisk_id = g_new0(char, strlen(uri->path)+3);
                                                            ^

ERROR: spaces required around that '+' (ctx:VxV)
#675: FILE: block/vxhs.c:582:
+        trace_vxhs_parse_uri_hostinfo(i+1, vxhsconf->host, vxhsconf->port);
                                        ^

ERROR: suspect code indent for conditional statements (4, 4)
#1265: FILE: block/vxhs.c:1172:
+    if (res != 0) {
+    trace_vxhs_switch_storage_agent_failed(

ERROR: g_free(NULL) is safe this check is probably not required
#1379: FILE: block/vxhs.c:1286:
+    if (of_vsa_addr) {
+        g_free(of_vsa_addr);

ERROR: g_free(NULL) is safe this check is probably not required
#1382: FILE: block/vxhs.c:1289:
+    if (file_name) {
+        g_free(file_name);

total: 7 errors, 2 warnings, 1902 lines checked

Your patch has style problems, please review.  If any of these errors
are false positives report them to the maintainer, see
CHECKPATCH in MAINTAINERS.

=== OUTPUT END ===

Test command exited with code: 1


---
Email generated automatically by Patchew [http://patchew.org/].
Please send your feedback to patchew-devel@freelists.org
no-reply@patchew.org Aug. 14, 2016, 4:43 a.m. UTC | #2
Hi,

Your series failed automatic build test. Please find the testing commands and
their output below. If you have docker installed, you can probably reproduce it
locally.

Message-id: 1471149312-28148-1-git-send-email-ashish.mittal@veritas.com
Subject: [Qemu-devel] [PATCH v3 RFC] block/vxhs: Initial commit to add Veritas HyperScale VxHS block device support
Type: series

=== TEST SCRIPT BEGIN ===
#!/bin/bash
set -e
git submodule update --init dtc
make J=8 docker-test-quick@centos6

# we need CURL DPRINTF patch
# http://patchew.org/QEMU/1470027888-24381-1-git-send-email-famz%40redhat.com/
#make J=8 docker-test-mingw@fedora
=== TEST SCRIPT END ===

Updating 3c8cf5a9c21ff8782164d1def7f44bd888713384
Switched to a new branch 'test'
7b11227 block/vxhs: Initial commit to add Veritas HyperScale VxHS block device support

=== OUTPUT BEGIN ===
Submodule 'dtc' (git://git.qemu-project.org/dtc.git) registered for path 'dtc'
Cloning into 'dtc'...
Submodule path 'dtc': checked out '65cc4d2748a2c2e6f27f1cf39e07a5dbabd80ebf'
  BUILD centos6
  ARCHIVE qemu.tgz
  ARCHIVE dtc.tgz
  COPY RUNNER
  RUN test-quick in centos6
No C++ compiler available; disabling C++ specific optional code

ERROR: User requested feature vxhs block device
       configure was not able to find it.
       Install libqnio. See github

tests/docker/Makefile.include:104: recipe for target 'docker-run-test-quick@centos6' failed
make: *** [docker-run-test-quick@centos6] Error 1
=== OUTPUT END ===

Test command exited with code: 2


---
Email generated automatically by Patchew [http://patchew.org/].
Please send your feedback to patchew-devel@freelists.org
Daniel P. Berrangé Aug. 15, 2016, 10:20 a.m. UTC | #3
On Sat, Aug 13, 2016 at 09:35:12PM -0700, Ashish Mittal wrote:
> This patch adds support for a new block device type called "vxhs".
> Source code for the library that this code loads can be downloaded from:
> https://github.com/MittalAshish/libqnio.git
> 
> Sample command line with a vxhs block device specification:
> ./qemu-system-x86_64 -name instance-00000008 -S -vnc 0.0.0.0:0 -k en-us -vga cirrus -device virtio-balloon-pci,id=balloon0,bus=pci.0,addr=0x5 -msg timestamp=on 'json:{"driver":"vxhs","vdisk_id":"{c3e9095a-a5ee-4dce-afeb-2a59fb387410}","server":[{"host":"172.172.17.4","port":"9999"},{"host":"172.172.17.2","port":"9999"}]}'
> 
> Signed-off-by: Ashish Mittal <ashish.mittal@veritas.com>
> ---
> v3 changelog:
> =============
> (1) Implemented QAPI interface for passing VxHS block device parameters.
> 
> Sample trace o/p after filtering out libqnio debug messages:
> 
> ./qemu-system-x86_64 -trace enable=vxhs* -name instance-00000008 -S -vnc 0.0.0.0:0 -k en-us -vga cirrus -device virtio-balloon-pci,id=balloon0,bus=pci.0,addr=0x5 -msg timestamp=on 'json:{"driver":"vxhs","vdisk_id":"/{c3e9095a-a5ee-4dce-afeb-2a59fb387410}","server":[{"host":"172.172.17.4","port":"9999"},{"host":"172.172.17.2","port":"9999"}]}'
> 2149@1471122960.293340:vxhs_parse_json vdisk_id from json /{c3e9095a-a5ee-4dce-afeb-2a59fb387410}
> 2149@1471122960.293359:vxhs_parse_json_numservers Number of servers passed = 2
> 2149@1471122960.293369:vxhs_parse_json_hostinfo Host 1: IP 172.172.17.4, Port 9999
> 2149@1471122960.293379:vxhs_parse_json_hostinfo Host 2: IP 172.172.17.2, Port 9999
> 2149@1471122960.293382:vxhs_open vxhs_vxhs_open: came in to open file = (null)
> 2149@1471122960.301444:vxhs_setup_qnio Context to HyperScale IO manager = 0x7f0996fd3920
> 2149@1471122960.301499:vxhs_open_device Adding host 172.172.17.4:9999 to BDRVVXHSState
> 2149@1471122960.301512:vxhs_open_device Adding host 172.172.17.2:9999 to BDRVVXHSState
> 2149@1471122960.305062:vxhs_get_vdisk_stat vDisk /{c3e9095a-a5ee-4dce-afeb-2a59fb387410} stat ioctl returned size 429
> ...
> 
> TODO: Fixes to issues raised by review comments from Stefan and Kevin.
> 
>  block/Makefile.objs  |    2 +
>  block/trace-events   |   46 ++
>  block/vxhs.c         | 1424 ++++++++++++++++++++++++++++++++++++++++++++++++++
>  block/vxhs.h         |  293 +++++++++++
>  configure            |   50 ++
>  qapi/block-core.json |   22 +-
>  6 files changed, 1835 insertions(+), 2 deletions(-)
>  create mode 100644 block/vxhs.c
>  create mode 100644 block/vxhs.h



> diff --git a/block/vxhs.c b/block/vxhs.c
> new file mode 100644
> index 0000000..3960ae5
> --- /dev/null
> +++ b/block/vxhs.c


> +
> +static QemuOptsList runtime_opts = {
> +    .name = "vxhs",
> +    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
> +    .desc = {
> +        {
> +            .name = VXHS_OPT_FILENAME,
> +            .type = QEMU_OPT_STRING,
> +            .help = "URI to the Veritas HyperScale image",
> +        },
> +        { /* end of list */ }
> +    },
> +};
> +
> +static QemuOptsList runtime_tcp_opts = {
> +    .name = "vxhs_tcp",
> +    .head = QTAILQ_HEAD_INITIALIZER(runtime_tcp_opts.head),
> +    .desc = {
> +        {
> +            .name = VXHS_OPT_HOST,
> +            .type = QEMU_OPT_STRING,
> +            .help = "host address (ipv4 addresses)",
> +        },
> +        {
> +            .name = VXHS_OPT_PORT,
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "port number on which VxHSD is listening (default 9999)",
> +        },
> +        {
> +            .name = "to",
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "max port number, not supported by VxHS",
> +        },
> +        {
> +            .name = "ipv4",
> +            .type = QEMU_OPT_BOOL,
> +            .help = "ipv4 bool value, not supported by VxHS",
> +        },
> +        {
> +            .name = "ipv6",
> +            .type = QEMU_OPT_BOOL,
> +            .help = "ipv6 bool value, not supported by VxHS",
> +        },
> +        { /* end of list */ }
> +    },
> +};
> +
> +static QemuOptsList runtime_json_opts = {
> +    .name = "vxhs_json",
> +    .head = QTAILQ_HEAD_INITIALIZER(runtime_json_opts.head),
> +    .desc = {
> +        {
> +            .name = VXHS_OPT_VDISK_ID,
> +            .type = QEMU_OPT_STRING,
> +            .help = "UUID of the VxHS vdisk",
> +        },
> +        { /* end of list */ }
> +    },
> +};
> +
> +
> +/*
> + * Convert the json formatted command line into qapi.
> +*/
> +
> +static int vxhs_parse_json(BlockdevOptionsVxHS *conf,
> +                                  QDict *options, Error **errp)
> +{
> +    QemuOpts *opts;
> +    InetSocketAddress *vxhsconf;
> +    InetSocketAddressList *curr = NULL;
> +    QDict *backing_options = NULL;
> +    Error *local_err = NULL;
> +    char *str = NULL;
> +    const char *ptr = NULL;
> +    size_t num_servers;
> +    int i;
> +
> +    /* create opts info from runtime_json_opts list */
> +    opts = qemu_opts_create(&runtime_json_opts, NULL, 0, &error_abort);
> +    qemu_opts_absorb_qdict(opts, options, &local_err);
> +    if (local_err) {
> +        goto out;
> +    }
> +
> +    ptr = qemu_opt_get(opts, VXHS_OPT_VDISK_ID);
> +    if (!ptr) {
> +        error_setg(&local_err, QERR_MISSING_PARAMETER, VXHS_OPT_VDISK_ID);
> +        goto out;
> +    }
> +    conf->vdisk_id = g_strdup(ptr);
> +    trace_vxhs_parse_json(ptr);
> +
> +    num_servers = qdict_array_entries(options, VXHS_OPT_SERVER);
> +    if (num_servers < 1) {
> +        error_setg(&local_err, QERR_MISSING_PARAMETER, "server");
> +        goto out;
> +    }
> +    trace_vxhs_parse_json_numservers(num_servers);
> +    qemu_opts_del(opts);
> +
> +    for (i = 0; i < num_servers; i++) {
> +            str = g_strdup_printf(VXHS_OPT_SERVER_PATTERN"%d.", i);
> +            qdict_extract_subqdict(options, &backing_options, str);
> +
> +            /* create opts info from runtime_tcp_opts list */
> +            opts = qemu_opts_create(&runtime_tcp_opts, NULL, 0, &error_abort);
> +            qemu_opts_absorb_qdict(opts, backing_options, &local_err);
> +            if (local_err) {
> +                goto out;
> +            }
> +
> +            vxhsconf = g_new0(InetSocketAddress, 1);
> +            ptr = qemu_opt_get(opts, VXHS_OPT_HOST);
> +            if (!ptr) {
> +                error_setg(&local_err, QERR_MISSING_PARAMETER,
> +                           VXHS_OPT_HOST);
> +                error_append_hint(&local_err, GERR_INDEX_HINT, i);
> +                goto out;
> +            }
> +            vxhsconf->host = g_strdup(ptr);
> +
> +            ptr = qemu_opt_get(opts, VXHS_OPT_PORT);
> +            if (!ptr) {
> +                error_setg(&local_err, QERR_MISSING_PARAMETER,
> +                           VXHS_OPT_PORT);
> +                error_append_hint(&local_err, GERR_INDEX_HINT, i);
> +                goto out;
> +            }
> +            vxhsconf->port = g_strdup(ptr);
> +
> +            /* defend for unsupported fields in InetSocketAddress,
> +             * i.e. @ipv4, @ipv6  and @to
> +             */
> +            ptr = qemu_opt_get(opts, VXHS_OPT_TO);
> +            if (ptr) {
> +                vxhsconf->has_to = true;
> +            }
> +            ptr = qemu_opt_get(opts, VXHS_OPT_IPV4);
> +            if (ptr) {
> +                vxhsconf->has_ipv4 = true;
> +            }
> +            ptr = qemu_opt_get(opts, VXHS_OPT_IPV6);
> +            if (ptr) {
> +                vxhsconf->has_ipv6 = true;
> +            }
> +            if (vxhsconf->has_to) {
> +                error_setg(&local_err, "Parameter 'to' not supported");
> +                goto out;
> +            }
> +            if (vxhsconf->has_ipv4 || vxhsconf->has_ipv6) {
> +                error_setg(&local_err, "Parameters 'ipv4/ipv6' not supported");
> +                goto out;
> +            }
> +            trace_vxhs_parse_json_hostinfo(i+1, vxhsconf->host, vxhsconf->port);
> +
> +            if (conf->server == NULL) {
> +                conf->server = g_new0(InetSocketAddressList, 1);
> +                conf->server->value = vxhsconf;
> +                curr = conf->server;
> +            } else {
> +                curr->next = g_new0(InetSocketAddressList, 1);
> +                curr->next->value = vxhsconf;
> +                curr = curr->next;
> +            }
> +
> +            qdict_del(backing_options, str);
> +            qemu_opts_del(opts);
> +            g_free(str);
> +            str = NULL;
> +        }
> +
> +    return 0;
> +
> +out:
> +    error_propagate(errp, local_err);
> +    qemu_opts_del(opts);
> +    if (str) {
> +        qdict_del(backing_options, str);
> +        g_free(str);
> +    }
> +    errno = EINVAL;
> +    return -errno;
> +}

Ewww this is really horrible code. It is open-coding a special purpose
conversion of QemuOpts -> QDict -> QAPI scheme. We should really put
my qdict_crumple() API impl as a pre-requisite of this, so you can then
use qdict_crumple + qmp_input_visitor to do this conversion in a generic
manner removing all this code.

  https://lists.gnu.org/archive/html/qemu-devel/2016-07/msg03118.html

> +/*
> + * vxhs_parse_uri: Parse the incoming URI and populate *conf with the
> + * vdisk_id, and all the host(s) information. Host at index 0 is local storage
> + * agent and the rest, reflection target storage agents. The local storage
> + * agent ip is the efficient internal address in the uri, e.g. 192.168.0.2.
> + * The local storage agent address is stored at index 0. The reflection target
> + * ips, are the E-W data network addresses of the reflection node agents, also
> + * extracted from the uri.
> + */
> +static int vxhs_parse_uri(BlockdevOptionsVxHS *conf,
> +                               const char *filename)

Delete this method entirely. We should not be adding URI syntax for any new
block driver. The QAPI schema syntax is all we need.


> +
> +static void *qemu_vxhs_init(BlockdevOptionsVxHS *conf,
> +                            const char *filename,
> +                            QDict *options, Error **errp)
> +{
> +    int ret;
> +
> +    if (filename) {
> +        ret = vxhs_parse_uri(conf, filename);
> +        if (ret < 0) {
> +            error_setg(errp, "invalid URI");
> +            error_append_hint(errp, "Usage: file=vxhs://"
> +                                    "[host[:port]]/{VDISK_UUID}\n");
> +            errno = -ret;
> +            return NULL;
> +        }
> +    } else {
> +        ret = vxhs_parse_json(conf, options, errp);
> +        if (ret < 0) {
> +            error_append_hint(errp, "Usage: "
> +                             "json:{\"driver\":\"vxhs\","
> +                             "\"vdisk_id\":\"{VDISK_UUID}\","
> +                             "\"server\":[{\"host\":\"1.2.3.4\","
> +                             "\"port\":\"9999\"}"
> +                             ",{\"host\":\"4.5.6.7\",\"port\":\"9999\"}]}"
> +                             "\n");
> +            errno = -ret;
> +            return NULL;
> +        }
> +
> +    }
> +
> +            return NULL;
> +}
> +
> +int vxhs_open_device(BlockdevOptionsVxHS *conf, int *cfd, int *rfd,
> +                       BDRVVXHSState *s)
> +{
> +    InetSocketAddressList *curr = NULL;
> +    char *file_name;
> +    char *of_vsa_addr;
> +    int ret = 0;
> +    int i = 0;
> +
> +    pthread_mutex_lock(&of_global_ctx_lock);
> +    if (global_qnio_ctx == NULL) {
> +        global_qnio_ctx = vxhs_setup_qnio();
> +        if (global_qnio_ctx == NULL) {
> +            pthread_mutex_unlock(&of_global_ctx_lock);
> +            return -1;
> +        }
> +    }
> +    pthread_mutex_unlock(&of_global_ctx_lock);
> +
> +    curr = conf->server;
> +    s->vdisk_guid = g_strdup(conf->vdisk_id);
> +
> +    for (i = 0; curr != NULL; i++) {
> +        s->vdisk_hostinfo[i].hostip = g_strdup(curr->value->host);
> +        s->vdisk_hostinfo[i].port = g_ascii_strtoll(curr->value->port, NULL, 0);
> +
> +        s->vdisk_hostinfo[i].qnio_cfd = -1;
> +        s->vdisk_hostinfo[i].vdisk_rfd = -1;
> +        trace_vxhs_open_device(curr->value->host, curr->value->port);
> +        curr = curr->next;
> +    }
> +    s->vdisk_nhosts = i;
> +    s->vdisk_cur_host_idx = 0;
> +
> +
> +    *cfd = -1;
> +    of_vsa_addr = g_new0(char, OF_MAX_SERVER_ADDR);
> +    file_name = g_new0(char, OF_MAX_FILE_LEN);
> +    snprintf(file_name, OF_MAX_FILE_LEN, "%s%s", vdisk_prefix, s->vdisk_guid);
> +    snprintf(of_vsa_addr, OF_MAX_SERVER_ADDR, "of://%s:%d",
> +             s->vdisk_hostinfo[s->vdisk_cur_host_idx].hostip,
> +             s->vdisk_hostinfo[s->vdisk_cur_host_idx].port);

*Never* use  g_new + snprintf, particularly not with a fixed length
buffer. g_strdup_printf() is the right way.

> +
> +    *cfd = qemu_open_iio_conn(global_qnio_ctx, of_vsa_addr, 0);
> +    if (*cfd < 0) {
> +        trace_vxhs_open_device_qnio(of_vsa_addr);
> +        ret = -EIO;
> +        goto out;
> +    }
> +    *rfd = qemu_iio_devopen(global_qnio_ctx, *cfd, file_name, 0);
> +    s->aio_context = qemu_get_aio_context();
> +
> +out:
> +    if (file_name != NULL) {
> +        g_free(file_name);
> +    }
> +    if (of_vsa_addr != NULL) {
> +        g_free(of_vsa_addr);
> +    }

Useless if-before-free - g_free happily accepts NULL so don't check
for it yourself.

> +
> +    return ret;
> +}
> +
> +int vxhs_create(const char *filename,
> +        QemuOpts *options, Error **errp)
> +{
> +    int ret = 0;
> +    int qemu_cfd = 0;
> +    int qemu_rfd = 0;
> +    BDRVVXHSState s;
> +    BlockdevOptionsVxHS *conf = NULL;
> +
> +    conf = g_new0(BlockdevOptionsVxHS, 1);
> +    trace_vxhs_create(filename);
> +    qemu_vxhs_init(conf, filename, NULL, errp);
> +    ret = vxhs_open_device(conf, &qemu_cfd, &qemu_rfd, &s);
> +
> +    qapi_free_BlockdevOptionsVxHS(conf);
> +    return ret;
> +}
> +
> +int vxhs_open(BlockDriverState *bs, QDict *options,
> +              int bdrv_flags, Error **errp)
> +{
> +    BDRVVXHSState *s = bs->opaque;
> +    int ret = 0;
> +    int qemu_qnio_cfd = 0;
> +    int qemu_rfd = 0;
> +    QemuOpts *opts;
> +    Error *local_err = NULL;
> +    const char *vxhs_uri;
> +    BlockdevOptionsVxHS *conf = NULL;
> +
> +    opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
> +    qemu_opts_absorb_qdict(opts, options, &local_err);
> +    if (local_err) {
> +        error_propagate(errp, local_err);
> +        ret = -EINVAL;
> +        goto out;
> +    }
> +    vxhs_uri = qemu_opt_get(opts, VXHS_OPT_FILENAME);
> +
> +    conf = g_new0(BlockdevOptionsVxHS, 1);
> +
> +    qemu_vxhs_init(conf, vxhs_uri, options, errp);
> +    memset(s, 0, sizeof(*s));
> +    trace_vxhs_open(vxhs_uri);
> +    ret = vxhs_open_device(conf, &qemu_qnio_cfd, &qemu_rfd, s);
> +    if (ret != 0) {
> +        trace_vxhs_open_fail(ret);
> +        return ret;
> +    }
> +    s->qnio_ctx = global_qnio_ctx;
> +    s->vdisk_hostinfo[0].qnio_cfd = qemu_qnio_cfd;
> +    s->vdisk_hostinfo[0].vdisk_rfd = qemu_rfd;
> +    s->vdisk_size = 0;
> +    QSIMPLEQ_INIT(&s->vdisk_aio_retryq);
> +
> +    ret = qemu_pipe(s->fds);
> +    if (ret < 0) {
> +        trace_vxhs_open_epipe('.');
> +        ret = -errno;
> +        goto out;
> +    }
> +    fcntl(s->fds[VDISK_FD_READ], F_SETFL, O_NONBLOCK);
> +
> +    aio_set_fd_handler(s->aio_context, s->fds[VDISK_FD_READ],
> +                       false, vxhs_aio_event_reader, NULL, s);
> +
> +    /*
> +     * Allocate/Initialize the spin-locks.
> +     *
> +     * NOTE:
> +     *      Since spin lock is being allocated
> +     *      dynamically hence moving acb struct
> +     *      specific lock to BDRVVXHSState
> +     *      struct. The reason being,
> +     *      we don't want the overhead of spin
> +     *      lock being dynamically allocated and
> +     *      freed for every AIO.
> +     */
> +    s->vdisk_lock = VXHS_SPIN_LOCK_ALLOC;
> +    s->vdisk_acb_lock = VXHS_SPIN_LOCK_ALLOC;
> +
> +    qapi_free_BlockdevOptionsVxHS(conf);
> +    return 0;
> +
> +out:
> +    if (s->vdisk_hostinfo[0].vdisk_rfd >= 0) {
> +        qemu_iio_devclose(s->qnio_ctx, 0,
> +                                 s->vdisk_hostinfo[0].vdisk_rfd);
> +    }
> +    /* never close qnio_cfd */
> +    trace_vxhs_open_fail(ret);
> +    qapi_free_BlockdevOptionsVxHS(conf);
> +    return ret;
> +}
> +


> +
> +/*
> + * This is called by QEMU when a flush gets triggered from within
> + * a guest at the block layer, either for IDE or SCSI disks.
> + */
> +int vxhs_co_flush(BlockDriverState *bs)
> +{
> +    BDRVVXHSState *s = bs->opaque;
> +    uint64_t size = 0;
> +    int ret = 0;
> +    uint32_t iocount = 0;
> +
> +    ret = qemu_iio_ioctl(s->qnio_ctx,
> +            s->vdisk_hostinfo[s->vdisk_cur_host_idx].vdisk_rfd,
> +            VDISK_AIO_FLUSH, &size, NULL, IIO_FLAG_SYNC);
> +
> +    if (ret < 0) {
> +        /*
> +         * Currently not handling the flush ioctl
> +         * failure because of network connection
> +         * disconnect. Since all the writes are
> +         * commited into persistent storage hence
> +         * this flush call is noop and we can safely
> +         * return success status to the caller.
> +         *
> +         * If any write failure occurs for inflight
> +         * write AIO because of network disconnect
> +         * then anyway IO failover will be triggered.
> +         */
> +        trace_vxhs_co_flush(s->vdisk_guid, ret, errno);
> +        ret = 0;
> +    }
> +
> +    iocount = vxhs_get_vdisk_iocount(s);
> +    if (iocount > 0) {
> +        trace_vxhs_co_flush_iocnt(iocount);
> +    }

You're not doing anything with the iocount value here. Either
delete this, or do something useful with it.

> +
> +    return ret;
> +}
> +
> +unsigned long vxhs_get_vdisk_stat(BDRVVXHSState *s)
> +{
> +    void *ctx = NULL;
> +    int flags = 0;
> +    unsigned long vdisk_size = 0;

Is this meansured in bytes ? If so 'unsigned long' is a rather
limited choice for 32-bit builds. long long  / int64_t
would be better.

> +    int ret = 0;
> +
> +    ret = qemu_iio_ioctl(s->qnio_ctx,
> +            s->vdisk_hostinfo[s->vdisk_cur_host_idx].vdisk_rfd,
> +            VDISK_STAT, &vdisk_size, ctx, flags);
> +
> +    if (ret < 0) {
> +        trace_vxhs_get_vdisk_stat_err(s->vdisk_guid, ret, errno);
> +    }
> +
> +    trace_vxhs_get_vdisk_stat(s->vdisk_guid, vdisk_size);
> +    return vdisk_size;

Presumably vdisk_size is garbage when ret < 0, so you really
need to propagate the error better.

> +}
> +
> +/*
> + * Returns the size of vDisk in bytes. This is required
> + * by QEMU block upper block layer so that it is visible
> + * to guest.
> + */
> +int64_t vxhs_getlength(BlockDriverState *bs)
> +{
> +    BDRVVXHSState *s = bs->opaque;
> +    unsigned long vdisk_size = 0;
> +
> +    if (s->vdisk_size > 0) {
> +        vdisk_size = s->vdisk_size;
> +    } else {
> +        /*
> +         * Fetch the vDisk size using stat ioctl
> +         */
> +        vdisk_size = vxhs_get_vdisk_stat(s);
> +        if (vdisk_size > 0) {
> +            s->vdisk_size = vdisk_size;
> +        }
> +    }
> +
> +    if (vdisk_size > 0) {
> +        return (int64_t)vdisk_size; /* return size in bytes */
> +    } else {
> +        return -EIO;
> +    }
> +}
> +
> +/*
> + * Returns actual blocks allocated for the vDisk.
> + * This is required by qemu-img utility.
> + */
> +int64_t vxhs_get_allocated_blocks(BlockDriverState *bs)
> +{
> +    BDRVVXHSState *s = bs->opaque;
> +    unsigned long vdisk_size = 0;
> +
> +    if (s->vdisk_size > 0) {
> +        vdisk_size = s->vdisk_size;
> +    } else {
> +        /*
> +         * TODO:
> +         * Once HyperScale storage-virtualizer provides
> +         * actual physical allocation of blocks then
> +         * fetch that information and return back to the
> +         * caller but for now just get the full size.
> +         */
> +        vdisk_size = vxhs_get_vdisk_stat(s);
> +        if (vdisk_size > 0) {
> +            s->vdisk_size = vdisk_size;
> +        }
> +    }
> +
> +    if (vdisk_size > 0) {
> +        return (int64_t)vdisk_size; /* return size in bytes */
> +    } else {
> +        return -EIO;
> +    }
> +}


> +/*
> + * Try to reopen the vDisk on one of the available hosts
> + * If vDisk reopen is successful on any of the host then
> + * check if that node is ready to accept I/O.
> + */
> +int vxhs_reopen_vdisk(BDRVVXHSState *s, int index)
> +{
> +    char *of_vsa_addr = NULL;
> +    char *file_name = NULL;
> +    int  res = 0;
> +
> +    /*
> +     * Don't close the channel if it was opened
> +     * before successfully. It will be handled
> +     * within iio* api if the same channel open
> +     * fd is reused.
> +     *
> +     * close stale vdisk device remote fd since
> +     * it is invalid after channel disconnect.
> +     */
> +    if (s->vdisk_hostinfo[index].vdisk_rfd >= 0) {
> +        qemu_iio_devclose(s->qnio_ctx, 0,
> +                                 s->vdisk_hostinfo[index].vdisk_rfd);
> +        s->vdisk_hostinfo[index].vdisk_rfd = -1;
> +    }
> +    /*
> +     * build storage agent address and vdisk device name strings
> +     */
> +    of_vsa_addr = g_new0(char, OF_MAX_SERVER_ADDR);
> +    file_name = g_new0(char, OF_MAX_FILE_LEN);
> +    snprintf(file_name, OF_MAX_FILE_LEN, "%s%s", vdisk_prefix, s->vdisk_guid);
> +    snprintf(of_vsa_addr, OF_MAX_SERVER_ADDR, "of://%s:%d",
> +             s->vdisk_hostinfo[index].hostip, s->vdisk_hostinfo[index].port);

Again no snprintf please.

> +    /*
> +     * open qnio channel to storage agent if not opened before.
> +     */
> +    if (s->vdisk_hostinfo[index].qnio_cfd < 0) {
> +        s->vdisk_hostinfo[index].qnio_cfd =
> +                qemu_open_iio_conn(global_qnio_ctx, of_vsa_addr, 0);
> +        if (s->vdisk_hostinfo[index].qnio_cfd < 0) {
> +            trace_vxhs_reopen_vdisk(s->vdisk_hostinfo[index].hostip);
> +            res = ENODEV;
> +            goto out;
> +        }
> +    }
> +    /*
> +     * open vdisk device
> +     */
> +    s->vdisk_hostinfo[index].vdisk_rfd =
> +            qemu_iio_devopen(global_qnio_ctx,
> +             s->vdisk_hostinfo[index].qnio_cfd, file_name, 0);
> +    if (s->vdisk_hostinfo[index].vdisk_rfd < 0) {
> +        trace_vxhs_reopen_vdisk_openfail(file_name);
> +        res = EIO;
> +        goto out;
> +    }
> +out:
> +    if (of_vsa_addr) {
> +        g_free(of_vsa_addr);
> +    }
> +    if (file_name) {
> +        g_free(file_name);
> +    }

More useless-if-before-free

> +    return res;
> +}



> +void bdrv_vxhs_init(void)
> +{
> +    trace_vxhs_bdrv_init(vxhs_drv_version);
> +    bdrv_register(&bdrv_vxhs);
> +}
> +
> +/*
> + * The line below is how our drivier is initialized.
> + * DO NOT TOUCH IT
> + */

This is a rather pointless comment - the function name itself makes
it obvious what this is doing.

> +block_init(bdrv_vxhs_init);
> diff --git a/block/vxhs.h b/block/vxhs.h
> new file mode 100644
> index 0000000..1b678e5
> --- /dev/null
> +++ b/block/vxhs.h


> +#define vxhsErr(fmt, ...) { \
> +    time_t t = time(0); \
> +    char buf[9] = {0}; \
> +    strftime(buf, 9, "%H:%M:%S", localtime(&t)); \
> +    fprintf(stderr, "[%s: %lu] %d: %s():\t", buf, pthread_self(), \
> +                __LINE__, __func__); \
> +    fprintf(stderr, fmt, ## __VA_ARGS__); \
> +}

This appears unused now, please delete it.

> diff --git a/configure b/configure
> index 8d84919..fc09dc6 100755
> --- a/configure
> +++ b/configure
> @@ -320,6 +320,7 @@ vhdx=""
>  numa=""
>  tcmalloc="no"
>  jemalloc="no"
> +vxhs="yes"

You should set this to "", to indicate that configure should
auto-probe for support. Setting it to 'yes' indicates a mandatory
requirement which we don't want for an optional library like yours.

This would fix the automated build failure report this patch got.

> diff --git a/qapi/block-core.json b/qapi/block-core.json
> index 5e2d7d7..064d620 100644
> --- a/qapi/block-core.json
> +++ b/qapi/block-core.json
> @@ -1693,12 +1693,13 @@
>  #
>  # @host_device, @host_cdrom: Since 2.1
>  # @gluster: Since 2.7
> +# @vxhs: Since 2.7
>  #
>  # Since: 2.0
>  ##
>  { 'enum': 'BlockdevDriver',
>    'data': [ 'archipelago', 'blkdebug', 'blkverify', 'bochs', 'cloop',
> -            'dmg', 'file', 'ftp', 'ftps', 'gluster', 'host_cdrom',
> +            'dmg', 'file', 'ftp', 'ftps', 'gluster', 'host_cdrom', 'vxhs',
>              'host_device', 'http', 'https', 'luks', 'null-aio', 'null-co',
>              'parallels', 'qcow', 'qcow2', 'qed', 'quorum', 'raw', 'tftp',
>              'vdi', 'vhdx', 'vmdk', 'vpc', 'vvfat' ] }
> @@ -2150,6 +2151,22 @@
>              'server': ['GlusterServer'],
>              '*debug-level': 'int' } }
>  
> +# @BlockdevOptionsVxHS
> +#
> +# Driver specific block device options for VxHS
> +#
> +# @vdisk_id:    UUID of VxHS volume
> +#
> +# @server:      list of vxhs server IP, port
> +#
> +# Since: 2.7
> +##
> +{ 'struct': 'BlockdevOptionsVxHS',
> +  'data': { 'vdisk_id': 'str',
> +            'server': ['InetSocketAddress'] } }

Is there any chance that you are going to want to support UNIX domain socket
connections in the future ? If so, perhaps we could use SocketAddress instead
of InetSocketAddress to allow more flexibility in future.


Regards,
Daniel
Kevin Wolf Aug. 15, 2016, 10:47 a.m. UTC | #4
Am 15.08.2016 um 12:20 hat Daniel P. Berrange geschrieben:
> On Sat, Aug 13, 2016 at 09:35:12PM -0700, Ashish Mittal wrote:
> > +/*
> > + * vxhs_parse_uri: Parse the incoming URI and populate *conf with the
> > + * vdisk_id, and all the host(s) information. Host at index 0 is local storage
> > + * agent and the rest, reflection target storage agents. The local storage
> > + * agent ip is the efficient internal address in the uri, e.g. 192.168.0.2.
> > + * The local storage agent address is stored at index 0. The reflection target
> > + * ips, are the E-W data network addresses of the reflection node agents, also
> > + * extracted from the uri.
> > + */
> > +static int vxhs_parse_uri(BlockdevOptionsVxHS *conf,
> > +                               const char *filename)
> 
> Delete this method entirely. We should not be adding URI syntax for any new
> block driver. The QAPI schema syntax is all we need.

I disagree. URI syntax is nice for human users.

However, you should use the proper interfaces to implement this, which
is .bdrv_parse_filename(). This is a function that gets a string and
converts it into a QDict, which is then passed to .bdrv_open(). So it
uses exactly the same code path in .bdrv_open() as if used directly with
QAPI.

Kevin
Daniel P. Berrangé Aug. 15, 2016, 10:54 a.m. UTC | #5
On Mon, Aug 15, 2016 at 12:47:52PM +0200, Kevin Wolf wrote:
> Am 15.08.2016 um 12:20 hat Daniel P. Berrange geschrieben:
> > On Sat, Aug 13, 2016 at 09:35:12PM -0700, Ashish Mittal wrote:
> > > +/*
> > > + * vxhs_parse_uri: Parse the incoming URI and populate *conf with the
> > > + * vdisk_id, and all the host(s) information. Host at index 0 is local storage
> > > + * agent and the rest, reflection target storage agents. The local storage
> > > + * agent ip is the efficient internal address in the uri, e.g. 192.168.0.2.
> > > + * The local storage agent address is stored at index 0. The reflection target
> > > + * ips, are the E-W data network addresses of the reflection node agents, also
> > > + * extracted from the uri.
> > > + */
> > > +static int vxhs_parse_uri(BlockdevOptionsVxHS *conf,
> > > +                               const char *filename)
> > 
> > Delete this method entirely. We should not be adding URI syntax for any new
> > block driver. The QAPI schema syntax is all we need.
> 
> I disagree. URI syntax is nice for human users.

I think the dot-separated qapi syntax is actually more user friendly as
looking at it, it is obvious what each component does. The URI syntax is
inherantly limited, so if you start with that syntax and then want to
support > 1 server, you have to completely change to the qapi syntax anyway.
So from that POV I think it is a disservice to users to provide & encourage
them to use a syntax that is known to be insufficiently flexible to work.

Regards,
Daniel
Ashish Mittal Aug. 15, 2016, 4:29 p.m. UTC | #6
>> +/*
>> + * Convert the json formatted command line into qapi.
>> +*/
>> +
>> +static int vxhs_parse_json(BlockdevOptionsVxHS *conf,
>> +                                  QDict *options, Error **errp)
>> +{
...
>> +    errno = EINVAL;
>> +    return -errno;
>> +}
>
> Ewww this is really horrible code. It is open-coding a special purpose
> conversion of QemuOpts -> QDict -> QAPI scheme. We should really put
> my qdict_crumple() API impl as a pre-requisite of this, so you can then
> use qdict_crumple + qmp_input_visitor to do this conversion in a generic
> manner removing all this code.
>
>   https://lists.gnu.org/archive/html/qemu-devel/2016-07/msg03118.html
>

Thanks for the suggestions. I will try to incorporate them in the next
version. Actually, I used block/gluster.c for reference (as advised).
This is exactly how it parses json CLI options. It also implements the
*parse_uri() in a similar way.


>> +/*
>> + * vxhs_parse_uri: Parse the incoming URI and populate *conf with the
>> + * vdisk_id, and all the host(s) information. Host at index 0 is local storage
>> + * agent and the rest, reflection target storage agents. The local storage
>> + * agent ip is the efficient internal address in the uri, e.g. 192.168.0.2.
>> + * The local storage agent address is stored at index 0. The reflection target
>> + * ips, are the E-W data network addresses of the reflection node agents, also
>> + * extracted from the uri.
>> + */
>> +static int vxhs_parse_uri(BlockdevOptionsVxHS *conf,
>> +                               const char *filename)
>
> Delete this method entirely. We should not be adding URI syntax for any new
> block driver. The QAPI schema syntax is all we need.
>

Regards,
Ashish
Paolo Bonzini Aug. 17, 2016, 11:20 a.m. UTC | #7
On 15/08/2016 12:54, Daniel P. Berrange wrote:
> I think the dot-separated qapi syntax is actually more user friendly as
> looking at it, it is obvious what each component does. The URI syntax is
> inherantly limited, so if you start with that syntax and then want to
> support > 1 server, you have to completely change to the qapi syntax anyway.

If I understand correctly, this already supports >1 data server, the URI
is to some kind of metadata server that is used only at startup.

Paolo
Paolo Bonzini Aug. 17, 2016, 11:22 a.m. UTC | #8
On 15/08/2016 18:29, ashish mittal wrote:
>>> +/*
>>> + * Convert the json formatted command line into qapi.
>>> +*/
>>> +
>>> +static int vxhs_parse_json(BlockdevOptionsVxHS *conf,
>>> +                                  QDict *options, Error **errp)
>>> +{
> ...
>>> +    errno = EINVAL;
>>> +    return -errno;
>>> +}
>>
>> Ewww this is really horrible code. It is open-coding a special purpose
>> conversion of QemuOpts -> QDict -> QAPI scheme. We should really put
>> my qdict_crumple() API impl as a pre-requisite of this, so you can then
>> use qdict_crumple + qmp_input_visitor to do this conversion in a generic
>> manner removing all this code.
>>
>>   https://lists.gnu.org/archive/html/qemu-devel/2016-07/msg03118.html
>>
> 
> Thanks for the suggestions. I will try to incorporate them in the next
> version. Actually, I used block/gluster.c for reference (as advised).
> This is exactly how it parses json CLI options. It also implements the
> *parse_uri() in a similar way.

I think I pointed you to block/nbd.c instead, which works as Kevin
suggested.

Paolo
Ashish Mittal Aug. 17, 2016, 9:58 p.m. UTC | #9
Thanks Paolo, and everyone else for the corrections :)

I will try to fix it in a patch this week or the next. I referred to
gluster.c implementation as it was closer to what we wanted to achieve
i.e. passing multiple servers for the block device. I picked up the
idea of referring to gluster.c from the following email that I
received on libvirt mailing list. I did test my implementation to make
sure that it worked fine, but did not realize that it was not clean. I
will fix it.

/====================/
Hmm, unless I'm mis-reading this, there is no separator between
the two URIs you're providing - is there a missing ',' or something
similar.

Slightly off topic for the libvirt list, but I would be pretty surprised
if the QEMU developers accepted patches using this URI syntax for providing
multiple servers.

A similar approach was originally proposed for the glusterfs driver and
the submitter was told to write it a different way, not using the legacy
URI syntax at all, but instead a QAPI based syntax:

  https://lists.gnu.org/archive/html/qemu-devel/2016-06/msg04126.html

So the sooner you can submit your QEMU patches to the qemu-devel mailing
list the better, as we'll need to get clarity on the accepted QEMU syntax
in order to proceed further with libvirt patch review.

Broadly speaking I think the patch you've proposed looks pretty much
fine now. I'd have a slight preference for it to be done as two patches.
One patch adds the XML schema, parser changes, etc. The second patch
just does the QEMU driver integration & unit tests.
...
/====================/

Regards,
Ashish


On Wed, Aug 17, 2016 at 4:22 AM, Paolo Bonzini <pbonzini@redhat.com> wrote:
>
>
> On 15/08/2016 18:29, ashish mittal wrote:
>>>> +/*
>>>> + * Convert the json formatted command line into qapi.
>>>> +*/
>>>> +
>>>> +static int vxhs_parse_json(BlockdevOptionsVxHS *conf,
>>>> +                                  QDict *options, Error **errp)
>>>> +{
>> ...
>>>> +    errno = EINVAL;
>>>> +    return -errno;
>>>> +}
>>>
>>> Ewww this is really horrible code. It is open-coding a special purpose
>>> conversion of QemuOpts -> QDict -> QAPI scheme. We should really put
>>> my qdict_crumple() API impl as a pre-requisite of this, so you can then
>>> use qdict_crumple + qmp_input_visitor to do this conversion in a generic
>>> manner removing all this code.
>>>
>>>   https://lists.gnu.org/archive/html/qemu-devel/2016-07/msg03118.html
>>>
>>
>> Thanks for the suggestions. I will try to incorporate them in the next
>> version. Actually, I used block/gluster.c for reference (as advised).
>> This is exactly how it parses json CLI options. It also implements the
>> *parse_uri() in a similar way.
>
> I think I pointed you to block/nbd.c instead, which works as Kevin
> suggested.
>
> Paolo
Ashish Mittal Aug. 20, 2016, 6:42 p.m. UTC | #10
Need some help!

I'm trying to understand how I can parse a json command line having
multiple server list (InetSocketAddressList) without the QemuOpts ->
QDict -> QAPI conversion that I am currently doing.

block/nbd.c has been suggested, but it only parses a single host
entry, which is pretty straightforward, but not very helpful.

Could somebody please suggest a sample implementation (other than
gluster.c) that parses a list of server like options in the json
format?

Thanks,
Ashish


On Wed, Aug 17, 2016 at 2:58 PM, ashish mittal <ashmit602@gmail.com> wrote:
> Thanks Paolo, and everyone else for the corrections :)
>
> I will try to fix it in a patch this week or the next. I referred to
> gluster.c implementation as it was closer to what we wanted to achieve
> i.e. passing multiple servers for the block device. I picked up the
> idea of referring to gluster.c from the following email that I
> received on libvirt mailing list. I did test my implementation to make
> sure that it worked fine, but did not realize that it was not clean. I
> will fix it.
>
> /====================/
> Hmm, unless I'm mis-reading this, there is no separator between
> the two URIs you're providing - is there a missing ',' or something
> similar.
>
> Slightly off topic for the libvirt list, but I would be pretty surprised
> if the QEMU developers accepted patches using this URI syntax for providing
> multiple servers.
>
> A similar approach was originally proposed for the glusterfs driver and
> the submitter was told to write it a different way, not using the legacy
> URI syntax at all, but instead a QAPI based syntax:
>
>   https://lists.gnu.org/archive/html/qemu-devel/2016-06/msg04126.html
>
> So the sooner you can submit your QEMU patches to the qemu-devel mailing
> list the better, as we'll need to get clarity on the accepted QEMU syntax
> in order to proceed further with libvirt patch review.
>
> Broadly speaking I think the patch you've proposed looks pretty much
> fine now. I'd have a slight preference for it to be done as two patches.
> One patch adds the XML schema, parser changes, etc. The second patch
> just does the QEMU driver integration & unit tests.
> ...
> /====================/
>
> Regards,
> Ashish
>
>
> On Wed, Aug 17, 2016 at 4:22 AM, Paolo Bonzini <pbonzini@redhat.com> wrote:
>>
>>
>> On 15/08/2016 18:29, ashish mittal wrote:
>>>>> +/*
>>>>> + * Convert the json formatted command line into qapi.
>>>>> +*/
>>>>> +
>>>>> +static int vxhs_parse_json(BlockdevOptionsVxHS *conf,
>>>>> +                                  QDict *options, Error **errp)
>>>>> +{
>>> ...
>>>>> +    errno = EINVAL;
>>>>> +    return -errno;
>>>>> +}
>>>>
>>>> Ewww this is really horrible code. It is open-coding a special purpose
>>>> conversion of QemuOpts -> QDict -> QAPI scheme. We should really put
>>>> my qdict_crumple() API impl as a pre-requisite of this, so you can then
>>>> use qdict_crumple + qmp_input_visitor to do this conversion in a generic
>>>> manner removing all this code.
>>>>
>>>>   https://lists.gnu.org/archive/html/qemu-devel/2016-07/msg03118.html
>>>>
>>>
>>> Thanks for the suggestions. I will try to incorporate them in the next
>>> version. Actually, I used block/gluster.c for reference (as advised).
>>> This is exactly how it parses json CLI options. It also implements the
>>> *parse_uri() in a similar way.
>>
>> I think I pointed you to block/nbd.c instead, which works as Kevin
>> suggested.
>>
>> Paolo
Stefan Hajnoczi Aug. 23, 2016, 9:58 p.m. UTC | #11
On Sat, Aug 20, 2016 at 11:42:22AM -0700, ashish mittal wrote:
> I'm trying to understand how I can parse a json command line having
> multiple server list (InetSocketAddressList) without the QemuOpts ->
> QDict -> QAPI conversion that I am currently doing.
> 
> block/nbd.c has been suggested, but it only parses a single host
> entry, which is pretty straightforward, but not very helpful.
> 
> Could somebody please suggest a sample implementation (other than
> gluster.c) that parses a list of server like options in the json
> format?

Another block driver that takes an array of objects is block/quorum.c.
See quorum_open() and how it picks out "children." objects.

I haven't read the full discussion in this thread but maybe that's what
you're looking for.

Stefan
Ashish Mittal Aug. 23, 2016, 10:22 p.m. UTC | #12
Thanks Stefan, I will look at block/quorum.c.

I have sent V4 of the patch with a reworked parsing logic for both
JSON and URI. Both of them are quite compact now.

URI parsing now follows the suggestion given by Kevin.
/================/
However, you should use the proper interfaces to implement this, which
is .bdrv_parse_filename(). This is a function that gets a string and
converts it into a QDict, which is then passed to .bdrv_open(). So it
uses exactly the same code path in .bdrv_open() as if used directly with
QAPI.
/================/

Additionally, I have fixed all the issues pointed out by you on V1 of
the patch. The only change I haven't done is to replace pipes with
QEMUBH. I am hoping this will not hold up the patch from being
accepted, and I can make this transition later with proper dev and
testing.

Regards,
Ashish


On Tue, Aug 23, 2016 at 2:58 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
> On Sat, Aug 20, 2016 at 11:42:22AM -0700, ashish mittal wrote:
>> I'm trying to understand how I can parse a json command line having
>> multiple server list (InetSocketAddressList) without the QemuOpts ->
>> QDict -> QAPI conversion that I am currently doing.
>>
>> block/nbd.c has been suggested, but it only parses a single host
>> entry, which is pretty straightforward, but not very helpful.
>>
>> Could somebody please suggest a sample implementation (other than
>> gluster.c) that parses a list of server like options in the json
>> format?
>
> Another block driver that takes an array of objects is block/quorum.c.
> See quorum_open() and how it picks out "children." objects.
>
> I haven't read the full discussion in this thread but maybe that's what
> you're looking for.
>
> Stefan
Ashish Mittal Aug. 23, 2016, 10:57 p.m. UTC | #13
Hi Daniel,

In V4 of the patch:

On Mon, Aug 15, 2016 at 3:20 AM, Daniel P. Berrange <berrange@redhat.com> wrote:

>> + * Convert the json formatted command line into qapi.
>> +*/
>> +
>> +static int vxhs_parse_json(BlockdevOptionsVxHS *conf,
>> +                                  QDict *options, Error **errp)
>> +{

>> +}
>
> Ewww this is really horrible code. It is open-coding a special purpose
> conversion of QemuOpts -> QDict -> QAPI scheme. We should really put
> my qdict_crumple() API impl as a pre-requisite of this, so you can then
> use qdict_crumple + qmp_input_visitor to do this conversion in a generic
> manner removing all this code.
>
>   https://lists.gnu.org/archive/html/qemu-devel/2016-07/msg03118.html

Changed to not do the manual conversion anymore.

>
>> +/*
>> + * vxhs_parse_uri: Parse the incoming URI and populate *conf with the
>> + * vdisk_id, and all the host(s) information. Host at index 0 is local storage
>> + * agent and the rest, reflection target storage agents. The local storage
>> + * agent ip is the efficient internal address in the uri, e.g. 192.168.0.2.
>> + * The local storage agent address is stored at index 0. The reflection target
>> + * ips, are the E-W data network addresses of the reflection node agents, also
>> + * extracted from the uri.
>> + */
>> +static int vxhs_parse_uri(BlockdevOptionsVxHS *conf,
>> +                               const char *filename)
>
> Delete this method entirely. We should not be adding URI syntax for any new
> block driver. The QAPI schema syntax is all we need.
>

Changed per suggestion from Kevin.


>> +    of_vsa_addr = g_new0(char, OF_MAX_SERVER_ADDR);
>> +    file_name = g_new0(char, OF_MAX_FILE_LEN);
>> +    snprintf(file_name, OF_MAX_FILE_LEN, "%s%s", vdisk_prefix, s->vdisk_guid);
>> +    snprintf(of_vsa_addr, OF_MAX_SERVER_ADDR, "of://%s:%d",
>> +             s->vdisk_hostinfo[s->vdisk_cur_host_idx].hostip,
>> +             s->vdisk_hostinfo[s->vdisk_cur_host_idx].port);
>
> *Never* use  g_new + snprintf, particularly not with a fixed length
> buffer. g_strdup_printf() is the right way.
>

Fixed all such places.


>> +out:
>> +    if (file_name != NULL) {
>> +        g_free(file_name);
>> +    }
>> +    if (of_vsa_addr != NULL) {
>> +        g_free(of_vsa_addr);
>> +    }
>
> Useless if-before-free - g_free happily accepts NULL so don't check
> for it yourself.
>

Removed all if-before-free - g_free.

>
>> +
>> +/*
>> + * This is called by QEMU when a flush gets triggered from within
>> + * a guest at the block layer, either for IDE or SCSI disks.
>> + */
>> +int vxhs_co_flush(BlockDriverState *bs)
>> +{
>> +    BDRVVXHSState *s = bs->opaque;
>> +    uint64_t size = 0;
>> +    int ret = 0;
>> +    uint32_t iocount = 0;
>> +
>> +    ret = qemu_iio_ioctl(s->qnio_ctx,
>> +            s->vdisk_hostinfo[s->vdisk_cur_host_idx].vdisk_rfd,
>> +            VDISK_AIO_FLUSH, &size, NULL, IIO_FLAG_SYNC);
>> +
>> +    if (ret < 0) {
>> +        /*
>> +         * Currently not handling the flush ioctl
>> +         * failure because of network connection
>> +         * disconnect. Since all the writes are
>> +         * commited into persistent storage hence
>> +         * this flush call is noop and we can safely
>> +         * return success status to the caller.
>> +         *
>> +         * If any write failure occurs for inflight
>> +         * write AIO because of network disconnect
>> +         * then anyway IO failover will be triggered.
>> +         */
>> +        trace_vxhs_co_flush(s->vdisk_guid, ret, errno);
>> +        ret = 0;
>> +    }
>> +
>> +    iocount = vxhs_get_vdisk_iocount(s);
>> +    if (iocount > 0) {
>> +        trace_vxhs_co_flush_iocnt(iocount);
>> +    }
>
> You're not doing anything with the iocount value here. Either
> delete this, or do something useful with it.
>

Deleted.


>> +unsigned long vxhs_get_vdisk_stat(BDRVVXHSState *s)
>> +{
>> +    void *ctx = NULL;
>> +    int flags = 0;
>> +    unsigned long vdisk_size = 0;
>
> Is this meansured in bytes ? If so 'unsigned long' is a rather
> limited choice for 32-bit builds. long long  / int64_t
> would be better.
>

You are right. This will need change in the qnio library as well. Hope
I can fix this later!

>> +    int ret = 0;
>> +
>> +    ret = qemu_iio_ioctl(s->qnio_ctx,
>> +            s->vdisk_hostinfo[s->vdisk_cur_host_idx].vdisk_rfd,
>> +            VDISK_STAT, &vdisk_size, ctx, flags);
>> +
>> +    if (ret < 0) {
>> +        trace_vxhs_get_vdisk_stat_err(s->vdisk_guid, ret, errno);
>> +    }
>> +
>> +    trace_vxhs_get_vdisk_stat(s->vdisk_guid, vdisk_size);
>> +    return vdisk_size;
>
> Presumably vdisk_size is garbage when ret < 0, so you really
> need to propagate the error better.
>

Fixed.


>> +    /*
>> +     * build storage agent address and vdisk device name strings
>> +     */
>> +    of_vsa_addr = g_new0(char, OF_MAX_SERVER_ADDR);
>> +    file_name = g_new0(char, OF_MAX_FILE_LEN);
>> +    snprintf(file_name, OF_MAX_FILE_LEN, "%s%s", vdisk_prefix, s->vdisk_guid);
>> +    snprintf(of_vsa_addr, OF_MAX_SERVER_ADDR, "of://%s:%d",
>> +             s->vdisk_hostinfo[index].hostip, s->vdisk_hostinfo[index].port);
>
> Again no snprintf please.

Fixed.


>> +out:
>> +    if (of_vsa_addr) {
>> +        g_free(of_vsa_addr);
>> +    }
>> +    if (file_name) {
>> +        g_free(file_name);
>> +    }
>
> More useless-if-before-free
>

Fixed.


>> +
>> +/*
>> + * The line below is how our drivier is initialized.
>> + * DO NOT TOUCH IT
>> + */
>
> This is a rather pointless comment - the function name itself makes
> it obvious what this is doing.
>

Removed.


>
>> +#define vxhsErr(fmt, ...) { \
>> +    time_t t = time(0); \
>> +    char buf[9] = {0}; \
>> +    strftime(buf, 9, "%H:%M:%S", localtime(&t)); \
>> +    fprintf(stderr, "[%s: %lu] %d: %s():\t", buf, pthread_self(), \
>> +                __LINE__, __func__); \
>> +    fprintf(stderr, fmt, ## __VA_ARGS__); \
>> +}
>
> This appears unused now, please delete it.
>

Removed.

>> diff --git a/configure b/configure
>> index 8d84919..fc09dc6 100755
>> --- a/configure
>> +++ b/configure
>> @@ -320,6 +320,7 @@ vhdx=""
>>  numa=""
>>  tcmalloc="no"
>>  jemalloc="no"
>> +vxhs="yes"
>
> You should set this to "", to indicate that configure should
> auto-probe for support. Setting it to 'yes' indicates a mandatory
> requirement which we don't want for an optional library like yours.
>
> This would fix the automated build failure report this patch got.
>

Fixed.

>> diff --git a/qapi/block-core.json b/qapi/block-core.json
>> index 5e2d7d7..064d620 100644
>> --- a/qapi/block-core.json
>> +++ b/qapi/block-core.json
>> @@ -1693,12 +1693,13 @@
>>  #
>>  # @host_device, @host_cdrom: Since 2.1
>>  # @gluster: Since 2.7
>> +# @vxhs: Since 2.7
>>  #
>>  # Since: 2.0
>>  ##
>>  { 'enum': 'BlockdevDriver',
>>    'data': [ 'archipelago', 'blkdebug', 'blkverify', 'bochs', 'cloop',
>> -            'dmg', 'file', 'ftp', 'ftps', 'gluster', 'host_cdrom',
>> +            'dmg', 'file', 'ftp', 'ftps', 'gluster', 'host_cdrom', 'vxhs',
>>              'host_device', 'http', 'https', 'luks', 'null-aio', 'null-co',
>>              'parallels', 'qcow', 'qcow2', 'qed', 'quorum', 'raw', 'tftp',
>>              'vdi', 'vhdx', 'vmdk', 'vpc', 'vvfat' ] }
>> @@ -2150,6 +2151,22 @@
>>              'server': ['GlusterServer'],
>>              '*debug-level': 'int' } }
>>
>> +# @BlockdevOptionsVxHS
>> +#
>> +# Driver specific block device options for VxHS
>> +#
>> +# @vdisk_id:    UUID of VxHS volume
>> +#
>> +# @server:      list of vxhs server IP, port
>> +#
>> +# Since: 2.7
>> +##
>> +{ 'struct': 'BlockdevOptionsVxHS',
>> +  'data': { 'vdisk_id': 'str',
>> +            'server': ['InetSocketAddress'] } }
>
> Is there any chance that you are going to want to support UNIX domain socket
> connections in the future ? If so, perhaps we could use SocketAddress instead
> of InetSocketAddress to allow more flexibility in future.
>

We don't see the need for UNIX sockets at present.

Regards,
Ashish
Ashish Mittal Aug. 23, 2016, 11:28 p.m. UTC | #14
Hi Kevin,

Changed per your suggestion in V4 patch. Please review when you get some time.

Thanks,
Ashish

On Mon, Aug 15, 2016 at 3:47 AM, Kevin Wolf <kwolf@redhat.com> wrote:
> Am 15.08.2016 um 12:20 hat Daniel P. Berrange geschrieben:
>> On Sat, Aug 13, 2016 at 09:35:12PM -0700, Ashish Mittal wrote:
>> > +/*
>> > + * vxhs_parse_uri: Parse the incoming URI and populate *conf with the
>> > + * vdisk_id, and all the host(s) information. Host at index 0 is local storage
>> > + * agent and the rest, reflection target storage agents. The local storage
>> > + * agent ip is the efficient internal address in the uri, e.g. 192.168.0.2.
>> > + * The local storage agent address is stored at index 0. The reflection target
>> > + * ips, are the E-W data network addresses of the reflection node agents, also
>> > + * extracted from the uri.
>> > + */
>> > +static int vxhs_parse_uri(BlockdevOptionsVxHS *conf,
>> > +                               const char *filename)
>>
>> Delete this method entirely. We should not be adding URI syntax for any new
>> block driver. The QAPI schema syntax is all we need.
>
> I disagree. URI syntax is nice for human users.
>
> However, you should use the proper interfaces to implement this, which
> is .bdrv_parse_filename(). This is a function that gets a string and
> converts it into a QDict, which is then passed to .bdrv_open(). So it
> uses exactly the same code path in .bdrv_open() as if used directly with
> QAPI.
>
> Kevin
Ashish Mittal Nov. 15, 2016, 7:02 p.m. UTC | #15
Hi Stefan,

I had replied to the QEMUBH suggestion in the email below.

Regards,
Ashish

On Tue, Aug 23, 2016 at 3:22 PM, ashish mittal <ashmit602@gmail.com> wrote:
> Thanks Stefan, I will look at block/quorum.c.
>
> I have sent V4 of the patch with a reworked parsing logic for both
> JSON and URI. Both of them are quite compact now.
>
> URI parsing now follows the suggestion given by Kevin.
> /================/
> However, you should use the proper interfaces to implement this, which
> is .bdrv_parse_filename(). This is a function that gets a string and
> converts it into a QDict, which is then passed to .bdrv_open(). So it
> uses exactly the same code path in .bdrv_open() as if used directly with
> QAPI.
> /================/
>
> Additionally, I have fixed all the issues pointed out by you on V1 of
> the patch. The only change I haven't done is to replace pipes with
> QEMUBH. I am hoping this will not hold up the patch from being
> accepted, and I can make this transition later with proper dev and
> testing.
>
> Regards,
> Ashish
>
>
> On Tue, Aug 23, 2016 at 2:58 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
>> On Sat, Aug 20, 2016 at 11:42:22AM -0700, ashish mittal wrote:
>>> I'm trying to understand how I can parse a json command line having
>>> multiple server list (InetSocketAddressList) without the QemuOpts ->
>>> QDict -> QAPI conversion that I am currently doing.
>>>
>>> block/nbd.c has been suggested, but it only parses a single host
>>> entry, which is pretty straightforward, but not very helpful.
>>>
>>> Could somebody please suggest a sample implementation (other than
>>> gluster.c) that parses a list of server like options in the json
>>> format?
>>
>> Another block driver that takes an array of objects is block/quorum.c.
>> See quorum_open() and how it picks out "children." objects.
>>
>> I haven't read the full discussion in this thread but maybe that's what
>> you're looking for.
>>
>> Stefan
diff mbox

Patch

=============
(1) Implemented QAPI interface for passing VxHS block device parameters.

Sample trace o/p after filtering out libqnio debug messages:

./qemu-system-x86_64 -trace enable=vxhs* -name instance-00000008 -S -vnc 0.0.0.0:0 -k en-us -vga cirrus -device virtio-balloon-pci,id=balloon0,bus=pci.0,addr=0x5 -msg timestamp=on 'json:{"driver":"vxhs","vdisk_id":"/{c3e9095a-a5ee-4dce-afeb-2a59fb387410}","server":[{"host":"172.172.17.4","port":"9999"},{"host":"172.172.17.2","port":"9999"}]}'
2149@1471122960.293340:vxhs_parse_json vdisk_id from json /{c3e9095a-a5ee-4dce-afeb-2a59fb387410}
2149@1471122960.293359:vxhs_parse_json_numservers Number of servers passed = 2
2149@1471122960.293369:vxhs_parse_json_hostinfo Host 1: IP 172.172.17.4, Port 9999
2149@1471122960.293379:vxhs_parse_json_hostinfo Host 2: IP 172.172.17.2, Port 9999
2149@1471122960.293382:vxhs_open vxhs_vxhs_open: came in to open file = (null)
2149@1471122960.301444:vxhs_setup_qnio Context to HyperScale IO manager = 0x7f0996fd3920
2149@1471122960.301499:vxhs_open_device Adding host 172.172.17.4:9999 to BDRVVXHSState
2149@1471122960.301512:vxhs_open_device Adding host 172.172.17.2:9999 to BDRVVXHSState
2149@1471122960.305062:vxhs_get_vdisk_stat vDisk /{c3e9095a-a5ee-4dce-afeb-2a59fb387410} stat ioctl returned size 429
...

TODO: Fixes to issues raised by review comments from Stefan and Kevin.

 block/Makefile.objs  |    2 +
 block/trace-events   |   46 ++
 block/vxhs.c         | 1424 ++++++++++++++++++++++++++++++++++++++++++++++++++
 block/vxhs.h         |  293 +++++++++++
 configure            |   50 ++
 qapi/block-core.json |   22 +-
 6 files changed, 1835 insertions(+), 2 deletions(-)
 create mode 100644 block/vxhs.c
 create mode 100644 block/vxhs.h

diff --git a/block/Makefile.objs b/block/Makefile.objs
index 2593a2f..5f83305 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -20,6 +20,7 @@  block-obj-$(CONFIG_RBD) += rbd.o
 block-obj-$(CONFIG_GLUSTERFS) += gluster.o
 block-obj-$(CONFIG_ARCHIPELAGO) += archipelago.o
 block-obj-$(CONFIG_LIBSSH2) += ssh.o
+block-obj-$(CONFIG_VXHS) += vxhs.o
 block-obj-y += accounting.o dirty-bitmap.o
 block-obj-y += write-threshold.o
 
@@ -43,3 +44,4 @@  block-obj-m        += dmg.o
 dmg.o-libs         := $(BZIP2_LIBS)
 qcow.o-libs        := -lz
 linux-aio.o-libs   := -laio
+vxhs.o-libs        := $(VXHS_LIBS)
diff --git a/block/trace-events b/block/trace-events
index 978ef4f..d3decb0 100644
--- a/block/trace-events
+++ b/block/trace-events
@@ -114,3 +114,49 @@  qed_aio_write_data(void *s, void *acb, int ret, uint64_t offset, size_t len) "s
 qed_aio_write_prefill(void *s, void *acb, uint64_t start, size_t len, uint64_t offset) "s %p acb %p start %"PRIu64" len %zu offset %"PRIu64
 qed_aio_write_postfill(void *s, void *acb, uint64_t start, size_t len, uint64_t offset) "s %p acb %p start %"PRIu64" len %zu offset %"PRIu64
 qed_aio_write_main(void *s, void *acb, int ret, uint64_t offset, size_t len) "s %p acb %p ret %d offset %"PRIu64" len %zu"
+
+# block/vxhs.c
+vxhs_bdrv_init(int version) "Registering VxHS %d AIO driver"
+vxhs_iio_callback(int error, int reason) "ctx is NULL: error %d, reason %d"
+vxhs_setup_qnio(void *s) "Context to HyperScale IO manager = %p"
+vxhs_setup_qnio_nwerror(char c) "Could not initialize the network channel. Bailing out%c"
+vxhs_iio_callback_iofail(int err, int reason, void *acb, int seg) "Read/Write failed: error %d, reason %d, acb %p, segment %d"
+vxhs_iio_callback_retry(char *guid, void *acb) "vDisk %s, added acb %p to retry queue (5)"
+vxhs_iio_callback_chnlfail(int error) "QNIO channel failed, no i/o (%d)"
+vxhs_iio_callback_fail(int r, void *acb, int seg, uint64_t size, int err) " ALERT: reason = %d , acb = %p, acb->segments = %d, acb->size = %lu Error = %d"
+vxhs_fail_aio(char * guid, void *acb) "vDisk %s, failing acb %p"
+vxhs_iio_callback_ready(char *vd, int err) "async vxhs_iio_callback: IRP_VDISK_CHECK_IO_FAILOVER_READY completed for vdisk %s with error %d"
+vxhs_iio_callback_chnfail(int err, int error) "QNIO channel failed, no i/o %d, %d"
+vxhs_iio_callback_unknwn(int opcode, int err) "unexpected opcode %d, errno %d"
+vxhs_open_device(char *of_vsa_addr, char *port) "Adding host %s:%s to BDRVVXHSState"
+vxhs_open_device_qnio(char *of_vsa_addr) "Could not open an QNIO connection to: %s"
+vxhs_create(const char *filename) "vxhs_create: came in to open file = %s"
+vxhs_open(const char *filename) "vxhs_vxhs_open: came in to open file = %s"
+vxhs_open_fail(int ret) "Could not open the device. Error = %d"
+vxhs_open_epipe(char c) "Could not create a pipe for device. bailing out%c"
+vxhs_aio_rw(char *guid, int iodir, uint64_t size, uint64_t offset) "vDisk %s, vDisk device is in failed state iodir = %d size = %lu offset = %lu"
+vxhs_aio_rw_retry(char *guid, void *acb, int queue) "vDisk %s, added acb %p to retry queue(%d)"
+vxhs_aio_rw_invalid(int req) "Invalid I/O request iodir %d"
+vxhs_aio_rw_ioerr(char *guid, int iodir, uint64_t size, uint64_t off, void *acb, int seg, int ret, int err) "IO ERROR (vDisk %s) FOR : Read/Write = %d size = %lu offset = %lu ACB = %p Segments = %d. Error = %d, errno = %d"
+vxhs_co_flush(char *guid, int ret, int err) "vDisk (%s) Flush ioctl failed ret = %d errno = %d"
+vxhs_co_flush_iocnt(int iocnt) "In the flush the IO count = %d"
+vxhs_get_vdisk_stat_err(char *guid, int ret, int err) "vDisk (%s) stat ioctl failed, ret = %d, errno = %d"
+vxhs_get_vdisk_stat(char *vdisk_guid, uint64_t vdisk_size) "vDisk %s stat ioctl returned size %lu"
+vxhs_switch_storage_agent(char *ip, char *guid) "Query host %s for vdisk %s"
+vxhs_switch_storage_agent_failed(char *ip, char *guid, int res, int err) "Query to host %s for vdisk %s failed, res = %d, errno = %d"
+vxhs_failover_ioctl_cb(char *ip, char *guid) "Switched to storage server host-IP %s for vdisk %s"
+vxhs_failover_ioctl_cb_retry(char *guid) "failover_ioctl_cb: keep looking for io target for vdisk %s"
+vxhs_failover_io(char *vdisk) "I/O Failover starting for vDisk %s"
+vxhs_reopen_vdisk(char *ip) "Failed to connect to storage agent on host-ip %s"
+vxhs_reopen_vdisk_openfail(char *fname) "Failed to open vdisk device: %s"
+vxhs_handle_queued_ios(void *acb, int res) "Restarted acb %p res %d"
+vxhs_restart_aio(int dir, int res, int err) "IO ERROR FOR: Read/Write = %d Error = %d, errno = %d"
+vxhs_complete_aio(void *acb, uint64_t ret) "aio failed acb %p ret %ld"
+vxhs_aio_rw_iofail(char *guid) "vDisk %s, I/O operation failed."
+vxhs_aio_rw_devfail(char *guid, int dir, uint64_t size, uint64_t off) "vDisk %s, vDisk device failed iodir = %d size = %lu offset = %lu"
+vxhs_parse_filename(char c) "in vxhs_parse_filename %c"
+vxhs_parse_json(const char *vdisk_id) "vdisk_id from json %s"
+vxhs_parse_json_numservers(int num_servers) "Number of servers passed = %d"
+vxhs_parse_json_hostinfo(int num, char *host, char *port) "Host %d: IP %s, Port %s"
+vxhs_parse_uri_hostinfo(int num, char *host, char *port) "Host %d: IP %s, Port %s"
+vxhs_parse_uri_cmdline(const char *cmdline) "Command line passed via URI %s"
diff --git a/block/vxhs.c b/block/vxhs.c
new file mode 100644
index 0000000..3960ae5
--- /dev/null
+++ b/block/vxhs.c
@@ -0,0 +1,1424 @@ 
+/*
+ * QEMU Block driver for Veritas HyperScale (VxHS)
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "vxhs.h"
+#include <qnio/qnio_api.h>
+#include "trace.h"
+#include "qapi/qmp/qerror.h"
+
+#define VXHS_OPT_FILENAME        "filename"
+#define VXHS_OPT_VDISK_ID        "vdisk_id"
+#define VXHS_OPT_SERVER          "server."
+#define VXHS_OPT_HOST            "host"
+#define VXHS_OPT_PORT            "port"
+#define VXHS_OPT_TO              "to"
+#define VXHS_OPT_IPV4            "ipv4"
+#define VXHS_OPT_IPV6            "ipv6"
+#define VXHS_OPT_SERVER_PATTERN  "server."
+
+#define GERR_INDEX_HINT "hint: check in 'server' array index '%d'\n"
+
+/* qnio client ioapi_ctx */
+static void *global_qnio_ctx;
+
+/* insure init once */
+static pthread_mutex_t of_global_ctx_lock = PTHREAD_MUTEX_INITIALIZER;
+
+/* HyperScale Driver Version */
+static const int vxhs_drv_version = 8895;
+
+/* vdisk prefix to pass to qnio */
+static const char vdisk_prefix[] = "/dev/of/vdisk";
+
+void vxhs_inc_acb_segment_count(void *ptr, int count)
+{
+    VXHSAIOCB *acb = ptr;
+    BDRVVXHSState *s = acb->common.bs->opaque;
+
+    VXHS_SPIN_LOCK(s->vdisk_acb_lock);
+    acb->segments += count;
+    VXHS_SPIN_UNLOCK(s->vdisk_acb_lock);
+}
+
+void vxhs_dec_acb_segment_count(void *ptr, int count)
+{
+    VXHSAIOCB *acb = ptr;
+    BDRVVXHSState *s = acb->common.bs->opaque;
+
+    VXHS_SPIN_LOCK(s->vdisk_acb_lock);
+    acb->segments -= count;
+    VXHS_SPIN_UNLOCK(s->vdisk_acb_lock);
+}
+
+int vxhs_dec_and_get_acb_segment_count(void *ptr, int count)
+{
+    VXHSAIOCB *acb = ptr;
+    BDRVVXHSState *s = acb->common.bs->opaque;
+    int segcount = 0;
+
+
+    VXHS_SPIN_LOCK(s->vdisk_acb_lock);
+    acb->segments -= count;
+    segcount = acb->segments;
+    VXHS_SPIN_UNLOCK(s->vdisk_acb_lock);
+
+    return segcount;
+}
+
+void vxhs_set_acb_buffer(void *ptr, void *buffer)
+{
+    VXHSAIOCB *acb = ptr;
+
+    acb->buffer = buffer;
+}
+
+void vxhs_inc_vdisk_iocount(void *ptr, uint32_t count)
+{
+    BDRVVXHSState *s = ptr;
+
+    VXHS_SPIN_LOCK(s->vdisk_lock);
+    s->vdisk_aio_count += count;
+    VXHS_SPIN_UNLOCK(s->vdisk_lock);
+}
+
+void vxhs_dec_vdisk_iocount(void *ptr, uint32_t count)
+{
+    BDRVVXHSState *s = ptr;
+
+    VXHS_SPIN_LOCK(s->vdisk_lock);
+    s->vdisk_aio_count -= count;
+    VXHS_SPIN_UNLOCK(s->vdisk_lock);
+}
+
+uint32_t vxhs_get_vdisk_iocount(void *ptr)
+{
+    BDRVVXHSState *s = ptr;
+    uint32_t count = 0;
+
+    VXHS_SPIN_LOCK(s->vdisk_lock);
+    count = s->vdisk_aio_count;
+    VXHS_SPIN_UNLOCK(s->vdisk_lock);
+
+    return count;
+}
+
+void vxhs_iio_callback(uint32_t rfd, uint32_t reason, void *ctx, void *m)
+{
+    VXHSAIOCB *acb = NULL;
+    BDRVVXHSState *s = NULL;
+    int rv = 0;
+    int segcount = 0;
+    uint32_t error = 0;
+    uint32_t opcode = 0;
+
+    assert(m);
+    if (m) {
+        /* TODO: need common get message attrs, not two separate lib calls */
+        error = qemu_iio_extract_msg_error(m);
+        opcode = qemu_iio_extract_msg_opcode(m);
+    }
+    switch (opcode) {
+    case IRP_READ_REQUEST:
+    case IRP_WRITE_REQUEST:
+
+    /*
+     * ctx is VXHSAIOCB*
+     * ctx is NULL if error is VXERROR_CHANNEL_HUP or reason is IIO_REASON_HUP
+     */
+    if (ctx) {
+        acb = ctx;
+        s = acb->common.bs->opaque;
+    } else {
+        trace_vxhs_iio_callback(error, reason);
+        goto out;
+    }
+
+    if (error) {
+        trace_vxhs_iio_callback_iofail(error, reason, acb, acb->segments);
+
+        if (reason == IIO_REASON_DONE || reason == IIO_REASON_EVENT) {
+            /*
+             * Storage agent failed while I/O was in progress
+             * Fail over only if the qnio channel dropped, indicating
+             * storage agent failure. Don't fail over in response to other
+             * I/O errors such as disk failure.
+             */
+            if (error == VXERROR_RETRY_ON_SOURCE || error == VXERROR_HUP ||
+                error == VXERROR_CHANNEL_HUP || error == -1) {
+                /*
+                 * Start vDisk IO failover once callback is
+                 * called against all the pending IOs.
+                 * If vDisk has no redundency enabled
+                 * then IO failover routine will mark
+                 * the vDisk failed and fail all the
+                 * AIOs without retry (stateless vDisk)
+                 */
+                VXHS_SPIN_LOCK(s->vdisk_lock);
+                if (!OF_VDISK_IOFAILOVER_IN_PROGRESS(s)) {
+                    OF_VDISK_SET_IOFAILOVER_IN_PROGRESS(s);
+                }
+                /*
+                 * Check if this acb is already queued before.
+                 * It is possible in case if I/Os are submitted
+                 * in multiple segments (QNIO_MAX_IO_SIZE).
+                 */
+                VXHS_SPIN_LOCK(s->vdisk_acb_lock);
+                if (!OF_AIOCB_FLAGS_QUEUED(acb)) {
+                    QSIMPLEQ_INSERT_TAIL(&s->vdisk_aio_retryq,
+                                         acb, retry_entry);
+                    OF_AIOCB_FLAGS_SET_QUEUED(acb);
+                    s->vdisk_aio_retry_qd++;
+                    trace_vxhs_iio_callback_retry(s->vdisk_guid, acb);
+                }
+                segcount = --acb->segments;
+                VXHS_SPIN_UNLOCK(s->vdisk_acb_lock);
+                /*
+                 * Decrement AIO count only when callback is called
+                 * against all the segments of aiocb.
+                 */
+                if (segcount == 0 && --s->vdisk_aio_count == 0) {
+                    /*
+                     * Start vDisk I/O failover
+                     */
+                    VXHS_SPIN_UNLOCK(s->vdisk_lock);
+                    /*
+                     * TODO:
+                     * Need to explore further if it is possible to optimize
+                     * the failover operation on Virtual-Machine (global)
+                     * specific rather vDisk specific.
+                     */
+                    vxhs_failover_io(s);
+                    goto out;
+                }
+                VXHS_SPIN_UNLOCK(s->vdisk_lock);
+                goto out;
+            }
+        } else if (reason == IIO_REASON_HUP) {
+            /*
+             * Channel failed, spontaneous notification,
+             * not in response to I/O
+             */
+            trace_vxhs_iio_callback_chnlfail(error);
+            /*
+             * TODO: Start channel failover when no I/O is outstanding
+             */
+            goto out;
+        } else {
+            trace_vxhs_iio_callback_fail(reason, acb, acb->segments, acb->size, error);
+        }
+    }
+    /*
+     * Set error into acb if not set. In case if acb is being
+     * submitted in multiple segments then need to set the error
+     * only once.
+     *
+     * Once acb done callback is called for the last segment
+     * then acb->ret return status will be sent back to the
+     * caller.
+     */
+    VXHS_SPIN_LOCK(s->vdisk_acb_lock);
+    if (error && !acb->ret) {
+        acb->ret = error;
+    }
+    --acb->segments;
+    segcount = acb->segments;
+    assert(segcount >= 0);
+    VXHS_SPIN_UNLOCK(s->vdisk_acb_lock);
+    /*
+     * Check if all the outstanding I/Os are done against acb.
+     * If yes then send signal for AIO completion.
+     */
+    if (segcount == 0) {
+        rv = qemu_write_full(s->fds[VDISK_FD_WRITE], &acb, sizeof(acb));
+        if (rv != sizeof(acb)) {
+            error_report("VXHS AIO completion failed: %s", strerror(errno));
+            abort();
+        }
+    }
+    break;
+
+    case IRP_VDISK_CHECK_IO_FAILOVER_READY:
+        /* ctx is BDRVVXHSState* */
+        assert(ctx);
+        trace_vxhs_iio_callback_ready(((BDRVVXHSState *)ctx)->vdisk_guid, error);
+        vxhs_failover_ioctl_cb(error, ctx);
+        break;
+
+    default:
+        if (reason == IIO_REASON_HUP) {
+            /*
+             * Channel failed, spontaneous notification,
+             * not in response to I/O
+             */
+            trace_vxhs_iio_callback_chnfail(error, errno);
+            /*
+             * TODO: Start channel failover when no I/O is outstanding
+             */
+        } else {
+            trace_vxhs_iio_callback_unknwn(opcode, error);
+        }
+        break;
+    }
+out:
+    return;
+}
+
+void vxhs_complete_aio(VXHSAIOCB *acb, BDRVVXHSState *s)
+{
+    BlockCompletionFunc *cb = acb->common.cb;
+    void *opaque = acb->common.opaque;
+    int ret = 0;
+
+    if (acb->ret != 0) {
+        trace_vxhs_complete_aio(acb, acb->ret);
+    /*
+     * We mask all the IO errors generically as EIO for upper layers
+     * Right now our IO Manager uses non standard error codes. Instead
+     * of confusing upper layers with incorrect interpretation we are
+     * doing this workaround.
+     */
+        ret = (-EIO);
+    }
+    /*
+     * Copy back contents from stablization buffer into original iovector
+     * before returning the IO
+     */
+    if (acb->buffer != NULL) {
+        qemu_iovec_from_buf(acb->qiov, 0, acb->buffer, acb->qiov->size);
+        free(acb->buffer);
+        acb->buffer = NULL;
+    }
+    vxhs_dec_vdisk_iocount(s, 1);
+    acb->aio_done = VXHS_IO_COMPLETED;
+    qemu_aio_unref(acb);
+    cb(opaque, ret);
+}
+
+/*
+ * This is the HyperScale event handler registered to QEMU.
+ * It is invoked when any IO gets completed and written on pipe
+ * by callback called from QNIO thread context. Then it marks
+ * the AIO as completed, and releases HyperScale AIO callbacks.
+ */
+void vxhs_aio_event_reader(void *opaque)
+{
+    BDRVVXHSState *s = opaque;
+    ssize_t ret;
+
+    do {
+        char *p = (char *)&s->qnio_event_acb;
+
+        ret = read(s->fds[VDISK_FD_READ], p + s->event_reader_pos,
+                   sizeof(s->qnio_event_acb) - s->event_reader_pos);
+        if (ret > 0) {
+            s->event_reader_pos += ret;
+            if (s->event_reader_pos == sizeof(s->qnio_event_acb)) {
+                s->event_reader_pos = 0;
+                vxhs_complete_aio(s->qnio_event_acb, s);
+            }
+        }
+    } while (ret < 0 && errno == EINTR);
+}
+
+/*
+ * QEMU calls this to check if there are any pending IO on vDisk.
+ * It will wait in a loop until all the AIOs are completed.
+ */
+int vxhs_aio_flush_cb(void *opaque)
+{
+    BDRVVXHSState *s = opaque;
+
+    return vxhs_get_vdisk_iocount(s);
+}
+
+/*
+ * This will be called by QEMU while booting for each vDisk.
+ * bs->opaque will be allocated by QEMU upper block layer before
+ * calling open. It will load all the QNIO operations from
+ * qemuqnio library and call QNIO operation to create channel to
+ * do IO on vDisk. It parses the URI, gets the hostname, vDisk
+ * path and then sets HyperScale event handler to QEMU.
+ */
+void *vxhs_setup_qnio(void)
+{
+    void *qnio_ctx = NULL;
+
+    qnio_ctx = qemu_iio_init(vxhs_iio_callback);
+
+    if (qnio_ctx != NULL) {
+        trace_vxhs_setup_qnio(qnio_ctx);
+    } else {
+        trace_vxhs_setup_qnio_nwerror('.');
+    }
+
+    return qnio_ctx;
+}
+
+static QemuOptsList runtime_opts = {
+    .name = "vxhs",
+    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
+    .desc = {
+        {
+            .name = VXHS_OPT_FILENAME,
+            .type = QEMU_OPT_STRING,
+            .help = "URI to the Veritas HyperScale image",
+        },
+        { /* end of list */ }
+    },
+};
+
+static QemuOptsList runtime_tcp_opts = {
+    .name = "vxhs_tcp",
+    .head = QTAILQ_HEAD_INITIALIZER(runtime_tcp_opts.head),
+    .desc = {
+        {
+            .name = VXHS_OPT_HOST,
+            .type = QEMU_OPT_STRING,
+            .help = "host address (ipv4 addresses)",
+        },
+        {
+            .name = VXHS_OPT_PORT,
+            .type = QEMU_OPT_NUMBER,
+            .help = "port number on which VxHSD is listening (default 9999)",
+        },
+        {
+            .name = "to",
+            .type = QEMU_OPT_NUMBER,
+            .help = "max port number, not supported by VxHS",
+        },
+        {
+            .name = "ipv4",
+            .type = QEMU_OPT_BOOL,
+            .help = "ipv4 bool value, not supported by VxHS",
+        },
+        {
+            .name = "ipv6",
+            .type = QEMU_OPT_BOOL,
+            .help = "ipv6 bool value, not supported by VxHS",
+        },
+        { /* end of list */ }
+    },
+};
+
+static QemuOptsList runtime_json_opts = {
+    .name = "vxhs_json",
+    .head = QTAILQ_HEAD_INITIALIZER(runtime_json_opts.head),
+    .desc = {
+        {
+            .name = VXHS_OPT_VDISK_ID,
+            .type = QEMU_OPT_STRING,
+            .help = "UUID of the VxHS vdisk",
+        },
+        { /* end of list */ }
+    },
+};
+
+
+/*
+ * Convert the json formatted command line into qapi.
+*/
+
+static int vxhs_parse_json(BlockdevOptionsVxHS *conf,
+                                  QDict *options, Error **errp)
+{
+    QemuOpts *opts;
+    InetSocketAddress *vxhsconf;
+    InetSocketAddressList *curr = NULL;
+    QDict *backing_options = NULL;
+    Error *local_err = NULL;
+    char *str = NULL;
+    const char *ptr = NULL;
+    size_t num_servers;
+    int i;
+
+    /* create opts info from runtime_json_opts list */
+    opts = qemu_opts_create(&runtime_json_opts, NULL, 0, &error_abort);
+    qemu_opts_absorb_qdict(opts, options, &local_err);
+    if (local_err) {
+        goto out;
+    }
+
+    ptr = qemu_opt_get(opts, VXHS_OPT_VDISK_ID);
+    if (!ptr) {
+        error_setg(&local_err, QERR_MISSING_PARAMETER, VXHS_OPT_VDISK_ID);
+        goto out;
+    }
+    conf->vdisk_id = g_strdup(ptr);
+    trace_vxhs_parse_json(ptr);
+
+    num_servers = qdict_array_entries(options, VXHS_OPT_SERVER);
+    if (num_servers < 1) {
+        error_setg(&local_err, QERR_MISSING_PARAMETER, "server");
+        goto out;
+    }
+    trace_vxhs_parse_json_numservers(num_servers);
+    qemu_opts_del(opts);
+
+    for (i = 0; i < num_servers; i++) {
+            str = g_strdup_printf(VXHS_OPT_SERVER_PATTERN"%d.", i);
+            qdict_extract_subqdict(options, &backing_options, str);
+
+            /* create opts info from runtime_tcp_opts list */
+            opts = qemu_opts_create(&runtime_tcp_opts, NULL, 0, &error_abort);
+            qemu_opts_absorb_qdict(opts, backing_options, &local_err);
+            if (local_err) {
+                goto out;
+            }
+
+            vxhsconf = g_new0(InetSocketAddress, 1);
+            ptr = qemu_opt_get(opts, VXHS_OPT_HOST);
+            if (!ptr) {
+                error_setg(&local_err, QERR_MISSING_PARAMETER,
+                           VXHS_OPT_HOST);
+                error_append_hint(&local_err, GERR_INDEX_HINT, i);
+                goto out;
+            }
+            vxhsconf->host = g_strdup(ptr);
+
+            ptr = qemu_opt_get(opts, VXHS_OPT_PORT);
+            if (!ptr) {
+                error_setg(&local_err, QERR_MISSING_PARAMETER,
+                           VXHS_OPT_PORT);
+                error_append_hint(&local_err, GERR_INDEX_HINT, i);
+                goto out;
+            }
+            vxhsconf->port = g_strdup(ptr);
+
+            /* defend for unsupported fields in InetSocketAddress,
+             * i.e. @ipv4, @ipv6  and @to
+             */
+            ptr = qemu_opt_get(opts, VXHS_OPT_TO);
+            if (ptr) {
+                vxhsconf->has_to = true;
+            }
+            ptr = qemu_opt_get(opts, VXHS_OPT_IPV4);
+            if (ptr) {
+                vxhsconf->has_ipv4 = true;
+            }
+            ptr = qemu_opt_get(opts, VXHS_OPT_IPV6);
+            if (ptr) {
+                vxhsconf->has_ipv6 = true;
+            }
+            if (vxhsconf->has_to) {
+                error_setg(&local_err, "Parameter 'to' not supported");
+                goto out;
+            }
+            if (vxhsconf->has_ipv4 || vxhsconf->has_ipv6) {
+                error_setg(&local_err, "Parameters 'ipv4/ipv6' not supported");
+                goto out;
+            }
+            trace_vxhs_parse_json_hostinfo(i+1, vxhsconf->host, vxhsconf->port);
+
+            if (conf->server == NULL) {
+                conf->server = g_new0(InetSocketAddressList, 1);
+                conf->server->value = vxhsconf;
+                curr = conf->server;
+            } else {
+                curr->next = g_new0(InetSocketAddressList, 1);
+                curr->next->value = vxhsconf;
+                curr = curr->next;
+            }
+
+            qdict_del(backing_options, str);
+            qemu_opts_del(opts);
+            g_free(str);
+            str = NULL;
+        }
+
+    return 0;
+
+out:
+    error_propagate(errp, local_err);
+    qemu_opts_del(opts);
+    if (str) {
+        qdict_del(backing_options, str);
+        g_free(str);
+    }
+    errno = EINVAL;
+    return -errno;
+}
+
+/*
+ * vxhs_parse_uri: Parse the incoming URI and populate *conf with the
+ * vdisk_id, and all the host(s) information. Host at index 0 is local storage
+ * agent and the rest, reflection target storage agents. The local storage
+ * agent ip is the efficient internal address in the uri, e.g. 192.168.0.2.
+ * The local storage agent address is stored at index 0. The reflection target
+ * ips, are the E-W data network addresses of the reflection node agents, also
+ * extracted from the uri.
+ */
+static int vxhs_parse_uri(BlockdevOptionsVxHS *conf,
+                               const char *filename)
+{
+    gchar **target_list;
+    URI *uri = NULL;
+    int i = 0;
+    InetSocketAddress *vxhsconf;
+    InetSocketAddressList *curr = NULL;
+
+    trace_vxhs_parse_uri_cmdline(filename);
+    target_list = g_strsplit(filename, "%7D", 0);
+    assert(target_list != NULL && target_list[0] != NULL);
+
+    for (i = 0; target_list[i] != NULL && *target_list[i]; i++) {
+        uri = uri_parse(target_list[i]);
+        assert(uri != NULL && uri->server != NULL);
+
+        vxhsconf = g_new0(InetSocketAddress, 1);
+        vxhsconf->host = g_strdup(uri->server);
+        vxhsconf->port = g_strdup_printf("%d", uri->port);
+
+        if (i==0 && (strstr(uri->path, "vxhs") == NULL)) {
+            conf->vdisk_id = g_new0(char, strlen(uri->path)+3);
+            strcpy((conf->vdisk_id), uri->path);
+            strcat((conf->vdisk_id), "}");
+        }
+
+        trace_vxhs_parse_uri_hostinfo(i+1, vxhsconf->host, vxhsconf->port);
+        if (conf->server == NULL) {
+            conf->server = g_new0(InetSocketAddressList, 1);
+            conf->server->value = vxhsconf;
+            curr = conf->server;
+        } else {
+            curr->next = g_new0(InetSocketAddressList, 1);
+            curr->next->value = vxhsconf;
+            curr = curr->next;
+        }
+        uri_free(uri);
+    }
+
+    g_strfreev(target_list);
+    return 0;
+}
+
+static void *qemu_vxhs_init(BlockdevOptionsVxHS *conf,
+                            const char *filename,
+                            QDict *options, Error **errp)
+{
+    int ret;
+
+    if (filename) {
+        ret = vxhs_parse_uri(conf, filename);
+        if (ret < 0) {
+            error_setg(errp, "invalid URI");
+            error_append_hint(errp, "Usage: file=vxhs://"
+                                    "[host[:port]]/{VDISK_UUID}\n");
+            errno = -ret;
+            return NULL;
+        }
+    } else {
+        ret = vxhs_parse_json(conf, options, errp);
+        if (ret < 0) {
+            error_append_hint(errp, "Usage: "
+                             "json:{\"driver\":\"vxhs\","
+                             "\"vdisk_id\":\"{VDISK_UUID}\","
+                             "\"server\":[{\"host\":\"1.2.3.4\","
+                             "\"port\":\"9999\"}"
+                             ",{\"host\":\"4.5.6.7\",\"port\":\"9999\"}]}"
+                             "\n");
+            errno = -ret;
+            return NULL;
+        }
+
+    }
+
+            return NULL;
+}
+
+int vxhs_open_device(BlockdevOptionsVxHS *conf, int *cfd, int *rfd,
+                       BDRVVXHSState *s)
+{
+    InetSocketAddressList *curr = NULL;
+    char *file_name;
+    char *of_vsa_addr;
+    int ret = 0;
+    int i = 0;
+
+    pthread_mutex_lock(&of_global_ctx_lock);
+    if (global_qnio_ctx == NULL) {
+        global_qnio_ctx = vxhs_setup_qnio();
+        if (global_qnio_ctx == NULL) {
+            pthread_mutex_unlock(&of_global_ctx_lock);
+            return -1;
+        }
+    }
+    pthread_mutex_unlock(&of_global_ctx_lock);
+
+    curr = conf->server;
+    s->vdisk_guid = g_strdup(conf->vdisk_id);
+
+    for (i = 0; curr != NULL; i++) {
+        s->vdisk_hostinfo[i].hostip = g_strdup(curr->value->host);
+        s->vdisk_hostinfo[i].port = g_ascii_strtoll(curr->value->port, NULL, 0);
+
+        s->vdisk_hostinfo[i].qnio_cfd = -1;
+        s->vdisk_hostinfo[i].vdisk_rfd = -1;
+        trace_vxhs_open_device(curr->value->host, curr->value->port);
+        curr = curr->next;
+    }
+    s->vdisk_nhosts = i;
+    s->vdisk_cur_host_idx = 0;
+
+
+    *cfd = -1;
+    of_vsa_addr = g_new0(char, OF_MAX_SERVER_ADDR);
+    file_name = g_new0(char, OF_MAX_FILE_LEN);
+    snprintf(file_name, OF_MAX_FILE_LEN, "%s%s", vdisk_prefix, s->vdisk_guid);
+    snprintf(of_vsa_addr, OF_MAX_SERVER_ADDR, "of://%s:%d",
+             s->vdisk_hostinfo[s->vdisk_cur_host_idx].hostip,
+             s->vdisk_hostinfo[s->vdisk_cur_host_idx].port);
+
+    *cfd = qemu_open_iio_conn(global_qnio_ctx, of_vsa_addr, 0);
+    if (*cfd < 0) {
+        trace_vxhs_open_device_qnio(of_vsa_addr);
+        ret = -EIO;
+        goto out;
+    }
+    *rfd = qemu_iio_devopen(global_qnio_ctx, *cfd, file_name, 0);
+    s->aio_context = qemu_get_aio_context();
+
+out:
+    if (file_name != NULL) {
+        g_free(file_name);
+    }
+    if (of_vsa_addr != NULL) {
+        g_free(of_vsa_addr);
+    }
+
+    return ret;
+}
+
+int vxhs_create(const char *filename,
+        QemuOpts *options, Error **errp)
+{
+    int ret = 0;
+    int qemu_cfd = 0;
+    int qemu_rfd = 0;
+    BDRVVXHSState s;
+    BlockdevOptionsVxHS *conf = NULL;
+
+    conf = g_new0(BlockdevOptionsVxHS, 1);
+    trace_vxhs_create(filename);
+    qemu_vxhs_init(conf, filename, NULL, errp);
+    ret = vxhs_open_device(conf, &qemu_cfd, &qemu_rfd, &s);
+
+    qapi_free_BlockdevOptionsVxHS(conf);
+    return ret;
+}
+
+int vxhs_open(BlockDriverState *bs, QDict *options,
+              int bdrv_flags, Error **errp)
+{
+    BDRVVXHSState *s = bs->opaque;
+    int ret = 0;
+    int qemu_qnio_cfd = 0;
+    int qemu_rfd = 0;
+    QemuOpts *opts;
+    Error *local_err = NULL;
+    const char *vxhs_uri;
+    BlockdevOptionsVxHS *conf = NULL;
+
+    opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
+    qemu_opts_absorb_qdict(opts, options, &local_err);
+    if (local_err) {
+        error_propagate(errp, local_err);
+        ret = -EINVAL;
+        goto out;
+    }
+    vxhs_uri = qemu_opt_get(opts, VXHS_OPT_FILENAME);
+
+    conf = g_new0(BlockdevOptionsVxHS, 1);
+
+    qemu_vxhs_init(conf, vxhs_uri, options, errp);
+    memset(s, 0, sizeof(*s));
+    trace_vxhs_open(vxhs_uri);
+    ret = vxhs_open_device(conf, &qemu_qnio_cfd, &qemu_rfd, s);
+    if (ret != 0) {
+        trace_vxhs_open_fail(ret);
+        return ret;
+    }
+    s->qnio_ctx = global_qnio_ctx;
+    s->vdisk_hostinfo[0].qnio_cfd = qemu_qnio_cfd;
+    s->vdisk_hostinfo[0].vdisk_rfd = qemu_rfd;
+    s->vdisk_size = 0;
+    QSIMPLEQ_INIT(&s->vdisk_aio_retryq);
+
+    ret = qemu_pipe(s->fds);
+    if (ret < 0) {
+        trace_vxhs_open_epipe('.');
+        ret = -errno;
+        goto out;
+    }
+    fcntl(s->fds[VDISK_FD_READ], F_SETFL, O_NONBLOCK);
+
+    aio_set_fd_handler(s->aio_context, s->fds[VDISK_FD_READ],
+                       false, vxhs_aio_event_reader, NULL, s);
+
+    /*
+     * Allocate/Initialize the spin-locks.
+     *
+     * NOTE:
+     *      Since spin lock is being allocated
+     *      dynamically hence moving acb struct
+     *      specific lock to BDRVVXHSState
+     *      struct. The reason being,
+     *      we don't want the overhead of spin
+     *      lock being dynamically allocated and
+     *      freed for every AIO.
+     */
+    s->vdisk_lock = VXHS_SPIN_LOCK_ALLOC;
+    s->vdisk_acb_lock = VXHS_SPIN_LOCK_ALLOC;
+
+    qapi_free_BlockdevOptionsVxHS(conf);
+    return 0;
+
+out:
+    if (s->vdisk_hostinfo[0].vdisk_rfd >= 0) {
+        qemu_iio_devclose(s->qnio_ctx, 0,
+                                 s->vdisk_hostinfo[0].vdisk_rfd);
+    }
+    /* never close qnio_cfd */
+    trace_vxhs_open_fail(ret);
+    qapi_free_BlockdevOptionsVxHS(conf);
+    return ret;
+}
+
+static const AIOCBInfo vxhs_aiocb_info = {
+    .aiocb_size = sizeof(VXHSAIOCB)
+};
+
+/*
+ * This is called in QNIO thread context when IO done
+ * on IO Manager and QNIO client received the data or
+ * ACK. It notify another event handler thread running in QEMU context
+ * by writing on the pipe
+ */
+void vxhs_finish_aiocb(ssize_t ret, void *arg)
+{
+    VXHSAIOCB *acb = arg;
+    BlockDriverState *bs = acb->common.bs;
+    BDRVVXHSState *s = bs->opaque;
+    int rv;
+
+    acb->ret = ret;
+    rv = qemu_write_full(s->fds[VDISK_FD_WRITE], &acb, sizeof(acb));
+    if (rv != sizeof(acb)) {
+        error_report("VXHS AIO completion failed: %s",
+                     strerror(errno));
+        abort();
+    }
+}
+
+/*
+ * This allocates QEMU-VXHS callback for each IO
+ * and is passed to QNIO. When QNIO completes the work,
+ * it will be passed back through the callback.
+ */
+BlockAIOCB *vxhs_aio_rw(BlockDriverState *bs,
+                                int64_t sector_num, QEMUIOVector *qiov,
+                                int nb_sectors,
+                                BlockCompletionFunc *cb,
+                                void *opaque, int iodir)
+{
+    VXHSAIOCB         *acb = NULL;
+    BDRVVXHSState     *s = bs->opaque;
+    size_t              size;
+    uint64_t            offset;
+    int                 iio_flags = 0;
+    int                 ret = 0;
+
+    offset = sector_num * BDRV_SECTOR_SIZE;
+    size = nb_sectors * BDRV_SECTOR_SIZE;
+
+    acb = qemu_aio_get(&vxhs_aiocb_info, bs, cb, opaque);
+    /*
+     * Setup or initialize VXHSAIOCB.
+     * Every single field should be initialized since
+     * acb will be picked up from the slab without
+     * initializing with zero.
+     */
+    acb->io_offset = offset;
+    acb->size = size;
+    acb->ret = 0;
+    acb->flags = 0;
+    acb->aio_done = VXHS_IO_INPROGRESS;
+    acb->segments = 0;
+    acb->buffer = 0;
+    acb->qiov = qiov;
+    acb->direction = iodir;
+
+    VXHS_SPIN_LOCK(s->vdisk_lock);
+    if (OF_VDISK_FAILED(s)) {
+        trace_vxhs_aio_rw(s->vdisk_guid, iodir, size, offset);
+        VXHS_SPIN_UNLOCK(s->vdisk_lock);
+        goto errout;
+    }
+    if (OF_VDISK_IOFAILOVER_IN_PROGRESS(s)) {
+        QSIMPLEQ_INSERT_TAIL(&s->vdisk_aio_retryq, acb, retry_entry);
+        s->vdisk_aio_retry_qd++;
+        OF_AIOCB_FLAGS_SET_QUEUED(acb);
+        VXHS_SPIN_UNLOCK(s->vdisk_lock);
+        trace_vxhs_aio_rw_retry(s->vdisk_guid, acb, 1);
+        goto out;
+    }
+    s->vdisk_aio_count++;
+    VXHS_SPIN_UNLOCK(s->vdisk_lock);
+
+    iio_flags = (IIO_FLAG_DONE | IIO_FLAG_ASYNC);
+
+    switch (iodir) {
+    case VDISK_AIO_WRITE:
+            vxhs_inc_acb_segment_count(acb, 1);
+            ret = qemu_iio_writev(s->qnio_ctx,
+                    s->vdisk_hostinfo[s->vdisk_cur_host_idx].vdisk_rfd,
+                    qiov->iov, qiov->niov, offset, (void *)acb, iio_flags);
+            break;
+    case VDISK_AIO_READ:
+            vxhs_inc_acb_segment_count(acb, 1);
+            ret = qemu_iio_readv(s->qnio_ctx,
+                    s->vdisk_hostinfo[s->vdisk_cur_host_idx].vdisk_rfd,
+                    qiov->iov, qiov->niov, offset, (void *)acb, iio_flags);
+            break;
+    default:
+            trace_vxhs_aio_rw_invalid(iodir);
+            goto errout;
+    }
+
+    if (ret != 0) {
+        trace_vxhs_aio_rw_ioerr(
+                  s->vdisk_guid, iodir, size, offset,
+                  acb, acb->segments, ret, errno);
+        /*
+         * Don't retry I/Os against vDisk having no
+         * redundency or statefull storage on compute
+         *
+         * TODO: Revisit this code path to see if any
+         *       particular error needs to be handled.
+         *       At this moment failing the I/O.
+         */
+        VXHS_SPIN_LOCK(s->vdisk_lock);
+        if (s->vdisk_nhosts == 1) {
+            trace_vxhs_aio_rw_iofail(s->vdisk_guid);
+            s->vdisk_aio_count--;
+            vxhs_dec_acb_segment_count(acb, 1);
+            VXHS_SPIN_UNLOCK(s->vdisk_lock);
+            goto errout;
+        }
+        if (OF_VDISK_FAILED(s)) {
+            trace_vxhs_aio_rw_devfail(
+                      s->vdisk_guid, iodir, size, offset);
+            s->vdisk_aio_count--;
+            vxhs_dec_acb_segment_count(acb, 1);
+            VXHS_SPIN_UNLOCK(s->vdisk_lock);
+            goto errout;
+        }
+        if (OF_VDISK_IOFAILOVER_IN_PROGRESS(s)) {
+            /*
+             * Queue all incoming io requests after failover starts.
+             * Number of requests that can arrive is limited by io queue depth
+             * so an app blasting independent ios will not exhaust memory.
+             */
+            QSIMPLEQ_INSERT_TAIL(&s->vdisk_aio_retryq, acb, retry_entry);
+            s->vdisk_aio_retry_qd++;
+            OF_AIOCB_FLAGS_SET_QUEUED(acb);
+            s->vdisk_aio_count--;
+            vxhs_dec_acb_segment_count(acb, 1);
+            VXHS_SPIN_UNLOCK(s->vdisk_lock);
+            trace_vxhs_aio_rw_retry(s->vdisk_guid, acb, 2);
+            goto out;
+        }
+        OF_VDISK_SET_IOFAILOVER_IN_PROGRESS(s);
+        QSIMPLEQ_INSERT_TAIL(&s->vdisk_aio_retryq, acb, retry_entry);
+        s->vdisk_aio_retry_qd++;
+        OF_AIOCB_FLAGS_SET_QUEUED(acb);
+        vxhs_dec_acb_segment_count(acb, 1);
+        trace_vxhs_aio_rw_retry(s->vdisk_guid, acb, 3);
+        /*
+         * Start I/O failover if there is no active
+         * AIO within vxhs block driver.
+         */
+        if (--s->vdisk_aio_count == 0) {
+            VXHS_SPIN_UNLOCK(s->vdisk_lock);
+            /*
+             * Start IO failover
+             */
+            vxhs_failover_io(s);
+            goto out;
+        }
+        VXHS_SPIN_UNLOCK(s->vdisk_lock);
+    }
+
+out:
+    return &acb->common;
+
+errout:
+    qemu_aio_unref(acb);
+    return NULL;
+}
+
+BlockAIOCB *vxhs_aio_readv(BlockDriverState *bs,
+                                   int64_t sector_num, QEMUIOVector *qiov,
+                                   int nb_sectors,
+                                   BlockCompletionFunc *cb, void *opaque)
+{
+    return vxhs_aio_rw(bs, sector_num, qiov, nb_sectors,
+                         cb, opaque, VDISK_AIO_READ);
+}
+
+BlockAIOCB *vxhs_aio_writev(BlockDriverState *bs,
+                                    int64_t sector_num, QEMUIOVector *qiov,
+                                    int nb_sectors,
+                                    BlockCompletionFunc *cb, void *opaque)
+{
+    return vxhs_aio_rw(bs, sector_num, qiov, nb_sectors,
+                         cb, opaque, VDISK_AIO_WRITE);
+}
+
+/*
+ * This is called by QEMU when a flush gets triggered from within
+ * a guest at the block layer, either for IDE or SCSI disks.
+ */
+int vxhs_co_flush(BlockDriverState *bs)
+{
+    BDRVVXHSState *s = bs->opaque;
+    uint64_t size = 0;
+    int ret = 0;
+    uint32_t iocount = 0;
+
+    ret = qemu_iio_ioctl(s->qnio_ctx,
+            s->vdisk_hostinfo[s->vdisk_cur_host_idx].vdisk_rfd,
+            VDISK_AIO_FLUSH, &size, NULL, IIO_FLAG_SYNC);
+
+    if (ret < 0) {
+        /*
+         * Currently not handling the flush ioctl
+         * failure because of network connection
+         * disconnect. Since all the writes are
+         * commited into persistent storage hence
+         * this flush call is noop and we can safely
+         * return success status to the caller.
+         *
+         * If any write failure occurs for inflight
+         * write AIO because of network disconnect
+         * then anyway IO failover will be triggered.
+         */
+        trace_vxhs_co_flush(s->vdisk_guid, ret, errno);
+        ret = 0;
+    }
+
+    iocount = vxhs_get_vdisk_iocount(s);
+    if (iocount > 0) {
+        trace_vxhs_co_flush_iocnt(iocount);
+    }
+
+    return ret;
+}
+
+unsigned long vxhs_get_vdisk_stat(BDRVVXHSState *s)
+{
+    void *ctx = NULL;
+    int flags = 0;
+    unsigned long vdisk_size = 0;
+    int ret = 0;
+
+    ret = qemu_iio_ioctl(s->qnio_ctx,
+            s->vdisk_hostinfo[s->vdisk_cur_host_idx].vdisk_rfd,
+            VDISK_STAT, &vdisk_size, ctx, flags);
+
+    if (ret < 0) {
+        trace_vxhs_get_vdisk_stat_err(s->vdisk_guid, ret, errno);
+    }
+
+    trace_vxhs_get_vdisk_stat(s->vdisk_guid, vdisk_size);
+    return vdisk_size;
+}
+
+/*
+ * Returns the size of vDisk in bytes. This is required
+ * by QEMU block upper block layer so that it is visible
+ * to guest.
+ */
+int64_t vxhs_getlength(BlockDriverState *bs)
+{
+    BDRVVXHSState *s = bs->opaque;
+    unsigned long vdisk_size = 0;
+
+    if (s->vdisk_size > 0) {
+        vdisk_size = s->vdisk_size;
+    } else {
+        /*
+         * Fetch the vDisk size using stat ioctl
+         */
+        vdisk_size = vxhs_get_vdisk_stat(s);
+        if (vdisk_size > 0) {
+            s->vdisk_size = vdisk_size;
+        }
+    }
+
+    if (vdisk_size > 0) {
+        return (int64_t)vdisk_size; /* return size in bytes */
+    } else {
+        return -EIO;
+    }
+}
+
+/*
+ * Returns actual blocks allocated for the vDisk.
+ * This is required by qemu-img utility.
+ */
+int64_t vxhs_get_allocated_blocks(BlockDriverState *bs)
+{
+    BDRVVXHSState *s = bs->opaque;
+    unsigned long vdisk_size = 0;
+
+    if (s->vdisk_size > 0) {
+        vdisk_size = s->vdisk_size;
+    } else {
+        /*
+         * TODO:
+         * Once HyperScale storage-virtualizer provides
+         * actual physical allocation of blocks then
+         * fetch that information and return back to the
+         * caller but for now just get the full size.
+         */
+        vdisk_size = vxhs_get_vdisk_stat(s);
+        if (vdisk_size > 0) {
+            s->vdisk_size = vdisk_size;
+        }
+    }
+
+    if (vdisk_size > 0) {
+        return (int64_t)vdisk_size; /* return size in bytes */
+    } else {
+        return -EIO;
+    }
+}
+
+void vxhs_close(BlockDriverState *bs)
+{
+    BDRVVXHSState *s = bs->opaque;
+
+    close(s->fds[VDISK_FD_READ]);
+    close(s->fds[VDISK_FD_WRITE]);
+
+    /*
+     * never close channel - not ref counted, will
+     * close for all vdisks
+     */
+    if (s->vdisk_hostinfo[s->vdisk_cur_host_idx].vdisk_rfd >= 0) {
+        qemu_iio_devclose(s->qnio_ctx, 0,
+            s->vdisk_hostinfo[s->vdisk_cur_host_idx].vdisk_rfd);
+    }
+    if (s->vdisk_lock) {
+        VXHS_SPIN_LOCK_DESTROY(s->vdisk_lock);
+        s->vdisk_lock = NULL;
+    }
+    if (s->vdisk_acb_lock) {
+        VXHS_SPIN_LOCK_DESTROY(s->vdisk_acb_lock);
+        s->vdisk_acb_lock = NULL;
+    }
+
+    /*
+     * TODO: Verify that all the resources were relinguished.
+     */
+}
+
+/*
+ * If errors are consistent with storage agent failure:
+ *  - Try to reconnect in case error is transient or storage agent restarted.
+ *  - Currently failover is being triggered on per vDisk basis. There is
+ *    a scope of further optimization where failover can be global (per VM).
+ *  - In case of network (storage agent) failure, for all the vDisks, having
+ *    no redundency, I/Os will be failed without attempting for I/O failover
+ *    because of stateless nature of vDisk.
+ *  - If local or source storage agent is down then send an ioctl to remote
+ *    storage agent to check if remote storage agent in a state to accept
+ *    application I/Os.
+ *  - Once remote storage agent is ready to accept I/O, start I/O shipping.
+ *  - If I/Os cannot be serviced then vDisk will be marked failed so that
+ *    new incoming I/Os are returned with failure immediately.
+ *  - If vDisk I/O failover is in progress then all new/inflight I/Os will
+ *    queued and will be restarted or failed based on failover operation
+ *    is successful or not.
+ *  - I/O failover can be started either in I/O forward or I/O backward
+ *    path.
+ *  - I/O failover will be started as soon as all the pending acb(s)
+ *    are queued and there is no pending I/O count.
+ *  - If I/O failover couldn't be completed within QNIO_CONNECT_TIMOUT_SECS
+ *    then vDisk will be marked failed and all I/Os will be completed with
+ *    error.
+ */
+
+int vxhs_switch_storage_agent(BDRVVXHSState *s)
+{
+    int res = 0;
+    int flags = (IIO_FLAG_ASYNC | IIO_FLAG_DONE);
+
+    trace_vxhs_switch_storage_agent(
+              s->vdisk_hostinfo[s->vdisk_ask_failover_idx].hostip,
+              s->vdisk_guid);
+
+    res = vxhs_reopen_vdisk(s, s->vdisk_ask_failover_idx);
+    if (res == 0) {
+        res = qemu_iio_ioctl(s->qnio_ctx,
+                   s->vdisk_hostinfo[s->vdisk_ask_failover_idx].vdisk_rfd,
+                   VDISK_CHECK_IO_FAILOVER_READY, NULL, s, flags);
+    }
+    if (res != 0) {
+    trace_vxhs_switch_storage_agent_failed(
+                  s->vdisk_hostinfo[s->vdisk_ask_failover_idx].hostip,
+                  s->vdisk_guid, res, errno);
+        /*
+         * TODO: calling vxhs_failover_ioctl_cb from here ties up the qnio epoll
+         * loop if qemu_iio_ioctl fails synchronously (-1) for all hosts in io
+         * target list.
+         */
+
+        /* try next host */
+        vxhs_failover_ioctl_cb(res, s);
+    }
+    return res;
+}
+
+void vxhs_failover_ioctl_cb(int res, void *ctx)
+{
+    BDRVVXHSState *s = ctx;
+
+    if (res == 0) {
+        /* found failover target */
+        s->vdisk_cur_host_idx = s->vdisk_ask_failover_idx;
+        s->vdisk_ask_failover_idx = 0;
+        trace_vxhs_failover_ioctl_cb(
+                   s->vdisk_hostinfo[s->vdisk_cur_host_idx].hostip,
+                   s->vdisk_guid);
+        VXHS_SPIN_LOCK(s->vdisk_lock);
+        OF_VDISK_RESET_IOFAILOVER_IN_PROGRESS(s);
+        VXHS_SPIN_UNLOCK(s->vdisk_lock);
+        vxhs_handle_queued_ios(s);
+    } else {
+        /* keep looking */
+        trace_vxhs_failover_ioctl_cb_retry(s->vdisk_guid);
+        s->vdisk_ask_failover_idx++;
+        if (s->vdisk_ask_failover_idx == s->vdisk_nhosts) {
+            /* pause and cycle through list again */
+            sleep(QNIO_CONNECT_RETRY_SECS);
+            s->vdisk_ask_failover_idx = 0;
+        }
+        res = vxhs_switch_storage_agent(s);
+    }
+}
+
+int vxhs_failover_io(BDRVVXHSState *s)
+{
+    int res = 0;
+
+    trace_vxhs_failover_io(s->vdisk_guid);
+
+    s->vdisk_ask_failover_idx = 0;
+    res = vxhs_switch_storage_agent(s);
+
+    return res;
+}
+
+/*
+ * Try to reopen the vDisk on one of the available hosts
+ * If vDisk reopen is successful on any of the host then
+ * check if that node is ready to accept I/O.
+ */
+int vxhs_reopen_vdisk(BDRVVXHSState *s, int index)
+{
+    char *of_vsa_addr = NULL;
+    char *file_name = NULL;
+    int  res = 0;
+
+    /*
+     * Don't close the channel if it was opened
+     * before successfully. It will be handled
+     * within iio* api if the same channel open
+     * fd is reused.
+     *
+     * close stale vdisk device remote fd since
+     * it is invalid after channel disconnect.
+     */
+    if (s->vdisk_hostinfo[index].vdisk_rfd >= 0) {
+        qemu_iio_devclose(s->qnio_ctx, 0,
+                                 s->vdisk_hostinfo[index].vdisk_rfd);
+        s->vdisk_hostinfo[index].vdisk_rfd = -1;
+    }
+    /*
+     * build storage agent address and vdisk device name strings
+     */
+    of_vsa_addr = g_new0(char, OF_MAX_SERVER_ADDR);
+    file_name = g_new0(char, OF_MAX_FILE_LEN);
+    snprintf(file_name, OF_MAX_FILE_LEN, "%s%s", vdisk_prefix, s->vdisk_guid);
+    snprintf(of_vsa_addr, OF_MAX_SERVER_ADDR, "of://%s:%d",
+             s->vdisk_hostinfo[index].hostip, s->vdisk_hostinfo[index].port);
+    /*
+     * open qnio channel to storage agent if not opened before.
+     */
+    if (s->vdisk_hostinfo[index].qnio_cfd < 0) {
+        s->vdisk_hostinfo[index].qnio_cfd =
+                qemu_open_iio_conn(global_qnio_ctx, of_vsa_addr, 0);
+        if (s->vdisk_hostinfo[index].qnio_cfd < 0) {
+            trace_vxhs_reopen_vdisk(s->vdisk_hostinfo[index].hostip);
+            res = ENODEV;
+            goto out;
+        }
+    }
+    /*
+     * open vdisk device
+     */
+    s->vdisk_hostinfo[index].vdisk_rfd =
+            qemu_iio_devopen(global_qnio_ctx,
+             s->vdisk_hostinfo[index].qnio_cfd, file_name, 0);
+    if (s->vdisk_hostinfo[index].vdisk_rfd < 0) {
+        trace_vxhs_reopen_vdisk_openfail(file_name);
+        res = EIO;
+        goto out;
+    }
+out:
+    if (of_vsa_addr) {
+        g_free(of_vsa_addr);
+    }
+    if (file_name) {
+        g_free(file_name);
+    }
+    return res;
+}
+
+int vxhs_handle_queued_ios(BDRVVXHSState *s)
+{
+    VXHSAIOCB *acb = NULL;
+    int res = 0;
+
+    VXHS_SPIN_LOCK(s->vdisk_lock);
+    while ((acb = QSIMPLEQ_FIRST(&s->vdisk_aio_retryq)) != NULL) {
+        /*
+         * Before we process the acb, check whether I/O failover
+         * started again due to failback or cascading failure.
+         */
+        if (OF_VDISK_IOFAILOVER_IN_PROGRESS(s)) {
+            VXHS_SPIN_UNLOCK(s->vdisk_lock);
+            goto out;
+        }
+        QSIMPLEQ_REMOVE_HEAD(&s->vdisk_aio_retryq, retry_entry);
+        s->vdisk_aio_retry_qd--;
+        OF_AIOCB_FLAGS_RESET_QUEUED(acb);
+        if (OF_VDISK_FAILED(s)) {
+            VXHS_SPIN_UNLOCK(s->vdisk_lock);
+            vxhs_fail_aio(acb, EIO);
+            VXHS_SPIN_LOCK(s->vdisk_lock);
+        } else {
+            VXHS_SPIN_UNLOCK(s->vdisk_lock);
+            res = vxhs_restart_aio(acb);
+            trace_vxhs_handle_queued_ios(acb, res);
+            VXHS_SPIN_LOCK(s->vdisk_lock);
+            if (res) {
+                QSIMPLEQ_INSERT_TAIL(&s->vdisk_aio_retryq,
+                                     acb, retry_entry);
+                OF_AIOCB_FLAGS_SET_QUEUED(acb);
+                VXHS_SPIN_UNLOCK(s->vdisk_lock);
+                goto out;
+            }
+        }
+    }
+    VXHS_SPIN_UNLOCK(s->vdisk_lock);
+out:
+    return res;
+}
+
+int vxhs_restart_aio(VXHSAIOCB *acb)
+{
+    BDRVVXHSState *s = NULL;
+    int iio_flags = 0;
+    int res = 0;
+
+    s = acb->common.bs->opaque;
+
+    if (acb->direction == VDISK_AIO_WRITE) {
+        vxhs_inc_vdisk_iocount(s, 1);
+        vxhs_inc_acb_segment_count(acb, 1);
+        iio_flags = (IIO_FLAG_DONE | IIO_FLAG_ASYNC);
+        res = qemu_iio_writev(s->qnio_ctx,
+                s->vdisk_hostinfo[s->vdisk_cur_host_idx].vdisk_rfd,
+                acb->qiov->iov, acb->qiov->niov,
+                acb->io_offset, (void *)acb, iio_flags);
+    }
+
+    if (acb->direction == VDISK_AIO_READ) {
+        vxhs_inc_vdisk_iocount(s, 1);
+        vxhs_inc_acb_segment_count(acb, 1);
+        iio_flags = (IIO_FLAG_DONE | IIO_FLAG_ASYNC);
+        res = qemu_iio_readv(s->qnio_ctx,
+                s->vdisk_hostinfo[s->vdisk_cur_host_idx].vdisk_rfd,
+                acb->qiov->iov, acb->qiov->niov,
+                acb->io_offset, (void *)acb, iio_flags);
+    }
+
+    if (res != 0) {
+        vxhs_dec_vdisk_iocount(s, 1);
+        vxhs_dec_acb_segment_count(acb, 1);
+        trace_vxhs_restart_aio(acb->direction, res, errno);
+    }
+
+    return res;
+}
+
+void vxhs_fail_aio(VXHSAIOCB *acb, int err)
+{
+    BDRVVXHSState *s = NULL;
+    int segcount = 0;
+    int rv = 0;
+
+    s = acb->common.bs->opaque;
+
+    trace_vxhs_fail_aio(s->vdisk_guid, acb);
+    if (!acb->ret) {
+        acb->ret = err;
+    }
+    VXHS_SPIN_LOCK(s->vdisk_acb_lock);
+    segcount = acb->segments;
+    VXHS_SPIN_UNLOCK(s->vdisk_acb_lock);
+    if (segcount == 0) {
+        /*
+         * Complete the io request
+         */
+        rv = qemu_write_full(s->fds[VDISK_FD_WRITE], &acb, sizeof(acb));
+        if (rv != sizeof(acb)) {
+            error_report("VXHS AIO completion failed: %s",
+                         strerror(errno));
+            abort();
+        }
+    }
+}
+
+static BlockDriver bdrv_vxhs = {
+    .format_name                  = "vxhs",
+    .protocol_name                = "vxhs",
+    .instance_size                = sizeof(BDRVVXHSState),
+    .bdrv_file_open               = vxhs_open,
+    .bdrv_create                  = vxhs_create,
+    .bdrv_close                   = vxhs_close,
+    .bdrv_getlength               = vxhs_getlength,
+    .bdrv_get_allocated_file_size = vxhs_get_allocated_blocks,
+    .bdrv_aio_readv               = vxhs_aio_readv,
+    .bdrv_aio_writev              = vxhs_aio_writev,
+    .bdrv_co_flush_to_disk        = vxhs_co_flush,
+};
+
+void bdrv_vxhs_init(void)
+{
+    trace_vxhs_bdrv_init(vxhs_drv_version);
+    bdrv_register(&bdrv_vxhs);
+}
+
+/*
+ * The line below is how our drivier is initialized.
+ * DO NOT TOUCH IT
+ */
+block_init(bdrv_vxhs_init);
diff --git a/block/vxhs.h b/block/vxhs.h
new file mode 100644
index 0000000..1b678e5
--- /dev/null
+++ b/block/vxhs.h
@@ -0,0 +1,293 @@ 
+/*
+ * QEMU Block driver for Veritas HyperScale (VxHS)
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#ifndef VXHSD_H
+#define VXHSD_H
+
+#include <gmodule.h>
+#include <inttypes.h>
+#include <pthread.h>
+
+#include "qemu/osdep.h"
+#include "qapi/error.h"
+#include "qemu/error-report.h"
+#include "block/block_int.h"
+#include "qemu/uri.h"
+#include "qemu/queue.h"
+
+#define vxhsErr(fmt, ...) { \
+    time_t t = time(0); \
+    char buf[9] = {0}; \
+    strftime(buf, 9, "%H:%M:%S", localtime(&t)); \
+    fprintf(stderr, "[%s: %lu] %d: %s():\t", buf, pthread_self(), \
+                __LINE__, __func__); \
+    fprintf(stderr, fmt, ## __VA_ARGS__); \
+}
+
+#define OF_GUID_STR_LEN             40
+#define OF_GUID_STR_SZ              (OF_GUID_STR_LEN + 1)
+#define QNIO_CONNECT_RETRY_SECS     5
+#define QNIO_CONNECT_TIMOUT_SECS    120
+
+/* constants from io_qnio.h */
+#define IIO_REASON_DONE     0x00000004
+#define IIO_REASON_EVENT    0x00000008
+#define IIO_REASON_HUP      0x00000010
+
+/*
+ * IO specific flags
+ */
+#define IIO_FLAG_ASYNC      0x00000001
+#define IIO_FLAG_DONE       0x00000010
+#define IIO_FLAG_SYNC       0
+
+/* constants from error.h */
+#define VXERROR_RETRY_ON_SOURCE     44
+#define VXERROR_HUP                 901
+#define VXERROR_CHANNEL_HUP         903
+
+/* constants from iomgr.h and opcode.h */
+#define IRP_READ_REQUEST                    0x1FFF
+#define IRP_WRITE_REQUEST                   0x2FFF
+#define IRP_VDISK_CHECK_IO_FAILOVER_READY   2020
+
+/* Lock specific macros */
+#define VXHS_SPIN_LOCK_ALLOC                  \
+    (qemu_ck_initialize_lock())
+#define VXHS_SPIN_LOCK(lock)                  \
+    (qemu_ck_spin_lock(lock))
+#define VXHS_SPIN_UNLOCK(lock)                \
+    (qemu_ck_spin_unlock(lock))
+#define VXHS_SPIN_LOCK_DESTROY(lock)          \
+    (qemu_ck_destroy_lock(lock))
+
+typedef enum {
+    VDISK_AIO_READ,
+    VDISK_AIO_WRITE,
+    VDISK_STAT,
+    VDISK_TRUNC,
+    VDISK_AIO_FLUSH,
+    VDISK_AIO_RECLAIM,
+    VDISK_GET_GEOMETRY,
+    VDISK_CHECK_IO_FAILOVER_READY,
+    VDISK_AIO_LAST_CMD
+} VDISKAIOCmd;
+
+typedef enum {
+    VXHS_IO_INPROGRESS,
+    VXHS_IO_COMPLETED,
+    VXHS_IO_ERROR
+} VXHSIOState;
+
+
+typedef void *qemu_aio_ctx_t;
+typedef void (*qnio_callback_t)(ssize_t retval, void *arg);
+
+#define VDISK_FD_READ 0
+#define VDISK_FD_WRITE 1
+
+#define QNIO_VDISK_NONE        0x00
+#define QNIO_VDISK_CREATE      0x01
+
+/* max IO size supported by QEMU NIO lib */
+#define QNIO_MAX_IO_SIZE       4194304
+
+#define IP_ADDR_LEN             20
+#define OF_MAX_FILE_LEN         1024
+#define OF_MAX_SERVER_ADDR      1024
+#define MAX_HOSTS               4
+
+/*
+ * Opcodes for making IOCTL on QEMU NIO library
+ */
+#define BASE_OPCODE_SHARED     1000
+#define BASE_OPCODE_DAL        2000
+#define IRP_VDISK_STAT                  (BASE_OPCODE_SHARED + 5)
+#define IRP_VDISK_GET_GEOMETRY          (BASE_OPCODE_DAL + 17)
+#define IRP_VDISK_READ_PARTITION        (BASE_OPCODE_DAL + 18)
+#define IRP_VDISK_FLUSH                 (BASE_OPCODE_DAL + 19)
+
+/*
+ * BDRVVXHSState specific flags
+ */
+#define OF_VDISK_FLAGS_STATE_ACTIVE             0x0000000000000001
+#define OF_VDISK_FLAGS_STATE_FAILED             0x0000000000000002
+#define OF_VDISK_FLAGS_IOFAILOVER_IN_PROGRESS   0x0000000000000004
+
+#define OF_VDISK_ACTIVE(s)                                              \
+        ((s)->vdisk_flags & OF_VDISK_FLAGS_STATE_ACTIVE)
+#define OF_VDISK_SET_ACTIVE(s)                                          \
+        ((s)->vdisk_flags |= OF_VDISK_FLAGS_STATE_ACTIVE)
+#define OF_VDISK_RESET_ACTIVE(s)                                        \
+        ((s)->vdisk_flags &= ~OF_VDISK_FLAGS_STATE_ACTIVE)
+
+#define OF_VDISK_FAILED(s)                                              \
+        ((s)->vdisk_flags & OF_VDISK_FLAGS_STATE_FAILED)
+#define OF_VDISK_SET_FAILED(s)                                          \
+        ((s)->vdisk_flags |= OF_VDISK_FLAGS_STATE_FAILED)
+#define OF_VDISK_RESET_FAILED(s)                                        \
+        ((s)->vdisk_flags &= ~OF_VDISK_FLAGS_STATE_FAILED)
+
+#define OF_VDISK_IOFAILOVER_IN_PROGRESS(s)                              \
+        ((s)->vdisk_flags & OF_VDISK_FLAGS_IOFAILOVER_IN_PROGRESS)
+#define OF_VDISK_SET_IOFAILOVER_IN_PROGRESS(s)                          \
+        ((s)->vdisk_flags |= OF_VDISK_FLAGS_IOFAILOVER_IN_PROGRESS)
+#define OF_VDISK_RESET_IOFAILOVER_IN_PROGRESS(s)                        \
+        ((s)->vdisk_flags &= ~OF_VDISK_FLAGS_IOFAILOVER_IN_PROGRESS)
+
+/*
+ * VXHSAIOCB specific flags
+ */
+#define OF_ACB_QUEUED       0x00000001
+
+#define OF_AIOCB_FLAGS_QUEUED(a)            \
+        ((a)->flags & OF_ACB_QUEUED)
+#define OF_AIOCB_FLAGS_SET_QUEUED(a)        \
+        ((a)->flags |= OF_ACB_QUEUED)
+#define OF_AIOCB_FLAGS_RESET_QUEUED(a)      \
+        ((a)->flags &= ~OF_ACB_QUEUED)
+
+typedef struct qemu2qnio_ctx {
+    uint32_t            qnio_flag;
+    uint64_t            qnio_size;
+    char                *qnio_channel;
+    char                *target;
+    qnio_callback_t     qnio_cb;
+} qemu2qnio_ctx_t;
+
+typedef qemu2qnio_ctx_t qnio2qemu_ctx_t;
+
+typedef struct LibQNIOSymbol {
+        const char *name;
+        gpointer *addr;
+} LibQNIOSymbol;
+
+typedef void (*iio_cb_t) (uint32_t rfd, uint32_t reason, void *ctx,
+                          void *reply);
+
+/*
+ * HyperScale AIO callbacks structure
+ */
+typedef struct VXHSAIOCB {
+    BlockAIOCB    common;
+    size_t              ret;
+    size_t              size;
+    QEMUBH              *bh;
+    int                 aio_done;
+    int                 segments;
+    int                 flags;
+    size_t              io_offset;
+    QEMUIOVector        *qiov;
+    void                *buffer;
+    int                 direction;  /* IO direction (r/w) */
+    QSIMPLEQ_ENTRY(VXHSAIOCB) retry_entry;
+} VXHSAIOCB;
+
+typedef struct VXHSvDiskHostsInfo {
+    int     qnio_cfd;        /* Channel FD */
+    int     vdisk_rfd;      /* vDisk remote FD */
+    char    *hostip;        /* Host's IP addresses */
+    int     port;           /* Host's port number */
+} VXHSvDiskHostsInfo;
+
+/*
+ * Structure per vDisk maintained for state
+ */
+typedef struct BDRVVXHSState {
+    int                     fds[2];
+    int64_t                 vdisk_size;
+    int64_t                 vdisk_blocks;
+    int64_t                 vdisk_flags;
+    int                     vdisk_aio_count;
+    int                     event_reader_pos;
+    VXHSAIOCB             *qnio_event_acb;
+    void                    *qnio_ctx;
+    void                    *vdisk_lock; /* Lock to protect BDRVVXHSState */
+    void                    *vdisk_acb_lock;  /* Protects ACB */
+    VXHSvDiskHostsInfo    vdisk_hostinfo[MAX_HOSTS]; /* Per host info */
+    int                     vdisk_nhosts;   /* Total number of hosts */
+    int                     vdisk_cur_host_idx; /* IOs are being shipped to */
+    int                     vdisk_ask_failover_idx; /*asking permsn to ship io*/
+    QSIMPLEQ_HEAD(aio_retryq, VXHSAIOCB) vdisk_aio_retryq;
+    int                     vdisk_aio_retry_qd;
+    char                    *vdisk_guid;
+    AioContext              *aio_context;
+} BDRVVXHSState;
+
+int vxhs_load_iio_ops(void);
+void bdrv_vxhs_init(void);
+void *vxhs_initialize(void);
+void *vxhs_setup_qnio(void);
+int qemu_qnio_fini(BDRVVXHSState *s);
+void vxhs_iio_callback(uint32_t rfd, uint32_t reason, void *ctx, void *m);
+int qemu_qnio_init(BDRVVXHSState *s, const char *vxhs_uri);
+void vxhs_aio_event_reader(void *opaque);
+void vxhs_complete_aio(VXHSAIOCB *acb, BDRVVXHSState *s);
+int vxhs_aio_flush_cb(void *opaque);
+void vxhs_finish_aiocb(ssize_t ret, void *arg);
+unsigned long vxhs_get_vdisk_stat(BDRVVXHSState *s);
+int vxhs_open(BlockDriverState *bs, QDict *options,
+              int bdrv_flags, Error **errp);
+int vxhs_create(const char *filename, QemuOpts *options,
+                Error **errp);
+int vxhs_open_device(BlockdevOptionsVxHS *conf, int *cfd, int *rfd,
+                       BDRVVXHSState *s);
+void vxhs_close(BlockDriverState *bs);
+void vxhs_aio_cancel(BlockAIOCB *blockacb);
+int vxhs_truncate(BlockDriverState *bs, int64_t offset);
+int qemu_submit_io(BDRVVXHSState *s, struct iovec *iov, int64_t niov,
+                   int64_t offset, int cmd, qemu_aio_ctx_t acb);
+BlockAIOCB *vxhs_aio_flush(BlockDriverState *bs,
+                                   BlockCompletionFunc *cb, void *opaque);
+BlockAIOCB *vxhs_aio_pdiscard(BlockDriverState *bs, int64_t sector_num,
+                                     int nb_sectors,
+                                     BlockCompletionFunc *cb,
+                                     void *opaque);
+BlockAIOCB *vxhs_aio_readv(BlockDriverState *bs, int64_t sector_num,
+                                   QEMUIOVector *qiov, int nb_sectors,
+                                   BlockCompletionFunc *cb, void *opaque);
+BlockAIOCB *vxhs_aio_writev(BlockDriverState *bs, int64_t sector_num,
+                                    QEMUIOVector *qiov, int nb_sectors,
+                                    BlockCompletionFunc *cb,
+                                    void *opaque);
+coroutine_fn int vxhs_co_read(BlockDriverState *bs, int64_t sector_num,
+                                uint8_t *buf, int nb_sectors);
+coroutine_fn int vxhs_co_write(BlockDriverState *bs, int64_t sector_num,
+                                 const uint8_t *buf, int nb_sectors);
+int64_t vxhs_get_allocated_blocks(BlockDriverState *bs);
+BlockAIOCB *vxhs_aio_rw(BlockDriverState *bs, int64_t sector_num,
+                                QEMUIOVector *qiov, int nb_sectors,
+                                BlockCompletionFunc *cb,
+                                void *opaque, int write);
+int vxhs_co_flush(BlockDriverState *bs);
+int vxhs_has_zero_init(BlockDriverState *bs);
+int64_t vxhs_getlength(BlockDriverState *bs);
+void vxhs_inc_vdisk_iocount(void *ptr, uint32_t delta);
+void vxhs_dec_vdisk_iocount(void *ptr, uint32_t delta);
+uint32_t vxhs_get_vdisk_iocount(void *ptr);
+void vxhs_inc_acb_segment_count(void *ptr, int count);
+void vxhs_dec_acb_segment_count(void *ptr, int count);
+int vxhs_dec_and_get_acb_segment_count(void *ptr, int count);
+void vxhs_set_acb_buffer(void *ptr, void *buffer);
+int vxhs_sync_rw(BlockDriverState *bs, int64_t sector_num,
+                   const uint8_t *buf, int nb_sectors, int write);
+int vxhs_failover_io(BDRVVXHSState *s);
+int vxhs_reopen_vdisk(BDRVVXHSState *s,
+                        int hostinfo_index);
+int vxhs_switch_storage_agent(BDRVVXHSState *s);
+int vxhs_check_io_failover_ready(BDRVVXHSState *s,
+                                   int hostinfo_index);
+int vxhs_build_io_target_list(BDRVVXHSState *s, char **filenames);
+int vxhs_handle_queued_ios(BDRVVXHSState *s);
+int vxhs_restart_aio(VXHSAIOCB *acb);
+void vxhs_fail_aio(VXHSAIOCB *acb, int err);
+void vxhs_failover_ioctl_cb(int res, void *ctx);
+char *vxhs_string_iterate(char *p, const char *d, const size_t len);
+
+
+#endif
diff --git a/configure b/configure
index 8d84919..fc09dc6 100755
--- a/configure
+++ b/configure
@@ -320,6 +320,7 @@  vhdx=""
 numa=""
 tcmalloc="no"
 jemalloc="no"
+vxhs="yes"
 
 # parse CC options first
 for opt do
@@ -1150,6 +1151,11 @@  for opt do
   ;;
   --enable-jemalloc) jemalloc="yes"
   ;;
+  --disable-vxhs) vxhs="no"
+  ;;
+  --enable-vxhs) vxhs="yes"
+  ;;
+
   *)
       echo "ERROR: unknown option $opt"
       echo "Try '$0 --help' for more information"
@@ -1380,6 +1386,7 @@  disabled with --disable-FEATURE, default is enabled if available:
   numa            libnuma support
   tcmalloc        tcmalloc support
   jemalloc        jemalloc support
+  vxhs            Veritas HyperScale vDisk backend support
 
 NOTE: The object files are built at the place where configure is launched
 EOF
@@ -4543,6 +4550,43 @@  if do_cc -nostdlib -Wl,-r -Wl,--no-relax -o $TMPMO $TMPO; then
 fi
 
 ##########################################
+# Veritas HyperScale block driver VxHS
+# Check if libqnio is installed
+if test "$vxhs" != "no" ; then
+  cat > $TMPC <<EOF
+#include <stdio.h>
+#include <qnio/qnio_api.h>
+
+void vxhs_inc_acb_segment_count(void *acb, int count);
+void vxhs_dec_acb_segment_count(void *acb, int count);
+void vxhs_set_acb_buffer(void *ptr, void *buffer);
+
+void vxhs_inc_acb_segment_count(void *ptr, int count)
+{
+}
+void vxhs_dec_acb_segment_count(void *ptr, int count)
+{
+}
+void vxhs_set_acb_buffer(void *ptr, void *buffer)
+{
+}
+int main(void) {
+    qemu_ck_initialize_lock();
+    return 0;
+}
+EOF
+  vxhs_libs="-lqnioshim -lqnio"
+  if compile_prog "" "$vxhs_libs" ; then
+    vxhs=yes
+  else
+    if test "$vxhs" = "yes" ; then
+      feature_not_found "vxhs block device" "Install libqnio. See github"
+    fi
+    vxhs=no
+  fi
+fi
+
+##########################################
 # End of CC checks
 # After here, no more $cc or $ld runs
 
@@ -5479,6 +5523,12 @@  if test "$pthread_setname_np" = "yes" ; then
   echo "CONFIG_PTHREAD_SETNAME_NP=y" >> $config_host_mak
 fi
 
+if test "$vxhs" = "yes" ; then
+  echo "CONFIG_VXHS=y" >> $config_host_mak
+  echo "VXHS_CFLAGS=$vxhs_cflags" >> $config_host_mak
+  echo "VXHS_LIBS=$vxhs_libs" >> $config_host_mak
+fi
+
 if test "$tcg_interpreter" = "yes"; then
   QEMU_INCLUDES="-I\$(SRC_PATH)/tcg/tci $QEMU_INCLUDES"
 elif test "$ARCH" = "sparc64" ; then
diff --git a/qapi/block-core.json b/qapi/block-core.json
index 5e2d7d7..064d620 100644
--- a/qapi/block-core.json
+++ b/qapi/block-core.json
@@ -1693,12 +1693,13 @@ 
 #
 # @host_device, @host_cdrom: Since 2.1
 # @gluster: Since 2.7
+# @vxhs: Since 2.7
 #
 # Since: 2.0
 ##
 { 'enum': 'BlockdevDriver',
   'data': [ 'archipelago', 'blkdebug', 'blkverify', 'bochs', 'cloop',
-            'dmg', 'file', 'ftp', 'ftps', 'gluster', 'host_cdrom',
+            'dmg', 'file', 'ftp', 'ftps', 'gluster', 'host_cdrom', 'vxhs',
             'host_device', 'http', 'https', 'luks', 'null-aio', 'null-co',
             'parallels', 'qcow', 'qcow2', 'qed', 'quorum', 'raw', 'tftp',
             'vdi', 'vhdx', 'vmdk', 'vpc', 'vvfat' ] }
@@ -2150,6 +2151,22 @@ 
             'server': ['GlusterServer'],
             '*debug-level': 'int' } }
 
+# @BlockdevOptionsVxHS
+#
+# Driver specific block device options for VxHS
+#
+# @vdisk_id:    UUID of VxHS volume
+#
+# @server:      list of vxhs server IP, port
+#
+# Since: 2.7
+##
+{ 'struct': 'BlockdevOptionsVxHS',
+  'data': { 'vdisk_id': 'str',
+            'server': ['InetSocketAddress'] } }
+
+##
+
 ##
 # @BlockdevOptions
 #
@@ -2222,7 +2239,8 @@ 
       'vhdx':       'BlockdevOptionsGenericFormat',
       'vmdk':       'BlockdevOptionsGenericCOWFormat',
       'vpc':        'BlockdevOptionsGenericFormat',
-      'vvfat':      'BlockdevOptionsVVFAT'
+      'vvfat':      'BlockdevOptionsVVFAT',
+      'vxhs':       'BlockdevOptionsVxHS'
   } }
 
 ##