Patchwork [v9,4/4] block: Support GlusterFS as a QEMU block backend.

login
register
mail settings
Submitter Bharata B Rao
Date Sept. 24, 2012, 9:13 a.m.
Message ID <20120924091340.GN18470@in.ibm.com>
Download mbox | patch
Permalink /patch/186334/
State New
Headers show

Comments

Bharata B Rao - Sept. 24, 2012, 9:13 a.m.
block: Support GlusterFS as a QEMU block backend.

From: Bharata B Rao <bharata@linux.vnet.ibm.com>

This patch adds gluster as the new block backend in QEMU. This gives
QEMU the ability to boot VM images from gluster volumes. Its already
possible to boot from VM images on gluster volumes using FUSE mount, but
this patchset provides the ability to boot VM images from gluster volumes
by by-passing the FUSE layer in gluster. This is made possible by
using libgfapi routines to perform IO on gluster volumes directly.

VM Image on gluster volume is specified like this:

file=gluster[+transport]://[server[:port]]/volname/image[?socket=...]

'gluster' is the protocol.

'transport' specifies the transport type used to connect to gluster
management daemon (glusterd). Valid transport types are
tcp, unix and rdma. If a transport type isn't specified, then tcp
type is assumed.

'server' specifies the server where the volume file specification for
the given volume resides. This can be either hostname, ipv4 address
or ipv6 address. ipv6 address needs to be within square brackets [ ].
If transport type is 'unix', then server field is ignored, but the
'socket' field needs to be populated with the path to unix domain
socket.

'port' is the port number on which glusterd is listening. This is optional
and if not specified, QEMU will send 0 which will make gluster to use the
default port. port is ignored for unix type of transport.

'volname' is the name of the gluster volume which contains the VM image.

'image' is the path to the actual VM image that resides on gluster volume.

Examples:

file=gluster://1.2.3.4/testvol/a.img
file=gluster+tcp://1.2.3.4/testvol/a.img
file=gluster+tcp://1.2.3.4:24007/testvol/dir/a.img
file=gluster+tcp://[1:2:3:4:5:6:7:8]/testvol/dir/a.img
file=gluster+tcp://[1:2:3:4:5:6:7:8]:24007/testvol/dir/a.img
file=gluster+tcp://server.domain.com:24007/testvol/dir/a.img
file=gluster+unix:///testvol/dir/a.img?socket=/tmp/glusterd.socket
file=gluster+rdma://1.2.3.4:24007/testvol/a.img

Signed-off-by: Bharata B Rao <bharata@linux.vnet.ibm.com>
---

 block/Makefile.objs |    1 
 block/gluster.c     |  642 +++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 643 insertions(+), 0 deletions(-)
 create mode 100644 block/gluster.c
