Patchwork [v3,2/2] block: Support GlusterFS as a QEMU block backend

login
register
mail settings
Submitter Bharata B Rao
Date July 25, 2012, 6 a.m.
Message ID <20120725060008.GD1392@in.ibm.com>
Download mbox | patch
Permalink /patch/173106/
State New
Headers show

Comments

Bharata B Rao - July 25, 2012, 6 a.m.
block: Support GlusterFS as a QEMU block backend.

From: Bharata B Rao <bharata@linux.vnet.ibm.com>

This patch adds gluster as the new block backend in QEMU. This gives
QEMU the ability to boot VM images from gluster volumes. Its already
possible to boot from VM images on gluster volumes, but this patchset
provides the ability to boot VM images from gluster volumes by by-passing
the FUSE layer in gluster. In case the image is present on the local
system, it is possible to even bypass client and server translator and
hence the RPC overhead.

VM Image on gluster volume is specified like this:

-drive file=gluster:server:[port]:[transport]:volname:image

- Here 'gluster' is the protocol.
- 'server' specifies the server where the volume file specification for
  the given volume resides.
- 'port' is the port number on which gluster management daemon (glusterd) is
   listening. This is optional and if not specified, QEMU will send 0 which
   will make libgfapi to use the default port.
- 'transport' specifies the transport used to connect to glusterd. This is
  optional and if not specified, socket transport is used.
- 'volname' is the name of the gluster volume which contains the VM image.
- 'image' is the path to the actual VM image in the gluster volume.

Eg 1: -drive file=gluster:server1:0:socket:test:/image
Eg 2: -drive file=gluster:server1:::test:/image

Signed-off-by: Bharata B Rao <bharata@linux.vnet.ibm.com>
---

 block/Makefile.objs |    1 
 block/gluster.c     |  484 +++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 485 insertions(+), 0 deletions(-)
 create mode 100644 block/gluster.c
