diff mbox

[RFC,3/3] block: gluster as block backend

Message ID 20120611142144.GD2737@in.ibm.com
State New
Headers show

Commit Message

Bharata B Rao June 11, 2012, 2:21 p.m. UTC
block: gluster as block backend

From: Bharata B Rao <bharata@linux.vnet.ibm.com>

This patch adds gluster as the new block backend in QEMU. This gives QEMU
the ability to boot VM images from gluster volumes.

Signed-off-by: Bharata B Rao <bharata@linux.vnet.ibm.com>
---

 Makefile.objs   |    2 
 block/gluster.c |  435 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 436 insertions(+), 1 deletions(-)
 create mode 100644 block/gluster.c

Comments

Stefan Hajnoczi June 18, 2012, 5:35 p.m. UTC | #1
On Mon, Jun 11, 2012 at 3:21 PM, Bharata B Rao
<bharata@linux.vnet.ibm.com> wrote:
> +#include "block_int.h"
> +#include "gluster-helpers.h"
> +
> +typedef void *gluster_file_t;

This typedef is already in gluster-helpers.h.  It's ugly BTW, "typedef
struct gluster_file gluster_file_t" is nicer since it won't cast to
other pointer types automatically.

> +
> +typedef struct glusterConf {
> +    char volfile[PATH_MAX];
> +    char image[PATH_MAX];
> +} glusterConf;

QEMU coding style always uses UpperCase for struct names.