Paolo Bonzini - Sept. 24, 2012, 9:26 a.m.
> +static int qemu_gluster_parseuri(GlusterConf *gconf, const char *filename)
> +{
> +    URI *uri;
> +    QueryParams *qp = NULL;
> +    bool is_unix = false;
> +    int ret = 0;
> +    char *unescape_str = NULL;
> +
> +    uri = uri_parse(filename);
> +    if (!uri) {
> +        return -EINVAL;
> +    }
> +
> +    /* transport */
> +    if (!strcmp(uri->scheme, "gluster")) {
> +        gconf->transport = g_strdup("tcp");
> +    } else if (!strcmp(uri->scheme, "gluster+tcp")) {
> +        gconf->transport = g_strdup("tcp");
> +    } else if (!strcmp(uri->scheme, "gluster+unix")) {
> +        gconf->transport = g_strdup("unix");
> +        is_unix = true;
> +    } else if (!strcmp(uri->scheme, "gluster+rdma")) {
> +        gconf->transport = g_strdup("rdma");
> +    } else {
> +        ret = -EINVAL;
> +        goto out;
> +    }
> +
> +    ret = parse_volume_options(gconf, uri->path);
> +    if (ret < 0) {
> +        goto out;
> +    }
> +
> +    if (uri->query) {
> +        unescape_str = uri_string_unescape(uri->query, -1, NULL);
> +        if (!unescape_str) {
> +            ret = -EINVAL;
> +            goto out;
> +        }
> +    }
> +
> +    qp = query_params_parse(unescape_str);

query_params_parse already does the unescaping.

Paolo
Bharata B Rao - Sept. 24, 2012, 9:44 a.m.
On Mon, Sep 24, 2012 at 05:26:53AM -0400, Paolo Bonzini wrote:
> > +    qp = query_params_parse(unescape_str);
> 
> query_params_parse already does the unescaping.

Hmm it failed to parse the options properly when I had an escape sequence,
hence resorted to unescaping the query string manually.

Look at the below gdb debug steps of your uri.c...

2302	    test("gluster+unix:///b?c=d%26e=f");
(gdb) s
test (x=0x4062c4 "gluster+unix:///b?c=d%26e=f") at uri.c:2279
2279	    URI *uri = uri_parse(x);
(gdb) n
2283	    if (!uri) {
(gdb) p *uri
$1 = {scheme = 0x607070 "gluster+unix", opaque = 0x0, authority = 0x0, 
  server = 0x0, user = 0x0, port = 0, path = 0x607090 "/b", fragment = 0x0, 
  cleanup = 0, query = 0x6070b0 "c=d%26e=f"}
(gdb) n
2289	    qp = query_params_parse(uri->query);
(gdb) p *qp
$2 = {n = 1, alloc = 1, p = 0x6070f0}

You can see that qp->n is still 1, but 2 was expected.

Regards,
Bharata.
Kevin Wolf - Sept. 26, 2012, 10 a.m.
Am 24.09.2012 11:13, schrieb Bharata B Rao:
> block: Support GlusterFS as a QEMU block backend.
> 
> From: Bharata B Rao <bharata@linux.vnet.ibm.com>
> 
> This patch adds gluster as the new block backend in QEMU. This gives
> QEMU the ability to boot VM images from gluster volumes. Its already
> possible to boot from VM images on gluster volumes using FUSE mount, but
> this patchset provides the ability to boot VM images from gluster volumes
> by by-passing the FUSE layer in gluster. This is made possible by
> using libgfapi routines to perform IO on gluster volumes directly.
> 
> VM Image on gluster volume is specified like this:
> 
> file=gluster[+transport]://[server[:port]]/volname/image[?socket=...]
> 
> 'gluster' is the protocol.
> 
> 'transport' specifies the transport type used to connect to gluster
> management daemon (glusterd). Valid transport types are
> tcp, unix and rdma. If a transport type isn't specified, then tcp
> type is assumed.
> 
> 'server' specifies the server where the volume file specification for
> the given volume resides. This can be either hostname, ipv4 address
> or ipv6 address. ipv6 address needs to be within square brackets [ ].
> If transport type is 'unix', then server field is ignored, but the
> 'socket' field needs to be populated with the path to unix domain
> socket.
> 
> 'port' is the port number on which glusterd is listening. This is optional
> and if not specified, QEMU will send 0 which will make gluster to use the
> default port. port is ignored for unix type of transport.
> 
> 'volname' is the name of the gluster volume which contains the VM image.
> 
> 'image' is the path to the actual VM image that resides on gluster volume.
> 
> Examples:
> 
> file=gluster://1.2.3.4/testvol/a.img
> file=gluster+tcp://1.2.3.4/testvol/a.img
> file=gluster+tcp://1.2.3.4:24007/testvol/dir/a.img
> file=gluster+tcp://[1:2:3:4:5:6:7:8]/testvol/dir/a.img
> file=gluster+tcp://[1:2:3:4:5:6:7:8]:24007/testvol/dir/a.img
> file=gluster+tcp://server.domain.com:24007/testvol/dir/a.img
> file=gluster+unix:///testvol/dir/a.img?socket=/tmp/glusterd.socket
> file=gluster+rdma://1.2.3.4:24007/testvol/a.img
> 
> Signed-off-by: Bharata B Rao <bharata@linux.vnet.ibm.com>
> ---
> 
>  block/Makefile.objs |    1 
>  block/gluster.c     |  642 +++++++++++++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 643 insertions(+), 0 deletions(-)
>  create mode 100644 block/gluster.c
> 
> 
> diff --git a/block/Makefile.objs b/block/Makefile.objs
> index b5754d3..a1ae67f 100644
> --- a/block/Makefile.objs
> +++ b/block/Makefile.objs
> @@ -9,3 +9,4 @@ block-obj-$(CONFIG_POSIX) += raw-posix.o
>  block-obj-$(CONFIG_LIBISCSI) += iscsi.o
>  block-obj-$(CONFIG_CURL) += curl.o
>  block-obj-$(CONFIG_RBD) += rbd.o
> +block-obj-$(CONFIG_GLUSTERFS) += gluster.o
> diff --git a/block/gluster.c b/block/gluster.c
> new file mode 100644
> index 0000000..a2f8303
> --- /dev/null
> +++ b/block/gluster.c
> @@ -0,0 +1,642 @@
> +/*
> + * GlusterFS backend for QEMU
> + *
> + * Copyright (C) 2012 Bharata B Rao <bharata@linux.vnet.ibm.com>
> + *
> + * Pipe handling mechanism in AIO implementation is derived from
> + * block/rbd.c. Hence,
> + *
> + * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
> + *                         Josh Durgin <josh.durgin@dreamhost.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2.  See
> + * the COPYING file in the top-level directory.
> + *
> + * Contributions after 2012-01-13 are licensed under the terms of the
> + * GNU GPL, version 2 or (at your option) any later version.
> + */
> +#include <glusterfs/api/glfs.h>
> +#include "block_int.h"
> +#include "qemu_socket.h"
> +#include "uri.h"
> +
> +typedef struct GlusterAIOCB {
> +    BlockDriverAIOCB common;
> +    int64_t size;
> +    int ret;
> +    bool *finished;
> +    QEMUBH *bh;
> +} GlusterAIOCB;
> +
> +typedef struct BDRVGlusterState {
> +    struct glfs *glfs;
> +    int fds[2];
> +    struct glfs_fd *fd;
> +    int qemu_aio_count;
> +    int event_reader_pos;
> +    GlusterAIOCB *event_acb;
> +} BDRVGlusterState;
> +
> +#define GLUSTER_FD_READ  0
> +#define GLUSTER_FD_WRITE 1
> +
> +typedef struct GlusterConf {
> +    char *server;
> +    int port;
> +    char *volname;
> +    char *image;
> +    char *transport;
> +} GlusterConf;
> +
> +static void qemu_gluster_gconf_free(GlusterConf *gconf)
> +{
> +    g_free(gconf->server);
> +    g_free(gconf->volname);
> +    g_free(gconf->image);
> +    g_free(gconf->transport);
> +    g_free(gconf);
> +}
> +
> +static int parse_volume_options(GlusterConf *gconf, char *path)
> +{
> +    char *token, *saveptr;
> +
> +    /* volname */
> +    token = strtok_r(path, "/", &saveptr);
> +    if (!token) {
> +        return -EINVAL;
> +    }
> +    gconf->volname = g_strdup(token);
> +
> +    /* image */
> +    token = strtok_r(NULL, "?", &saveptr);

If I understand uri.c right, there is no ? in the path, so there's no
reason to call strtok. You could just use the rest of the string.

> +    if (!token) {
> +        return -EINVAL;
> +    }
> +    gconf->image = g_strdup(token);
> +    return 0;
> +}
> +
> +/*
> + * file=gluster[+transport]://[server[:port]]/volname/image[?socket=...]
> + *
> + * 'gluster' is the protocol.
> + *
> + * 'transport' specifies the transport type used to connect to gluster
> + * management daemon (glusterd). Valid transport types are
> + * tcp, unix and rdma. If a transport type isn't specified, then tcp
> + * type is assumed.
> + *
> + * 'server' specifies the server where the volume file specification for
> + * the given volume resides. This can be either hostname, ipv4 address
> + * or ipv6 address. ipv6 address needs to be within square brackets [ ].
> + * If transport type is 'unix', then server field is ignored, but the
> + * 'socket' field needs to be populated with the path to unix domain
> + * socket.
> + *
> + * 'port' is the port number on which glusterd is listening. This is optional
> + * and if not specified, QEMU will send 0 which will make gluster to use the
> + * default port. port is ignored for unix type of transport.
> + *
> + * 'volname' is the name of the gluster volume which contains the VM image.
> + *
> + * 'image' is the path to the actual VM image that resides on gluster volume.
> + *
> + * Examples:
> + *
> + * file=gluster://1.2.3.4/testvol/a.img
> + * file=gluster+tcp://1.2.3.4/testvol/a.img
> + * file=gluster+tcp://1.2.3.4:24007/testvol/dir/a.img
> + * file=gluster+tcp://[1:2:3:4:5:6:7:8]/testvol/dir/a.img
> + * file=gluster+tcp://[1:2:3:4:5:6:7:8]:24007/testvol/dir/a.img
> + * file=gluster+tcp://server.domain.com:24007/testvol/dir/a.img
> + * file=gluster+unix:///testvol/dir/a.img?socket=/tmp/glusterd.socket
> + * file=gluster+rdma://1.2.3.4:24007/testvol/a.img
> + */
> +static int qemu_gluster_parseuri(GlusterConf *gconf, const char *filename)
> +{
> +    URI *uri;
> +    QueryParams *qp = NULL;
> +    bool is_unix = false;
> +    int ret = 0;
> +    char *unescape_str = NULL;
> +
> +    uri = uri_parse(filename);
> +    if (!uri) {
> +        return -EINVAL;
> +    }
> +
> +    /* transport */
> +    if (!strcmp(uri->scheme, "gluster")) {
> +        gconf->transport = g_strdup("tcp");
> +    } else if (!strcmp(uri->scheme, "gluster+tcp")) {
> +        gconf->transport = g_strdup("tcp");
> +    } else if (!strcmp(uri->scheme, "gluster+unix")) {
> +        gconf->transport = g_strdup("unix");
> +        is_unix = true;
> +    } else if (!strcmp(uri->scheme, "gluster+rdma")) {
> +        gconf->transport = g_strdup("rdma");
> +    } else {
> +        ret = -EINVAL;
> +        goto out;
> +    }
> +
> +    ret = parse_volume_options(gconf, uri->path);
> +    if (ret < 0) {
> +        goto out;
> +    }
> +
> +    if (uri->query) {
> +        unescape_str = uri_string_unescape(uri->query, -1, NULL);
> +        if (!unescape_str) {
> +            ret = -EINVAL;
> +            goto out;
> +        }
> +    }

I agree with Paolo here, this need to go away.

The example that you posted ("gluster+unix:///b?c=d%26e=f") has indeed
only one argument with name 'c' and value 'd&e=f'. If you unescape here,
you would incorrectly require double escaping.

> +
> +    qp = query_params_parse(unescape_str);
> +    if (qp->n > 1 || (is_unix && !qp->n) || (!is_unix && qp->n)) {
> +        ret = -EINVAL;
> +        goto out;
> +    }
> +
> +    if (is_unix) {
> +        if (strcmp(qp->p[0].name, "socket")) {
> +            ret = -EINVAL;
> +            goto out;
> +        }
> +        gconf->server = g_strdup(qp->p[0].value);

Maybe add a check that uri->server is empty?

> +    } else {
> +        gconf->server = g_strdup(uri->server);
> +        gconf->port = uri->port;
> +    }
> +
> +out:
> +    if (qp) {
> +        query_params_free(qp);
> +    }
> +    g_free(unescape_str);
> +    uri_free(uri);
> +    return ret;
> +}
> +
> +static struct glfs *qemu_gluster_init(GlusterConf *gconf, const char *filename)
> +{
> +    struct glfs *glfs = NULL;
> +    int ret;
> +
> +    ret = qemu_gluster_parseuri(gconf, filename);
> +    if (ret < 0) {
> +        error_report("Usage: file=gluster[+transport]://[server[:port]]/"
> +            "volname/image[?socket=...]");
> +        errno = -ret;
> +        goto out;
> +    }
> +
> +    glfs = glfs_new(gconf->volname);
> +    if (!glfs) {
> +        goto out;
> +    }
> +
> +    ret = glfs_set_volfile_server(glfs, gconf->transport, gconf->server,
> +            gconf->port);
> +    if (ret < 0) {
> +        goto out;
> +    }
> +
> +    /*
> +     * TODO: Use GF_LOG_ERROR instead of hard code value of 4 here when
> +     * GlusterFS makes GF_LOG_* macros available to libgfapi users.
> +     */
> +    ret = glfs_set_logging(glfs, "-", 4);
> +    if (ret < 0) {
> +        goto out;
> +    }
> +
> +    ret = glfs_init(glfs);
> +    if (ret) {
> +        error_report("Gluster connection failed for server=%s port=%d "
> +             "volume=%s image=%s transport=%s\n", gconf->server, gconf->port,
> +             gconf->volname, gconf->image, gconf->transport);
> +        goto out;
> +    }
> +    return glfs;
> +
> +out:
> +    if (glfs) {
> +        glfs_fini(glfs);

Does this corrupt errno?

> +    }
> +    return NULL;
> +}
> +
> +static void qemu_gluster_complete_aio(GlusterAIOCB *acb, BDRVGlusterState *s)
> +{
> +    int ret;
> +    bool *finished = acb->finished;
> +    BlockDriverCompletionFunc *cb = acb->common.cb;
> +    void *opaque = acb->common.opaque;
> +
> +    if (!acb->ret || acb->ret == acb->size) {
> +        ret = 0; /* Success */
> +    } else if (acb->ret < 0) {
> +        ret = acb->ret; /* Read/Write failed */
> +    } else {
> +        ret = -EIO; /* Partial read/write - fail it */
> +    }
> +
> +    s->qemu_aio_count--;
> +    qemu_aio_release(acb);
> +    cb(opaque, ret);
> +    if (finished) {
> +        *finished = true;
> +    }
> +}
> +
> +static void qemu_gluster_aio_event_reader(void *opaque)
> +{
> +    BDRVGlusterState *s = opaque;
> +    ssize_t ret;
> +
> +    do {
> +        char *p = (char *)&s->event_acb;
> +
> +        ret = read(s->fds[GLUSTER_FD_READ], p + s->event_reader_pos,
> +                   sizeof(s->event_acb) - s->event_reader_pos);
> +        if (ret > 0) {
> +            s->event_reader_pos += ret;
> +            if (s->event_reader_pos == sizeof(s->event_acb)) {
> +                s->event_reader_pos = 0;
> +                qemu_gluster_complete_aio(s->event_acb, s);
> +            }
> +        }
> +    } while (ret < 0 && errno == EINTR);
> +}
> +
> +static int qemu_gluster_aio_flush_cb(void *opaque)
> +{
> +    BDRVGlusterState *s = opaque;
> +
> +    return (s->qemu_aio_count > 0);
> +}
> +
> +static int qemu_gluster_open(BlockDriverState *bs, const char *filename,
> +    int bdrv_flags)
> +{
> +    BDRVGlusterState *s = bs->opaque;
> +    int open_flags = 0;
> +    int ret = 0;
> +    GlusterConf *gconf = g_malloc0(sizeof(GlusterConf));
> +
> +    s->glfs = qemu_gluster_init(gconf, filename);
> +    if (!s->glfs) {
> +        ret = -errno;
> +        goto out;
> +    }
> +
> +    open_flags |=  O_BINARY;
> +    open_flags &= ~O_ACCMODE;

open_flags == O_BINARY here, so no O_ACCMODE bits to clear.

> +    if (bdrv_flags & BDRV_O_RDWR) {
> +        open_flags |= O_RDWR;
> +    } else {
> +        open_flags |= O_RDONLY;
> +    }
> +
> +    if ((bdrv_flags & BDRV_O_NOCACHE)) {
> +        open_flags |= O_DIRECT;
> +    }
> +
> +    s->fd = glfs_open(s->glfs, gconf->image, open_flags);
> +    if (!s->fd) {
> +        ret = -errno;
> +        goto out;
> +    }
> +
> +    ret = qemu_pipe(s->fds);
> +    if (ret < 0) {
> +        ret = -errno;
> +        goto out;
> +    }
> +    fcntl(s->fds[GLUSTER_FD_READ], F_SETFL, O_NONBLOCK);
> +    qemu_aio_set_fd_handler(s->fds[GLUSTER_FD_READ],
> +        qemu_gluster_aio_event_reader, NULL, qemu_gluster_aio_flush_cb, s);
> +
> +out:
> +    qemu_gluster_gconf_free(gconf);
> +    if (!ret) {
> +        return ret;
> +    }
> +    if (s->fd) {
> +        glfs_close(s->fd);
> +    }
> +    if (s->glfs) {
> +        glfs_fini(s->glfs);
> +    }
> +    return ret;
> +}
> +
> +static int qemu_gluster_create(const char *filename,
> +        QEMUOptionParameter *options)
> +{
> +    struct glfs *glfs;
> +    struct glfs_fd *fd;
> +    int ret = 0;
> +    int64_t total_size = 0;
> +    GlusterConf *gconf = g_malloc0(sizeof(GlusterConf));
> +
> +    glfs = qemu_gluster_init(gconf, filename);
> +    if (!glfs) {
> +        ret = -errno;
> +        goto out;
> +    }
> +
> +    while (options && options->name) {
> +        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
> +            total_size = options->value.n / BDRV_SECTOR_SIZE;
> +        }
> +        options++;
> +    }
> +
> +    fd = glfs_creat(glfs, gconf->image,
> +        O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, S_IRUSR | S_IWUSR);
> +    if (!fd) {
> +        ret = -errno;
> +    } else {
> +        if (glfs_ftruncate(fd, total_size * BDRV_SECTOR_SIZE) != 0) {
> +            ret = -errno;
> +        }
> +        if (glfs_close(fd) != 0) {
> +            ret = -errno;
> +        }
> +    }
> +out:
> +    qemu_gluster_gconf_free(gconf);
> +    if (glfs) {
> +        glfs_fini(glfs);
> +    }
> +    return ret;
> +}
> +
> +static void qemu_gluster_aio_cancel(BlockDriverAIOCB *blockacb)
> +{
> +    GlusterAIOCB *acb = (GlusterAIOCB *)blockacb;
> +    bool finished = false;
> +
> +    acb->finished = &finished;
> +    while (!finished) {
> +        qemu_aio_wait();
> +    }
> +}
> +
> +static AIOPool gluster_aio_pool = {
> +    .aiocb_size = sizeof(GlusterAIOCB),
> +    .cancel = qemu_gluster_aio_cancel,
> +};
> +
> +static int qemu_gluster_send_pipe(BDRVGlusterState *s, GlusterAIOCB *acb)
> +{
> +    int ret = 0;
> +
> +    while (1) {
> +        int fd = s->fds[GLUSTER_FD_WRITE];
> +
> +        ret = write(fd, (void *)&acb, sizeof(acb));
> +        if (ret >= 0) {
> +            break;
> +        }
> +        if (errno == EINTR) {
> +            continue;
> +        }
> +        if (errno != EAGAIN) {
> +            break;
> +        }

Variatio delectat? ;-)

How about just do { ... } while (errno == EINTR || errno == EAGAIN); ?

> +    }
> +    return ret;
> +}