Blue Swirl - July 27, 2012, 6:44 p.m.
On Wed, Jul 25, 2012 at 6:00 AM, Bharata B Rao
<bharata@linux.vnet.ibm.com> wrote:
> block: Support GlusterFS as a QEMU block backend.
>
> From: Bharata B Rao <bharata@linux.vnet.ibm.com>
>
> This patch adds gluster as the new block backend in QEMU. This gives
> QEMU the ability to boot VM images from gluster volumes. Its already
> possible to boot from VM images on gluster volumes, but this patchset
> provides the ability to boot VM images from gluster volumes by by-passing
> the FUSE layer in gluster. In case the image is present on the local
> system, it is possible to even bypass client and server translator and
> hence the RPC overhead.
>
> VM Image on gluster volume is specified like this:
>
> -drive file=gluster:server:[port]:[transport]:volname:image
>
> - Here 'gluster' is the protocol.
> - 'server' specifies the server where the volume file specification for
>   the given volume resides.
> - 'port' is the port number on which gluster management daemon (glusterd) is
>    listening. This is optional and if not specified, QEMU will send 0 which
>    will make libgfapi to use the default port.
> - 'transport' specifies the transport used to connect to glusterd. This is
>   optional and if not specified, socket transport is used.
> - 'volname' is the name of the gluster volume which contains the VM image.
> - 'image' is the path to the actual VM image in the gluster volume.
>
> Eg 1: -drive file=gluster:server1:0:socket:test:/image
> Eg 2: -drive file=gluster:server1:::test:/image
>
> Signed-off-by: Bharata B Rao <bharata@linux.vnet.ibm.com>
> ---
>
>  block/Makefile.objs |    1
>  block/gluster.c     |  484 +++++++++++++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 485 insertions(+), 0 deletions(-)
>  create mode 100644 block/gluster.c
>
>
> diff --git a/block/Makefile.objs b/block/Makefile.objs
> index b5754d3..a1ae67f 100644
> --- a/block/Makefile.objs
> +++ b/block/Makefile.objs
> @@ -9,3 +9,4 @@ block-obj-$(CONFIG_POSIX) += raw-posix.o
>  block-obj-$(CONFIG_LIBISCSI) += iscsi.o
>  block-obj-$(CONFIG_CURL) += curl.o
>  block-obj-$(CONFIG_RBD) += rbd.o
> +block-obj-$(CONFIG_GLUSTERFS) += gluster.o
> diff --git a/block/gluster.c b/block/gluster.c
> new file mode 100644
> index 0000000..b27971b
> --- /dev/null
> +++ b/block/gluster.c
> @@ -0,0 +1,484 @@
> +/*
> + * GlusterFS backend for QEMU
> + *
> + * (AIO implementation is derived from block/rbd.c)
> + *
> + * Copyright (C) 2012 Bharata B Rao <bharata@linux.vnet.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or
> + * (at your option) any later version. See the COPYING file in the top-level
> + * directory.
> + */
> +#include "block_int.h"
> +#include <glusterfs/api/glfs.h>
> +
> +typedef struct GlusterAIOCB {
> +    BlockDriverAIOCB common;
> +    bool canceled;
> +    int64_t size;
> +    int ret;
> +} GlusterAIOCB;
> +
> +typedef struct BDRVGlusterState {
> +    struct glfs *glfs;
> +    int fds[2];
> +    struct glfs_fd *fd;
> +    int qemu_aio_count;
> +} BDRVGlusterState;
> +
> +#define GLUSTER_FD_READ 0
> +#define GLUSTER_FD_WRITE 1
> +
> +typedef enum {
> +    GOPT_PROTOCOL,
> +    GOPT_SERVER,
> +    GOPT_PORT,
> +    GOPT_TRANSPORT,
> +    GOPT_VOLNAME,
> +    GOPT_IMAGE,
> +
> +    GOPT_LAST,
> +} GlusterOptsEnum;
> +
> +struct GlusterOpts {

static

> +    bool optional;
> +    char defval[10];

const char *defval?

> +    char *value;
> +} GlusterOpts[] = {
> +    {false, "", NULL },
> +    {false, "", NULL },
> +    {true, "0", NULL },
> +    {true, "socket", NULL },
> +    {false, "", NULL },
> +    {false, "", NULL },
> +};
> +
> +static void qemu_gluster_opts_free(void)
> +{
> +    int i;
> +
> +    for (i = 0; i < GOPT_LAST; i++) {
> +        g_free(GlusterOpts[i].value);
> +        /* Prepare GlusterOpts to parse the next gluster drive (if any) */
> +        GlusterOpts[i].value = NULL;
> +    }
> +}
> +
> +/*
> + * file=protocol:server:[port]:[transport]:volname:image
> + */
> +static int qemu_gluster_parsename(const char *filename)
> +{
> +    char *p, *q, *r;
> +    int ret = -EINVAL;
> +    int i;
> +
> +    p = q = r = g_strdup(filename);
> +    for (i = 0; i < GOPT_LAST; i++) {
> +        q = p;
> +        p = strchr(p, ':');
> +        if (!p) {
> +            goto out;
> +        }
> +
> +        if (p == q) {
> +            if (GlusterOpts[i].optional) {
> +                GlusterOpts[i].value = g_strdup(GlusterOpts[i].defval);
> +                p++;
> +                continue;
> +            } else {
> +                goto out;
> +            }
> +        }
> +        *p++ = '\0';
> +        GlusterOpts[i].value = g_strdup(q);
> +    }
> +out:
> +    if (i == GOPT_LAST-1 && strlen(q)) {

Spaces around '-'.

> +        GlusterOpts[i].value = g_strdup(q);
> +        ret = 0;
> +    }
> +    g_free(r);
> +    return ret;
> +}
> +
> +static struct glfs *qemu_gluster_init(const char *filename)
> +{
> +    struct glfs *glfs = NULL;
> +    int ret;
> +    int port;
> +
> +    ret = qemu_gluster_parsename(filename);
> +    if (ret < 0) {
> +        errno = -ret;
> +        goto out;
> +    }
> +
> +    port = strtoul(GlusterOpts[GOPT_PORT].value, NULL, 0);
> +    if (port < 0) {

port > 65535 could be bad too.

> +        goto out;
> +    }
> +
> +    glfs = glfs_new(GlusterOpts[GOPT_VOLNAME].value);
> +    if (!glfs) {
> +        goto out;
> +    }
> +
> +    ret = glfs_set_volfile_server(glfs, GlusterOpts[GOPT_TRANSPORT].value,
> +        GlusterOpts[GOPT_SERVER].value, port);
> +    if (ret < 0) {
> +        goto out;
> +    }
> +
> +    /*
> +     * TODO: When GlusterFS exports logging.h, use GF_LOG_ERROR instead of
> +     * hard code value of 4 here.
> +     */
> +    ret = glfs_set_logging(glfs, "-", 4);
> +    if (ret < 0) {
> +        goto out;
> +    }
> +
> +    ret = glfs_init(glfs);
> +    if (ret < 0) {
> +        goto out;
> +    }
> +    return glfs;
> +
> +out:
> +    if (glfs) {
> +        glfs_fini(glfs);
> +    }
> +    return NULL;
> +}
> +
> +static void qemu_gluster_complete_aio(GlusterAIOCB *acb)
> +{
> +    int ret;
> +
> +    if (acb->canceled) {
> +        qemu_aio_release(acb);
> +        return;
> +    }
> +
> +    if (acb->ret == acb->size) {
> +        ret = 0; /* Success */
> +    } else if (acb->ret < 0) {
> +        ret = acb->ret; /* Read/Write failed */
> +    } else {
> +        ret = -EIO; /* Partial read/write - fail it */
> +    }
> +    acb->common.cb(acb->common.opaque, ret);
> +    qemu_aio_release(acb);
> +}
> +
> +static void qemu_gluster_aio_event_reader(void *opaque)
> +{
> +    BDRVGlusterState *s = opaque;
> +    GlusterAIOCB *event_acb;
> +    int event_reader_pos = 0;
> +    ssize_t ret;
> +
> +    do {
> +        char *p = (char *)&event_acb;
> +
> +        ret = read(s->fds[GLUSTER_FD_READ], p + event_reader_pos,
> +                   sizeof(event_acb) - event_reader_pos);
> +        if (ret > 0) {
> +            event_reader_pos += ret;
> +            if (event_reader_pos == sizeof(event_acb)) {
> +                event_reader_pos = 0;
> +                qemu_gluster_complete_aio(event_acb);
> +                s->qemu_aio_count--;
> +            }
> +        }
> +    } while (ret < 0 && errno == EINTR);
> +}
> +
> +static int qemu_gluster_aio_flush_cb(void *opaque)
> +{
> +    BDRVGlusterState *s = opaque;
> +
> +    return (s->qemu_aio_count > 0);
> +}
> +
> +static int qemu_gluster_open(BlockDriverState *bs, const char *filename,
> +    int bdrv_flags)
> +{
> +    BDRVGlusterState *s = bs->opaque;
> +    int open_flags = 0;
> +    int ret = 0;
> +
> +    s->glfs = qemu_gluster_init(filename);
> +    if (!s->glfs) {
> +        ret = -errno;
> +        goto out;
> +    }
> +
> +    open_flags |=  O_BINARY;
> +    open_flags &= ~O_ACCMODE;
> +    if (bdrv_flags & BDRV_O_RDWR) {
> +        open_flags |= O_RDWR;
> +    } else {
> +        open_flags |= O_RDONLY;
> +    }
> +
> +    if ((bdrv_flags & BDRV_O_NOCACHE)) {
> +        open_flags |= O_DIRECT;
> +    }
> +
> +    s->fd = glfs_open(s->glfs, GlusterOpts[GOPT_IMAGE].value, open_flags);
> +    if (!s->fd) {
> +        ret = -errno;
> +        goto out;
> +    }
> +
> +    ret = qemu_pipe(s->fds);
> +    if (ret < 0) {
> +        goto out;
> +    }
> +    fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
> +    fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
> +    qemu_aio_set_fd_handler(s->fds[GLUSTER_FD_READ],
> +        qemu_gluster_aio_event_reader, NULL, qemu_gluster_aio_flush_cb, s);
> +
> +out:
> +    qemu_gluster_opts_free();
> +    if (!ret) {
> +        return ret;
> +    }
> +    if (s->fd) {
> +        glfs_close(s->fd);
> +    }
> +    if (s->glfs) {
> +        glfs_fini(s->glfs);
> +    }
> +    return ret;
> +}
> +
> +static int qemu_gluster_create(const char *filename,
> +        QEMUOptionParameter *options)
> +{
> +    struct glfs *glfs;
> +    struct glfs_fd *fd;
> +    int ret = 0;
> +    int64_t total_size = 0;
> +
> +    glfs = qemu_gluster_init(filename);
> +    if (!glfs) {
> +        ret = -errno;
> +        goto out;
> +    }
> +
> +    while (options && options->name) {
> +        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
> +            total_size = options->value.n / BDRV_SECTOR_SIZE;
> +        }
> +        options++;
> +    }
> +
> +    fd = glfs_creat(glfs, GlusterOpts[GOPT_IMAGE].value,
> +        O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, S_IRUSR|S_IWUSR);