> +static void qemu_gluster_aio_event_reader(void *opaque)
> +{
> +    BDRVGlusterState *s = opaque;
> +    ssize_t ret;
> +
> +    do {
> +        char *p = (char *)&s->event_gaiocb;

Why make this a BDRVGlusterState field?  It could be a local, I think.

> +    /* Use O_DSYNC for write-through caching, no flags for write-back caching,
> +     * and O_DIRECT for no caching. */
> +    if ((bdrv_flags & BDRV_O_NOCACHE))
> +        s->open_flags |= O_DIRECT;
> +    if (!(bdrv_flags & BDRV_O_CACHE_WB))
> +        s->open_flags |= O_DSYNC;

Paolo has changed this recently, you might need to use
bs->enable_write_cache instead.

> +out:
> +    if (c) {
> +        g_free(c);
> +    }

g_free(NULL) is a nop, you never need to test that the pointer is non-NULL.

> +static void gluster_finish_aiocb(void *arg)
> +{
> +    int ret;
> +    gluster_aiocb_t *gaiocb = (gluster_aiocb_t *)arg;
> +    BDRVGlusterState *s = ((glusterAIOCB *)gaiocb->opaque)->s;
> +
> +    ret = qemu_gluster_send_pipe(s, gaiocb);
> +    if (ret < 0) {
> +        g_free(gaiocb);

What about the glusterAIOCB?  You need to invoke the callback with an
error value.

What about decrementing the in-flight I/O request count?

> +static BlockDriverAIOCB *qemu_gluster_aio_rw(BlockDriverState *bs,
> +        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> +        BlockDriverCompletionFunc *cb, void *opaque, int write)
> +{
> +    int ret;
> +    glusterAIOCB *acb;
> +    gluster_aiocb_t *gaiocb;
> +    BDRVGlusterState *s = bs->opaque;
> +    char *buf;
> +    size_t size;
> +    off_t offset;
> +
> +    acb = qemu_aio_get(&gluster_aio_pool, bs, cb, opaque);
> +    acb->write = write;
> +    acb->qiov = qiov;
> +    acb->bounce = qemu_blockalign(bs, qiov->size);
> +    acb->ret = 0;
> +    acb->bh = NULL;
> +    acb->s = s;
> +
> +    if (write) {
> +        qemu_iovec_to_buffer(acb->qiov, acb->bounce);
> +    }
> +
> +    buf = acb->bounce;
> +    offset = sector_num * BDRV_SECTOR_SIZE;
> +    size = nb_sectors * BDRV_SECTOR_SIZE;
> +    s->qemu_aio_count++;
> +
> +    gaiocb = g_malloc(sizeof(gluster_aiocb_t));

Can you make this a field of glusterAIOCB?  Then you don't need to
worry about freeing gaiocb later.

> +static int64_t qemu_gluster_getlength(BlockDriverState *bs)
> +{
> +    BDRVGlusterState *s = bs->opaque;
> +    gluster_file_t fd = s->fd;
> +    struct stat st;
> +    int ret;
> +
> +    ret = gluster_fstat(fd, &st);
> +    if (ret < 0) {
> +        return -1;

Please return a negative errno instead of -1.

Stefan
Avi Kivity June 19, 2012, 9:27 a.m. UTC | #2
On 06/18/2012 08:35 PM, Stefan Hajnoczi wrote:
> On Mon, Jun 11, 2012 at 3:21 PM, Bharata B Rao
> <bharata@linux.vnet.ibm.com> wrote:
>> +#include "block_int.h"
>> +#include "gluster-helpers.h"
>> +
>> +typedef void *gluster_file_t;
> 
> This typedef is already in gluster-helpers.h.  It's ugly BTW, "typedef
> struct gluster_file gluster_file_t" is nicer since it won't cast to
> other pointer types automatically.

gluster_file_t can only be cast to a NACK since names ending with _t are
reserved by the C runtime.
Bharata B Rao June 19, 2012, 9:30 a.m. UTC | #3
On Mon, Jun 18, 2012 at 06:35:28PM +0100, Stefan Hajnoczi wrote:
> On Mon, Jun 11, 2012 at 3:21 PM, Bharata B Rao
> <bharata@linux.vnet.ibm.com> wrote:
> > +#include "block_int.h"
> > +#include "gluster-helpers.h"
> > +
> > +typedef void *gluster_file_t;
> 
> This typedef is already in gluster-helpers.h.

Yes, will fix that.

> It's ugly BTW, "typedef
> struct gluster_file gluster_file_t" is nicer since it won't cast to
> other pointer types automatically.

Gluster routines in libglusterfsclient operate on gluster specific descriptor
called fd_t.

glusterfs_open returns a pointer to fd_t and rest of the read/write routines
take that pointer as input. libglusterfsclient hides this pointer by doing

typedef void *glusterfs_file_t.

I wanted to return an integer fd from open and then use them with read and
write. But that would need some code in gluster backend to convert integer
fd to fd_t and vice versa. Since libglusterfsclient doesn't deal with integer
fd's, I retained this ugly typedef.

> 
> > +
> > +typedef struct glusterConf {
> > +    char volfile[PATH_MAX];
> > +    char image[PATH_MAX];
> > +} glusterConf;
> 
> QEMU coding style always uses UpperCase for struct names.

Ok, will fix.

> 
> > +static void qemu_gluster_aio_event_reader(void *opaque)
> > +{
> > +    BDRVGlusterState *s = opaque;
> > +    ssize_t ret;
> > +
> > +    do {
> > +        char *p = (char *)&s->event_gaiocb;
> 
> Why make this a BDRVGlusterState field?  It could be a local, I think.

I could I guess, I was just following what rbd does.

> 
> > +    /* Use O_DSYNC for write-through caching, no flags for write-back caching,
> > +     * and O_DIRECT for no caching. */
> > +    if ((bdrv_flags & BDRV_O_NOCACHE))
> > +        s->open_flags |= O_DIRECT;
> > +    if (!(bdrv_flags & BDRV_O_CACHE_WB))
> > +        s->open_flags |= O_DSYNC;
> 
> Paolo has changed this recently, you might need to use
> bs->enable_write_cache instead.

I picked up this logic from block/raw-posix.c:raw_open_common(). Don't see
anything related to bs->enable_write_cache there. Will find out more about
bs->enable_write_cache.

> 
> > +out:
> > +    if (c) {
> > +        g_free(c);
> > +    }
> 
> g_free(NULL) is a nop, you never need to test that the pointer is non-NULL.

Ok.

> 
> > +static void gluster_finish_aiocb(void *arg)
> > +{
> > +    int ret;
> > +    gluster_aiocb_t *gaiocb = (gluster_aiocb_t *)arg;
> > +    BDRVGlusterState *s = ((glusterAIOCB *)gaiocb->opaque)->s;
> > +
> > +    ret = qemu_gluster_send_pipe(s, gaiocb);
> > +    if (ret < 0) {
> > +        g_free(gaiocb);
> 
> What about the glusterAIOCB?  You need to invoke the callback with an
> error value.
> 
> What about decrementing the in-flight I/O request count?

Again, this comes from rbd. gluster_finish_aiocb() is the callback
that we have registered with gluster. I am not doing any error handling when
we even fail to write to the pipe. An even reader would be waiting to read
from the other end of the pipe. Typically error handling and decrementing
the in-flight IO request count is done by that event reader. But in this
case, we even failed to kick (via pipe write) the even reader.

> 
> > +static BlockDriverAIOCB *qemu_gluster_aio_rw(BlockDriverState *bs,
> > +        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> > +        BlockDriverCompletionFunc *cb, void *opaque, int write)
> > +{
> > +    int ret;
> > +    glusterAIOCB *acb;
> > +    gluster_aiocb_t *gaiocb;
> > +    BDRVGlusterState *s = bs->opaque;
> > +    char *buf;
> > +    size_t size;
> > +    off_t offset;
> > +
> > +    acb = qemu_aio_get(&gluster_aio_pool, bs, cb, opaque);
> > +    acb->write = write;
> > +    acb->qiov = qiov;
> > +    acb->bounce = qemu_blockalign(bs, qiov->size);
> > +    acb->ret = 0;
> > +    acb->bh = NULL;
> > +    acb->s = s;
> > +
> > +    if (write) {
> > +        qemu_iovec_to_buffer(acb->qiov, acb->bounce);
> > +    }
> > +
> > +    buf = acb->bounce;
> > +    offset = sector_num * BDRV_SECTOR_SIZE;
> > +    size = nb_sectors * BDRV_SECTOR_SIZE;
> > +    s->qemu_aio_count++;
> > +
> > +    gaiocb = g_malloc(sizeof(gluster_aiocb_t));
> 
> Can you make this a field of glusterAIOCB?  Then you don't need to
> worry about freeing gaiocb later.

Hmm, I already have glusterAIOCB as part of gaiocb.

> 
> > +static int64_t qemu_gluster_getlength(BlockDriverState *bs)
> > +{
> > +    BDRVGlusterState *s = bs->opaque;
> > +    gluster_file_t fd = s->fd;
> > +    struct stat st;
> > +    int ret;
> > +
> > +    ret = gluster_fstat(fd, &st);
> > +    if (ret < 0) {
> > +        return -1;
> 
> Please return a negative errno instead of -1.

Ok. May be I could just return value from gluster_fstat().

Thanks for your review.

Regards,
Bharata.
Stefan Hajnoczi June 19, 2012, 11:05 a.m. UTC | #4
On Tue, Jun 19, 2012 at 10:30 AM, Bharata B Rao
<bharata@linux.vnet.ibm.com> wrote:
> On Mon, Jun 18, 2012 at 06:35:28PM +0100, Stefan Hajnoczi wrote:
>> On Mon, Jun 11, 2012 at 3:21 PM, Bharata B Rao
>> <bharata@linux.vnet.ibm.com> wrote:
>> > + á á/* Use O_DSYNC for write-through caching, no flags for write-back caching,
>> > + á á * and O_DIRECT for no caching. */
>> > + á áif ((bdrv_flags & BDRV_O_NOCACHE))
>> > + á á á ás->open_flags |= O_DIRECT;
>> > + á áif (!(bdrv_flags & BDRV_O_CACHE_WB))
>> > + á á á ás->open_flags |= O_DSYNC;
>>
>> Paolo has changed this recently, you might need to use
>> bs->enable_write_cache instead.
>
> I picked up this logic from block/raw-posix.c:raw_open_common(). Don't see
> anything related to bs->enable_write_cache there. Will find out more about
> bs->enable_write_cache.

If you fetch the latest qemu.git and check bdrv_open_common() there is
new code that stashes BDRV_O_CACHE_WB in bs->enable_write_cache and
then opens the actual block driver with BDRV_O_CACHE_WB set.  You can
use bdrv_enable_write_cache() to test the original flag.

>> > +static void gluster_finish_aiocb(void *arg)
>> > +{
>> > + á áint ret;
>> > + á ágluster_aiocb_t *gaiocb = (gluster_aiocb_t *)arg;
>> > + á áBDRVGlusterState *s = ((glusterAIOCB *)gaiocb->opaque)->s;
>> > +
>> > + á áret = qemu_gluster_send_pipe(s, gaiocb);
>> > + á áif (ret < 0) {
>> > + á á á ág_free(gaiocb);
>>
>> What about the glusterAIOCB?  You need to invoke the callback with an
>> error value.
>>
>> What about decrementing the in-flight I/O request count?
>
> Again, this comes from rbd. gluster_finish_aiocb() is the callback
> that we have registered with gluster. I am not doing any error handling when
> we even fail to write to the pipe. An even reader would be waiting to read
> from the other end of the pipe. Typically error handling and decrementing
> the in-flight IO request count is done by that event reader. But in this
> case, we even failed to kick (via pipe write) the even reader.

It sounds like you're saying the request is not properly cleaned up
and completed on failure.  Please fix :).

>> > +static int64_t qemu_gluster_getlength(BlockDriverState *bs)
>> > +{
>> > + á áBDRVGlusterState *s = bs->opaque;
>> > + á ágluster_file_t fd = s->fd;
>> > + á ástruct stat st;
>> > + á áint ret;
>> > +
>> > + á áret = gluster_fstat(fd, &st);
>> > + á áif (ret < 0) {
>> > + á á á áreturn -1;
>>
>> Please return a negative errno instead of -1.
>
> Ok. May be I could just return value from gluster_fstat().

The gluster_fstat() code also does not return negative errnos (at
least in the first case I checked, when CALLOC() fails).

Stefan
Paolo Bonzini July 1, 2012, 2:49 p.m. UTC | #5
Il 18/06/2012 19:35, Stefan Hajnoczi ha scritto:
>> > +    /* Use O_DSYNC for write-through caching, no flags for write-back caching,
>> > +     * and O_DIRECT for no caching. */
>> > +    if ((bdrv_flags & BDRV_O_NOCACHE))
>> > +        s->open_flags |= O_DIRECT;
>> > +    if (!(bdrv_flags & BDRV_O_CACHE_WB))
>> > +        s->open_flags |= O_DSYNC;
> Paolo has changed this recently, you might need to use
> bs->enable_write_cache instead.

At the protocol (i.e. low-level backend) level you don't need to do
anything really, if you implement bdrv_flush_to_disk correctly.

Looking at BDRV_O_CACHE_WB will do no harm, it's just dead code.

Paolo
Paolo Bonzini July 1, 2012, 2:50 p.m. UTC | #6
Il 19/06/2012 13:05, Stefan Hajnoczi ha scritto:
>> > I picked up this logic from block/raw-posix.c:raw_open_common(). Don't see
>> > anything related to bs->enable_write_cache there. Will find out more about
>> > bs->enable_write_cache.
> If you fetch the latest qemu.git and check bdrv_open_common() there is
> new code that stashes BDRV_O_CACHE_WB in bs->enable_write_cache and
> then opens the actual block driver with BDRV_O_CACHE_WB set.  You can
> use bdrv_enable_write_cache() to test the original flag.

Yes, but you shouldn't do this when opening.  You should always open for
writeback.

Paolo
diff mbox

Patch

diff --git a/Makefile.objs b/Makefile.objs
index 25190ba..859b88a 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -59,7 +59,7 @@  block-nested-$(CONFIG_POSIX) += raw-posix.o
 block-nested-$(CONFIG_LIBISCSI) += iscsi.o
 block-nested-$(CONFIG_CURL) += curl.o
 block-nested-$(CONFIG_RBD) += rbd.o
-block-nested-$(CONFIG_GLUSTERFS) += gluster-helpers.o
+block-nested-$(CONFIG_GLUSTERFS) += gluster-helpers.o gluster.o
 
 block-obj-y +=  $(addprefix block/, $(block-nested-y))
 
diff --git a/block/gluster.c b/block/gluster.c
new file mode 100644
index 0000000..1566cb7
--- /dev/null
+++ b/block/gluster.c
@@ -0,0 +1,435 @@ 
+/*
+ * GlusterFS backend for QEMU
+ *
+ * (AIO implementation is derived from block/rbd.c)
+ *
+ * Copyright (C) 2012 Bharata B Rao <bharata@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * (at your option) any later version. See the COPYING file in the top-level
+ * directory.
+ */
+#include "block_int.h"
+#include "gluster-helpers.h"
+
+typedef void *gluster_file_t;
+
+typedef struct glusterConf {
+    char volfile[PATH_MAX];
+    char image[PATH_MAX];
+} glusterConf;
+
+typedef struct BDRVGlusterState {
+    int fds[2];
+    int open_flags;
+    gluster_file_t fd;
+    glusterfs_ctx_t *ctx;
+    int qemu_aio_count;
+    int event_reader_pos;
+    gluster_aiocb_t *event_gaiocb;
+} BDRVGlusterState;
+
+typedef struct glusterAIOCB {
+    BlockDriverAIOCB common;
+    QEMUBH *bh;
+    QEMUIOVector *qiov;
+    int ret;
+    int write;
+    char *bounce;
+    BDRVGlusterState *s;
+    int cancelled;
+    int error;
+} glusterAIOCB;
+
+#define GLUSTER_FD_READ 0
+#define GLUSTER_FD_WRITE 1
+
+/*
+ * file=protocol:volfile:image
+ */
+static int qemu_gluster_parsename(glusterConf *c, const char *filename)
+{
+    char *file = g_strdup(filename);
+    char *token, *next_token, *saveptr;
+    int ret = 0;
+
+    /* Discard the protocol */
+    token = strtok_r(file, ":", &saveptr);
+    if (!token) {
+        ret = -EINVAL;
+        goto out;
+    }
+
+    /* volfile */
+    next_token = strtok_r(NULL, ":", &saveptr);
+    if (!next_token) {
+        ret = -EINVAL;
+        goto out;
+    }
+    strncpy(c->volfile, next_token, PATH_MAX);
+
+    /* image */
+    next_token = strtok_r(NULL, ":", &saveptr);
+    if (!next_token) {
+        ret = -EINVAL;
+        goto out;
+    }
+    strncpy(c->image, next_token, PATH_MAX);
+out:
+    g_free(file);
+    return ret;
+}
+
+static void gluster_aio_bh_cb(void *opaque)
+{
+    glusterAIOCB *acb = opaque;
+
+    if (!acb->write) {
+        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
+    }
+    qemu_vfree(acb->bounce);
+    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
+    qemu_bh_delete(acb->bh);
+    acb->bh = NULL;
+
+    qemu_aio_release(acb);
+}
+
+static void qemu_gluster_complete_aio(gluster_aiocb_t *gaiocb)
+{
+    glusterAIOCB *acb = (glusterAIOCB *)gaiocb->opaque;
+    int64_t r;
+
+    if (acb->cancelled) {
+        qemu_vfree(acb->bounce);
+        qemu_aio_release(acb);
+        goto done;
+    }
+
+    r = gaiocb->ret;
+
+    if (acb->write) {
+        if (r < 0) {
+            acb->ret = r;
+            acb->error = 1;
+        } else if (!acb->error) {
+            acb->ret = gaiocb->size;
+        }
+    } else {
+        if (r < 0) {
+            memset(gaiocb->buf, 0, gaiocb->size);
+            acb->ret = r;
+            acb->error = 1;
+        } else if (r < gaiocb->size) {
+            memset(gaiocb->buf + r, 0, gaiocb->size - r);
+            if (!acb->error) {
+                acb->ret = gaiocb->size;
+            }
+        } else if (!acb->error) {
+            acb->ret = r;
+        }
+    }
+    acb->bh = qemu_bh_new(gluster_aio_bh_cb, acb);
+    qemu_bh_schedule(acb->bh);
+done:
+    g_free(gaiocb);
+}
+
+static void qemu_gluster_aio_event_reader(void *opaque)
+{
+    BDRVGlusterState *s = opaque;
+    ssize_t ret;
+
+    do {
+        char *p = (char *)&s->event_gaiocb;
+
+        ret = read(s->fds[GLUSTER_FD_READ], p + s->event_reader_pos,
+                   sizeof(s->event_gaiocb) - s->event_reader_pos);
+        if (ret > 0) {
+            s->event_reader_pos += ret;
+            if (s->event_reader_pos == sizeof(s->event_gaiocb)) {
+                s->event_reader_pos = 0;
+                qemu_gluster_complete_aio(s->event_gaiocb);
+                s->qemu_aio_count--;
+            }
+        }
+    } while (ret < 0 && errno == EINTR);
+}
+
+static int qemu_gluster_aio_flush_cb(void *opaque)
+{
+    BDRVGlusterState *s = opaque;
+
+    return (s->qemu_aio_count > 0);
+}
+
+static int qemu_gluster_open(BlockDriverState *bs, const char *filename,
+    int bdrv_flags)
+{
+    BDRVGlusterState *s = bs->opaque;
+    glusterConf *c = g_malloc(sizeof(glusterConf));
+    int ret = -1;
+
+    if (qemu_gluster_parsename(c, filename)) {
+        goto out;
+    }
+
+    s->ctx = gluster_init(c->volfile);
+    if (!s->ctx) {
+        goto out;
+    }
+
+    /* FIX: Server client handshake takes time */
+    sleep(1);
+
+    s->open_flags |=  O_BINARY;
+    s->open_flags &= ~O_ACCMODE;
+    if (bdrv_flags & BDRV_O_RDWR) {
+        s->open_flags |= O_RDWR;
+    } else {
+        s->open_flags |= O_RDONLY;
+    }
+
+    /* Use O_DSYNC for write-through caching, no flags for write-back caching,
+     * and O_DIRECT for no caching. */
+    if ((bdrv_flags & BDRV_O_NOCACHE))
+        s->open_flags |= O_DIRECT;
+    if (!(bdrv_flags & BDRV_O_CACHE_WB))
+        s->open_flags |= O_DSYNC;
+
+    s->fd = gluster_open(c->image, s->open_flags, 0);
+    if (!s->fd) {
+        goto out;
+    }
+
+    ret = qemu_pipe(s->fds);
+    if (ret < 0) {
+        goto out;
+    }
+    fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
+    fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
+    qemu_aio_set_fd_handler(s->fds[GLUSTER_FD_READ],
+        qemu_gluster_aio_event_reader, NULL, qemu_gluster_aio_flush_cb, s);
+out:
+    if (c) {
+        g_free(c);
+    }
+    if (ret < 0) {
+        gluster_close(s->fd);
+    }
+    return ret;
+}
+
+static int qemu_gluster_create(const char *filename,
+        QEMUOptionParameter *options)
+{
+    glusterConf *c = g_malloc(sizeof(glusterConf));
+    int ret = 0;
+    gluster_file_t fd;
+    int64_t total_size = 0;
+
+    ret = qemu_gluster_parsename(c, filename);
+    if (ret) {
+        goto out;
+    }
+
+    if (!gluster_init(c->volfile)) {
+        ret = -1;
+        goto out;
+    }
+
+    /* FIX: Server client handshake takes time */
+    sleep(1);
+
+    /* Read out options */
+    while (options && options->name) {
+        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
+            total_size = options->value.n / BDRV_SECTOR_SIZE;
+        }
+        options++;
+    }
+
+    fd = gluster_creat(c->image, 0644);
+    if (!fd) {
+        ret = -errno;
+    } else {
+        if (gluster_ftruncate(fd, total_size * BDRV_SECTOR_SIZE) != 0) {
+            ret = -errno;
+        }
+        if (gluster_close(fd) != 0) {
+            ret = -errno;
+        }
+    }
+out:
+    if (c) {
+        g_free(c);
+    }
+    return ret;
+}
+
+static AIOPool gluster_aio_pool = {
+    .aiocb_size = sizeof(glusterAIOCB),
+};
+
+static int qemu_gluster_send_pipe(BDRVGlusterState *s, gluster_aiocb_t *gaiocb)
+{
+    int ret = 0;
+    while (1) {
+        fd_set wfd;
+        int fd = s->fds[GLUSTER_FD_WRITE];
+
+        ret = write(fd, (void *)&gaiocb, sizeof(gaiocb));
+        if (ret >= 0) {
+            break;
+        }
+        if (errno == EINTR) {
+            continue;
+        }
+        if (errno != EAGAIN) {
+            break;
+        }
+
+        FD_ZERO(&wfd);
+        FD_SET(fd, &wfd);
+        do {
+            ret = select(fd + 1, NULL, &wfd, NULL, NULL);
+        } while (ret < 0 && errno == EINTR);
+    }
+    return ret;
+}
+
+static void gluster_finish_aiocb(void *arg)
+{
+    int ret;
+    gluster_aiocb_t *gaiocb = (gluster_aiocb_t *)arg;
+    BDRVGlusterState *s = ((glusterAIOCB *)gaiocb->opaque)->s;
+
+    ret = qemu_gluster_send_pipe(s, gaiocb);
+    if (ret < 0) {
+        g_free(gaiocb);
+    }
+}
+
+static BlockDriverAIOCB *qemu_gluster_aio_rw(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque, int write)
+{
+    int ret;
+    glusterAIOCB *acb;
+    gluster_aiocb_t *gaiocb;
+    BDRVGlusterState *s = bs->opaque;
+    char *buf;
+    size_t size;
+    off_t offset;
+
+    acb = qemu_aio_get(&gluster_aio_pool, bs, cb, opaque);
+    acb->write = write;
+    acb->qiov = qiov;
+    acb->bounce = qemu_blockalign(bs, qiov->size);
+    acb->ret = 0;
+    acb->bh = NULL;
+    acb->s = s;
+
+    if (write) {
+        qemu_iovec_to_buffer(acb->qiov, acb->bounce);
+    }
+
+    buf = acb->bounce;
+    offset = sector_num * BDRV_SECTOR_SIZE;
+    size = nb_sectors * BDRV_SECTOR_SIZE;
+    s->qemu_aio_count++;
+
+    gaiocb = g_malloc(sizeof(gluster_aiocb_t));
+    gaiocb->opaque = acb;
+    gaiocb->buf = buf;
+    gaiocb->offset = offset;
+    gaiocb->size = size;
+    gaiocb->completion_fn = &gluster_finish_aiocb;
+
+    if (write) {
+        ret = gluster_aio_writev(s->fd, gaiocb);
+    } else {
+        ret = gluster_aio_readv(s->fd, gaiocb);
+    }
+
+    if (ret < 0) {
+        goto out;
+    }
+    return &acb->common;
+
+out:
+    g_free(gaiocb);
+    s->qemu_aio_count--;
+    qemu_aio_release(acb);
+    return NULL;
+}
+
+static BlockDriverAIOCB *qemu_gluster_aio_readv(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    return qemu_gluster_aio_rw(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
+}
+
+static BlockDriverAIOCB *qemu_gluster_aio_writev(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    return qemu_gluster_aio_rw(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
+}
+
+static int64_t qemu_gluster_getlength(BlockDriverState *bs)
+{
+    BDRVGlusterState *s = bs->opaque;
+    gluster_file_t fd = s->fd;
+    struct stat st;
+    int ret;
+
+    ret = gluster_fstat(fd, &st);
+    if (ret < 0) {
+        return -1;
+    } else {
+        return st.st_size;
+    }
+}
+
+static void qemu_gluster_close(BlockDriverState *bs)
+{
+    BDRVGlusterState *s = bs->opaque;
+
+    if (s->fd) {
+        gluster_close(s->fd);
+        s->fd = NULL;
+    }
+}
+
+static QEMUOptionParameter qemu_gluster_create_options[] = {
+    {
+        .name = BLOCK_OPT_SIZE,
+        .type = OPT_SIZE,
+        .help = "Virtual disk size"
+    },
+    { NULL }
+};
+
+static BlockDriver bdrv_gluster = {
+    .format_name = "gluster",
+    .protocol_name = "gluster",
+    .instance_size = sizeof(BDRVGlusterState),
+    .bdrv_file_open = qemu_gluster_open,
+    .bdrv_close = qemu_gluster_close,
+    .bdrv_create = qemu_gluster_create,
+    .bdrv_getlength = qemu_gluster_getlength,
+
+    .bdrv_aio_readv = qemu_gluster_aio_readv,
+    .bdrv_aio_writev = qemu_gluster_aio_writev,
+
+    .create_options = qemu_gluster_create_options,
+};
+
+static void bdrv_gluster_init(void)
+{
+    bdrv_register(&bdrv_gluster);
+}
+
+block_init(bdrv_gluster_init);