Patchwork [v4,2/2] block: Support GlusterFS as a QEMU block backend

login
register
mail settings
Submitter Bharata B Rao
Date Aug. 1, 2012, 2:16 p.m.
Message ID <20120801141625.GF21697@in.ibm.com>
Download mbox | patch
Permalink /patch/174468/
State New
Headers show

Comments

Bharata B Rao - Aug. 1, 2012, 2:16 p.m.
block: Support GlusterFS as a QEMU block backend.

From: Bharata B Rao <bharata@linux.vnet.ibm.com>

This patch adds gluster as the new block backend in QEMU. This gives
QEMU the ability to boot VM images from gluster volumes. Its already
possible to boot from VM images on gluster volumes, but this patchset
provides the ability to boot VM images from gluster volumes by by-passing
the FUSE layer in gluster.

VM Image on gluster volume is specified like this:

file=gluster://server:[port]/volname/image[?transport=socket]

'gluster' is the protocol.

'server' specifies the server where the volume file specification for
the given volume resides. This can be either hostname or ipv4 address
or ipv6 address. ipv6 address needs to be with in square brackets [ ].

port' is the port number on which gluster management daemon (glusterd) is
listening. This is optional and if not specified, QEMU will send 0 which
will make libgfapi to use the default port.

'volname' is the name of the gluster volume which contains the VM image.

'image' is the path to the actual VM image in the gluster volume.

'transport' specifies the transport used to connect to glusterd. This is
optional and if not specified, socket transport is used.

Examples:

file=gluster://1.2.3.4/testvol/a.img
file=gluster://1.2.3.4:5000/testvol/dir/a.img?transport=socket
file=gluster://[1:2:3:4:5:6:7:8]/testvol/dir/a.img
file=gluster://[1:2:3:4:5:6:7:8]:5000/testvol/dir/a.img?transport=socket
file=gluster://server.domain.com:5000/testvol/dir/a.img

Signed-off-by: Bharata B Rao <bharata@linux.vnet.ibm.com>
Reviewed-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---

 block/Makefile.objs |    1 
 block/gluster.c     |  610 +++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 611 insertions(+), 0 deletions(-)
 create mode 100644 block/gluster.c