Kevin
Paolo Bonzini - Sept. 26, 2012, 10:08 a.m.
Il 26/09/2012 12:00, Kevin Wolf ha scritto:
>> > +
>> > +        ret = write(fd, (void *)&acb, sizeof(acb));
>> > +        if (ret >= 0) {
>> > +            break;
>> > +        }
>> > +        if (errno == EINTR) {
>> > +            continue;
>> > +        }
>> > +        if (errno != EAGAIN) {
>> > +            break;
>> > +        }
> Variatio delectat? ;-)
> 
> How about just do { ... } while (errno == EINTR || errno == EAGAIN); ?

That should be

while ((ret < 0) && (errno == EINTR || errno == EAGAIN));

However, fd here is blocking, so you can just use qemu_write_full.

Paolo
Bharata B Rao - Sept. 26, 2012, 4:11 p.m.
On Wed, Sep 26, 2012 at 12:00:47PM +0200, Kevin Wolf wrote:
> Am 24.09.2012 11:13, schrieb Bharata B Rao:
> > +static int parse_volume_options(GlusterConf *gconf, char *path)
> > +{
> > +    char *token, *saveptr;
> > +
> > +    /* volname */
> > +    token = strtok_r(path, "/", &saveptr);
> > +    if (!token) {
> > +        return -EINVAL;
> > +    }
> > +    gconf->volname = g_strdup(token);
> > +
> > +    /* image */
> > +    token = strtok_r(NULL, "?", &saveptr);
> 
> If I understand uri.c right, there is no ? in the path, so there's no
> reason to call strtok. You could just use the rest of the string.

As you note, I don't need 2nd strtok strictly since the rest of the string
is available in saveptr. But I thought using saveptr is not ideal or preferred.
I wanted to use the most appropriate/safe delimiter to extract the image string
in the 2nd strtok and decided to use '?'.

If you think using saveptr is fine, then I could use that as below...

    /* image */
    if (!*saveptr) {
        return -EINVAL;
    }
    gconf->image = g_strdup(saveptr);

> 
> > +    if (!token) {
> > +        return -EINVAL;
> > +    }
> > +    gconf->image = g_strdup(token);
> > +    return 0;
> > +}
> > +
> > +
> > +    if (uri->query) {
> > +        unescape_str = uri_string_unescape(uri->query, -1, NULL);
> > +        if (!unescape_str) {
> > +            ret = -EINVAL;
> > +            goto out;
> > +        }
> > +    }
> 
> I agree with Paolo here, this need to go away.

Ok will do that.