Spaces around '|'.

> +    if (!fd) {
> +        ret = -errno;
> +    } else {
> +        if (glfs_ftruncate(fd, total_size * BDRV_SECTOR_SIZE) != 0) {
> +            ret = -errno;
> +        }
> +        if (glfs_close(fd) != 0) {
> +            ret = -errno;
> +        }
> +    }
> +out:
> +    qemu_gluster_opts_free();
> +    if (glfs) {
> +        glfs_fini(glfs);
> +    }
> +    return ret;
> +}
> +
> +static void qemu_gluster_aio_cancel(BlockDriverAIOCB *blockacb)
> +{
> +    GlusterAIOCB *acb = (GlusterAIOCB *)blockacb;
> +
> +    acb->common.cb(acb->common.opaque, -ECANCELED);
> +    acb->canceled = true;
> +}
> +
> +static AIOPool gluster_aio_pool = {
> +    .aiocb_size = sizeof(GlusterAIOCB),
> +    .cancel = qemu_gluster_aio_cancel,
> +};
> +
> +static int qemu_gluster_send_pipe(BDRVGlusterState *s, GlusterAIOCB *acb)
> +{
> +    int ret = 0;
> +    while (1) {
> +        fd_set wfd;
> +        int fd = s->fds[GLUSTER_FD_WRITE];
> +
> +        ret = write(fd, (void *)&acb, sizeof(acb));
> +        if (ret >= 0) {
> +            break;
> +        }
> +        if (errno == EINTR) {
> +            continue;
> +        }
> +        if (errno != EAGAIN) {
> +            break;
> +        }
> +
> +        FD_ZERO(&wfd);
> +        FD_SET(fd, &wfd);
> +        do {
> +            ret = select(fd + 1, NULL, &wfd, NULL, NULL);
> +        } while (ret < 0 && errno == EINTR);
> +    }
> +    return ret;
> +}
> +
> +static void gluster_finish_aiocb(struct glfs_fd *fd, ssize_t ret, void *arg)
> +{
> +    GlusterAIOCB *acb = (GlusterAIOCB *)arg;
> +    BDRVGlusterState *s = acb->common.bs->opaque;
> +
> +    acb->ret = ret;
> +    if (qemu_gluster_send_pipe(s, acb) < 0) {
> +        error_report("Could not complete read/write/flush from gluster");
> +        abort();
> +    }
> +}
> +
> +static BlockDriverAIOCB *qemu_gluster_aio_rw(BlockDriverState *bs,
> +        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> +        BlockDriverCompletionFunc *cb, void *opaque, int write)
> +{
> +    int ret;
> +    GlusterAIOCB *acb;
> +    BDRVGlusterState *s = bs->opaque;
> +    size_t size;
> +    off_t offset;
> +
> +    offset = sector_num * BDRV_SECTOR_SIZE;
> +    size = nb_sectors * BDRV_SECTOR_SIZE;
> +    s->qemu_aio_count++;
> +
> +    acb = qemu_aio_get(&gluster_aio_pool, bs, cb, opaque);
> +    acb->size = size;
> +    acb->ret = 0;
> +    acb->canceled = false;
> +
> +    if (write) {
> +        ret = glfs_pwritev_async(s->fd, qiov->iov, qiov->niov, offset, 0,
> +            &gluster_finish_aiocb, acb);
> +    } else {
> +        ret = glfs_preadv_async(s->fd, qiov->iov, qiov->niov, offset, 0,
> +            &gluster_finish_aiocb, acb);
> +    }
> +
> +    if (ret < 0) {
> +        goto out;
> +    }
> +    return &acb->common;
> +
> +out:
> +    s->qemu_aio_count--;
> +    qemu_aio_release(acb);
> +    return NULL;
> +}
> +
> +static BlockDriverAIOCB *qemu_gluster_aio_readv(BlockDriverState *bs,
> +        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> +        BlockDriverCompletionFunc *cb, void *opaque)
> +{
> +    return qemu_gluster_aio_rw(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
> +}
> +
> +static BlockDriverAIOCB *qemu_gluster_aio_writev(BlockDriverState *bs,
> +        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> +        BlockDriverCompletionFunc *cb, void *opaque)
> +{
> +    return qemu_gluster_aio_rw(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
> +}
> +
> +static BlockDriverAIOCB *qemu_gluster_aio_flush(BlockDriverState *bs,
> +        BlockDriverCompletionFunc *cb, void *opaque)
> +{
> +    int ret;
> +    GlusterAIOCB *acb;
> +    BDRVGlusterState *s = bs->opaque;
> +
> +    acb = qemu_aio_get(&gluster_aio_pool, bs, cb, opaque);
> +    acb->size = 0;
> +    acb->ret = 0;
> +    acb->canceled = false;
> +    s->qemu_aio_count++;
> +
> +    ret = glfs_fsync_async(s->fd, &gluster_finish_aiocb, acb);
> +    if (ret < 0) {
> +        goto out;
> +    }
> +    return &acb->common;
> +
> +out:
> +    s->qemu_aio_count--;
> +    qemu_aio_release(acb);
> +    return NULL;
> +}
> +
> +static int64_t qemu_gluster_getlength(BlockDriverState *bs)
> +{
> +    BDRVGlusterState *s = bs->opaque;
> +    struct stat st;
> +    int ret;
> +
> +    ret = glfs_fstat(s->fd, &st);
> +    if (ret < 0) {
> +        return -errno;
> +    } else {
> +        return st.st_size;
> +    }
> +}
> +
> +static void qemu_gluster_close(BlockDriverState *bs)
> +{
> +    BDRVGlusterState *s = bs->opaque;
> +
> +    if (s->fd) {
> +        glfs_close(s->fd);
> +        s->fd = NULL;
> +    }
> +    glfs_fini(s->glfs);
> +}
> +
> +static QEMUOptionParameter qemu_gluster_create_options[] = {
> +    {
> +        .name = BLOCK_OPT_SIZE,
> +        .type = OPT_SIZE,
> +        .help = "Virtual disk size"
> +    },
> +    { NULL }
> +};
> +
> +static BlockDriver bdrv_gluster = {
> +    .format_name = "gluster",
> +    .protocol_name = "gluster",
> +    .instance_size = sizeof(BDRVGlusterState),
> +    .bdrv_file_open = qemu_gluster_open,
> +    .bdrv_close = qemu_gluster_close,
> +    .bdrv_create = qemu_gluster_create,
> +    .bdrv_getlength = qemu_gluster_getlength,
> +
> +    .bdrv_aio_readv = qemu_gluster_aio_readv,
> +    .bdrv_aio_writev = qemu_gluster_aio_writev,
> +    .bdrv_aio_flush = qemu_gluster_aio_flush,
> +
> +    .create_options = qemu_gluster_create_options,
> +};
> +
> +static void bdrv_gluster_init(void)
> +{
> +    bdrv_register(&bdrv_gluster);
> +}
> +
> +block_init(bdrv_gluster_init);
>
>
Bharata B Rao - July 28, 2012, 4:43 a.m.
On Fri, Jul 27, 2012 at 06:44:04PM +0000, Blue Swirl wrote:
> > +struct GlusterOpts {
> 
> static

Sure.

> 
> > +    bool optional;
> > +    char defval[10];
> 
> const char *defval?

Sure I could.

> 
> > +    char *value;
> > +} GlusterOpts[] = {
> > +    {false, "", NULL },
> > +    {false, "", NULL },
> > +    {true, "0", NULL },
> > +    {true, "socket", NULL },
> > +    {false, "", NULL },
> > +    {false, "", NULL },
> > +};
> > +
> > +    if (i == GOPT_LAST-1 && strlen(q)) {
> 
> Spaces around '-'.

checkpatch.pl doesn't enforce this, but I can change.

> > +
> > +    port = strtoul(GlusterOpts[GOPT_PORT].value, NULL, 0);
> > +    if (port < 0) {
> 
> port > 65535 could be bad too.

Actually I am just checking if strtoul gave me a valid integer only
and depending on gluster to flag an error for invalid port number.
But I guess no harm in checking for valid port range here. Is there
a #define equivalent for 65535 ?

> 
> > +
> > +    fd = glfs_creat(glfs, GlusterOpts[GOPT_IMAGE].value,
> > +        O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, S_IRUSR|S_IWUSR);
> 
> Spaces around '|'.

Again, checkpatch.pl doesn't enforce this, but I can change.

Thanks for take time to review.

Regards,
Bharata.
Blue Swirl - July 28, 2012, 8:23 a.m.
On Sat, Jul 28, 2012 at 4:43 AM, Bharata B Rao
<bharata@linux.vnet.ibm.com> wrote:
> On Fri, Jul 27, 2012 at 06:44:04PM +0000, Blue Swirl wrote:
>> > +struct GlusterOpts {
>>
>> static
>
> Sure.
>
>>
>> > +    bool optional;
>> > +    char defval[10];
>>
>> const char *defval?
>
> Sure I could.
>
>>
>> > +    char *value;
>> > +} GlusterOpts[] = {
>> > +    {false, "", NULL },
>> > +    {false, "", NULL },
>> > +    {true, "0", NULL },
>> > +    {true, "socket", NULL },
>> > +    {false, "", NULL },
>> > +    {false, "", NULL },
>> > +};
>> > +
>> > +    if (i == GOPT_LAST-1 && strlen(q)) {
>>
>> Spaces around '-'.
>
> checkpatch.pl doesn't enforce this, but I can change.
>
>> > +
>> > +    port = strtoul(GlusterOpts[GOPT_PORT].value, NULL, 0);
>> > +    if (port < 0) {
>>
>> port > 65535 could be bad too.
>
> Actually I am just checking if strtoul gave me a valid integer only
> and depending on gluster to flag an error for invalid port number.
> But I guess no harm in checking for valid port range here. Is there
> a #define equivalent for 65535 ?

I don't think there is. It may also be possible to omit the check if
the connection function checks it and fails. Accidental modular
arithmetic for the port numbers (65537 % 65536 == 1) would not be so
nice if there are no checks.

>
>>
>> > +
>> > +    fd = glfs_creat(glfs, GlusterOpts[GOPT_IMAGE].value,
>> > +        O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, S_IRUSR|S_IWUSR);
>>
>> Spaces around '|'.
>
> Again, checkpatch.pl doesn't enforce this, but I can change.
>
> Thanks for take time to review.
>
> Regards,
> Bharata.
>

Patch

diff --git a/block/Makefile.objs b/block/Makefile.objs
index b5754d3..a1ae67f 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -9,3 +9,4 @@  block-obj-$(CONFIG_POSIX) += raw-posix.o
 block-obj-$(CONFIG_LIBISCSI) += iscsi.o
 block-obj-$(CONFIG_CURL) += curl.o
 block-obj-$(CONFIG_RBD) += rbd.o
+block-obj-$(CONFIG_GLUSTERFS) += gluster.o
diff --git a/block/gluster.c b/block/gluster.c
new file mode 100644
index 0000000..b27971b
--- /dev/null
+++ b/block/gluster.c
@@ -0,0 +1,484 @@ 
+/*
+ * GlusterFS backend for QEMU
+ *
+ * (AIO implementation is derived from block/rbd.c)
+ *
+ * Copyright (C) 2012 Bharata B Rao <bharata@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * (at your option) any later version. See the COPYING file in the top-level
+ * directory.
+ */
+#include "block_int.h"
+#include <glusterfs/api/glfs.h>
+
+typedef struct GlusterAIOCB {
+    BlockDriverAIOCB common;
+    bool canceled;
+    int64_t size;
+    int ret;
+} GlusterAIOCB;
+
+typedef struct BDRVGlusterState {
+    struct glfs *glfs;
+    int fds[2];
+    struct glfs_fd *fd;
+    int qemu_aio_count;
+} BDRVGlusterState;
+
+#define GLUSTER_FD_READ 0
+#define GLUSTER_FD_WRITE 1
+
+typedef enum {
+    GOPT_PROTOCOL,
+    GOPT_SERVER,
+    GOPT_PORT,
+    GOPT_TRANSPORT,
+    GOPT_VOLNAME,
+    GOPT_IMAGE,
+
+    GOPT_LAST,
+} GlusterOptsEnum;
+
+struct GlusterOpts {
+    bool optional;
+    char defval[10];
+    char *value;
+} GlusterOpts[] = {
+    {false, "", NULL },
+    {false, "", NULL },
+    {true, "0", NULL },
+    {true, "socket", NULL },
+    {false, "", NULL },
+    {false, "", NULL },
+};
+
+static void qemu_gluster_opts_free(void)
+{
+    int i;
+
+    for (i = 0; i < GOPT_LAST; i++) {
+        g_free(GlusterOpts[i].value);
+        /* Prepare GlusterOpts to parse the next gluster drive (if any) */
+        GlusterOpts[i].value = NULL;
+    }
+}
+
+/*
+ * file=protocol:server:[port]:[transport]:volname:image
+ */
+static int qemu_gluster_parsename(const char *filename)
+{
+    char *p, *q, *r;
+    int ret = -EINVAL;
+    int i;
+
+    p = q = r = g_strdup(filename);
+    for (i = 0; i < GOPT_LAST; i++) {
+        q = p;
+        p = strchr(p, ':');
+        if (!p) {
+            goto out;
+        }
+
+        if (p == q) {
+            if (GlusterOpts[i].optional) {
+                GlusterOpts[i].value = g_strdup(GlusterOpts[i].defval);
+                p++;
+                continue;
+            } else {
+                goto out;
+            }
+        }
+        *p++ = '\0';
+        GlusterOpts[i].value = g_strdup(q);
+    }
+out:
+    if (i == GOPT_LAST-1 && strlen(q)) {
+        GlusterOpts[i].value = g_strdup(q);
+        ret = 0;
+    }
+    g_free(r);
+    return ret;
+}
+
+static struct glfs *qemu_gluster_init(const char *filename)
+{
+    struct glfs *glfs = NULL;
+    int ret;
+    int port;
+
+    ret = qemu_gluster_parsename(filename);
+    if (ret < 0) {
+        errno = -ret;
+        goto out;
+    }
+
+    port = strtoul(GlusterOpts[GOPT_PORT].value, NULL, 0);
+    if (port < 0) {
+        goto out;
+    }
+
+    glfs = glfs_new(GlusterOpts[GOPT_VOLNAME].value);
+    if (!glfs) {
+        goto out;
+    }
+
+    ret = glfs_set_volfile_server(glfs, GlusterOpts[GOPT_TRANSPORT].value,
+        GlusterOpts[GOPT_SERVER].value, port);
+    if (ret < 0) {
+        goto out;
+    }
+
+    /*
+     * TODO: When GlusterFS exports logging.h, use GF_LOG_ERROR instead of
+     * hard code value of 4 here.
+     */
+    ret = glfs_set_logging(glfs, "-", 4);
+    if (ret < 0) {
+        goto out;
+    }
+
+    ret = glfs_init(glfs);
+    if (ret < 0) {
+        goto out;
+    }
+    return glfs;
+
+out:
+    if (glfs) {
+        glfs_fini(glfs);
+    }
+    return NULL;
+}
+
+static void qemu_gluster_complete_aio(GlusterAIOCB *acb)
+{
+    int ret;
+
+    if (acb->canceled) {
+        qemu_aio_release(acb);
+        return;
+    }
+
+    if (acb->ret == acb->size) {
+        ret = 0; /* Success */
+    } else if (acb->ret < 0) {
+        ret = acb->ret; /* Read/Write failed */
+    } else {
+        ret = -EIO; /* Partial read/write - fail it */
+    }
+    acb->common.cb(acb->common.opaque, ret);
+    qemu_aio_release(acb);
+}
+
+static void qemu_gluster_aio_event_reader(void *opaque)
+{
+    BDRVGlusterState *s = opaque;
+    GlusterAIOCB *event_acb;
+    int event_reader_pos = 0;
+    ssize_t ret;
+
+    do {
+        char *p = (char *)&event_acb;
+
+        ret = read(s->fds[GLUSTER_FD_READ], p + event_reader_pos,
+                   sizeof(event_acb) - event_reader_pos);
+        if (ret > 0) {
+            event_reader_pos += ret;
+            if (event_reader_pos == sizeof(event_acb)) {
+                event_reader_pos = 0;
+                qemu_gluster_complete_aio(event_acb);
+                s->qemu_aio_count--;
+            }
+        }
+    } while (ret < 0 && errno == EINTR);
+}
+
+static int qemu_gluster_aio_flush_cb(void *opaque)
+{
+    BDRVGlusterState *s = opaque;
+
+    return (s->qemu_aio_count > 0);
+}
+
+static int qemu_gluster_open(BlockDriverState *bs, const char *filename,
+    int bdrv_flags)
+{
+    BDRVGlusterState *s = bs->opaque;
+    int open_flags = 0;
+    int ret = 0;
+
+    s->glfs = qemu_gluster_init(filename);
+    if (!s->glfs) {
+        ret = -errno;
+        goto out;
+    }
+
+    open_flags |=  O_BINARY;
+    open_flags &= ~O_ACCMODE;
+    if (bdrv_flags & BDRV_O_RDWR) {
+        open_flags |= O_RDWR;
+    } else {
+        open_flags |= O_RDONLY;
+    }
+
+    if ((bdrv_flags & BDRV_O_NOCACHE)) {
+        open_flags |= O_DIRECT;
+    }
+
+    s->fd = glfs_open(s->glfs, GlusterOpts[GOPT_IMAGE].value, open_flags);
+    if (!s->fd) {
+        ret = -errno;
+        goto out;
+    }
+
+    ret = qemu_pipe(s->fds);
+    if (ret < 0) {
+        goto out;
+    }
+    fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
+    fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
+    qemu_aio_set_fd_handler(s->fds[GLUSTER_FD_READ],
+        qemu_gluster_aio_event_reader, NULL, qemu_gluster_aio_flush_cb, s);
+
+out:
+    qemu_gluster_opts_free();
+    if (!ret) {
+        return ret;
+    }
+    if (s->fd) {
+        glfs_close(s->fd);
+    }
+    if (s->glfs) {
+        glfs_fini(s->glfs);
+    }
+    return ret;
+}
+
+static int qemu_gluster_create(const char *filename,
+        QEMUOptionParameter *options)
+{
+    struct glfs *glfs;
+    struct glfs_fd *fd;
+    int ret = 0;
+    int64_t total_size = 0;
+
+    glfs = qemu_gluster_init(filename);
+    if (!glfs) {
+        ret = -errno;
+        goto out;
+    }
+
+    while (options && options->name) {
+        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
+            total_size = options->value.n / BDRV_SECTOR_SIZE;
+        }
+        options++;
+    }
+
+    fd = glfs_creat(glfs, GlusterOpts[GOPT_IMAGE].value,
+        O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, S_IRUSR|S_IWUSR);
+    if (!fd) {
+        ret = -errno;
+    } else {
+        if (glfs_ftruncate(fd, total_size * BDRV_SECTOR_SIZE) != 0) {
+            ret = -errno;
+        }
+        if (glfs_close(fd) != 0) {
+            ret = -errno;
+        }
+    }
+out:
+    qemu_gluster_opts_free();
+    if (glfs) {
+        glfs_fini(glfs);
+    }
+    return ret;
+}
+
+static void qemu_gluster_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+    GlusterAIOCB *acb = (GlusterAIOCB *)blockacb;
+
+    acb->common.cb(acb->common.opaque, -ECANCELED);
+    acb->canceled = true;
+}
+
+static AIOPool gluster_aio_pool = {
+    .aiocb_size = sizeof(GlusterAIOCB),
+    .cancel = qemu_gluster_aio_cancel,
+};
+
+static int qemu_gluster_send_pipe(BDRVGlusterState *s, GlusterAIOCB *acb)
+{
+    int ret = 0;
+    while (1) {
+        fd_set wfd;
+        int fd = s->fds[GLUSTER_FD_WRITE];
+
+        ret = write(fd, (void *)&acb, sizeof(acb));
+        if (ret >= 0) {
+            break;
+        }
+        if (errno == EINTR) {
+            continue;
+        }
+        if (errno != EAGAIN) {
+            break;
+        }
+
+        FD_ZERO(&wfd);
+        FD_SET(fd, &wfd);
+        do {
+            ret = select(fd + 1, NULL, &wfd, NULL, NULL);
+        } while (ret < 0 && errno == EINTR);
+    }
+    return ret;
+}
+
+static void gluster_finish_aiocb(struct glfs_fd *fd, ssize_t ret, void *arg)
+{
+    GlusterAIOCB *acb = (GlusterAIOCB *)arg;
+    BDRVGlusterState *s = acb->common.bs->opaque;
+
+    acb->ret = ret;
+    if (qemu_gluster_send_pipe(s, acb) < 0) {
+        error_report("Could not complete read/write/flush from gluster");
+        abort();
+    }
+}
+
+static BlockDriverAIOCB *qemu_gluster_aio_rw(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque, int write)
+{
+    int ret;
+    GlusterAIOCB *acb;
+    BDRVGlusterState *s = bs->opaque;
+    size_t size;
+    off_t offset;
+
+    offset = sector_num * BDRV_SECTOR_SIZE;
+    size = nb_sectors * BDRV_SECTOR_SIZE;
+    s->qemu_aio_count++;
+
+    acb = qemu_aio_get(&gluster_aio_pool, bs, cb, opaque);
+    acb->size = size;
+    acb->ret = 0;
+    acb->canceled = false;
+
+    if (write) {
+        ret = glfs_pwritev_async(s->fd, qiov->iov, qiov->niov, offset, 0,
+            &gluster_finish_aiocb, acb);
+    } else {
+        ret = glfs_preadv_async(s->fd, qiov->iov, qiov->niov, offset, 0,
+            &gluster_finish_aiocb, acb);
+    }
+
+    if (ret < 0) {
+        goto out;
+    }
+    return &acb->common;
+
+out:
+    s->qemu_aio_count--;
+    qemu_aio_release(acb);
+    return NULL;
+}
+
+static BlockDriverAIOCB *qemu_gluster_aio_readv(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    return qemu_gluster_aio_rw(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
+}
+
+static BlockDriverAIOCB *qemu_gluster_aio_writev(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    return qemu_gluster_aio_rw(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
+}
+
+static BlockDriverAIOCB *qemu_gluster_aio_flush(BlockDriverState *bs,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    int ret;
+    GlusterAIOCB *acb;
+    BDRVGlusterState *s = bs->opaque;
+
+    acb = qemu_aio_get(&gluster_aio_pool, bs, cb, opaque);
+    acb->size = 0;
+    acb->ret = 0;
+    acb->canceled = false;
+    s->qemu_aio_count++;
+
+    ret = glfs_fsync_async(s->fd, &gluster_finish_aiocb, acb);
+    if (ret < 0) {
+        goto out;
+    }
+    return &acb->common;
+
+out:
+    s->qemu_aio_count--;
+    qemu_aio_release(acb);
+    return NULL;
+}
+
+static int64_t qemu_gluster_getlength(BlockDriverState *bs)
+{
+    BDRVGlusterState *s = bs->opaque;
+    struct stat st;
+    int ret;
+
+    ret = glfs_fstat(s->fd, &st);
+    if (ret < 0) {
+        return -errno;
+    } else {
+        return st.st_size;
+    }
+}
+
+static void qemu_gluster_close(BlockDriverState *bs)
+{
+    BDRVGlusterState *s = bs->opaque;
+
+    if (s->fd) {
+        glfs_close(s->fd);
+        s->fd = NULL;
+    }
+    glfs_fini(s->glfs);
+}
+
+static QEMUOptionParameter qemu_gluster_create_options[] = {
+    {
+        .name = BLOCK_OPT_SIZE,
+        .type = OPT_SIZE,
+        .help = "Virtual disk size"
+    },
+    { NULL }
+};
+
+static BlockDriver bdrv_gluster = {
+    .format_name = "gluster",
+    .protocol_name = "gluster",
+    .instance_size = sizeof(BDRVGlusterState),
+    .bdrv_file_open = qemu_gluster_open,
+    .bdrv_close = qemu_gluster_close,
+    .bdrv_create = qemu_gluster_create,
+    .bdrv_getlength = qemu_gluster_getlength,
+
+    .bdrv_aio_readv = qemu_gluster_aio_readv,
+    .bdrv_aio_writev = qemu_gluster_aio_writev,
+    .bdrv_aio_flush = qemu_gluster_aio_flush,
+
+    .create_options = qemu_gluster_create_options,
+};
+
+static void bdrv_gluster_init(void)
+{
+    bdrv_register(&bdrv_gluster);
+}
+
+block_init(bdrv_gluster_init);