Blue Swirl - Aug. 1, 2012, 6:35 p.m.
On Wed, Aug 1, 2012 at 2:16 PM, Bharata B Rao
<bharata@linux.vnet.ibm.com> wrote:
> block: Support GlusterFS as a QEMU block backend.
>
> From: Bharata B Rao <bharata@linux.vnet.ibm.com>
>
> This patch adds gluster as the new block backend in QEMU. This gives
> QEMU the ability to boot VM images from gluster volumes. Its already
> possible to boot from VM images on gluster volumes, but this patchset
> provides the ability to boot VM images from gluster volumes by by-passing
> the FUSE layer in gluster.
>
> VM Image on gluster volume is specified like this:
>
> file=gluster://server:[port]/volname/image[?transport=socket]
>
> 'gluster' is the protocol.
>
> 'server' specifies the server where the volume file specification for
> the given volume resides. This can be either hostname or ipv4 address
> or ipv6 address. ipv6 address needs to be with in square brackets [ ].
>
> port' is the port number on which gluster management daemon (glusterd) is
> listening. This is optional and if not specified, QEMU will send 0 which
> will make libgfapi to use the default port.
>
> 'volname' is the name of the gluster volume which contains the VM image.
>
> 'image' is the path to the actual VM image in the gluster volume.
>
> 'transport' specifies the transport used to connect to glusterd. This is
> optional and if not specified, socket transport is used.
>
> Examples:
>
> file=gluster://1.2.3.4/testvol/a.img
> file=gluster://1.2.3.4:5000/testvol/dir/a.img?transport=socket
> file=gluster://[1:2:3:4:5:6:7:8]/testvol/dir/a.img
> file=gluster://[1:2:3:4:5:6:7:8]:5000/testvol/dir/a.img?transport=socket
> file=gluster://server.domain.com:5000/testvol/dir/a.img
>
> Signed-off-by: Bharata B Rao <bharata@linux.vnet.ibm.com>
> Reviewed-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
> ---
>
>  block/Makefile.objs |    1
>  block/gluster.c     |  610 +++++++++++++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 611 insertions(+), 0 deletions(-)
>  create mode 100644 block/gluster.c
>
>
> diff --git a/block/Makefile.objs b/block/Makefile.objs
> index b5754d3..a1ae67f 100644
> --- a/block/Makefile.objs
> +++ b/block/Makefile.objs
> @@ -9,3 +9,4 @@ block-obj-$(CONFIG_POSIX) += raw-posix.o
>  block-obj-$(CONFIG_LIBISCSI) += iscsi.o
>  block-obj-$(CONFIG_CURL) += curl.o
>  block-obj-$(CONFIG_RBD) += rbd.o
> +block-obj-$(CONFIG_GLUSTERFS) += gluster.o
> diff --git a/block/gluster.c b/block/gluster.c
> new file mode 100644
> index 0000000..712752f
> --- /dev/null
> +++ b/block/gluster.c
> @@ -0,0 +1,610 @@
> +/*
> + * GlusterFS backend for QEMU
> + *
> + * (AIO implementation is derived from block/rbd.c)
> + *
> + * Copyright (C) 2012 Bharata B Rao <bharata@linux.vnet.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or
> + * (at your option) any later version. See the COPYING file in the top-level
> + * directory.
> + */
> +#include "block_int.h"
> +#include <glusterfs/api/glfs.h>
> +
> +typedef struct GlusterAIOCB {
> +    BlockDriverAIOCB common;
> +    bool canceled;
> +    int64_t size;
> +    int ret;
> +} GlusterAIOCB;
> +
> +typedef struct BDRVGlusterState {
> +    struct glfs *glfs;
> +    int fds[2];
> +    struct glfs_fd *fd;
> +    int qemu_aio_count;
> +} BDRVGlusterState;
> +
> +#define GLUSTER_FD_READ 0
> +#define GLUSTER_FD_WRITE 1
> +
> +typedef struct GlusterURI {
> +    char *server;
> +    int port;
> +    char *volname;
> +    char *image;
> +    char *transport;
> +} GlusterURI;
> +
> +static void qemu_gluster_uri_free(GlusterURI *uri)
> +{
> +    g_free(uri->server);
> +    g_free(uri->volname);
> +    g_free(uri->image);
> +    g_free(uri->transport);
> +    g_free(uri);
> +}
> +
> +/*
> + * We don't validate the transport option obtained here but
> + * instead depend on gluster to flag an error.
> + */
> +static int parse_transport(GlusterURI *uri, char *transport)
> +{
> +    char *token, *saveptr;
> +    int ret = -EINVAL;
> +
> +    if (!transport) {
> +        uri->transport = strdup("socket");

g_strdup

> +        ret = 0;
> +        goto out;
> +    }
> +
> +    token = strtok_r(transport, "=", &saveptr);
> +    if (!token) {
> +        goto out;
> +    }
> +    if (strcmp(token, "transport")) {
> +        goto out;
> +    }
> +    token = strtok_r(NULL, "=", &saveptr);
> +    if (!token) {
> +        goto out;
> +    }
> +    uri->transport = strdup(token);

g_strdup

> +    ret = 0;
> +out:
> +    return ret;
> +}
> +
> +static int parse_server(GlusterURI *uri, char *server)
> +{
> +    int ret = -EINVAL;
> +    char *token, *saveptr;
> +    char *p, *q = server;
> +
> +    p = strchr(server, '[');
> +    if (p) {
> +        /* [ipv6] */
> +        if (p != server) {
> +            /* [ not in the beginning */
> +            goto out;
> +        }
> +        q++;
> +        p = strrchr(p, ']');
> +        if (!p) {
> +            /* No matching ] */
> +            goto out;
> +        }
> +        *p++ = '\0';
> +        uri->server = strdup(q);

g_strdup

> +
> +        if (*p) {
> +            if (*p != ':') {
> +                /* [ipv6] followed by something other than : */
> +                goto out;
> +            }
> +            uri->port = strtoul(++p, NULL, 0);
> +            if (uri->port < 0) {
> +                goto out;
> +            }
> +        } else {
> +            /* port not specified, use default */
> +            uri->port = 0;
> +        }
> +
> +    } else {
> +        /* ipv4 or hostname */
> +        if (*server == ':') {
> +            /* port specified w/o a server */
> +            goto out;
> +        }
> +        token = strtok_r(server, ":", &saveptr);
> +        if (!token) {
> +            goto out;
> +        }
> +        uri->server = strdup(token);
> +        token = strtok_r(NULL, ":", &saveptr);
> +        if (token) {
> +            uri->port = strtoul(token, NULL, 0);
> +            if (uri->port < 0) {
> +                goto out;
> +            }
> +        } else {
> +            uri->port = 0;
> +        }
> +    }
> +    ret = 0;
> +out:
> +    return ret;
> +}
> +
> +/*
> + * file=gluster://server:[port]/volname/image[?transport=socket]
> + *
> + * 'gluster' is the protocol.
> + *
> + * 'server' specifies the server where the volume file specification for
> + * the given volume resides. This can be either hostname or ipv4 address
> + * or ipv6 address. ipv6 address needs to be with in square brackets [ ].
> + *
> + *'port' is the port number on which gluster management daemon (glusterd) is
> + * listening. This is optional and if not specified, QEMU will send 0 which
> + * will make libgfapi to use the default port.
> + *
> + * 'volname' is the name of the gluster volume which contains the VM image.
> + *
> + * 'image' is the path to the actual VM image in the gluster volume.
> + *
> + * 'transport' specifies the transport used to connect to glusterd. This is
> + * optional and if not specified, socket transport is used.
> + *
> + * Examples:
> + *
> + * file=gluster://1.2.3.4/testvol/a.img
> + * file=gluster://1.2.3.4:5000/testvol/dir/a.img?transport=socket
> + * file=gluster://[1:2:3:4:5:6:7:8]/testvol/dir/a.img
> + * file=gluster://[1:2:3:4:5:6:7:8]:5000/testvol/dir/a.img?transport=socket
> + * file=gluster://server.domain.com:5000/testvol/dir/a.img
> + */
> +static int qemu_gluster_parseuri(GlusterURI *uri, const char *filename)
> +{
> +    char *token, *saveptr;
> +    char *p, *r;
> +    int ret = -EINVAL;
> +
> +    p = r = g_strdup(filename);

Why?

> +    if (strncmp(p, "gluster://", 10)) {
> +        goto out;
> +    }
> +
> +    /* Discard the protocol */
> +    p += 10;
> +
> +    /* server */
> +    token = strtok_r(p, "/", &saveptr);
> +    if (!token) {
> +        goto out;
> +    }
> +
> +    ret = parse_server(uri, token);
> +    if (ret < 0) {
> +        goto out;
> +    }
> +
> +    /* volname */
> +    token = strtok_r(NULL, "/", &saveptr);
> +    if (!token) {
> +        ret = -EINVAL;
> +        goto out;
> +    }
> +    uri->volname = g_strdup(token);
> +
> +    /* image */
> +    token = strtok_r(NULL, "?", &saveptr);
> +    if (!token) {
> +        ret = -EINVAL;
> +        goto out;
> +    }
> +    uri->image = g_strdup(token);
> +
> +    /* transport */
> +    token = strtok_r(NULL, "?", &saveptr);
> +    ret = parse_transport(uri, token);
> +    if (ret < 0) {
> +        goto out;
> +     }
> +
> +    /* Flag error for extra options */
> +    token = strtok_r(NULL, "?", &saveptr);
> +    if (token) {
> +        ret = -1;
> +        goto out;
> +    }
> +    ret = 0;
> +out:
> +    g_free(r);
> +    return ret;
> +}
> +
> +static struct glfs *qemu_gluster_init(GlusterURI *uri, const char *filename)
> +{
> +    struct glfs *glfs = NULL;
> +    int ret;
> +
> +    ret = qemu_gluster_parseuri(uri, filename);
> +    if (ret < 0) {
> +        error_report("Usage: file=gluster://server:[port]/volname/image"
> +            "[?transport=socket]");
> +        errno = -ret;
> +        goto out;
> +    }
> +
> +    glfs = glfs_new(uri->volname);
> +    if (!glfs) {
> +        goto out;
> +    }
> +
> +    ret = glfs_set_volfile_server(glfs, uri->transport, uri->server,
> +        uri->port);
> +    if (ret < 0) {
> +        goto out;
> +    }
> +
> +    /*
> +     * TODO: Use GF_LOG_ERROR instead of hard code value of 4 here when
> +     * GlusterFS exports it in a header.
> +     */
> +    ret = glfs_set_logging(glfs, "-", 4);
> +    if (ret < 0) {
> +        goto out;
> +    }
> +
> +    ret = glfs_init(glfs);
> +    if (ret) {
> +        error_report("glfs_init() failed for server=%s port=%d volume=%s "
> +            "image=%s\n", uri->server, uri->port, uri->volname, uri->image);
> +        goto out;
> +    }
> +    return glfs;
> +
> +out:
> +    if (glfs) {
> +        glfs_fini(glfs);
> +    }
> +    return NULL;
> +}
> +
> +static void qemu_gluster_complete_aio(GlusterAIOCB *acb)
> +{
> +    int ret;
> +
> +    if (acb->canceled) {
> +        qemu_aio_release(acb);
> +        return;
> +    }
> +
> +    if (acb->ret == acb->size) {
> +        ret = 0; /* Success */
> +    } else if (acb->ret < 0) {
> +        ret = acb->ret; /* Read/Write failed */
> +    } else {
> +        ret = -EIO; /* Partial read/write - fail it */
> +    }
> +    acb->common.cb(acb->common.opaque, ret);
> +    qemu_aio_release(acb);
> +}
> +
> +static void qemu_gluster_aio_event_reader(void *opaque)
> +{
> +    BDRVGlusterState *s = opaque;
> +    GlusterAIOCB *event_acb;
> +    int event_reader_pos = 0;
> +    ssize_t ret;
> +
> +    do {
> +        char *p = (char *)&event_acb;
> +
> +        ret = read(s->fds[GLUSTER_FD_READ], p + event_reader_pos,
> +                   sizeof(event_acb) - event_reader_pos);
> +        if (ret > 0) {
> +            event_reader_pos += ret;
> +            if (event_reader_pos == sizeof(event_acb)) {
> +                event_reader_pos = 0;
> +                qemu_gluster_complete_aio(event_acb);
> +                s->qemu_aio_count--;
> +            }
> +        }
> +    } while (ret < 0 && errno == EINTR);
> +}
> +
> +static int qemu_gluster_aio_flush_cb(void *opaque)
> +{
> +    BDRVGlusterState *s = opaque;
> +
> +    return (s->qemu_aio_count > 0);
> +}
> +
> +static int qemu_gluster_open(BlockDriverState *bs, const char *filename,
> +    int bdrv_flags)
> +{
> +    BDRVGlusterState *s = bs->opaque;
> +    int open_flags = 0;
> +    int ret = 0;
> +    GlusterURI *uri = g_malloc0(sizeof(GlusterURI));
> +
> +    s->glfs = qemu_gluster_init(uri, filename);
> +    if (!s->glfs) {
> +        ret = -errno;
> +        goto out;
> +    }
> +
> +    open_flags |=  O_BINARY;
> +    open_flags &= ~O_ACCMODE;
> +    if (bdrv_flags & BDRV_O_RDWR) {
> +        open_flags |= O_RDWR;
> +    } else {
> +        open_flags |= O_RDONLY;
> +    }
> +
> +    if ((bdrv_flags & BDRV_O_NOCACHE)) {
> +        open_flags |= O_DIRECT;
> +    }
> +
> +    s->fd = glfs_open(s->glfs, uri->image, open_flags);
> +    if (!s->fd) {
> +        ret = -errno;
> +        goto out;
> +    }
> +
> +    ret = qemu_pipe(s->fds);
> +    if (ret < 0) {
> +        goto out;
> +    }
> +    fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
> +    fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
> +    qemu_aio_set_fd_handler(s->fds[GLUSTER_FD_READ],
> +        qemu_gluster_aio_event_reader, NULL, qemu_gluster_aio_flush_cb, s);
> +
> +out:
> +    qemu_gluster_uri_free(uri);
> +    if (!ret) {
> +        return ret;
> +    }
> +    if (s->fd) {
> +        glfs_close(s->fd);
> +    }
> +    if (s->glfs) {
> +        glfs_fini(s->glfs);
> +    }
> +    return ret;
> +}
> +
> +static int qemu_gluster_create(const char *filename,
> +        QEMUOptionParameter *options)
> +{
> +    struct glfs *glfs;
> +    struct glfs_fd *fd;
> +    int ret = 0;
> +    int64_t total_size = 0;
> +    GlusterURI *uri = g_malloc0(sizeof(GlusterURI));
> +
> +    glfs = qemu_gluster_init(uri, filename);
> +    if (!glfs) {
> +        ret = -errno;
> +        goto out;
> +    }
> +
> +    while (options && options->name) {
> +        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
> +            total_size = options->value.n / BDRV_SECTOR_SIZE;
> +        }
> +        options++;
> +    }
> +
> +    fd = glfs_creat(glfs, uri->image,
> +        O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, S_IRUSR | S_IWUSR);
> +    if (!fd) {
> +        ret = -errno;
> +    } else {
> +        if (glfs_ftruncate(fd, total_size * BDRV_SECTOR_SIZE) != 0) {
> +            ret = -errno;
> +        }
> +        if (glfs_close(fd) != 0) {
> +            ret = -errno;
> +        }
> +    }
> +out:
> +    qemu_gluster_uri_free(uri);
> +    if (glfs) {
> +        glfs_fini(glfs);
> +    }
> +    return ret;
> +}
> +
> +static void qemu_gluster_aio_cancel(BlockDriverAIOCB *blockacb)
> +{
> +    GlusterAIOCB *acb = (GlusterAIOCB *)blockacb;
> +
> +    acb->common.cb(acb->common.opaque, -ECANCELED);
> +    acb->canceled = true;
> +}
> +
> +static AIOPool gluster_aio_pool = {
> +    .aiocb_size = sizeof(GlusterAIOCB),
> +    .cancel = qemu_gluster_aio_cancel,
> +};
> +
> +static int qemu_gluster_send_pipe(BDRVGlusterState *s, GlusterAIOCB *acb)
> +{
> +    int ret = 0;
> +    while (1) {
> +        fd_set wfd;
> +        int fd = s->fds[GLUSTER_FD_WRITE];
> +
> +        ret = write(fd, (void *)&acb, sizeof(acb));
> +        if (ret >= 0) {
> +            break;
> +        }
> +        if (errno == EINTR) {
> +            continue;
> +        }
> +        if (errno != EAGAIN) {
> +            break;
> +        }
> +
> +        FD_ZERO(&wfd);
> +        FD_SET(fd, &wfd);
> +        do {
> +            ret = select(fd + 1, NULL, &wfd, NULL, NULL);
> +        } while (ret < 0 && errno == EINTR);
> +    }
> +    return ret;
> +}
> +
> +static void gluster_finish_aiocb(struct glfs_fd *fd, ssize_t ret, void *arg)
> +{
> +    GlusterAIOCB *acb = (GlusterAIOCB *)arg;
> +    BDRVGlusterState *s = acb->common.bs->opaque;
> +
> +    acb->ret = ret;
> +    if (qemu_gluster_send_pipe(s, acb) < 0) {
> +        error_report("Could not complete read/write/flush from gluster");
> +        abort();

Aborting is a bit drastic, it would be nice to save and exit gracefully.

> +    }
> +}
> +
> +static BlockDriverAIOCB *qemu_gluster_aio_rw(BlockDriverState *bs,
> +        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> +        BlockDriverCompletionFunc *cb, void *opaque, int write)
> +{
> +    int ret;
> +    GlusterAIOCB *acb;
> +    BDRVGlusterState *s = bs->opaque;
> +    size_t size;
> +    off_t offset;
> +
> +    offset = sector_num * BDRV_SECTOR_SIZE;
> +    size = nb_sectors * BDRV_SECTOR_SIZE;
> +    s->qemu_aio_count++;
> +
> +    acb = qemu_aio_get(&gluster_aio_pool, bs, cb, opaque);
> +    acb->size = size;
> +    acb->ret = 0;
> +    acb->canceled = false;
> +
> +    if (write) {
> +        ret = glfs_pwritev_async(s->fd, qiov->iov, qiov->niov, offset, 0,
> +            &gluster_finish_aiocb, acb);
> +    } else {
> +        ret = glfs_preadv_async(s->fd, qiov->iov, qiov->niov, offset, 0,
> +            &gluster_finish_aiocb, acb);
> +    }
> +
> +    if (ret < 0) {
> +        goto out;
> +    }
> +    return &acb->common;
> +
> +out:
> +    s->qemu_aio_count--;
> +    qemu_aio_release(acb);
> +    return NULL;
> +}
> +
> +static BlockDriverAIOCB *qemu_gluster_aio_readv(BlockDriverState *bs,
> +        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> +        BlockDriverCompletionFunc *cb, void *opaque)
> +{
> +    return qemu_gluster_aio_rw(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
> +}
> +
> +static BlockDriverAIOCB *qemu_gluster_aio_writev(BlockDriverState *bs,
> +        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> +        BlockDriverCompletionFunc *cb, void *opaque)
> +{
> +    return qemu_gluster_aio_rw(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
> +}
> +
> +static BlockDriverAIOCB *qemu_gluster_aio_flush(BlockDriverState *bs,
> +        BlockDriverCompletionFunc *cb, void *opaque)
> +{
> +    int ret;
> +    GlusterAIOCB *acb;
> +    BDRVGlusterState *s = bs->opaque;
> +
> +    acb = qemu_aio_get(&gluster_aio_pool, bs, cb, opaque);
> +    acb->size = 0;
> +    acb->ret = 0;
> +    acb->canceled = false;
> +    s->qemu_aio_count++;
> +
> +    ret = glfs_fsync_async(s->fd, &gluster_finish_aiocb, acb);
> +    if (ret < 0) {
> +        goto out;
> +    }
> +    return &acb->common;
> +
> +out:
> +    s->qemu_aio_count--;
> +    qemu_aio_release(acb);
> +    return NULL;
> +}
> +
> +static int64_t qemu_gluster_getlength(BlockDriverState *bs)
> +{
> +    BDRVGlusterState *s = bs->opaque;
> +    struct stat st;
> +    int ret;
> +
> +    ret = glfs_fstat(s->fd, &st);
> +    if (ret < 0) {
> +        return -errno;
> +    } else {
> +        return st.st_size;
> +    }
> +}
> +
> +static void qemu_gluster_close(BlockDriverState *bs)
> +{
> +    BDRVGlusterState *s = bs->opaque;
> +
> +    if (s->fd) {
> +        glfs_close(s->fd);
> +        s->fd = NULL;
> +    }
> +    glfs_fini(s->glfs);
> +}
> +
> +static QEMUOptionParameter qemu_gluster_create_options[] = {

'const'?

> +    {
> +        .name = BLOCK_OPT_SIZE,
> +        .type = OPT_SIZE,
> +        .help = "Virtual disk size"
> +    },
> +    { NULL }
> +};
> +
> +static BlockDriver bdrv_gluster = {

'const'?

> +    .format_name = "gluster",
> +    .protocol_name = "gluster",
> +    .instance_size = sizeof(BDRVGlusterState),
> +    .bdrv_file_open = qemu_gluster_open,
> +    .bdrv_close = qemu_gluster_close,
> +    .bdrv_create = qemu_gluster_create,
> +    .bdrv_getlength = qemu_gluster_getlength,
> +
> +    .bdrv_aio_readv = qemu_gluster_aio_readv,
> +    .bdrv_aio_writev = qemu_gluster_aio_writev,
> +    .bdrv_aio_flush = qemu_gluster_aio_flush,
> +
> +    .create_options = qemu_gluster_create_options,
> +};
> +
> +static void bdrv_gluster_init(void)
> +{
> +    bdrv_register(&bdrv_gluster);
> +}
> +
> +block_init(bdrv_gluster_init);
>
>
Bharata B Rao - Aug. 2, 2012, 3:55 a.m.
On Wed, Aug 01, 2012 at 06:35:22PM +0000, Blue Swirl wrote:
> > +
> > +    if (!transport) {
> > +        uri->transport = strdup("socket");
> 
> g_strdup

Sorry about that, pitfalls of developing the parsing code out of line :(

> > +static int qemu_gluster_parseuri(GlusterURI *uri, const char *filename)
> > +{
> > +    char *token, *saveptr;
> > +    char *p, *r;
> > +    int ret = -EINVAL;
> > +
> > +    p = r = g_strdup(filename);
> 
> Why?

- Are you asking why use 2 variables ? I need them because I loose p and
  need r to free the string.
- Or are you asking why strdup ? That's because filename is const char *
  and I need to modify the filename when parsing.

> > +static void gluster_finish_aiocb(struct glfs_fd *fd, ssize_t ret, void *arg)
> > +{
> > +    GlusterAIOCB *acb = (GlusterAIOCB *)arg;
> > +    BDRVGlusterState *s = acb->common.bs->opaque;
> > +
> > +    acb->ret = ret;
> > +    if (qemu_gluster_send_pipe(s, acb) < 0) {
> > +        error_report("Could not complete read/write/flush from gluster");
> > +        abort();
> 
> Aborting is a bit drastic, it would be nice to save and exit gracefully.

I am not sure if there is an easy way to recover sanely and exit from this
kind of error.

Here the non-QEMU thread (gluster thread) failed to notify the QEMU thread
on the read side of the pipe about the IO completion. So essentially
bdrv_read or bdrv_write will never complete if this error happens.

Do you have any suggestions on how to exit gracefully here ?

> > +static QEMUOptionParameter qemu_gluster_create_options[] = {
> 
> 'const'?

Hmm no precedence of const usage for identical scenario in  other block
drivers in QEMU.

> 
> > +    {
> > +        .name = BLOCK_OPT_SIZE,
> > +        .type = OPT_SIZE,
> > +        .help = "Virtual disk size"
> > +    },
> > +    { NULL }
> > +};
> > +
> > +static BlockDriver bdrv_gluster = {
> 
> 'const'?

Again dodn't see the precedence for this.

Thanks for your review.

Regards,
Bharata.
Blue Swirl - Aug. 3, 2012, 3:57 p.m.
On Thu, Aug 2, 2012 at 3:55 AM, Bharata B Rao
<bharata@linux.vnet.ibm.com> wrote:
> On Wed, Aug 01, 2012 at 06:35:22PM +0000, Blue Swirl wrote:
>> > +
>> > +    if (!transport) {
>> > +        uri->transport = strdup("socket");
>>
>> g_strdup
>
> Sorry about that, pitfalls of developing the parsing code out of line :(
>
>> > +static int qemu_gluster_parseuri(GlusterURI *uri, const char *filename)
>> > +{
>> > +    char *token, *saveptr;
>> > +    char *p, *r;
>> > +    int ret = -EINVAL;
>> > +
>> > +    p = r = g_strdup(filename);
>>
>> Why?
>
> - Are you asking why use 2 variables ? I need them because I loose p and
>   need r to free the string.
> - Or are you asking why strdup ? That's because filename is const char *
>   and I need to modify the filename when parsing.

OK.

>
>> > +static void gluster_finish_aiocb(struct glfs_fd *fd, ssize_t ret, void *arg)
>> > +{
>> > +    GlusterAIOCB *acb = (GlusterAIOCB *)arg;
>> > +    BDRVGlusterState *s = acb->common.bs->opaque;
>> > +
>> > +    acb->ret = ret;
>> > +    if (qemu_gluster_send_pipe(s, acb) < 0) {
>> > +        error_report("Could not complete read/write/flush from gluster");
>> > +        abort();
>>
>> Aborting is a bit drastic, it would be nice to save and exit gracefully.
>
> I am not sure if there is an easy way to recover sanely and exit from this
> kind of error.
>
> Here the non-QEMU thread (gluster thread) failed to notify the QEMU thread
> on the read side of the pipe about the IO completion. So essentially
> bdrv_read or bdrv_write will never complete if this error happens.
>
> Do you have any suggestions on how to exit gracefully here ?

Ignore but set the callback return to -EIO, see for example curl.c:249.

>
>> > +static QEMUOptionParameter qemu_gluster_create_options[] = {
>>
>> 'const'?
>
> Hmm no precedence of const usage for identical scenario in  other block
> drivers in QEMU.
>
>>
>> > +    {
>> > +        .name = BLOCK_OPT_SIZE,
>> > +        .type = OPT_SIZE,
>> > +        .help = "Virtual disk size"
>> > +    },
>> > +    { NULL }
>> > +};
>> > +
>> > +static BlockDriver bdrv_gluster = {
>>
>> 'const'?
>
> Again dodn't see the precedence for this.

OK.

>
> Thanks for your review.
>
> Regards,
> Bharata.
>
Bharata B Rao - Aug. 4, 2012, 2:44 a.m.
On Fri, Aug 03, 2012 at 03:57:20PM +0000, Blue Swirl wrote:
> >> > +static void gluster_finish_aiocb(struct glfs_fd *fd, ssize_t ret, void *arg)
> >> > +{
> >> > +    GlusterAIOCB *acb = (GlusterAIOCB *)arg;
> >> > +    BDRVGlusterState *s = acb->common.bs->opaque;
> >> > +
> >> > +    acb->ret = ret;
> >> > +    if (qemu_gluster_send_pipe(s, acb) < 0) {
> >> > +        error_report("Could not complete read/write/flush from gluster");
> >> > +        abort();
> >>
> >> Aborting is a bit drastic, it would be nice to save and exit gracefully.
> >
> > I am not sure if there is an easy way to recover sanely and exit from this
> > kind of error.
> >
> > Here the non-QEMU thread (gluster thread) failed to notify the QEMU thread
> > on the read side of the pipe about the IO completion. So essentially
> > bdrv_read or bdrv_write will never complete if this error happens.
> >
> > Do you have any suggestions on how to exit gracefully here ?
> 
> Ignore but set the callback return to -EIO, see for example curl.c:249.

I see the precedence for how I am handling this in
posix-aio-compat.c:posix_aio_notify_event().

So instead of aborting, I could do acb->common.cb(acb->common.opaque, -EIO)
as you suggest, but that would not help because, the thread at the read side
of the pipe is still waiting and user will see the read/write failure as hang.

[root@bharata qemu]# gdb ./x86_64-softmmu/qemu-system-x86_64
Starting program: ./x86_64-softmmu/qemu-system-x86_64 --enable-kvm --nographic -m 1024 -smp 4 -drive file=gluster://bharata/test/F16,if=virtio,cache=none
[New Thread 0x7ffff4c7f700 (LWP 6537)]
[New Thread 0x7ffff447e700 (LWP 6538)]
[New Thread 0x7ffff3420700 (LWP 6539)]
[New Thread 0x7ffff1407700 (LWP 6540)]
qemu-system-x86_64: -drive file=gluster://bharata/test/F16,if=virtio,cache=none: Could not complete read/write/flush from gluster
^C
Program received signal SIGINT, Interrupt.
0x00007ffff60e9403 in select () from /lib64/libc.so.6
(gdb) bt
#0  0x00007ffff60e9403 in select () from /lib64/libc.so.6
#1  0x00005555555baee3 in qemu_aio_wait () at aio.c:158
#2  0x00005555555cf57b in bdrv_rw_co (bs=0x5555564cfa50, sector_num=0, buf=
    0x7fffffffb640 "\353c\220", nb_sectors=4, is_write=false) at block.c:1623
#3  0x00005555555cf5e1 in bdrv_read (bs=0x5555564cfa50, sector_num=0, buf=
    0x7fffffffb640 "\353c\220", nb_sectors=4) at block.c:1633
#4  0x00005555555cf9d0 in bdrv_pread (bs=0x5555564cfa50, offset=0, buf=0x7fffffffb640, 
    count1=2048) at block.c:1720
#5  0x00005555555cc8d4 in find_image_format (filename=
    0x5555564cc290 "gluster://bharata/test/F16", pdrv=0x7fffffffbe60) at block.c:529
#6  0x00005555555cd303 in bdrv_open (bs=0x5555564cef20, filename=
    0x5555564cc290 "gluster://bharata/test/F16", flags=98, drv=0x0) at block.c:800
#7  0x0000555555609f69 in drive_init (opts=0x5555564cf900, default_to_scsi=0)
    at blockdev.c:608
#8  0x0000555555711b6c in drive_init_func (opts=0x5555564cc1e0, opaque=0x555555c357a0)
    at vl.c:775
#9  0x000055555574ceda in qemu_opts_foreach (list=0x555555c319e0, func=
    0x555555711b31 <drive_init_func>, opaque=0x555555c357a0, abort_on_failure=1)
    at qemu-option.c:1094
#10 0x0000555555719d78 in main (argc=9, argv=0x7fffffffe468, envp=0x7fffffffe4b8)
    at vl.c:3430
Blue Swirl - Aug. 4, 2012, 11:28 a.m.
On Sat, Aug 4, 2012 at 2:44 AM, Bharata B Rao
<bharata@linux.vnet.ibm.com> wrote:
> On Fri, Aug 03, 2012 at 03:57:20PM +0000, Blue Swirl wrote:
>> >> > +static void gluster_finish_aiocb(struct glfs_fd *fd, ssize_t ret, void *arg)
>> >> > +{
>> >> > +    GlusterAIOCB *acb = (GlusterAIOCB *)arg;
>> >> > +    BDRVGlusterState *s = acb->common.bs->opaque;
>> >> > +
>> >> > +    acb->ret = ret;
>> >> > +    if (qemu_gluster_send_pipe(s, acb) < 0) {
>> >> > +        error_report("Could not complete read/write/flush from gluster");
>> >> > +        abort();
>> >>
>> >> Aborting is a bit drastic, it would be nice to save and exit gracefully.
>> >
>> > I am not sure if there is an easy way to recover sanely and exit from this
>> > kind of error.
>> >
>> > Here the non-QEMU thread (gluster thread) failed to notify the QEMU thread
>> > on the read side of the pipe about the IO completion. So essentially
>> > bdrv_read or bdrv_write will never complete if this error happens.
>> >
>> > Do you have any suggestions on how to exit gracefully here ?
>>
>> Ignore but set the callback return to -EIO, see for example curl.c:249.
>
> I see the precedence for how I am handling this in
> posix-aio-compat.c:posix_aio_notify_event().
>
> So instead of aborting, I could do acb->common.cb(acb->common.opaque, -EIO)
> as you suggest, but that would not help because, the thread at the read side
> of the pipe is still waiting and user will see the read/write failure as hang.

Probably the other side needs to be informed somehow.

Maybe it's enough for 1.2 to just use abort() and add a FIXME comment.

>
> [root@bharata qemu]# gdb ./x86_64-softmmu/qemu-system-x86_64
> Starting program: ./x86_64-softmmu/qemu-system-x86_64 --enable-kvm --nographic -m 1024 -smp 4 -drive file=gluster://bharata/test/F16,if=virtio,cache=none
> [New Thread 0x7ffff4c7f700 (LWP 6537)]
> [New Thread 0x7ffff447e700 (LWP 6538)]
> [New Thread 0x7ffff3420700 (LWP 6539)]
> [New Thread 0x7ffff1407700 (LWP 6540)]
> qemu-system-x86_64: -drive file=gluster://bharata/test/F16,if=virtio,cache=none: Could not complete read/write/flush from gluster
> ^C
> Program received signal SIGINT, Interrupt.
> 0x00007ffff60e9403 in select () from /lib64/libc.so.6
> (gdb) bt
> #0  0x00007ffff60e9403 in select () from /lib64/libc.so.6
> #1  0x00005555555baee3 in qemu_aio_wait () at aio.c:158
> #2  0x00005555555cf57b in bdrv_rw_co (bs=0x5555564cfa50, sector_num=0, buf=
>     0x7fffffffb640 "\353c\220", nb_sectors=4, is_write=false) at block.c:1623
> #3  0x00005555555cf5e1 in bdrv_read (bs=0x5555564cfa50, sector_num=0, buf=
>     0x7fffffffb640 "\353c\220", nb_sectors=4) at block.c:1633
> #4  0x00005555555cf9d0 in bdrv_pread (bs=0x5555564cfa50, offset=0, buf=0x7fffffffb640,
>     count1=2048) at block.c:1720
> #5  0x00005555555cc8d4 in find_image_format (filename=
>     0x5555564cc290 "gluster://bharata/test/F16", pdrv=0x7fffffffbe60) at block.c:529
> #6  0x00005555555cd303 in bdrv_open (bs=0x5555564cef20, filename=
>     0x5555564cc290 "gluster://bharata/test/F16", flags=98, drv=0x0) at block.c:800
> #7  0x0000555555609f69 in drive_init (opts=0x5555564cf900, default_to_scsi=0)
>     at blockdev.c:608
> #8  0x0000555555711b6c in drive_init_func (opts=0x5555564cc1e0, opaque=0x555555c357a0)
>     at vl.c:775
> #9  0x000055555574ceda in qemu_opts_foreach (list=0x555555c319e0, func=
>     0x555555711b31 <drive_init_func>, opaque=0x555555c357a0, abort_on_failure=1)
>     at qemu-option.c:1094
> #10 0x0000555555719d78 in main (argc=9, argv=0x7fffffffe468, envp=0x7fffffffe4b8)
>     at vl.c:3430
>

Patch

diff --git a/block/Makefile.objs b/block/Makefile.objs
index b5754d3..a1ae67f 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -9,3 +9,4 @@  block-obj-$(CONFIG_POSIX) += raw-posix.o
 block-obj-$(CONFIG_LIBISCSI) += iscsi.o
 block-obj-$(CONFIG_CURL) += curl.o
 block-obj-$(CONFIG_RBD) += rbd.o
+block-obj-$(CONFIG_GLUSTERFS) += gluster.o
diff --git a/block/gluster.c b/block/gluster.c
new file mode 100644
index 0000000..712752f
--- /dev/null
+++ b/block/gluster.c
@@ -0,0 +1,610 @@ 
+/*
+ * GlusterFS backend for QEMU
+ *
+ * (AIO implementation is derived from block/rbd.c)
+ *
+ * Copyright (C) 2012 Bharata B Rao <bharata@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * (at your option) any later version. See the COPYING file in the top-level
+ * directory.
+ */
+#include "block_int.h"
+#include <glusterfs/api/glfs.h>
+
+typedef struct GlusterAIOCB {
+    BlockDriverAIOCB common;
+    bool canceled;
+    int64_t size;
+    int ret;
+} GlusterAIOCB;
+
+typedef struct BDRVGlusterState {
+    struct glfs *glfs;
+    int fds[2];
+    struct glfs_fd *fd;
+    int qemu_aio_count;
+} BDRVGlusterState;
+
+#define GLUSTER_FD_READ 0
+#define GLUSTER_FD_WRITE 1
+
+typedef struct GlusterURI {
+    char *server;
+    int port;
+    char *volname;
+    char *image;
+    char *transport;
+} GlusterURI;
+
+static void qemu_gluster_uri_free(GlusterURI *uri)
+{
+    g_free(uri->server);
+    g_free(uri->volname);
+    g_free(uri->image);
+    g_free(uri->transport);
+    g_free(uri);
+}
+
+/*
+ * We don't validate the transport option obtained here but
+ * instead depend on gluster to flag an error.
+ */
+static int parse_transport(GlusterURI *uri, char *transport)
+{
+    char *token, *saveptr;
+    int ret = -EINVAL;
+
+    if (!transport) {
+        uri->transport = strdup("socket");
+        ret = 0;
+        goto out;
+    }
+
+    token = strtok_r(transport, "=", &saveptr);
+    if (!token) {
+        goto out;
+    }
+    if (strcmp(token, "transport")) {
+        goto out;
+    }
+    token = strtok_r(NULL, "=", &saveptr);
+    if (!token) {
+        goto out;
+    }
+    uri->transport = strdup(token);
+    ret = 0;
+out:
+    return ret;
+}
+
+static int parse_server(GlusterURI *uri, char *server)
+{
+    int ret = -EINVAL;
+    char *token, *saveptr;
+    char *p, *q = server;
+
+    p = strchr(server, '[');
+    if (p) {
+        /* [ipv6] */
+        if (p != server) {
+            /* [ not in the beginning */
+            goto out;
+        }
+        q++;
+        p = strrchr(p, ']');
+        if (!p) {
+            /* No matching ] */
+            goto out;
+        }
+        *p++ = '\0';
+        uri->server = strdup(q);
+
+        if (*p) {
+            if (*p != ':') {
+                /* [ipv6] followed by something other than : */
+                goto out;
+            }
+            uri->port = strtoul(++p, NULL, 0);
+            if (uri->port < 0) {
+                goto out;
+            }
+        } else {
+            /* port not specified, use default */
+            uri->port = 0;
+        }
+
+    } else {
+        /* ipv4 or hostname */
+        if (*server == ':') {
+            /* port specified w/o a server */
+            goto out;
+        }
+        token = strtok_r(server, ":", &saveptr);
+        if (!token) {
+            goto out;
+        }
+        uri->server = strdup(token);
+        token = strtok_r(NULL, ":", &saveptr);
+        if (token) {
+            uri->port = strtoul(token, NULL, 0);
+            if (uri->port < 0) {
+                goto out;
+            }
+        } else {
+            uri->port = 0;
+        }
+    }
+    ret = 0;
+out:
+    return ret;
+}
+
+/*
+ * file=gluster://server:[port]/volname/image[?transport=socket]
+ *
+ * 'gluster' is the protocol.
+ *
+ * 'server' specifies the server where the volume file specification for
+ * the given volume resides. This can be either hostname or ipv4 address
+ * or ipv6 address. ipv6 address needs to be with in square brackets [ ].
+ *
+ *'port' is the port number on which gluster management daemon (glusterd) is
+ * listening. This is optional and if not specified, QEMU will send 0 which
+ * will make libgfapi to use the default port.
+ *
+ * 'volname' is the name of the gluster volume which contains the VM image.
+ *
+ * 'image' is the path to the actual VM image in the gluster volume.
+ *
+ * 'transport' specifies the transport used to connect to glusterd. This is
+ * optional and if not specified, socket transport is used.
+ *
+ * Examples:
+ *
+ * file=gluster://1.2.3.4/testvol/a.img
+ * file=gluster://1.2.3.4:5000/testvol/dir/a.img?transport=socket
+ * file=gluster://[1:2:3:4:5:6:7:8]/testvol/dir/a.img
+ * file=gluster://[1:2:3:4:5:6:7:8]:5000/testvol/dir/a.img?transport=socket
+ * file=gluster://server.domain.com:5000/testvol/dir/a.img
+ */
+static int qemu_gluster_parseuri(GlusterURI *uri, const char *filename)
+{
+    char *token, *saveptr;
+    char *p, *r;
+    int ret = -EINVAL;
+
+    p = r = g_strdup(filename);
+    if (strncmp(p, "gluster://", 10)) {
+        goto out;
+    }
+
+    /* Discard the protocol */
+    p += 10;
+
+    /* server */
+    token = strtok_r(p, "/", &saveptr);
+    if (!token) {
+        goto out;
+    }
+
+    ret = parse_server(uri, token);
+    if (ret < 0) {
+        goto out;
+    }
+
+    /* volname */
+    token = strtok_r(NULL, "/", &saveptr);
+    if (!token) {
+        ret = -EINVAL;
+        goto out;
+    }
+    uri->volname = g_strdup(token);
+
+    /* image */
+    token = strtok_r(NULL, "?", &saveptr);
+    if (!token) {
+        ret = -EINVAL;
+        goto out;
+    }
+    uri->image = g_strdup(token);
+
+    /* transport */
+    token = strtok_r(NULL, "?", &saveptr);
+    ret = parse_transport(uri, token);
+    if (ret < 0) {
+        goto out;
+     }
+
+    /* Flag error for extra options */
+    token = strtok_r(NULL, "?", &saveptr);
+    if (token) {
+        ret = -1;
+        goto out;
+    }
+    ret = 0;
+out:
+    g_free(r);
+    return ret;
+}
+
+static struct glfs *qemu_gluster_init(GlusterURI *uri, const char *filename)
+{
+    struct glfs *glfs = NULL;
+    int ret;
+
+    ret = qemu_gluster_parseuri(uri, filename);
+    if (ret < 0) {
+        error_report("Usage: file=gluster://server:[port]/volname/image"
+            "[?transport=socket]");
+        errno = -ret;
+        goto out;
+    }
+
+    glfs = glfs_new(uri->volname);
+    if (!glfs) {
+        goto out;
+    }
+
+    ret = glfs_set_volfile_server(glfs, uri->transport, uri->server,
+        uri->port);
+    if (ret < 0) {
+        goto out;
+    }
+
+    /*
+     * TODO: Use GF_LOG_ERROR instead of hard code value of 4 here when
+     * GlusterFS exports it in a header.
+     */
+    ret = glfs_set_logging(glfs, "-", 4);
+    if (ret < 0) {
+        goto out;
+    }
+
+    ret = glfs_init(glfs);
+    if (ret) {
+        error_report("glfs_init() failed for server=%s port=%d volume=%s "
+            "image=%s\n", uri->server, uri->port, uri->volname, uri->image);
+        goto out;
+    }
+    return glfs;
+
+out:
+    if (glfs) {
+        glfs_fini(glfs);
+    }
+    return NULL;
+}
+
+static void qemu_gluster_complete_aio(GlusterAIOCB *acb)
+{
+    int ret;
+
+    if (acb->canceled) {
+        qemu_aio_release(acb);
+        return;
+    }
+
+    if (acb->ret == acb->size) {
+        ret = 0; /* Success */
+    } else if (acb->ret < 0) {
+        ret = acb->ret; /* Read/Write failed */
+    } else {
+        ret = -EIO; /* Partial read/write - fail it */
+    }
+    acb->common.cb(acb->common.opaque, ret);
+    qemu_aio_release(acb);
+}
+
+static void qemu_gluster_aio_event_reader(void *opaque)
+{
+    BDRVGlusterState *s = opaque;
+    GlusterAIOCB *event_acb;
+    int event_reader_pos = 0;
+    ssize_t ret;
+
+    do {
+        char *p = (char *)&event_acb;
+
+        ret = read(s->fds[GLUSTER_FD_READ], p + event_reader_pos,
+                   sizeof(event_acb) - event_reader_pos);
+        if (ret > 0) {
+            event_reader_pos += ret;
+            if (event_reader_pos == sizeof(event_acb)) {
+                event_reader_pos = 0;
+                qemu_gluster_complete_aio(event_acb);
+                s->qemu_aio_count--;
+            }
+        }
+    } while (ret < 0 && errno == EINTR);
+}
+
+static int qemu_gluster_aio_flush_cb(void *opaque)
+{
+    BDRVGlusterState *s = opaque;
+
+    return (s->qemu_aio_count > 0);
+}
+
+static int qemu_gluster_open(BlockDriverState *bs, const char *filename,
+    int bdrv_flags)
+{
+    BDRVGlusterState *s = bs->opaque;
+    int open_flags = 0;
+    int ret = 0;
+    GlusterURI *uri = g_malloc0(sizeof(GlusterURI));
+
+    s->glfs = qemu_gluster_init(uri, filename);
+    if (!s->glfs) {
+        ret = -errno;
+        goto out;
+    }
+
+    open_flags |=  O_BINARY;
+    open_flags &= ~O_ACCMODE;
+    if (bdrv_flags & BDRV_O_RDWR) {
+        open_flags |= O_RDWR;
+    } else {
+        open_flags |= O_RDONLY;
+    }
+
+    if ((bdrv_flags & BDRV_O_NOCACHE)) {
+        open_flags |= O_DIRECT;
+    }
+
+    s->fd = glfs_open(s->glfs, uri->image, open_flags);
+    if (!s->fd) {
+        ret = -errno;
+        goto out;
+    }
+
+    ret = qemu_pipe(s->fds);
+    if (ret < 0) {
+        goto out;
+    }
+    fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
+    fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
+    qemu_aio_set_fd_handler(s->fds[GLUSTER_FD_READ],
+        qemu_gluster_aio_event_reader, NULL, qemu_gluster_aio_flush_cb, s);
+
+out:
+    qemu_gluster_uri_free(uri);
+    if (!ret) {
+        return ret;
+    }
+    if (s->fd) {
+        glfs_close(s->fd);
+    }
+    if (s->glfs) {
+        glfs_fini(s->glfs);
+    }
+    return ret;
+}
+
+static int qemu_gluster_create(const char *filename,
+        QEMUOptionParameter *options)
+{
+    struct glfs *glfs;
+    struct glfs_fd *fd;
+    int ret = 0;
+    int64_t total_size = 0;
+    GlusterURI *uri = g_malloc0(sizeof(GlusterURI));
+
+    glfs = qemu_gluster_init(uri, filename);
+    if (!glfs) {
+        ret = -errno;
+        goto out;
+    }
+
+    while (options && options->name) {
+        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
+            total_size = options->value.n / BDRV_SECTOR_SIZE;
+        }
+        options++;
+    }
+
+    fd = glfs_creat(glfs, uri->image,
+        O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, S_IRUSR | S_IWUSR);
+    if (!fd) {
+        ret = -errno;
+    } else {
+        if (glfs_ftruncate(fd, total_size * BDRV_SECTOR_SIZE) != 0) {
+            ret = -errno;
+        }
+        if (glfs_close(fd) != 0) {
+            ret = -errno;
+        }
+    }
+out:
+    qemu_gluster_uri_free(uri);
+    if (glfs) {
+        glfs_fini(glfs);
+    }
+    return ret;
+}
+
+static void qemu_gluster_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+    GlusterAIOCB *acb = (GlusterAIOCB *)blockacb;
+
+    acb->common.cb(acb->common.opaque, -ECANCELED);
+    acb->canceled = true;
+}
+
+static AIOPool gluster_aio_pool = {
+    .aiocb_size = sizeof(GlusterAIOCB),
+    .cancel = qemu_gluster_aio_cancel,
+};
+
+static int qemu_gluster_send_pipe(BDRVGlusterState *s, GlusterAIOCB *acb)
+{
+    int ret = 0;
+    while (1) {
+        fd_set wfd;
+        int fd = s->fds[GLUSTER_FD_WRITE];
+
+        ret = write(fd, (void *)&acb, sizeof(acb));
+        if (ret >= 0) {
+            break;
+        }
+        if (errno == EINTR) {
+            continue;
+        }
+        if (errno != EAGAIN) {
+            break;
+        }
+
+        FD_ZERO(&wfd);
+        FD_SET(fd, &wfd);
+        do {
+            ret = select(fd + 1, NULL, &wfd, NULL, NULL);
+        } while (ret < 0 && errno == EINTR);
+    }
+    return ret;
+}
+
+static void gluster_finish_aiocb(struct glfs_fd *fd, ssize_t ret, void *arg)
+{
+    GlusterAIOCB *acb = (GlusterAIOCB *)arg;
+    BDRVGlusterState *s = acb->common.bs->opaque;
+
+    acb->ret = ret;
+    if (qemu_gluster_send_pipe(s, acb) < 0) {
+        error_report("Could not complete read/write/flush from gluster");
+        abort();
+    }
+}
+
+static BlockDriverAIOCB *qemu_gluster_aio_rw(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque, int write)
+{
+    int ret;
+    GlusterAIOCB *acb;
+    BDRVGlusterState *s = bs->opaque;
+    size_t size;
+    off_t offset;
+
+    offset = sector_num * BDRV_SECTOR_SIZE;
+    size = nb_sectors * BDRV_SECTOR_SIZE;
+    s->qemu_aio_count++;
+
+    acb = qemu_aio_get(&gluster_aio_pool, bs, cb, opaque);
+    acb->size = size;
+    acb->ret = 0;
+    acb->canceled = false;
+
+    if (write) {
+        ret = glfs_pwritev_async(s->fd, qiov->iov, qiov->niov, offset, 0,
+            &gluster_finish_aiocb, acb);
+    } else {
+        ret = glfs_preadv_async(s->fd, qiov->iov, qiov->niov, offset, 0,
+            &gluster_finish_aiocb, acb);
+    }
+
+    if (ret < 0) {
+        goto out;
+    }
+    return &acb->common;
+
+out:
+    s->qemu_aio_count--;
+    qemu_aio_release(acb);
+    return NULL;
+}
+
+static BlockDriverAIOCB *qemu_gluster_aio_readv(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    return qemu_gluster_aio_rw(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
+}
+
+static BlockDriverAIOCB *qemu_gluster_aio_writev(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    return qemu_gluster_aio_rw(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
+}
+
+static BlockDriverAIOCB *qemu_gluster_aio_flush(BlockDriverState *bs,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    int ret;
+    GlusterAIOCB *acb;
+    BDRVGlusterState *s = bs->opaque;
+
+    acb = qemu_aio_get(&gluster_aio_pool, bs, cb, opaque);
+    acb->size = 0;
+    acb->ret = 0;
+    acb->canceled = false;
+    s->qemu_aio_count++;
+
+    ret = glfs_fsync_async(s->fd, &gluster_finish_aiocb, acb);
+    if (ret < 0) {
+        goto out;
+    }
+    return &acb->common;
+
+out:
+    s->qemu_aio_count--;
+    qemu_aio_release(acb);
+    return NULL;
+}
+
+static int64_t qemu_gluster_getlength(BlockDriverState *bs)
+{
+    BDRVGlusterState *s = bs->opaque;
+    struct stat st;
+    int ret;
+
+    ret = glfs_fstat(s->fd, &st);
+    if (ret < 0) {
+        return -errno;
+    } else {
+        return st.st_size;
+    }
+}
+
+static void qemu_gluster_close(BlockDriverState *bs)
+{
+    BDRVGlusterState *s = bs->opaque;
+
+    if (s->fd) {
+        glfs_close(s->fd);
+        s->fd = NULL;
+    }
+    glfs_fini(s->glfs);
+}
+
+static QEMUOptionParameter qemu_gluster_create_options[] = {
+    {
+        .name = BLOCK_OPT_SIZE,
+        .type = OPT_SIZE,
+        .help = "Virtual disk size"
+    },
+    { NULL }
+};
+
+static BlockDriver bdrv_gluster = {
+    .format_name = "gluster",
+    .protocol_name = "gluster",
+    .instance_size = sizeof(BDRVGlusterState),
+    .bdrv_file_open = qemu_gluster_open,
+    .bdrv_close = qemu_gluster_close,
+    .bdrv_create = qemu_gluster_create,
+    .bdrv_getlength = qemu_gluster_getlength,
+
+    .bdrv_aio_readv = qemu_gluster_aio_readv,
+    .bdrv_aio_writev = qemu_gluster_aio_writev,
+    .bdrv_aio_flush = qemu_gluster_aio_flush,
+
+    .create_options = qemu_gluster_create_options,
+};
+
+static void bdrv_gluster_init(void)
+{
+    bdrv_register(&bdrv_gluster);
+}
+
+block_init(bdrv_gluster_init);