> > +
> > +    if (is_unix) {
> > +        if (strcmp(qp->p[0].name, "socket")) {
> > +            ret = -EINVAL;
> > +            goto out;
> > +        }
> > +        gconf->server = g_strdup(qp->p[0].value);
> 
> Maybe add a check that uri->server is empty?

I am saying that we will ignore the server and port if
transport type is unix. But I guess I will add this check and change
the comments and patch description accordingly.

> 
> > +    } else {
> > +        gconf->server = g_strdup(uri->server);
> > +        gconf->port = uri->port;
> > +    }
> > +
> > +out:
> > +    if (qp) {
> > +        query_params_free(qp);
> > +    }
> > +    g_free(unescape_str);
> > +    uri_free(uri);
> > +    return ret;
> > +}
> > +
> > +static struct glfs *qemu_gluster_init(GlusterConf *gconf, const char *filename)
> > +{
> > +    struct glfs *glfs = NULL;
> > +    int ret;
> > +
> > +    ret = qemu_gluster_parseuri(gconf, filename);
> > +    if (ret < 0) {
> > +        error_report("Usage: file=gluster[+transport]://[server[:port]]/"
> > +            "volname/image[?socket=...]");
> > +        errno = -ret;
> > +        goto out;
> > +    }
> > +
> > +    glfs = glfs_new(gconf->volname);
> > +    if (!glfs) {
> > +        goto out;
> > +    }
> > +
> > +    ret = glfs_set_volfile_server(glfs, gconf->transport, gconf->server,
> > +            gconf->port);
> > +    if (ret < 0) {
> > +        goto out;
> > +    }
> > +
> > +    /*
> > +     * TODO: Use GF_LOG_ERROR instead of hard code value of 4 here when
> > +     * GlusterFS makes GF_LOG_* macros available to libgfapi users.
> > +     */
> > +    ret = glfs_set_logging(glfs, "-", 4);
> > +    if (ret < 0) {
> > +        goto out;
> > +    }
> > +
> > +    ret = glfs_init(glfs);
> > +    if (ret) {
> > +        error_report("Gluster connection failed for server=%s port=%d "
> > +             "volume=%s image=%s transport=%s\n", gconf->server, gconf->port,
> > +             gconf->volname, gconf->image, gconf->transport);
> > +        goto out;
> > +    }
> > +    return glfs;
> > +
> > +out:
> > +    if (glfs) {
> > +        glfs_fini(glfs);
> 
> Does this corrupt errno?

Currently glfs_fini() isn't implemented and it returns -1. I guess it could
modify errno when its implemented. At one point of time, I had a logic to save
the errno value from previous calls and restore it to errno if glfs_fini()
fails, but that looked ugly since I had to save errno values from
4 previous calls. Should I just save the errno from glfs_init() since
that does most of the validation, connection establishment etc and is more
likely to fail ?

> > +static int qemu_gluster_open(BlockDriverState *bs, const char *filename,
> > +    int bdrv_flags)
> > +{
> > +    BDRVGlusterState *s = bs->opaque;
> > +    int open_flags = 0;
> > +    int ret = 0;
> > +    GlusterConf *gconf = g_malloc0(sizeof(GlusterConf));
> > +
> > +    s->glfs = qemu_gluster_init(gconf, filename);
> > +    if (!s->glfs) {
> > +        ret = -errno;
> > +        goto out;
> > +    }
> > +
> > +    open_flags |=  O_BINARY;
> > +    open_flags &= ~O_ACCMODE;
> 
> open_flags == O_BINARY here, so no O_ACCMODE bits to clear.

Right, will fix.

> 
> > +static int qemu_gluster_send_pipe(BDRVGlusterState *s, GlusterAIOCB *acb)
> > +{
> > +    int ret = 0;
> > +
> > +    while (1) {
> > +        int fd = s->fds[GLUSTER_FD_WRITE];
> > +
> > +        ret = write(fd, (void *)&acb, sizeof(acb));
> > +        if (ret >= 0) {
> > +            break;
> > +        }
> > +        if (errno == EINTR) {
> > +            continue;
> > +        }
> > +        if (errno != EAGAIN) {
> > +            break;
> > +        }
> 
> Variatio delectat? ;-)
> 
> How about just do { ... } while (errno == EINTR || errno == EAGAIN); ?

I will go with qemu_write_full(). With that I could get rid of
qemu_gluster_send_pipe() totally.

Regards,
Bharata.
Paolo Bonzini - Sept. 26, 2012, 4:38 p.m.
Il 26/09/2012 18:11, Bharata B Rao ha scritto:
>>> +static int parse_volume_options(GlusterConf *gconf, char *path)
>>> > > +{
>>> > > +    char *token, *saveptr;
>>> > > +
>>> > > +    /* volname */
>>> > > +    token = strtok_r(path, "/", &saveptr);
>>> > > +    if (!token) {
>>> > > +        return -EINVAL;
>>> > > +    }
>>> > > +    gconf->volname = g_strdup(token);
>>> > > +
>>> > > +    /* image */
>>> > > +    token = strtok_r(NULL, "?", &saveptr);
>> > 
>> > If I understand uri.c right, there is no ? in the path, so there's no
>> > reason to call strtok. You could just use the rest of the string.

Actually there could be a %3F which uri.c would unescape to ? (only the
query part is left escaped), so your usage of strtok_r is incorrect.

> As you note, I don't need 2nd strtok strictly since the rest of the string
> is available in saveptr. But I thought using saveptr is not ideal or preferred.
> I wanted to use the most appropriate/safe delimiter to extract the image string
> in the 2nd strtok and decided to use '?'.

I don't think it is defined what saveptr points to.
http://publib.boulder.ibm.com/infocenter/pseries/v5r3/index.jsp?topic=/com.ibm.aix.basetechref/doc/basetrf2/strtok_r.htm
says "the strtok_r subroutine also updates the Pointer parameter with
the starting address of the token following the first occurrence of the
Separators parameter".  I read this as:

    *saveptr = token + strlen(token) + 1;

which is consistent with this strtok example from the C standard:

    #include <string.h>
    static char str[] = "?a???b,";
    char *t;

    t = strtok(str, "?");  // t points to the token "a"
    t = strtok(str, ",");  // t points to the token "??b"

Have you tested this code with multiple consecutive slashes?

> If you think using saveptr is fine, then I could use that as below...
> 
>     /* image */
>     if (!*saveptr) {
>         return -EINVAL;
>     }
>     gconf->image = g_strdup(saveptr);
> 

I would avoid strtok_r completely:

    char *p = path + strcpsn(path, "/");
    if (*p == '\0') {
        return -EINVAL;
    }
    gconf->volname = g_strndup(path, p - path);

    p += strspn(p, "/");
    if (*p == '\0') {
        return -EINVAL;
    }
    gconf->image = g_strdup(p);

Paolo
Bharata B Rao - Sept. 27, 2012, 6:41 a.m.
On Wed, Sep 26, 2012 at 06:38:02PM +0200, Paolo Bonzini wrote:
> Il 26/09/2012 18:11, Bharata B Rao ha scritto:
> >>> +static int parse_volume_options(GlusterConf *gconf, char *path)
> >>> > > +{
> >>> > > +    char *token, *saveptr;
> >>> > > +
> >>> > > +    /* volname */
> >>> > > +    token = strtok_r(path, "/", &saveptr);
> >>> > > +    if (!token) {
> >>> > > +        return -EINVAL;
> >>> > > +    }
> >>> > > +    gconf->volname = g_strdup(token);
> >>> > > +
> >>> > > +    /* image */
> >>> > > +    token = strtok_r(NULL, "?", &saveptr);
> >> > 
> >> > If I understand uri.c right, there is no ? in the path, so there's no
> >> > reason to call strtok. You could just use the rest of the string.
> 
> Actually there could be a %3F which uri.c would unescape to ? (only the
> query part is left escaped), so your usage of strtok_r is incorrect.

Ok the approch of using 2 strtok as above would fail for URI's like this:

gluster+unix://server/volname/weird%3Fimage?socket=/path/to/socket

> 
> > As you note, I don't need 2nd strtok strictly since the rest of the string
> > is available in saveptr. But I thought using saveptr is not ideal or preferred.
> > I wanted to use the most appropriate/safe delimiter to extract the image string
> > in the 2nd strtok and decided to use '?'.
> 
> I don't think it is defined what saveptr points to.
> http://publib.boulder.ibm.com/infocenter/pseries/v5r3/index.jsp?topic=/com.ibm.aix.basetechref/doc/basetrf2/strtok_r.htm
> says "the strtok_r subroutine also updates the Pointer parameter with
> the starting address of the token following the first occurrence of the
> Separators parameter".  I read this as:
> 
>     *saveptr = token + strlen(token) + 1;
> 
> which is consistent with this strtok example from the C standard:
> 
>     #include <string.h>
>     static char str[] = "?a???b,";
>     char *t;
> 
>     t = strtok(str, "?");  // t points to the token "a"
>     t = strtok(str, ",");  // t points to the token "??b"
> 
> Have you tested this code with multiple consecutive slashes?

Yes, both 2-strtok and strtok+saveptr approach work correctly (= as per
my expectation!) with multiple consecutive slashes. I do understand that
2-strtok approach will not work when we have %3F in the path component of
the URI.

For URIs like gluster://server/volname/path/to/image, both the approaches
extract image as "path/to/image".

For URIs like gluster://server/volname//path/to/image, both the approaches
extract image as "/path/to/image".

For gluster://server/volname////path/to/image, the image is extracted as
"//path/to/image".

> 
> > If you think using saveptr is fine, then I could use that as below...
> > 
> >     /* image */
> >     if (!*saveptr) {
> >         return -EINVAL;
> >     }
> >     gconf->image = g_strdup(saveptr);
> > 
> 
> I would avoid strtok_r completely:
> 
>     char *p = path + strcpsn(path, "/");
>     if (*p == '\0') {
>         return -EINVAL;
>     }
>     gconf->volname = g_strndup(path, p - path);
> 
>     p += strspn(p, "/");
>     if (*p == '\0') {
>         return -EINVAL;
>     }
>     gconf->image = g_strdup(p);

This isn't working because for a URI like
gluster://server/volname/path/to/image, uri_parse() will give
"/volname/path/to/image" in uri->path. I would have expected to see
uri->path as "volname/path/to/image" (without 1st slash).

Note that gluster is currently able to resolve image paths that don't
have a preceding slash (like dir/a.img). But I guess we should support
specifying complete image paths too (like /dir/a.img)

Regards,
Bharata.
Paolo Bonzini - Sept. 27, 2012, 7:19 a.m.
Il 27/09/2012 08:41, Bharata B Rao ha scritto:
>>> As you note, I don't need 2nd strtok strictly since the rest of the string
>>> is available in saveptr. But I thought using saveptr is not ideal or preferred.
>>> I wanted to use the most appropriate/safe delimiter to extract the image string
>>> in the 2nd strtok and decided to use '?'.
>>
>> I don't think it is defined what saveptr points to.
>> http://publib.boulder.ibm.com/infocenter/pseries/v5r3/index.jsp?topic=/com.ibm.aix.basetechref/doc/basetrf2/strtok_r.htm
>> says "the strtok_r subroutine also updates the Pointer parameter with
>> the starting address of the token following the first occurrence of the
>> Separators parameter".  I read this as:
>>
>>     *saveptr = token + strlen(token) + 1;
>>
>> which is consistent with this strtok example from the C standard:
>>
>>     #include <string.h>
>>     static char str[] = "?a???b,";
>>     char *t;
>>
>>     t = strtok(str, "?");  // t points to the token "a"
>>     t = strtok(str, ",");  // t points to the token "??b"
>>
>> Have you tested this code with multiple consecutive slashes?
> 
> Yes, both 2-strtok and strtok+saveptr approach work correctly (= as per
> my expectation!) with multiple consecutive slashes. I do understand that
> 2-strtok approach will not work when we have %3F in the path component of
> the URI.
> 
> For URIs like gluster://server/volname/path/to/image, both the approaches
> extract image as "path/to/image".
> 
> For URIs like gluster://server/volname//path/to/image, both the approaches
> extract image as "/path/to/image".
> 
> For gluster://server/volname////path/to/image, the image is extracted as
> "//path/to/image".

Should there be three /'s here?  I assume it's just a typo.

I'm concerned that there is no documentation of what saveptr actually
points to.  In many cases the strtok specification doesn't leave much
free room, but in the case you're testing here:

>>>     /* image */
>>>     if (!*saveptr) {
>>>         return -EINVAL;
>>>     }

strtok_r may just as well leave saveptr = NULL for example.

>>>     gconf->image = g_strdup(saveptr);
>>>
>>
>> I would avoid strtok_r completely:
>>
>>     char *p = path + strcpsn(path, "/");
>>     if (*p == '\0') {
>>         return -EINVAL;
>>     }
>>     gconf->volname = g_strndup(path, p - path);
>>
>>     p += strspn(p, "/");
>>     if (*p == '\0') {
>>         return -EINVAL;
>>     }
>>     gconf->image = g_strdup(p);
> 
> This isn't working because for a URI like
> gluster://server/volname/path/to/image, uri_parse() will give
> "/volname/path/to/image" in uri->path. I would have expected to see
> uri->path as "volname/path/to/image" (without 1st slash).

Ok, that's easy enough to fix with an extra strspn,

    char *p = path + strpsn(path, "/");
    p += strcspn(p, "/");


> Note that gluster is currently able to resolve image paths that don't
> have a preceding slash (like dir/a.img). But I guess we should support
> specifying complete image paths too (like /dir/a.img)

How would the URIs look like?

Paolo
Bharata B Rao - Sept. 27, 2012, 8:28 a.m.
On Thu, Sep 27, 2012 at 09:19:34AM +0200, Paolo Bonzini wrote:
> > 
> > For gluster://server/volname////path/to/image, the image is extracted as
> > "//path/to/image".
> 
> Should there be three /'s here?  I assume it's just a typo.

Yes it was a typo.

> 
> I'm concerned that there is no documentation of what saveptr actually
> points to.  In many cases the strtok specification doesn't leave much
> free room, but in the case you're testing here:
> 
> >>>     /* image */
> >>>     if (!*saveptr) {
> >>>         return -EINVAL;
> >>>     }
> 
> strtok_r may just as well leave saveptr = NULL for example.

Haven't seen that, but yes can't depend on that I suppose.

> 
> >>>     gconf->image = g_strdup(saveptr);
> >>>
> >>
> >> I would avoid strtok_r completely:
> >>
> >>     char *p = path + strcpsn(path, "/");
> >>     if (*p == '\0') {
> >>         return -EINVAL;
> >>     }
> >>     gconf->volname = g_strndup(path, p - path);
> >>
> >>     p += strspn(p, "/");
> >>     if (*p == '\0') {
> >>         return -EINVAL;
> >>     }
> >>     gconf->image = g_strdup(p);
> > 
> > This isn't working because for a URI like
> > gluster://server/volname/path/to/image, uri_parse() will give
> > "/volname/path/to/image" in uri->path. I would have expected to see
> > uri->path as "volname/path/to/image" (without 1st slash).
> 
> Ok, that's easy enough to fix with an extra strspn,
> 
>     char *p = path + strpsn(path, "/");
>     p += strcspn(p, "/");

Ok, this is how its looking finally...

    char *p, *q;
    p = q = path + strspn(path, "/");
    p += strcspn(p, "/");
    if (*p == '\0') {
        return -EINVAL;
    }
    gconf->volname = g_strndup(q, p - q);

    p += strspn(p, "/");
    if (*p == '\0') {
        return -EINVAL;
    }
    gconf->image = g_strdup(p);

> 
> 
> > Note that gluster is currently able to resolve image paths that don't
> > have a preceding slash (like dir/a.img). But I guess we should support
> > specifying complete image paths too (like /dir/a.img)
> 
> How would the URIs look like?

gluster://server/testvol//dir/a.img ?
Isn't this a valid URI ? Here volname=testvol and image=/dir/a.img.

If that's valid and needs to be supported, then the above strspn based
parsing logic needs some rewrite.

Regards,
Bharata.
Paolo Bonzini - Sept. 27, 2012, 8:31 a.m.
Il 27/09/2012 10:28, Bharata B Rao ha scritto:
>> > 
>> > How would the URIs look like?
> gluster://server/testvol//dir/a.img ?
> Isn't this a valid URI ? Here volname=testvol and image=/dir/a.img.

It is, but I think it would be canonicalized to
gluster://server/testvol/dir/a.img.

> If that's valid and needs to be supported, then the above strspn based
> parsing logic needs some rewrite.

I think we can avoid this, it is unintuitive.

Paolo
Paolo Bonzini - Oct. 3, 2012, 3:50 p.m.
Il 24/09/2012 11:13, Bharata B Rao ha scritto:
> +
> +    if ((bdrv_flags & BDRV_O_NOCACHE)) {
> +        open_flags |= O_DIRECT;
> +    }
> +

If I understand correctly what I was told, this prevents the brick
server from using its own buffer cache.  This is quite different from
what we do for example over NFS (where the host does no caching, but
nothing prevents it on the remote server).

I think these 3 lines should be removed.  We're bypassing the host
buffer cache just by virtue of using a userspace driver, and that's what
cache=none cares about.

Paolo
Anand Avati - Oct. 3, 2012, 5:58 p.m.
On 10/03/2012 08:50 AM, Paolo Bonzini wrote:
> Il 24/09/2012 11:13, Bharata B Rao ha scritto:
>> +
>> +    if ((bdrv_flags&  BDRV_O_NOCACHE)) {
>> +        open_flags |= O_DIRECT;
>> +    }
>> +
>
> If I understand correctly what I was told, this prevents the brick
> server from using its own buffer cache.  This is quite different from
> what we do for example over NFS (where the host does no caching, but
> nothing prevents it on the remote server).
>
> I think these 3 lines should be removed.  We're bypassing the host
> buffer cache just by virtue of using a userspace driver, and that's what
> cache=none cares about.
>
> Paolo

O_DIRECT also has an effect on the behavior of the "client side" (the 
part within the qemu) of Gluster stack as well. I presume the intention 
of O_DIRECT is to minimize use of memory (whether as host' page cache or 
buffered data in user space). To that end it is a good idea to leave 
O_DIRECT flag set.

The behavior of whether gluster bricks need to get the O_DIRECT 
propagated or not is a different issue. We are exploring the possibility 
of not sending O_DIRECT flag over the wire to mimic NFS behavior. That 
would be independent of the qemu block driver setting the open flag.

Avati
Anand Avati - Oct. 3, 2012, 6:17 p.m.
On 10/03/2012 11:17 AM, Paolo Bonzini wrote:
> Il 03/10/2012 19:58, Anand Avati ha scritto:
>>>
>>> I think these 3 lines should be removed.  We're bypassing the host
>>> buffer cache just by virtue of using a userspace driver, and that's what
>>> cache=none cares about.
>>
>> O_DIRECT also has an effect on the behavior of the "client side" (the
>> part within the qemu) of Gluster stack as well. I presume the intention
>> of O_DIRECT is to minimize use of memory (whether as host' page cache or
>> buffered data in user space). To that end it is a good idea to leave
>> O_DIRECT flag set.
>>
>> The behavior of whether gluster bricks need to get the O_DIRECT
>> propagated or not is a different issue. We are exploring the possibility
>> of not sending O_DIRECT flag over the wire to mimic NFS behavior. That
>> would be independent of the qemu block driver setting the open flag.
>
> What is the effect of O_DIRECT on the client exactly?

To avoid caching in the io-cache module, disable read-ahead etc (if 
those translators are loaded). The behavior in write-behind is tunable. 
You could either disable write-behind entirely (which will happen once 
libgfapi supports 0-copy/RDMA) or perform a sliding-window like 
size-limited write-behind (defaults to 1MB).

Avati
Paolo Bonzini - Oct. 3, 2012, 6:17 p.m.
Il 03/10/2012 19:58, Anand Avati ha scritto:
>>
>> I think these 3 lines should be removed.  We're bypassing the host
>> buffer cache just by virtue of using a userspace driver, and that's what
>> cache=none cares about.
> 
> O_DIRECT also has an effect on the behavior of the "client side" (the
> part within the qemu) of Gluster stack as well. I presume the intention
> of O_DIRECT is to minimize use of memory (whether as host' page cache or
> buffered data in user space). To that end it is a good idea to leave
> O_DIRECT flag set.
> 
> The behavior of whether gluster bricks need to get the O_DIRECT
> propagated or not is a different issue. We are exploring the possibility
> of not sending O_DIRECT flag over the wire to mimic NFS behavior. That
> would be independent of the qemu block driver setting the open flag.

What is the effect of O_DIRECT on the client exactly?

Paolo
Paolo Bonzini - Oct. 3, 2012, 6:27 p.m.
Il 03/10/2012 20:17, Anand Avati ha scritto:
> On 10/03/2012 11:17 AM, Paolo Bonzini wrote:
>> Il 03/10/2012 19:58, Anand Avati ha scritto:
>>>>
>>>> I think these 3 lines should be removed.  We're bypassing the host
>>>> buffer cache just by virtue of using a userspace driver, and that's
>>>> what
>>>> cache=none cares about.
>>>
>>> O_DIRECT also has an effect on the behavior of the "client side" (the
>>> part within the qemu) of Gluster stack as well. I presume the intention
>>> of O_DIRECT is to minimize use of memory (whether as host' page cache or
>>> buffered data in user space). To that end it is a good idea to leave
>>> O_DIRECT flag set.
>>>
>>> The behavior of whether gluster bricks need to get the O_DIRECT
>>> propagated or not is a different issue. We are exploring the possibility
>>> of not sending O_DIRECT flag over the wire to mimic NFS behavior. That
>>> would be independent of the qemu block driver setting the open flag.
>>
>> What is the effect of O_DIRECT on the client exactly?
> 
> To avoid caching in the io-cache module, disable read-ahead etc (if
> those translators are loaded). The behavior in write-behind is tunable.
> You could either disable write-behind entirely (which will happen once
> libgfapi supports 0-copy/RDMA) or perform a sliding-window like
> size-limited write-behind (defaults to 1MB).

Thanks for the information.

I think we need to benchmark it and see what is the actual difference in
memory usage vs. performance.  The usual reasons for cache=none (enable
Linux native AIO + allow migration with shared non-coherent storage) do
not apply to gluster.

Paolo

Patch

diff --git a/block/Makefile.objs b/block/Makefile.objs
index b5754d3..a1ae67f 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -9,3 +9,4 @@  block-obj-$(CONFIG_POSIX) += raw-posix.o
 block-obj-$(CONFIG_LIBISCSI) += iscsi.o
 block-obj-$(CONFIG_CURL) += curl.o
 block-obj-$(CONFIG_RBD) += rbd.o
+block-obj-$(CONFIG_GLUSTERFS) += gluster.o
diff --git a/block/gluster.c b/block/gluster.c
new file mode 100644
index 0000000..a2f8303
--- /dev/null
+++ b/block/gluster.c
@@ -0,0 +1,642 @@ 
+/*
+ * GlusterFS backend for QEMU
+ *
+ * Copyright (C) 2012 Bharata B Rao <bharata@linux.vnet.ibm.com>
+ *
+ * Pipe handling mechanism in AIO implementation is derived from
+ * block/rbd.c. Hence,
+ *
+ * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
+ *                         Josh Durgin <josh.durgin@dreamhost.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ * Contributions after 2012-01-13 are licensed under the terms of the
+ * GNU GPL, version 2 or (at your option) any later version.
+ */
+#include <glusterfs/api/glfs.h>
+#include "block_int.h"
+#include "qemu_socket.h"
+#include "uri.h"
+
+typedef struct GlusterAIOCB {
+    BlockDriverAIOCB common;
+    int64_t size;
+    int ret;
+    bool *finished;
+    QEMUBH *bh;
+} GlusterAIOCB;
+
+typedef struct BDRVGlusterState {
+    struct glfs *glfs;
+    int fds[2];
+    struct glfs_fd *fd;
+    int qemu_aio_count;
+    int event_reader_pos;
+    GlusterAIOCB *event_acb;
+} BDRVGlusterState;
+
+#define GLUSTER_FD_READ  0
+#define GLUSTER_FD_WRITE 1
+
+typedef struct GlusterConf {
+    char *server;
+    int port;
+    char *volname;
+    char *image;
+    char *transport;
+} GlusterConf;
+
+static void qemu_gluster_gconf_free(GlusterConf *gconf)
+{
+    g_free(gconf->server);
+    g_free(gconf->volname);
+    g_free(gconf->image);
+    g_free(gconf->transport);
+    g_free(gconf);
+}
+
+static int parse_volume_options(GlusterConf *gconf, char *path)
+{
+    char *token, *saveptr;
+
+    /* volname */
+    token = strtok_r(path, "/", &saveptr);
+    if (!token) {
+        return -EINVAL;
+    }
+    gconf->volname = g_strdup(token);
+
+    /* image */
+    token = strtok_r(NULL, "?", &saveptr);
+    if (!token) {
+        return -EINVAL;
+    }
+    gconf->image = g_strdup(token);
+    return 0;
+}
+
+/*
+ * file=gluster[+transport]://[server[:port]]/volname/image[?socket=...]
+ *
+ * 'gluster' is the protocol.
+ *
+ * 'transport' specifies the transport type used to connect to gluster
+ * management daemon (glusterd). Valid transport types are
+ * tcp, unix and rdma. If a transport type isn't specified, then tcp
+ * type is assumed.
+ *
+ * 'server' specifies the server where the volume file specification for
+ * the given volume resides. This can be either hostname, ipv4 address
+ * or ipv6 address. ipv6 address needs to be within square brackets [ ].
+ * If transport type is 'unix', then server field is ignored, but the
+ * 'socket' field needs to be populated with the path to unix domain
+ * socket.
+ *
+ * 'port' is the port number on which glusterd is listening. This is optional
+ * and if not specified, QEMU will send 0 which will make gluster to use the
+ * default port. port is ignored for unix type of transport.
+ *
+ * 'volname' is the name of the gluster volume which contains the VM image.
+ *
+ * 'image' is the path to the actual VM image that resides on gluster volume.
+ *
+ * Examples:
+ *
+ * file=gluster://1.2.3.4/testvol/a.img
+ * file=gluster+tcp://1.2.3.4/testvol/a.img
+ * file=gluster+tcp://1.2.3.4:24007/testvol/dir/a.img
+ * file=gluster+tcp://[1:2:3:4:5:6:7:8]/testvol/dir/a.img
+ * file=gluster+tcp://[1:2:3:4:5:6:7:8]:24007/testvol/dir/a.img
+ * file=gluster+tcp://server.domain.com:24007/testvol/dir/a.img
+ * file=gluster+unix:///testvol/dir/a.img?socket=/tmp/glusterd.socket
+ * file=gluster+rdma://1.2.3.4:24007/testvol/a.img
+ */
+static int qemu_gluster_parseuri(GlusterConf *gconf, const char *filename)
+{
+    URI *uri;
+    QueryParams *qp = NULL;
+    bool is_unix = false;
+    int ret = 0;
+    char *unescape_str = NULL;
+
+    uri = uri_parse(filename);
+    if (!uri) {
+        return -EINVAL;
+    }
+
+    /* transport */
+    if (!strcmp(uri->scheme, "gluster")) {
+        gconf->transport = g_strdup("tcp");
+    } else if (!strcmp(uri->scheme, "gluster+tcp")) {
+        gconf->transport = g_strdup("tcp");
+    } else if (!strcmp(uri->scheme, "gluster+unix")) {
+        gconf->transport = g_strdup("unix");
+        is_unix = true;
+    } else if (!strcmp(uri->scheme, "gluster+rdma")) {
+        gconf->transport = g_strdup("rdma");
+    } else {
+        ret = -EINVAL;
+        goto out;
+    }
+
+    ret = parse_volume_options(gconf, uri->path);
+    if (ret < 0) {
+        goto out;
+    }
+
+    if (uri->query) {
+        unescape_str = uri_string_unescape(uri->query, -1, NULL);
+        if (!unescape_str) {
+            ret = -EINVAL;
+            goto out;
+        }
+    }
+
+    qp = query_params_parse(unescape_str);
+    if (qp->n > 1 || (is_unix && !qp->n) || (!is_unix && qp->n)) {
+        ret = -EINVAL;
+        goto out;
+    }
+
+    if (is_unix) {
+        if (strcmp(qp->p[0].name, "socket")) {
+            ret = -EINVAL;
+            goto out;
+        }
+        gconf->server = g_strdup(qp->p[0].value);
+    } else {
+        gconf->server = g_strdup(uri->server);
+        gconf->port = uri->port;
+    }
+
+out:
+    if (qp) {
+        query_params_free(qp);
+    }
+    g_free(unescape_str);
+    uri_free(uri);
+    return ret;
+}
+
+static struct glfs *qemu_gluster_init(GlusterConf *gconf, const char *filename)
+{
+    struct glfs *glfs = NULL;
+    int ret;
+
+    ret = qemu_gluster_parseuri(gconf, filename);
+    if (ret < 0) {
+        error_report("Usage: file=gluster[+transport]://[server[:port]]/"
+            "volname/image[?socket=...]");
+        errno = -ret;
+        goto out;
+    }
+
+    glfs = glfs_new(gconf->volname);
+    if (!glfs) {
+        goto out;
+    }
+
+    ret = glfs_set_volfile_server(glfs, gconf->transport, gconf->server,
+            gconf->port);
+    if (ret < 0) {
+        goto out;
+    }
+
+    /*
+     * TODO: Use GF_LOG_ERROR instead of hard code value of 4 here when
+     * GlusterFS makes GF_LOG_* macros available to libgfapi users.
+     */
+    ret = glfs_set_logging(glfs, "-", 4);
+    if (ret < 0) {
+        goto out;
+    }
+
+    ret = glfs_init(glfs);
+    if (ret) {
+        error_report("Gluster connection failed for server=%s port=%d "
+             "volume=%s image=%s transport=%s\n", gconf->server, gconf->port,
+             gconf->volname, gconf->image, gconf->transport);
+        goto out;
+    }
+    return glfs;
+
+out:
+    if (glfs) {
+        glfs_fini(glfs);
+    }
+    return NULL;
+}
+
+static void qemu_gluster_complete_aio(GlusterAIOCB *acb, BDRVGlusterState *s)
+{
+    int ret;
+    bool *finished = acb->finished;
+    BlockDriverCompletionFunc *cb = acb->common.cb;
+    void *opaque = acb->common.opaque;
+
+    if (!acb->ret || acb->ret == acb->size) {
+        ret = 0; /* Success */
+    } else if (acb->ret < 0) {
+        ret = acb->ret; /* Read/Write failed */
+    } else {
+        ret = -EIO; /* Partial read/write - fail it */
+    }
+
+    s->qemu_aio_count--;
+    qemu_aio_release(acb);
+    cb(opaque, ret);
+    if (finished) {
+        *finished = true;
+    }
+}
+
+static void qemu_gluster_aio_event_reader(void *opaque)
+{
+    BDRVGlusterState *s = opaque;
+    ssize_t ret;
+
+    do {
+        char *p = (char *)&s->event_acb;
+
+        ret = read(s->fds[GLUSTER_FD_READ], p + s->event_reader_pos,
+                   sizeof(s->event_acb) - s->event_reader_pos);
+        if (ret > 0) {
+            s->event_reader_pos += ret;
+            if (s->event_reader_pos == sizeof(s->event_acb)) {
+                s->event_reader_pos = 0;
+                qemu_gluster_complete_aio(s->event_acb, s);
+            }
+        }
+    } while (ret < 0 && errno == EINTR);
+}
+
+static int qemu_gluster_aio_flush_cb(void *opaque)
+{
+    BDRVGlusterState *s = opaque;
+
+    return (s->qemu_aio_count > 0);
+}
+
+static int qemu_gluster_open(BlockDriverState *bs, const char *filename,
+    int bdrv_flags)
+{
+    BDRVGlusterState *s = bs->opaque;
+    int open_flags = 0;
+    int ret = 0;
+    GlusterConf *gconf = g_malloc0(sizeof(GlusterConf));
+
+    s->glfs = qemu_gluster_init(gconf, filename);
+    if (!s->glfs) {
+        ret = -errno;
+        goto out;
+    }
+
+    open_flags |=  O_BINARY;
+    open_flags &= ~O_ACCMODE;
+    if (bdrv_flags & BDRV_O_RDWR) {
+        open_flags |= O_RDWR;
+    } else {
+        open_flags |= O_RDONLY;
+    }
+
+    if ((bdrv_flags & BDRV_O_NOCACHE)) {
+        open_flags |= O_DIRECT;
+    }
+
+    s->fd = glfs_open(s->glfs, gconf->image, open_flags);
+    if (!s->fd) {
+        ret = -errno;
+        goto out;
+    }
+
+    ret = qemu_pipe(s->fds);
+    if (ret < 0) {
+        ret = -errno;
+        goto out;
+    }
+    fcntl(s->fds[GLUSTER_FD_READ], F_SETFL, O_NONBLOCK);
+    qemu_aio_set_fd_handler(s->fds[GLUSTER_FD_READ],
+        qemu_gluster_aio_event_reader, NULL, qemu_gluster_aio_flush_cb, s);
+
+out:
+    qemu_gluster_gconf_free(gconf);
+    if (!ret) {
+        return ret;
+    }
+    if (s->fd) {
+        glfs_close(s->fd);
+    }
+    if (s->glfs) {
+        glfs_fini(s->glfs);
+    }
+    return ret;
+}
+
+static int qemu_gluster_create(const char *filename,
+        QEMUOptionParameter *options)
+{
+    struct glfs *glfs;
+    struct glfs_fd *fd;
+    int ret = 0;
+    int64_t total_size = 0;
+    GlusterConf *gconf = g_malloc0(sizeof(GlusterConf));
+
+    glfs = qemu_gluster_init(gconf, filename);
+    if (!glfs) {
+        ret = -errno;
+        goto out;
+    }
+
+    while (options && options->name) {
+        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
+            total_size = options->value.n / BDRV_SECTOR_SIZE;
+        }
+        options++;
+    }
+
+    fd = glfs_creat(glfs, gconf->image,
+        O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, S_IRUSR | S_IWUSR);
+    if (!fd) {
+        ret = -errno;
+    } else {
+        if (glfs_ftruncate(fd, total_size * BDRV_SECTOR_SIZE) != 0) {
+            ret = -errno;
+        }
+        if (glfs_close(fd) != 0) {
+            ret = -errno;
+        }
+    }
+out:
+    qemu_gluster_gconf_free(gconf);
+    if (glfs) {
+        glfs_fini(glfs);
+    }
+    return ret;
+}
+
+static void qemu_gluster_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+    GlusterAIOCB *acb = (GlusterAIOCB *)blockacb;
+    bool finished = false;
+
+    acb->finished = &finished;
+    while (!finished) {
+        qemu_aio_wait();
+    }
+}
+
+static AIOPool gluster_aio_pool = {
+    .aiocb_size = sizeof(GlusterAIOCB),
+    .cancel = qemu_gluster_aio_cancel,
+};
+
+static int qemu_gluster_send_pipe(BDRVGlusterState *s, GlusterAIOCB *acb)
+{
+    int ret = 0;
+
+    while (1) {
+        int fd = s->fds[GLUSTER_FD_WRITE];
+
+        ret = write(fd, (void *)&acb, sizeof(acb));
+        if (ret >= 0) {
+            break;
+        }
+        if (errno == EINTR) {
+            continue;
+        }
+        if (errno != EAGAIN) {
+            break;
+        }
+    }
+    return ret;
+}
+
+static void gluster_finish_aiocb(struct glfs_fd *fd, ssize_t ret, void *arg)
+{
+    GlusterAIOCB *acb = (GlusterAIOCB *)arg;
+    BlockDriverState *bs = acb->common.bs;
+    BDRVGlusterState *s = bs->opaque;
+
+    acb->ret = ret;
+    if (qemu_gluster_send_pipe(s, acb) < 0) {
+        /*
+         * Gluster AIO callback thread failed to notify the waiting
+         * QEMU thread about IO completion.
+         *
+         * Complete this IO request and make the disk inaccessible for
+         * subsequent reads and writes.
+         */
+        error_report("Gluster failed to notify QEMU about IO completion");
+
+        qemu_mutex_lock_iothread(); /* We are in gluster thread context */
+        acb->common.cb(acb->common.opaque, -EIO);
+        qemu_aio_release(acb);
+        s->qemu_aio_count--;
+        close(s->fds[GLUSTER_FD_READ]);
+        close(s->fds[GLUSTER_FD_WRITE]);
+        qemu_aio_set_fd_handler(s->fds[GLUSTER_FD_READ], NULL, NULL, NULL,
+            NULL);
+        bs->drv = NULL; /* Make the disk inaccessible */
+        qemu_mutex_unlock_iothread();
+    }
+}
+
+static BlockDriverAIOCB *qemu_gluster_aio_rw(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque, int write)
+{
+    int ret;
+    GlusterAIOCB *acb;
+    BDRVGlusterState *s = bs->opaque;
+    size_t size;
+    off_t offset;
+
+    offset = sector_num * BDRV_SECTOR_SIZE;
+    size = nb_sectors * BDRV_SECTOR_SIZE;
+    s->qemu_aio_count++;
+
+    acb = qemu_aio_get(&gluster_aio_pool, bs, cb, opaque);
+    acb->size = size;
+    acb->ret = 0;
+    acb->finished = NULL;
+
+    if (write) {
+        ret = glfs_pwritev_async(s->fd, qiov->iov, qiov->niov, offset, 0,
+            &gluster_finish_aiocb, acb);
+    } else {
+        ret = glfs_preadv_async(s->fd, qiov->iov, qiov->niov, offset, 0,
+            &gluster_finish_aiocb, acb);
+    }
+
+    if (ret < 0) {
+        goto out;
+    }
+    return &acb->common;
+
+out:
+    s->qemu_aio_count--;
+    qemu_aio_release(acb);
+    return NULL;
+}
+
+static BlockDriverAIOCB *qemu_gluster_aio_readv(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    return qemu_gluster_aio_rw(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
+}
+
+static BlockDriverAIOCB *qemu_gluster_aio_writev(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    return qemu_gluster_aio_rw(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
+}
+
+static BlockDriverAIOCB *qemu_gluster_aio_flush(BlockDriverState *bs,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    int ret;
+    GlusterAIOCB *acb;
+    BDRVGlusterState *s = bs->opaque;
+
+    acb = qemu_aio_get(&gluster_aio_pool, bs, cb, opaque);
+    acb->size = 0;
+    acb->ret = 0;
+    acb->finished = NULL;
+    s->qemu_aio_count++;
+
+    ret = glfs_fsync_async(s->fd, &gluster_finish_aiocb, acb);
+    if (ret < 0) {
+        goto out;
+    }
+    return &acb->common;
+
+out:
+    s->qemu_aio_count--;
+    qemu_aio_release(acb);
+    return NULL;
+}
+
+static int64_t qemu_gluster_getlength(BlockDriverState *bs)
+{
+    BDRVGlusterState *s = bs->opaque;
+    int64_t ret;
+
+    ret = glfs_lseek(s->fd, 0, SEEK_END);
+    if (ret < 0) {
+        return -errno;
+    } else {
+        return ret;
+    }
+}
+
+static int64_t qemu_gluster_allocated_file_size(BlockDriverState *bs)
+{
+    BDRVGlusterState *s = bs->opaque;
+    struct stat st;
+    int ret;
+
+    ret = glfs_fstat(s->fd, &st);
+    if (ret < 0) {
+        return -errno;
+    } else {
+        return st.st_blocks * 512;
+    }
+}
+
+static void qemu_gluster_close(BlockDriverState *bs)
+{
+    BDRVGlusterState *s = bs->opaque;
+
+    close(s->fds[GLUSTER_FD_READ]);
+    close(s->fds[GLUSTER_FD_WRITE]);
+    qemu_aio_set_fd_handler(s->fds[GLUSTER_FD_READ], NULL, NULL, NULL, NULL);
+
+    if (s->fd) {
+        glfs_close(s->fd);
+        s->fd = NULL;
+    }
+    glfs_fini(s->glfs);
+}
+
+static QEMUOptionParameter qemu_gluster_create_options[] = {
+    {
+        .name = BLOCK_OPT_SIZE,
+        .type = OPT_SIZE,
+        .help = "Virtual disk size"
+    },
+    { NULL }
+};
+
+static BlockDriver bdrv_gluster = {
+    .format_name                  = "gluster",
+    .protocol_name                = "gluster",
+    .instance_size                = sizeof(BDRVGlusterState),
+    .bdrv_file_open               = qemu_gluster_open,
+    .bdrv_close                   = qemu_gluster_close,
+    .bdrv_create                  = qemu_gluster_create,
+    .bdrv_getlength               = qemu_gluster_getlength,
+    .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size,
+    .bdrv_aio_readv               = qemu_gluster_aio_readv,
+    .bdrv_aio_writev              = qemu_gluster_aio_writev,
+    .bdrv_aio_flush               = qemu_gluster_aio_flush,
+    .create_options               = qemu_gluster_create_options,
+};
+
+static BlockDriver bdrv_gluster_tcp = {
+    .format_name                  = "gluster",
+    .protocol_name                = "gluster+tcp",
+    .instance_size                = sizeof(BDRVGlusterState),
+    .bdrv_file_open               = qemu_gluster_open,
+    .bdrv_close                   = qemu_gluster_close,
+    .bdrv_create                  = qemu_gluster_create,
+    .bdrv_getlength               = qemu_gluster_getlength,
+    .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size,
+    .bdrv_aio_readv               = qemu_gluster_aio_readv,
+    .bdrv_aio_writev              = qemu_gluster_aio_writev,
+    .bdrv_aio_flush               = qemu_gluster_aio_flush,
+    .create_options               = qemu_gluster_create_options,
+};
+
+static BlockDriver bdrv_gluster_unix = {
+    .format_name                  = "gluster",
+    .protocol_name                = "gluster+unix",
+    .instance_size                = sizeof(BDRVGlusterState),
+    .bdrv_file_open               = qemu_gluster_open,
+    .bdrv_close                   = qemu_gluster_close,
+    .bdrv_create                  = qemu_gluster_create,
+    .bdrv_getlength               = qemu_gluster_getlength,
+    .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size,
+    .bdrv_aio_readv               = qemu_gluster_aio_readv,
+    .bdrv_aio_writev              = qemu_gluster_aio_writev,
+    .bdrv_aio_flush               = qemu_gluster_aio_flush,
+    .create_options               = qemu_gluster_create_options,
+};
+
+static BlockDriver bdrv_gluster_rdma = {
+    .format_name                  = "gluster",
+    .protocol_name                = "gluster+rdma",
+    .instance_size                = sizeof(BDRVGlusterState),
+    .bdrv_file_open               = qemu_gluster_open,
+    .bdrv_close                   = qemu_gluster_close,
+    .bdrv_create                  = qemu_gluster_create,
+    .bdrv_getlength               = qemu_gluster_getlength,
+    .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size,
+    .bdrv_aio_readv               = qemu_gluster_aio_readv,
+    .bdrv_aio_writev              = qemu_gluster_aio_writev,
+    .bdrv_aio_flush               = qemu_gluster_aio_flush,
+    .create_options               = qemu_gluster_create_options,
+};
+
+static void bdrv_gluster_init(void)
+{
+    bdrv_register(&bdrv_gluster_rdma);
+    bdrv_register(&bdrv_gluster_unix);
+    bdrv_register(&bdrv_gluster_tcp);
+    bdrv_register(&bdrv_gluster);
+}
+
+block_init(bdrv_gluster_init);