Patchwork ceph/rbd block driver for qemu-kvm (v4)

login
register
mail settings
Submitter Christian Brunner
Date Aug. 3, 2010, 8:14 p.m.
Message ID <20100803201407.GD1475@chb-desktop>
Download mbox | patch
Permalink /patch/60792/
State New
Headers show

Comments

Christian Brunner - Aug. 3, 2010, 8:14 p.m.
On Tue, Aug 03, 2010 at 12:37:18AM +0400, malc wrote:
> 
> Thare are whitespace issues in this patch.

Thanks for looking at the patch. Here is an updated patch, that 
should fix the whitespace issues:

This is a block driver for the distributed file system Ceph
(http://ceph.newdream.net/). This driver uses librados (which
is part of the Ceph server) for direct access to the Ceph object
store and is running entirely in userspace.

It now has (read only) snapshot support and passes all relevant
qemu-iotests.

To compile the driver you need at least ceph 0.21.

Additional information is available on the Ceph-Wiki:

http://ceph.newdream.net/wiki/Kvm-rbd

The patch is based on git://repo.or.cz/qemu/kevin.git block

Signed-off-by: Christian Brunner <chb@muc.de>

---
 Makefile.objs     |    1 +
 block/rbd.c       |  907 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 block/rbd_types.h |   71 +++++
 configure         |   31 ++
 4 files changed, 1010 insertions(+), 0 deletions(-)
 create mode 100644 block/rbd.c
 create mode 100644 block/rbd_types.h
Yehuda Sadeh Weinraub - Sept. 23, 2010, 2:21 a.m.
Following up on this one, I'd like to know whether there is any
pending issue preventing rbd from being included upstream.

Thanks,
Yehuda

On Tue, Aug 3, 2010 at 1:14 PM, Christian Brunner <chb@muc.de> wrote:
> On Tue, Aug 03, 2010 at 12:37:18AM +0400, malc wrote:
>>
>> Thare are whitespace issues in this patch.
>
> Thanks for looking at the patch. Here is an updated patch, that
> should fix the whitespace issues:
>
> This is a block driver for the distributed file system Ceph
> (http://ceph.newdream.net/). This driver uses librados (which
> is part of the Ceph server) for direct access to the Ceph object
> store and is running entirely in userspace.
>
> It now has (read only) snapshot support and passes all relevant
> qemu-iotests.
>
> To compile the driver you need at least ceph 0.21.
>
> Additional information is available on the Ceph-Wiki:
>
> http://ceph.newdream.net/wiki/Kvm-rbd
>
> The patch is based on git://repo.or.cz/qemu/kevin.git block
>
> Signed-off-by: Christian Brunner <chb@muc.de>
>
> ---
>  Makefile.objs     |    1 +
>  block/rbd.c       |  907 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>  block/rbd_types.h |   71 +++++
>  configure         |   31 ++
>  4 files changed, 1010 insertions(+), 0 deletions(-)
>  create mode 100644 block/rbd.c
>  create mode 100644 block/rbd_types.h
>
> diff --git a/Makefile.objs b/Makefile.objs
> index 4a1eaa1..bf45142 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -18,6 +18,7 @@ block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o
>  block-nested-$(CONFIG_WIN32) += raw-win32.o
>  block-nested-$(CONFIG_POSIX) += raw-posix.o
>  block-nested-$(CONFIG_CURL) += curl.o
> +block-nested-$(CONFIG_RBD) += rbd.o
>
>  block-obj-y +=  $(addprefix block/, $(block-nested-y))
>
> diff --git a/block/rbd.c b/block/rbd.c
> new file mode 100644
> index 0000000..0e6b2a5
> --- /dev/null
> +++ b/block/rbd.c
> @@ -0,0 +1,907 @@
> +/*
> + * QEMU Block driver for RADOS (Ceph)
> + *
> + * Copyright (C) 2010 Christian Brunner <chb@muc.de>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2.  See
> + * the COPYING file in the top-level directory.
> + *
> + */
> +
> +#include "qemu-common.h"
> +#include "qemu-error.h"
> +#include <sys/types.h>
> +#include <stdbool.h>
> +
> +#include <qemu-common.h>
> +
> +#include "rbd_types.h"
> +#include "module.h"
> +#include "block_int.h"
> +
> +#include <stdio.h>
> +#include <stdlib.h>
> +#include <rados/librados.h>
> +
> +#include <signal.h>
> +
> +
> +int eventfd(unsigned int initval, int flags);
> +
> +
> +/*
> + * When specifying the image filename use:
> + *
> + * rbd:poolname/devicename
> + *
> + * poolname must be the name of an existing rados pool
> + *
> + * devicename is the basename for all objects used to
> + * emulate the raw device.
> + *
> + * Metadata information (image size, ...) is stored in an
> + * object with the name "devicename.rbd".
> + *
> + * The raw device is split into 4MB sized objects by default.
> + * The sequencenumber is encoded in a 12 byte long hex-string,
> + * and is attached to the devicename, separated by a dot.
> + * e.g. "devicename.1234567890ab"
> + *
> + */
> +
> +#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
> +
> +typedef struct RBDAIOCB {
> +    BlockDriverAIOCB common;
> +    QEMUBH *bh;
> +    int ret;
> +    QEMUIOVector *qiov;
> +    char *bounce;
> +    int write;
> +    int64_t sector_num;
> +    int aiocnt;
> +    int error;
> +    struct BDRVRBDState *s;
> +} RBDAIOCB;
> +
> +typedef struct RADOSCB {
> +    int rcbid;
> +    RBDAIOCB *acb;
> +    int done;
> +    int64_t segsize;
> +    char *buf;
> +} RADOSCB;
> +
> +typedef struct BDRVRBDState {
> +    int efd;
> +    rados_pool_t pool;
> +    rados_pool_t header_pool;
> +    char name[RBD_MAX_OBJ_NAME_SIZE];
> +    char block_name[RBD_MAX_BLOCK_NAME_SIZE];
> +    uint64_t size;
> +    uint64_t objsize;
> +    int qemu_aio_count;
> +    int read_only;
> +} BDRVRBDState;
> +
> +typedef struct rbd_obj_header_ondisk RbdHeader1;
> +
> +static int rbd_parsename(const char *filename, char *pool, char **snap,
> +                         char *name)
> +{
> +    const char *rbdname;
> +    char *p;
> +    int l;
> +
> +    if (!strstart(filename, "rbd:", &rbdname)) {
> +        return -EINVAL;
> +    }
> +
> +    pstrcpy(pool, 2 * RBD_MAX_SEG_NAME_SIZE, rbdname);
> +    p = strchr(pool, '/');
> +    if (p == NULL) {
> +        return -EINVAL;
> +    }
> +
> +    *p = '\0';
> +
> +    l = strlen(pool);
> +    if(l >= RBD_MAX_SEG_NAME_SIZE) {
> +        error_report("pool name to long");
> +        return -EINVAL;
> +    } else if (l <= 0) {
> +        error_report("pool name to short");
> +        return -EINVAL;
> +    }
> +
> +    l = strlen(++p);
> +    if (l >= RBD_MAX_OBJ_NAME_SIZE) {
> +        error_report("object name to long");
> +        return -EINVAL;
> +    } else if (l <= 0) {
> +        error_report("object name to short");
> +        return -EINVAL;
> +    }
> +
> +    strcpy(name, p);
> +
> +    *snap = strchr(name, '@');
> +    if (*snap) {
> +        *(*snap) = '\0';
> +        (*snap)++;
> +        if (!*snap) *snap = NULL;
> +    }
> +
> +    return l;
> +}
> +
> +static int create_tmap_op(uint8_t op, const char *name, char **tmap_desc)
> +{
> +    uint32_t len = strlen(name);
> +    /* total_len = encoding op + name + empty buffer */
> +    uint32_t total_len = 1 + (sizeof(uint32_t) + len) + sizeof(uint32_t);
> +    char *desc = NULL;
> +
> +    desc = qemu_malloc(total_len);
> +
> +    *tmap_desc = desc;
> +
> +    *desc = op;
> +    desc++;
> +    memcpy(desc, &len, sizeof(len));
> +    desc += sizeof(len);
> +    memcpy(desc, name, len);
> +    desc += len;
> +    len = 0;
> +    memcpy(desc, &len, sizeof(len));
> +    desc += sizeof(len);
> +
> +    return desc - *tmap_desc;
> +}
> +
> +static void free_tmap_op(char *tmap_desc)
> +{
> +    qemu_free(tmap_desc);
> +}
> +
> +static int rbd_register_image(rados_pool_t pool, const char *name)
> +{
> +    char *tmap_desc;
> +    const char *dir = RBD_DIRECTORY;
> +    int ret;
> +
> +    ret = create_tmap_op(CEPH_OSD_TMAP_SET, name, &tmap_desc);
> +    if (ret < 0) {
> +        return ret;
> +    }
> +
> +    ret = rados_tmap_update(pool, dir, tmap_desc, ret);
> +    free_tmap_op(tmap_desc);
> +
> +    return ret;
> +}
> +
> +static int touch_rbd_info(rados_pool_t pool, const char *info_oid)
> +{
> +    int r = rados_write(pool, info_oid, 0, NULL, 0);
> +    if (r < 0) {
> +        return r;
> +    }
> +    return 0;
> +}
> +
> +static int rbd_assign_bid(rados_pool_t pool, uint64_t *id)
> +{
> +    uint64_t out[1];
> +    const char *info_oid = RBD_INFO;
> +
> +    *id = 0;
> +
> +    int r = touch_rbd_info(pool, info_oid);
> +    if (r < 0) {
> +        return r;
> +    }
> +
> +    r = rados_exec(pool, info_oid, "rbd", "assign_bid", NULL,
> +                   0, (char *)out, sizeof(out));
> +    if (r < 0) {
> +        return r;
> +    }
> +
> +    *id = out[0];
> +    le64_to_cpus(out);
> +
> +    return 0;
> +}
> +
> +static int rbd_create(const char *filename, QEMUOptionParameter *options)
> +{
> +    int64_t bytes = 0;
> +    int64_t objsize;
> +    uint64_t size;
> +    time_t mtime;
> +    uint8_t obj_order = RBD_DEFAULT_OBJ_ORDER;
> +    char pool[RBD_MAX_SEG_NAME_SIZE];
> +    char n[RBD_MAX_SEG_NAME_SIZE];
> +    char name[RBD_MAX_SEG_NAME_SIZE];
> +    char *snap;
> +    RbdHeader1 header;
> +    rados_pool_t p;
> +    uint64_t bid;
> +    uint32_t hi, lo;
> +    int ret;
> +
> +    if (rbd_parsename(filename, pool, &snap, name) < 0) {
> +        return -EINVAL;
> +    }
> +
> +    snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s%s", name, RBD_SUFFIX);
> +
> +    /* Read out options */
> +    while (options && options->name) {
> +        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
> +            bytes = options->value.n;
> +        } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) {
> +            if (options->value.n) {
> +                objsize = options->value.n;
> +                if ((objsize - 1) & objsize) {    /* not a power of 2? */
> +                    error_report("obj size needs to be power of 2");
> +                    return -EINVAL;
> +                }
> +                if (objsize < 4096) {
> +                    error_report("obj size too small");
> +                    return -EINVAL;
> +                }
> +
> +                for (obj_order = 0; obj_order < 64; obj_order++) {
> +                    if (objsize == 1) {
> +                        break;
> +                    }
> +                    objsize >>= 1;
> +                }
> +            }
> +        }
> +        options++;
> +    }
> +
> +    memset(&header, 0, sizeof(header));
> +    pstrcpy(header.text, sizeof(header.text), RBD_HEADER_TEXT);
> +    pstrcpy(header.signature, sizeof(header.signature), RBD_HEADER_SIGNATURE);
> +    pstrcpy(header.version, sizeof(header.version), RBD_HEADER_VERSION);
> +    header.image_size = bytes;
> +    cpu_to_le64s((uint64_t *) & header.image_size);
> +    header.options.order = obj_order;
> +    header.options.crypt_type = RBD_CRYPT_NONE;
> +    header.options.comp_type = RBD_COMP_NONE;
> +    header.snap_seq = 0;
> +    header.snap_count = 0;
> +    cpu_to_le32s(&header.snap_count);
> +
> +    if (rados_initialize(0, NULL) < 0) {
> +        error_report("error initializing");
> +        return -EIO;
> +    }
> +
> +    if (rados_open_pool(pool, &p)) {
> +        error_report("error opening pool %s", pool);
> +        rados_deinitialize();
> +        return -EIO;
> +    }
> +
> +    /* check for existing rbd header file */
> +    ret = rados_stat(p, n, &size, &mtime);
> +    if (ret == 0) {
> +        ret=-EEXIST;
> +        goto done;
> +    }
> +
> +    ret = rbd_assign_bid(p, &bid);
> +    if (ret < 0) {
> +        error_report("failed assigning block id");
> +        rados_deinitialize();
> +        return -EIO;
> +    }
> +    hi = bid >> 32;
> +    lo = bid & 0xFFFFFFFF;
> +    snprintf(header.block_name, sizeof(header.block_name), "rb.%x.%x", hi, lo);
> +
> +    /* create header file */
> +    ret = rados_write(p, n, 0, (const char *)&header, sizeof(header));
> +    if (ret < 0) {
> +        goto done;
> +    }
> +
> +    ret = rbd_register_image(p, name);
> +done:
> +    rados_close_pool(p);
> +    rados_deinitialize();
> +
> +    return ret;
> +}
> +
> +static void rbd_aio_completion_cb(void *opaque)
> +{
> +    BDRVRBDState *s = opaque;
> +
> +    uint64_t val;
> +    ssize_t ret;
> +
> +    do {
> +        if ((ret = read(s->efd, &val, sizeof(val))) > 0) {
> +            s->qemu_aio_count -= val;
> +        }
> +    } while (ret < 0 && errno == EINTR);
> +
> +    return;
> +}
> +
> +static int rbd_aio_flush_cb(void *opaque)
> +{
> +    BDRVRBDState *s = opaque;
> +
> +    return (s->qemu_aio_count > 0);
> +}
> +
> +
> +static int rbd_set_snapc(rados_pool_t pool, const char *snap, RbdHeader1 *header)
> +{
> +    uint32_t snap_count = header->snap_count;
> +    rados_snap_t *snaps = NULL;
> +    rados_snap_t seq;
> +    uint32_t i;
> +    uint64_t snap_names_len = header->snap_names_len;
> +    int r;
> +    rados_snap_t snapid = 0;
> +
> +    cpu_to_le32s(&snap_count);
> +    cpu_to_le64s(&snap_names_len);
> +    if (snap_count) {
> +        const char *header_snap = (const char *)&header->snaps[snap_count];
> +        const char *end = header_snap + snap_names_len;
> +        snaps = qemu_malloc(sizeof(rados_snap_t) * header->snap_count);
> +
> +        for (i=0; i < snap_count; i++) {
> +            snaps[i] = (uint64_t)header->snaps[i].id;
> +            cpu_to_le64s(&snaps[i]);
> +
> +            if (snap && strcmp(snap, header_snap) == 0) {
> +                snapid = snaps[i];
> +            }
> +
> +            header_snap += strlen(header_snap) + 1;
> +            if (header_snap > end)
> +                error_report("bad header, snapshot list broken");
> +        }
> +    }
> +
> +    if (snap && !snapid) {
> +        error_report("snapshot not found");
> +        return -ENOENT;
> +    }
> +    seq = header->snap_seq;
> +    cpu_to_le32s((uint32_t *)&seq);
> +
> +    r = rados_set_snap_context(pool, seq, snaps, snap_count);
> +
> +    rados_set_snap(pool, snapid);
> +
> +    qemu_free(snaps);
> +
> +    return r;
> +}
> +
> +#define BUF_READ_START_LEN    4096
> +
> +static int rbd_read_header(BDRVRBDState *s, char **hbuf)
> +{
> +    char *buf = NULL;
> +    char n[RBD_MAX_SEG_NAME_SIZE];
> +    uint64_t len = BUF_READ_START_LEN;
> +    int r;
> +
> +    snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s%s", s->name, RBD_SUFFIX);
> +
> +    buf = qemu_malloc(len);
> +
> +    r = rados_read(s->header_pool, n, 0, buf, len);
> +    if (r < 0)
> +        goto failed;
> +
> +    if (r < len)
> +        goto done;
> +
> +    qemu_free(buf);
> +    buf = qemu_malloc(len);
> +
> +    r = rados_stat(s->header_pool, n, &len, NULL);
> +    if (r < 0)
> +        goto failed;
> +
> +    r = rados_read(s->header_pool, n, 0, buf, len);
> +    if (r < 0)
> +        goto failed;
> +
> +done:
> +    *hbuf = buf;
> +    return 0;
> +
> +failed:
> +    qemu_free(buf);
> +    return r;
> +}
> +
> +static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
> +{
> +    BDRVRBDState *s = bs->opaque;
> +    char pool[RBD_MAX_SEG_NAME_SIZE];
> +    char *snap;
> +    char *hbuf = NULL;
> +    int r;
> +
> +    if (rbd_parsename(filename, pool, &snap, s->name) < 0) {
> +        return -EINVAL;
> +    }
> +
> +    if ((r = rados_initialize(0, NULL)) < 0) {
> +        error_report("error initializing");
> +        return r;
> +    }
> +
> +    if ((r = rados_open_pool(pool, &s->pool))) {
> +        error_report("error opening pool %s", pool);
> +        rados_deinitialize();
> +        return r;
> +    }
> +
> +    if ((r = rados_open_pool(pool, &s->header_pool))) {
> +        error_report("error opening pool %s", pool);
> +        rados_deinitialize();
> +        return r;
> +    }
> +
> +   if ((r = rbd_read_header(s, &hbuf)) < 0) {
> +        error_report("error reading header from %s", s->name);
> +        goto failed;
> +    }
> +
> +    if (strncmp(hbuf + 64, RBD_HEADER_SIGNATURE, 4)) {
> +        error_report("Invalid header signature %s", hbuf + 64);
> +        r = -EMEDIUMTYPE;
> +        goto failed;
> +    }
> +
> +    if (strncmp(hbuf + 68, RBD_HEADER_VERSION, 8)) {
> +        error_report("Unknown image version %s", hbuf + 68);
> +        r = -EMEDIUMTYPE;
> +        goto failed;
> +    }
> +
> +    RbdHeader1 *header;
> +
> +    header = (RbdHeader1 *) hbuf;
> +    le64_to_cpus((uint64_t *) & header->image_size);
> +    s->size = header->image_size;
> +    s->objsize = 1 << header->options.order;
> +    memcpy(s->block_name, header->block_name, sizeof(header->block_name));
> +
> +    r = rbd_set_snapc(s->pool, snap, header);
> +    if (r < 0) {
> +        error_report("failed setting snap context: %s", strerror(-r));
> +        goto failed;
> +    }
> +
> +    s->read_only = (snap != NULL);
> +
> +    s->efd = eventfd(0, 0);
> +    if (s->efd < 0) {
> +        error_report("error opening eventfd");
> +        goto failed;
> +    }
> +    fcntl(s->efd, F_SETFL, O_NONBLOCK);
> +    qemu_aio_set_fd_handler(s->efd, rbd_aio_completion_cb, NULL,
> +        rbd_aio_flush_cb, NULL, s);
> +
> +    qemu_free(hbuf);
> +
> +    return 0;
> +
> +failed:
> +    if (hbuf)
> +        qemu_free(hbuf);
> +
> +    rados_close_pool(s->header_pool);
> +    rados_close_pool(s->pool);
> +    rados_deinitialize();
> +    return r;
> +}
> +
> +static void rbd_close(BlockDriverState *bs)
> +{
> +    BDRVRBDState *s = bs->opaque;
> +
> +    close(s->efd);
> +    qemu_aio_set_fd_handler(s->efd, NULL , NULL, NULL, NULL, NULL);
> +
> +    rados_close_pool(s->header_pool);
> +    rados_close_pool(s->pool);
> +    rados_deinitialize();
> +}
> +
> +static int rbd_rw(BlockDriverState *bs, int64_t sector_num,
> +                  uint8_t *buf, int nb_sectors, int write)
> +{
> +    BDRVRBDState *s = bs->opaque;
> +    char n[RBD_MAX_SEG_NAME_SIZE];
> +
> +    int64_t segnr, segoffs, segsize, r;
> +    int64_t off, size;
> +
> +    off = sector_num * BDRV_SECTOR_SIZE;
> +    size = nb_sectors * BDRV_SECTOR_SIZE;
> +    segnr = off / s->objsize;
> +    segoffs = off % s->objsize;
> +    segsize = s->objsize - segoffs;
> +
> +    while (size > 0) {
> +        if (size < segsize) {
> +            segsize = size;
> +        }
> +
> +        snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s.%012" PRIx64, s->block_name, segnr);
> +
> +        if (write) {
> +            if (s->read_only)
> +                return -EROFS;
> +            if ((r = rados_write(s->pool, n, segoffs, (const char *)buf,
> +                segsize)) < 0) {
> +                return r;
> +            }
> +        } else {
> +            r = rados_read(s->pool, n, segoffs, (char *)buf, segsize);
> +            if (r == -ENOENT) {
> +                memset(buf, 0, segsize);
> +            } else if (r < 0) {
> +                return r;
> +            } else if (r < segsize) {
> +                memset(buf + r, 0, segsize - r);
> +            }
> +        }
> +
> +        buf += segsize;
> +        size -= segsize;
> +        segoffs = 0;
> +        segsize = s->objsize;
> +        segnr++;
> +    }
> +
> +    return 0;
> +}
> +
> +static int rbd_read(BlockDriverState *bs, int64_t sector_num,
> +                    uint8_t *buf, int nb_sectors)
> +{
> +    return rbd_rw(bs, sector_num, buf, nb_sectors, 0);
> +}
> +
> +static int rbd_write(BlockDriverState *bs, int64_t sector_num,
> +                     const uint8_t *buf, int nb_sectors)
> +{
> +    return rbd_rw(bs, sector_num, (uint8_t *) buf, nb_sectors, 1);
> +}
> +
> +static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
> +{
> +    RBDAIOCB *acb = (RBDAIOCB *) blockacb;
> +    qemu_bh_delete(acb->bh);
> +    acb->bh = NULL;
> +    qemu_aio_release(acb);
> +}
> +
> +static AIOPool rbd_aio_pool = {
> +    .aiocb_size = sizeof(RBDAIOCB),
> +    .cancel = rbd_aio_cancel,
> +};
> +
> +/* This is the callback function for rados_aio_read and _write */
> +
> +static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
> +{
> +    RBDAIOCB *acb = rcb->acb;
> +    int64_t r;
> +    uint64_t buf = 1;
> +    int i;
> +
> +    acb->aiocnt--;
> +    r = rados_aio_get_return_value(c);
> +    rados_aio_release(c);
> +    if (acb->write) {
> +        if (r < 0) {
> +            acb->ret = r;
> +            acb->error = 1;
> +        } else if (!acb->error) {
> +            acb->ret += rcb->segsize;
> +        }
> +    } else {
> +        if (r == -ENOENT) {
> +            memset(rcb->buf, 0, rcb->segsize);
> +            if (!acb->error) {
> +                acb->ret += rcb->segsize;
> +            }
> +        } else if (r < 0) {
> +            acb->ret = r;
> +            acb->error = 1;
> +        } else if (r < rcb->segsize) {
> +            memset(rcb->buf + r, 0, rcb->segsize - r);
> +            if (!acb->error) {
> +                acb->ret += rcb->segsize;
> +            }
> +        } else if (!acb->error) {
> +            acb->ret += r;
> +        }
> +    }
> +    if (write(acb->s->efd, &buf, sizeof(buf)) < 0)
> +        error_report("failed writing to acb->s->efd\n");
> +    qemu_free(rcb);
> +    i = 0;
> +    if (!acb->aiocnt && acb->bh) {
> +        qemu_bh_schedule(acb->bh);
> +    }
> +}
> +
> +/* Callback when all queued rados_aio requests are complete */
> +
> +static void rbd_aio_bh_cb(void *opaque)
> +{
> +    RBDAIOCB *acb = opaque;
> +    uint64_t buf = 1;
> +
> +    if (!acb->write) {
> +        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
> +    }
> +    qemu_vfree(acb->bounce);
> +    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
> +    qemu_bh_delete(acb->bh);
> +    acb->bh = NULL;
> +
> +    if (write(acb->s->efd, &buf, sizeof(buf)) < 0)
> +        error_report("failed writing to acb->s->efd\n");
> +    qemu_aio_release(acb);
> +}
> +
> +static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
> +                                           int64_t sector_num,
> +                                           QEMUIOVector *qiov,
> +                                           int nb_sectors,
> +                                           BlockDriverCompletionFunc *cb,
> +                                           void *opaque, int write)
> +{
> +    RBDAIOCB *acb;
> +    RADOSCB *rcb;
> +    rados_completion_t c;
> +    char n[RBD_MAX_SEG_NAME_SIZE];
> +    int64_t segnr, segoffs, segsize, last_segnr;
> +    int64_t off, size;
> +    char *buf;
> +
> +    BDRVRBDState *s = bs->opaque;
> +
> +    acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
> +    acb->write = write;
> +    acb->qiov = qiov;
> +    acb->bounce = qemu_blockalign(bs, qiov->size);
> +    acb->aiocnt = 0;
> +    acb->ret = 0;
> +    acb->error = 0;
> +    acb->s = s;
> +
> +    if (!acb->bh) {
> +        acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
> +    }
> +
> +    if (write) {
> +        qemu_iovec_to_buffer(acb->qiov, acb->bounce);
> +    }
> +
> +    buf = acb->bounce;
> +
> +    off = sector_num * BDRV_SECTOR_SIZE;
> +    size = nb_sectors * BDRV_SECTOR_SIZE;
> +    segnr = off / s->objsize;
> +    segoffs = off % s->objsize;
> +    segsize = s->objsize - segoffs;
> +
> +    last_segnr = ((off + size - 1) / s->objsize);
> +    acb->aiocnt = (last_segnr - segnr) + 1;
> +
> +    s->qemu_aio_count+=acb->aiocnt + 1; /* All the RADOSCB and the related RBDAIOCB */
> +
> +    if (write && s->read_only) {
> +        acb->ret = -EROFS;
> +        return NULL;
> +    }
> +
> +    while (size > 0) {
> +        if (size < segsize) {
> +            segsize = size;
> +        }
> +
> +        snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s.%012" PRIx64, s->block_name,
> +                 segnr);
> +
> +        rcb = qemu_malloc(sizeof(RADOSCB));
> +        rcb->done = 0;
> +        rcb->acb = acb;
> +        rcb->segsize = segsize;
> +        rcb->buf = buf;
> +
> +        if (write) {
> +            rados_aio_create_completion(rcb, NULL,
> +                                        (rados_callback_t) rbd_finish_aiocb,
> +                                        &c);
> +            rados_aio_write(s->pool, n, segoffs, buf, segsize, c);
> +        } else {
> +            rados_aio_create_completion(rcb,
> +                                        (rados_callback_t) rbd_finish_aiocb,
> +                                        NULL, &c);
> +            rados_aio_read(s->pool, n, segoffs, buf, segsize, c);
> +        }
> +
> +        buf += segsize;
> +        size -= segsize;
> +        segoffs = 0;
> +        segsize = s->objsize;
> +        segnr++;
> +    }
> +
> +    return &acb->common;
> +}
> +
> +static BlockDriverAIOCB *rbd_aio_readv(BlockDriverState * bs,
> +                                       int64_t sector_num, QEMUIOVector * qiov,
> +                                       int nb_sectors,
> +                                       BlockDriverCompletionFunc * cb,
> +                                       void *opaque)
> +{
> +    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
> +}
> +
> +static BlockDriverAIOCB *rbd_aio_writev(BlockDriverState * bs,
> +                                        int64_t sector_num, QEMUIOVector * qiov,
> +                                        int nb_sectors,
> +                                        BlockDriverCompletionFunc * cb,
> +                                        void *opaque)
> +{
> +    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
> +}
> +
> +static int rbd_getinfo(BlockDriverState * bs, BlockDriverInfo * bdi)
> +{
> +    BDRVRBDState *s = bs->opaque;
> +    bdi->cluster_size = s->objsize;
> +    return 0;
> +}
> +
> +static int64_t rbd_getlength(BlockDriverState * bs)
> +{
> +    BDRVRBDState *s = bs->opaque;
> +
> +    return s->size;
> +}
> +
> +static int rbd_snap_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
> +{
> +    BDRVRBDState *s = bs->opaque;
> +    char inbuf[512], outbuf[128];
> +    uint64_t snap_id;
> +    int r;
> +    char *p = inbuf;
> +    char *end = inbuf + sizeof(inbuf);
> +    char n[RBD_MAX_SEG_NAME_SIZE];
> +    char *hbuf = NULL;
> +
> +    if (sn_info->name[0] == '\0')
> +        return -EINVAL; /* we need a name for rbd snapshots */
> +
> +    /*
> +     * rbd snapshots are using the name as the user controlled unique identifier
> +     * we can't use the rbd snapid for that purpose, as it can't be set
> +     */
> +    if (sn_info->id_str[0] != '\0' &&
> +        strcmp(sn_info->id_str, sn_info->name) != 0)
> +        return -EINVAL;
> +
> +    if (strlen(sn_info->name) >= sizeof(sn_info->id_str))
> +        return -ERANGE;
> +
> +    r = rados_selfmanaged_snap_create(s->header_pool, &snap_id);
> +    if (r < 0) {
> +        error_report("failed to create snap id: %s", strerror(-r));
> +        return r;
> +    }
> +
> +    *(uint32_t *)p = strlen(sn_info->name);
> +    cpu_to_le32s((uint32_t *)p);
> +    p += sizeof(uint32_t);
> +    strncpy(p, sn_info->name, end - p);
> +    p += strlen(p);
> +    if (p + sizeof(snap_id) > end) {
> +        error_report("invalid input parameter");
> +        return -EINVAL;
> +    }
> +
> +    *(uint64_t *)p = snap_id;
> +    cpu_to_le64s((uint64_t *)p);
> +
> +    snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s%s", s->name, RBD_SUFFIX);
> +
> +    r = rados_exec(s->header_pool, n, "rbd", "snap_add", inbuf,
> +                   sizeof(inbuf), outbuf, sizeof(outbuf));
> +    if (r < 0) {
> +        error_report("rbd.snap_add execution failed failed: %s", strerror(-r));
> +        return r;
> +    }
> +
> +    sprintf(sn_info->id_str, "%s", sn_info->name);
> +
> +    r = rbd_read_header(s, &hbuf);
> +    if (r < 0) {
> +        error_report("failed reading header: %s", strerror(-r));
> +        return r;
> +    }
> +
> +    RbdHeader1 *header;
> +
> +    header = (RbdHeader1 *) hbuf;
> +    r = rbd_set_snapc(s->pool, sn_info->name, header);
> +    if (r < 0) {
> +        error_report("failed setting snap context: %s", strerror(-r));
> +        goto failed;
> +    }
> +
> +    return 0;
> +
> +failed:
> +    if (hbuf)
> +        qemu_free(header);
> +    return r;
> +}
> +
> +static QEMUOptionParameter rbd_create_options[] = {
> +    {
> +     .name = BLOCK_OPT_SIZE,
> +     .type = OPT_SIZE,
> +     .help = "Virtual disk size"
> +    },
> +    {
> +     .name = BLOCK_OPT_CLUSTER_SIZE,
> +     .type = OPT_SIZE,
> +     .help = "RBD object size"
> +    },
> +    {NULL}
> +};
> +
> +static BlockDriver bdrv_rbd = {
> +    .format_name        = "rbd",
> +    .instance_size      = sizeof(BDRVRBDState),
> +    .bdrv_file_open     = rbd_open,
> +    .bdrv_read          = rbd_read,
> +    .bdrv_write         = rbd_write,
> +    .bdrv_close         = rbd_close,
> +    .bdrv_create        = rbd_create,
> +    .bdrv_get_info      = rbd_getinfo,
> +    .create_options     = rbd_create_options,
> +    .bdrv_getlength     = rbd_getlength,
> +    .protocol_name      = "rbd",
> +
> +    .bdrv_aio_readv     = rbd_aio_readv,
> +    .bdrv_aio_writev    = rbd_aio_writev,
> +
> +    .bdrv_snapshot_create = rbd_snap_create,
> +};
> +
> +static void bdrv_rbd_init(void)
> +{
> +    bdrv_register(&bdrv_rbd);
> +}
> +
> +block_init(bdrv_rbd_init);
> diff --git a/block/rbd_types.h b/block/rbd_types.h
> new file mode 100644
> index 0000000..c35d840
> --- /dev/null
> +++ b/block/rbd_types.h
> @@ -0,0 +1,71 @@
> +/*
> + * Ceph - scalable distributed file system
> + *
> + * Copyright (C) 2004-2010 Sage Weil <sage@newdream.net>
> + *
> + * This is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License version 2.1, as published by the Free Software
> + * Foundation.  See file COPYING.
> + *
> + */
> +
> +#ifndef CEPH_RBD_TYPES_H
> +#define CEPH_RBD_TYPES_H
> +
> +
> +/*
> + * rbd image 'foo' consists of objects
> + *   foo.rbd      - image metadata
> + *   foo.00000000
> + *   foo.00000001
> + *   ...          - data
> + */
> +
> +#define RBD_SUFFIX              ".rbd"
> +#define RBD_DIRECTORY           "rbd_directory"
> +#define RBD_INFO                "rbd_info"
> +
> +#define RBD_DEFAULT_OBJ_ORDER   22   /* 4MB */
> +
> +#define RBD_MAX_OBJ_NAME_SIZE   96
> +#define RBD_MAX_BLOCK_NAME_SIZE 24
> +#define RBD_MAX_SEG_NAME_SIZE   128
> +
> +#define RBD_COMP_NONE           0
> +#define RBD_CRYPT_NONE          0
> +
> +#define RBD_HEADER_TEXT         "<<< Rados Block Device Image >>>\n"
> +#define RBD_HEADER_SIGNATURE    "RBD"
> +#define RBD_HEADER_VERSION      "001.005"
> +
> +struct rbd_info {
> +    uint64_t max_id;
> +} __attribute__ ((packed));
> +
> +struct rbd_obj_snap_ondisk {
> +    uint64_t id;
> +    uint64_t image_size;
> +} __attribute__((packed));
> +
> +struct rbd_obj_header_ondisk {
> +    char text[40];
> +    char block_name[RBD_MAX_BLOCK_NAME_SIZE];
> +    char signature[4];
> +    char version[8];
> +    struct {
> +        uint8_t order;
> +        uint8_t crypt_type;
> +        uint8_t comp_type;
> +        uint8_t unused;
> +    } __attribute__((packed)) options;
> +    uint64_t image_size;
> +    uint64_t snap_seq;
> +    uint32_t snap_count;
> +    uint32_t reserved;
> +    uint64_t snap_names_len;
> +    struct rbd_obj_snap_ondisk snaps[0];
> +} __attribute__((packed));
> +
> +
> +#endif
> diff --git a/configure b/configure
> index a20371c..3fb7c90 100755
> --- a/configure
> +++ b/configure
> @@ -315,6 +315,7 @@ pkgversion=""
>  check_utests="no"
>  user_pie="no"
>  zero_malloc=""
> +rbd=""
>
>  # OS specific
>  if check_define __linux__ ; then
> @@ -709,6 +710,10 @@ for opt do
>   ;;
>   --*dir)
>   ;;
> +  --disable-rbd) rbd="no"
> +  ;;
> +  --enable-rbd) rbd="yes"
> +  ;;
>   *) echo "ERROR: unknown option $opt"; show_help="yes"
>   ;;
>   esac
> @@ -895,6 +900,7 @@ echo "  --enable-docs            enable documentation build"
>  echo "  --disable-docs           disable documentation build"
>  echo "  --disable-vhost-net      disable vhost-net acceleration support"
>  echo "  --enable-vhost-net       enable vhost-net acceleration support"
> +echo "  --enable-rbd             enable building the rados block device (rbd)"
>  echo ""
>  echo "NOTE: The object files are built at the place where configure is launched"
>  exit 1
> @@ -1701,6 +1707,27 @@ if test "$mingw32" != yes -a "$pthread" = no; then
>  fi
>
>  ##########################################
> +# rbd probe
> +if test "$rbd" != "no" ; then
> +  cat > $TMPC <<EOF
> +#include <stdio.h>
> +#include <rados/librados.h>
> +int main(void) { rados_initialize(0, NULL); return 0; }
> +EOF
> +  rbd_libs="-lrados -lcrypto"
> +  if compile_prog "" "$rbd_libs" ; then
> +    rbd=yes
> +    libs_tools="$rbd_libs $libs_tools"
> +    libs_softmmu="$rbd_libs $libs_softmmu"
> +  else
> +    if test "$rbd" = "yes" ; then
> +      feature_not_found "rados block device"
> +    fi
> +    rbd=no
> +  fi
> +fi
> +
> +##########################################
>  # linux-aio probe
>
>  if test "$linux_aio" != "no" ; then
> @@ -2187,6 +2214,7 @@ echo "preadv support    $preadv"
>  echo "fdatasync         $fdatasync"
>  echo "uuid support      $uuid"
>  echo "vhost-net support $vhost_net"
> +echo "rbd support       $rbd"
>
>  if test $sdl_too_old = "yes"; then
>  echo "-> Your SDL version is too old - please upgrade to have SDL support"
> @@ -2434,6 +2462,9 @@ echo "CONFIG_UNAME_RELEASE=\"$uname_release\"" >> $config_host_mak
>  if test "$zero_malloc" = "yes" ; then
>   echo "CONFIG_ZERO_MALLOC=y" >> $config_host_mak
>  fi
> +if test "$rbd" = "yes" ; then
> +  echo "CONFIG_RBD=y" >> $config_host_mak
> +fi
>
>  # USB host support
>  case "$usb" in
> --
> 1.7.0.4
>
> --
> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
>
Kevin Wolf - Oct. 7, 2010, 12:23 p.m.
Am 23.09.2010 04:21, schrieb Yehuda Sadeh Weinraub:
> Following up on this one, I'd like to know whether there is any
> pending issue preventing rbd from being included upstream.

Basically for me the only problem at the moment is a lack of reviews. If
nobody else picks it up, I'll get to it eventually myself, but I can't
promise if I get to it next week.

Unfortunately this is something that you as the authors can't do much
about, except maybe asking more people to give it a review.

Kevin
Anthony Liguori - Oct. 7, 2010, 2:12 p.m.
On 08/03/2010 03:14 PM, Christian Brunner wrote:
> On Tue, Aug 03, 2010 at 12:37:18AM +0400, malc wrote:
>    
>> Thare are whitespace issues in this patch.
>>      
> Thanks for looking at the patch. Here is an updated patch, that
> should fix the whitespace issues:
>
> This is a block driver for the distributed file system Ceph
> (http://ceph.newdream.net/). This driver uses librados (which
> is part of the Ceph server) for direct access to the Ceph object
> store and is running entirely in userspace.
>
> It now has (read only) snapshot support and passes all relevant
> qemu-iotests.
>
> To compile the driver you need at least ceph 0.21.
>
> Additional information is available on the Ceph-Wiki:
>
> http://ceph.newdream.net/wiki/Kvm-rbd
>
> The patch is based on git://repo.or.cz/qemu/kevin.git block
>
> Signed-off-by: Christian Brunner<chb@muc.de>
>
> ---
>   Makefile.objs     |    1 +
>   block/rbd.c       |  907 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>   block/rbd_types.h |   71 +++++
>   configure         |   31 ++
>   4 files changed, 1010 insertions(+), 0 deletions(-)
>   create mode 100644 block/rbd.c
>   create mode 100644 block/rbd_types.h
>
> diff --git a/Makefile.objs b/Makefile.objs
> index 4a1eaa1..bf45142 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -18,6 +18,7 @@ block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o
>   block-nested-$(CONFIG_WIN32) += raw-win32.o
>   block-nested-$(CONFIG_POSIX) += raw-posix.o
>   block-nested-$(CONFIG_CURL) += curl.o
> +block-nested-$(CONFIG_RBD) += rbd.o
>
>   block-obj-y +=  $(addprefix block/, $(block-nested-y))
>
> diff --git a/block/rbd.c b/block/rbd.c
> new file mode 100644
> index 0000000..0e6b2a5
> --- /dev/null
> +++ b/block/rbd.c
> @@ -0,0 +1,907 @@
> +/*
> + * QEMU Block driver for RADOS (Ceph)
> + *
> + * Copyright (C) 2010 Christian Brunner<chb@muc.de>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2.  See
> + * the COPYING file in the top-level directory.
> + *
> + */
> +
> +#include "qemu-common.h"
> +#include "qemu-error.h"
> +#include<sys/types.h>
> +#include<stdbool.h>
> +
> +#include<qemu-common.h>
>    

This looks to be unnecessary.  Generally, system includes shouldn't be 
required so all of these should go away except rado/librados.h

> +
> +#include "rbd_types.h"
> +#include "module.h"
> +#include "block_int.h"
> +
> +#include<stdio.h>
> +#include<stdlib.h>
> +#include<rados/librados.h>
> +
> +#include<signal.h>
> +
> +
> +int eventfd(unsigned int initval, int flags);
>    

This is not quite right.  Depending on eventfd is curious but in the 
very least, you need to detect the presence of eventfd in configure and 
provide a wrapper that redefines it as necessary.

> +
> +/*
> + * When specifying the image filename use:
> + *
> + * rbd:poolname/devicename
> + *
> + * poolname must be the name of an existing rados pool
> + *
> + * devicename is the basename for all objects used to
> + * emulate the raw device.
> + *
> + * Metadata information (image size, ...) is stored in an
> + * object with the name "devicename.rbd".
> + *
> + * The raw device is split into 4MB sized objects by default.
> + * The sequencenumber is encoded in a 12 byte long hex-string,
> + * and is attached to the devicename, separated by a dot.
> + * e.g. "devicename.1234567890ab"
> + *
> + */
> +
> +#define OBJ_MAX_SIZE (1UL<<  OBJ_DEFAULT_OBJ_ORDER)
> +
> +typedef struct RBDAIOCB {
> +    BlockDriverAIOCB common;
> +    QEMUBH *bh;
> +    int ret;
> +    QEMUIOVector *qiov;
> +    char *bounce;
> +    int write;
> +    int64_t sector_num;
> +    int aiocnt;
> +    int error;
> +    struct BDRVRBDState *s;
> +} RBDAIOCB;
> +
> +typedef struct RADOSCB {
> +    int rcbid;
> +    RBDAIOCB *acb;
> +    int done;
> +    int64_t segsize;
> +    char *buf;
> +} RADOSCB;
> +
> +typedef struct BDRVRBDState {
> +    int efd;
> +    rados_pool_t pool;
> +    rados_pool_t header_pool;
> +    char name[RBD_MAX_OBJ_NAME_SIZE];
> +    char block_name[RBD_MAX_BLOCK_NAME_SIZE];
> +    uint64_t size;
> +    uint64_t objsize;
> +    int qemu_aio_count;
> +    int read_only;
> +} BDRVRBDState;
> +
> +typedef struct rbd_obj_header_ondisk RbdHeader1;
> +
> +static int rbd_parsename(const char *filename, char *pool, char **snap,
> +                         char *name)
> +{
> +    const char *rbdname;
> +    char *p;
> +    int l;
> +
> +    if (!strstart(filename, "rbd:",&rbdname)) {
> +        return -EINVAL;
> +    }
> +
> +    pstrcpy(pool, 2 * RBD_MAX_SEG_NAME_SIZE, rbdname);
> +    p = strchr(pool, '/');
> +    if (p == NULL) {
> +        return -EINVAL;
> +    }
> +
> +    *p = '\0';
> +
> +    l = strlen(pool);
> +    if(l>= RBD_MAX_SEG_NAME_SIZE) {
> +        error_report("pool name to long");
> +        return -EINVAL;
> +    } else if (l<= 0) {
> +        error_report("pool name to short");
> +        return -EINVAL;
> +    }
> +
> +    l = strlen(++p);
> +    if (l>= RBD_MAX_OBJ_NAME_SIZE) {
> +        error_report("object name to long");
> +        return -EINVAL;
> +    } else if (l<= 0) {
> +        error_report("object name to short");
> +        return -EINVAL;
> +    }
> +
> +    strcpy(name, p);
> +
> +    *snap = strchr(name, '@');
> +    if (*snap) {
> +        *(*snap) = '\0';
> +        (*snap)++;
> +        if (!*snap) *snap = NULL;
> +    }
> +
> +    return l;
> +}
> +
> +static int create_tmap_op(uint8_t op, const char *name, char **tmap_desc)
> +{
> +    uint32_t len = strlen(name);
> +    /* total_len = encoding op + name + empty buffer */
> +    uint32_t total_len = 1 + (sizeof(uint32_t) + len) + sizeof(uint32_t);
> +    char *desc = NULL;
>    

char is the wrong type to use here as it may be signed or unsigned.  
That can have weird effects with binary data when you're directly 
manipulating it.

> +
> +    desc = qemu_malloc(total_len);
> +
> +    *tmap_desc = desc;
> +
> +    *desc = op;
> +    desc++;
> +    memcpy(desc,&len, sizeof(len));
> +    desc += sizeof(len);
> +    memcpy(desc, name, len);
> +    desc += len;
> +    len = 0;
> +    memcpy(desc,&len, sizeof(len));
> +    desc += sizeof(len);
>    

Shouldn't endianness be a concern?

> +
> +    return desc - *tmap_desc;
> +}
> +
> +static void free_tmap_op(char *tmap_desc)
> +{
> +    qemu_free(tmap_desc);
> +}
> +
> +static int rbd_register_image(rados_pool_t pool, const char *name)
> +{
> +    char *tmap_desc;
> +    const char *dir = RBD_DIRECTORY;
> +    int ret;
> +
> +    ret = create_tmap_op(CEPH_OSD_TMAP_SET, name,&tmap_desc);
> +    if (ret<  0) {
> +        return ret;
> +    }
> +
> +    ret = rados_tmap_update(pool, dir, tmap_desc, ret);
> +    free_tmap_op(tmap_desc);
> +
> +    return ret;
> +}
>    

This ops are all synchronous?  IOW, rados_tmap_update() call blocks 
until the operation is completed?

> +static int touch_rbd_info(rados_pool_t pool, const char *info_oid)
> +{
> +    int r = rados_write(pool, info_oid, 0, NULL, 0);
> +    if (r<  0) {
> +        return r;
> +    }
> +    return 0;
> +}
> +
> +static int rbd_assign_bid(rados_pool_t pool, uint64_t *id)
> +{
> +    uint64_t out[1];
> +    const char *info_oid = RBD_INFO;
> +
> +    *id = 0;
> +
> +    int r = touch_rbd_info(pool, info_oid);
> +    if (r<  0) {
> +        return r;
> +    }
> +
> +    r = rados_exec(pool, info_oid, "rbd", "assign_bid", NULL,
> +                   0, (char *)out, sizeof(out));
> +    if (r<  0) {
> +        return r;
> +    }
> +
> +    *id = out[0];
> +    le64_to_cpus(out);
> +
> +    return 0;
> +}
> +
> +static int rbd_create(const char *filename, QEMUOptionParameter *options)
> +{
> +    int64_t bytes = 0;
> +    int64_t objsize;
> +    uint64_t size;
> +    time_t mtime;
> +    uint8_t obj_order = RBD_DEFAULT_OBJ_ORDER;
> +    char pool[RBD_MAX_SEG_NAME_SIZE];
> +    char n[RBD_MAX_SEG_NAME_SIZE];
> +    char name[RBD_MAX_SEG_NAME_SIZE];
> +    char *snap;
> +    RbdHeader1 header;
> +    rados_pool_t p;
> +    uint64_t bid;
> +    uint32_t hi, lo;
> +    int ret;
> +
> +    if (rbd_parsename(filename, pool,&snap, name)<  0) {
> +        return -EINVAL;
> +    }
> +
> +    snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s%s", name, RBD_SUFFIX);
> +
> +    /* Read out options */
> +    while (options&&  options->name) {
> +        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
> +            bytes = options->value.n;
> +        } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) {
> +            if (options->value.n) {
> +                objsize = options->value.n;
> +                if ((objsize - 1)&  objsize) {    /* not a power of 2? */
> +                    error_report("obj size needs to be power of 2");
> +                    return -EINVAL;
> +                }
> +                if (objsize<  4096) {
> +                    error_report("obj size too small");
> +                    return -EINVAL;
> +                }
> +
> +                for (obj_order = 0; obj_order<  64; obj_order++) {
> +                    if (objsize == 1) {
> +                        break;
> +                    }
> +                    objsize>>= 1;
> +                }
> +            }
> +        }
> +        options++;
> +    }
> +
> +    memset(&header, 0, sizeof(header));
> +    pstrcpy(header.text, sizeof(header.text), RBD_HEADER_TEXT);
> +    pstrcpy(header.signature, sizeof(header.signature), RBD_HEADER_SIGNATURE);
> +    pstrcpy(header.version, sizeof(header.version), RBD_HEADER_VERSION);
> +    header.image_size = bytes;
> +    cpu_to_le64s((uint64_t *)&  header.image_size);
> +    header.options.order = obj_order;
> +    header.options.crypt_type = RBD_CRYPT_NONE;
> +    header.options.comp_type = RBD_COMP_NONE;
> +    header.snap_seq = 0;
> +    header.snap_count = 0;
> +    cpu_to_le32s(&header.snap_count);
> +
> +    if (rados_initialize(0, NULL)<  0) {
> +        error_report("error initializing");
> +        return -EIO;
> +    }
> +
> +    if (rados_open_pool(pool,&p)) {
> +        error_report("error opening pool %s", pool);
> +        rados_deinitialize();
> +        return -EIO;
> +    }
> +
> +    /* check for existing rbd header file */
> +    ret = rados_stat(p, n,&size,&mtime);
> +    if (ret == 0) {
> +        ret=-EEXIST;
> +        goto done;
> +    }
> +
> +    ret = rbd_assign_bid(p,&bid);
> +    if (ret<  0) {
> +        error_report("failed assigning block id");
> +        rados_deinitialize();
> +        return -EIO;
> +    }
> +    hi = bid>>  32;
> +    lo = bid&  0xFFFFFFFF;
> +    snprintf(header.block_name, sizeof(header.block_name), "rb.%x.%x", hi, lo);
> +
> +    /* create header file */
> +    ret = rados_write(p, n, 0, (const char *)&header, sizeof(header));
> +    if (ret<  0) {
> +        goto done;
> +    }
> +
> +    ret = rbd_register_image(p, name);
> +done:
> +    rados_close_pool(p);
> +    rados_deinitialize();
> +
> +    return ret;
> +}
> +
> +static void rbd_aio_completion_cb(void *opaque)
> +{
> +    BDRVRBDState *s = opaque;
> +
> +    uint64_t val;
> +    ssize_t ret;
> +
> +    do {
> +        if ((ret = read(s->efd,&val, sizeof(val)))>  0) {
> +            s->qemu_aio_count -= val;
> +        }
> +    } while (ret<  0&&  errno == EINTR);
> +
> +    return;
> +}
> +
> +static int rbd_aio_flush_cb(void *opaque)
> +{
> +    BDRVRBDState *s = opaque;
> +
> +    return (s->qemu_aio_count>  0);
> +}
> +
> +
> +static int rbd_set_snapc(rados_pool_t pool, const char *snap, RbdHeader1 *header)
> +{
> +    uint32_t snap_count = header->snap_count;
> +    rados_snap_t *snaps = NULL;
> +    rados_snap_t seq;
> +    uint32_t i;
> +    uint64_t snap_names_len = header->snap_names_len;
> +    int r;
> +    rados_snap_t snapid = 0;
> +
> +    cpu_to_le32s(&snap_count);
> +    cpu_to_le64s(&snap_names_len);
> +    if (snap_count) {
> +        const char *header_snap = (const char *)&header->snaps[snap_count];
> +        const char *end = header_snap + snap_names_len;
> +        snaps = qemu_malloc(sizeof(rados_snap_t) * header->snap_count);
> +
> +        for (i=0; i<  snap_count; i++) {
> +            snaps[i] = (uint64_t)header->snaps[i].id;
> +            cpu_to_le64s(&snaps[i]);
> +
> +            if (snap&&  strcmp(snap, header_snap) == 0) {
> +                snapid = snaps[i];
> +            }
> +
> +            header_snap += strlen(header_snap) + 1;
> +            if (header_snap>  end)
> +                error_report("bad header, snapshot list broken");
>    

Missing curly braces here.

> +        }
> +    }
> +
> +    if (snap&&  !snapid) {
> +        error_report("snapshot not found");
> +        return -ENOENT;
> +    }
> +    seq = header->snap_seq;
> +    cpu_to_le32s((uint32_t *)&seq);
> +
> +    r = rados_set_snap_context(pool, seq, snaps, snap_count);
> +
> +    rados_set_snap(pool, snapid);
> +
> +    qemu_free(snaps);
> +
> +    return r;
> +}
> +
> +#define BUF_READ_START_LEN    4096
> +
> +static int rbd_read_header(BDRVRBDState *s, char **hbuf)
> +{
> +    char *buf = NULL;
> +    char n[RBD_MAX_SEG_NAME_SIZE];
> +    uint64_t len = BUF_READ_START_LEN;
> +    int r;
> +
> +    snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s%s", s->name, RBD_SUFFIX);
> +
> +    buf = qemu_malloc(len);
> +
> +    r = rados_read(s->header_pool, n, 0, buf, len);
> +    if (r<  0)
> +        goto failed;
> +
> +    if (r<  len)
> +        goto done;
> +
> +    qemu_free(buf);
> +    buf = qemu_malloc(len);
> +
> +    r = rados_stat(s->header_pool, n,&len, NULL);
> +    if (r<  0)
> +        goto failed;
> +
> +    r = rados_read(s->header_pool, n, 0, buf, len);
> +    if (r<  0)
> +        goto failed;
> +
> +done:
> +    *hbuf = buf;
> +    return 0;
> +
> +failed:
> +    qemu_free(buf);
> +    return r;
> +}
> +
> +static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
> +{
> +    BDRVRBDState *s = bs->opaque;
> +    char pool[RBD_MAX_SEG_NAME_SIZE];
> +    char *snap;
> +    char *hbuf = NULL;
> +    int r;
> +
> +    if (rbd_parsename(filename, pool,&snap, s->name)<  0) {
> +        return -EINVAL;
> +    }
> +
> +    if ((r = rados_initialize(0, NULL))<  0) {
> +        error_report("error initializing");
> +        return r;
> +    }
> +
> +    if ((r = rados_open_pool(pool,&s->pool))) {
> +        error_report("error opening pool %s", pool);
> +        rados_deinitialize();
> +        return r;
> +    }
> +
> +    if ((r = rados_open_pool(pool,&s->header_pool))) {
> +        error_report("error opening pool %s", pool);
> +        rados_deinitialize();
> +        return r;
> +    }
> +
> +   if ((r = rbd_read_header(s,&hbuf))<  0) {
> +        error_report("error reading header from %s", s->name);
> +        goto failed;
> +    }
> +
> +    if (strncmp(hbuf + 64, RBD_HEADER_SIGNATURE, 4)) {
> +        error_report("Invalid header signature %s", hbuf + 64);
> +        r = -EMEDIUMTYPE;
> +        goto failed;
> +    }
> +
> +    if (strncmp(hbuf + 68, RBD_HEADER_VERSION, 8)) {
> +        error_report("Unknown image version %s", hbuf + 68);
> +        r = -EMEDIUMTYPE;
> +        goto failed;
> +    }
> +
> +    RbdHeader1 *header;
>
>    

Don't mix variable definitions with code.

> +    header = (RbdHeader1 *) hbuf;
> +    le64_to_cpus((uint64_t *)&  header->image_size);
> +    s->size = header->image_size;
> +    s->objsize = 1<<  header->options.order;
> +    memcpy(s->block_name, header->block_name, sizeof(header->block_name));
> +
> +    r = rbd_set_snapc(s->pool, snap, header);
> +    if (r<  0) {
> +        error_report("failed setting snap context: %s", strerror(-r));
> +        goto failed;
> +    }
> +
> +    s->read_only = (snap != NULL);
> +
> +    s->efd = eventfd(0, 0);
> +    if (s->efd<  0) {
> +        error_report("error opening eventfd");
> +        goto failed;
> +    }
> +    fcntl(s->efd, F_SETFL, O_NONBLOCK);
> +    qemu_aio_set_fd_handler(s->efd, rbd_aio_completion_cb, NULL,
> +        rbd_aio_flush_cb, NULL, s);
>    

It looks like you just use the eventfd to signal aio completion 
callbacks.  A better way to do this would be to schedule a bottom half.  
eventfds are Linux specific and specific to recent kernels.

I think you need to try to split this up into multiple patches.  Maybe 
start a driver with just open support and then add rw incrementally.

> +    qemu_free(hbuf);
> +
> +    return 0;
> +
> +failed:
> +    if (hbuf)
> +        qemu_free(hbuf);
> +
> +    rados_close_pool(s->header_pool);
> +    rados_close_pool(s->pool);
> +    rados_deinitialize();
> +    return r;
> +}
> +
> +static void rbd_close(BlockDriverState *bs)
> +{
> +    BDRVRBDState *s = bs->opaque;
> +
> +    close(s->efd);
> +    qemu_aio_set_fd_handler(s->efd, NULL , NULL, NULL, NULL, NULL);
> +
> +    rados_close_pool(s->header_pool);
> +    rados_close_pool(s->pool);
> +    rados_deinitialize();
> +}
> +
> +static int rbd_rw(BlockDriverState *bs, int64_t sector_num,
> +                  uint8_t *buf, int nb_sectors, int write)
> +{
> +    BDRVRBDState *s = bs->opaque;
> +    char n[RBD_MAX_SEG_NAME_SIZE];
> +
> +    int64_t segnr, segoffs, segsize, r;
> +    int64_t off, size;
> +
> +    off = sector_num * BDRV_SECTOR_SIZE;
> +    size = nb_sectors * BDRV_SECTOR_SIZE;
> +    segnr = off / s->objsize;
> +    segoffs = off % s->objsize;
> +    segsize = s->objsize - segoffs;
> +
> +    while (size>  0) {
> +        if (size<  segsize) {
> +            segsize = size;
> +        }
> +
> +        snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s.%012" PRIx64, s->block_name, segnr);
> +
> +        if (write) {
> +            if (s->read_only)
> +                return -EROFS;
> +            if ((r = rados_write(s->pool, n, segoffs, (const char *)buf,
> +                segsize))<  0) {
> +                return r;
> +            }
> +        } else {
> +            r = rados_read(s->pool, n, segoffs, (char *)buf, segsize);
> +            if (r == -ENOENT) {
> +                memset(buf, 0, segsize);
> +            } else if (r<  0) {
> +                return r;
> +            } else if (r<  segsize) {
> +                memset(buf + r, 0, segsize - r);
> +            }
> +        }
> +
> +        buf += segsize;
> +        size -= segsize;
> +        segoffs = 0;
> +        segsize = s->objsize;
> +        segnr++;
> +    }
> +
> +    return 0;
> +}
>    

You don't need to implement synchronous functions as long as you have 
the async interfaces implemented.

> +static int rbd_read(BlockDriverState *bs, int64_t sector_num,
> +                    uint8_t *buf, int nb_sectors)
> +{
> +    return rbd_rw(bs, sector_num, buf, nb_sectors, 0);
> +}
> +
> +static int rbd_write(BlockDriverState *bs, int64_t sector_num,
> +                     const uint8_t *buf, int nb_sectors)
> +{
> +    return rbd_rw(bs, sector_num, (uint8_t *) buf, nb_sectors, 1);
> +}
> +
> +static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
> +{
> +    RBDAIOCB *acb = (RBDAIOCB *) blockacb;
> +    qemu_bh_delete(acb->bh);
> +    acb->bh = NULL;
> +    qemu_aio_release(acb);
> +}
> +
> +static AIOPool rbd_aio_pool = {
> +    .aiocb_size = sizeof(RBDAIOCB),
> +    .cancel = rbd_aio_cancel,
> +};
> +
> +/* This is the callback function for rados_aio_read and _write */
> +
> +static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
> +{
> +    RBDAIOCB *acb = rcb->acb;
> +    int64_t r;
> +    uint64_t buf = 1;
> +    int i;
> +
> +    acb->aiocnt--;
> +    r = rados_aio_get_return_value(c);
> +    rados_aio_release(c);
> +    if (acb->write) {
> +        if (r<  0) {
> +            acb->ret = r;
> +            acb->error = 1;
> +        } else if (!acb->error) {
> +            acb->ret += rcb->segsize;
> +        }
> +    } else {
> +        if (r == -ENOENT) {
> +            memset(rcb->buf, 0, rcb->segsize);
> +            if (!acb->error) {
> +                acb->ret += rcb->segsize;
> +            }
> +        } else if (r<  0) {
> +            acb->ret = r;
> +            acb->error = 1;
> +        } else if (r<  rcb->segsize) {
> +            memset(rcb->buf + r, 0, rcb->segsize - r);
> +            if (!acb->error) {
> +                acb->ret += rcb->segsize;
> +            }
> +        } else if (!acb->error) {
> +            acb->ret += r;
> +        }
> +    }
> +    if (write(acb->s->efd,&buf, sizeof(buf))<  0)
> +        error_report("failed writing to acb->s->efd\n");
> +    qemu_free(rcb);
> +    i = 0;
> +    if (!acb->aiocnt&&  acb->bh) {
> +        qemu_bh_schedule(acb->bh);
> +    }
> +}
> +
> +/* Callback when all queued rados_aio requests are complete */
> +
> +static void rbd_aio_bh_cb(void *opaque)
> +{
> +    RBDAIOCB *acb = opaque;
> +    uint64_t buf = 1;
> +
> +    if (!acb->write) {
> +        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
> +    }
> +    qemu_vfree(acb->bounce);
> +    acb->common.cb(acb->common.opaque, (acb->ret>  0 ? 0 : acb->ret));
> +    qemu_bh_delete(acb->bh);
> +    acb->bh = NULL;
> +
> +    if (write(acb->s->efd,&buf, sizeof(buf))<  0)
> +        error_report("failed writing to acb->s->efd\n");
> +    qemu_aio_release(acb);
> +}
> +
> +static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
> +                                           int64_t sector_num,
> +                                           QEMUIOVector *qiov,
> +                                           int nb_sectors,
> +                                           BlockDriverCompletionFunc *cb,
> +                                           void *opaque, int write)
> +{
> +    RBDAIOCB *acb;
> +    RADOSCB *rcb;
> +    rados_completion_t c;
> +    char n[RBD_MAX_SEG_NAME_SIZE];
> +    int64_t segnr, segoffs, segsize, last_segnr;
> +    int64_t off, size;
> +    char *buf;
> +
> +    BDRVRBDState *s = bs->opaque;
> +
> +    acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
> +    acb->write = write;
> +    acb->qiov = qiov;
> +    acb->bounce = qemu_blockalign(bs, qiov->size);
> +    acb->aiocnt = 0;
> +    acb->ret = 0;
> +    acb->error = 0;
> +    acb->s = s;
> +
> +    if (!acb->bh) {
> +        acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
> +    }
> +
> +    if (write) {
> +        qemu_iovec_to_buffer(acb->qiov, acb->bounce);
> +    }
> +
> +    buf = acb->bounce;
> +
> +    off = sector_num * BDRV_SECTOR_SIZE;
> +    size = nb_sectors * BDRV_SECTOR_SIZE;
> +    segnr = off / s->objsize;
> +    segoffs = off % s->objsize;
> +    segsize = s->objsize - segoffs;
> +
> +    last_segnr = ((off + size - 1) / s->objsize);
> +    acb->aiocnt = (last_segnr - segnr) + 1;
> +
> +    s->qemu_aio_count+=acb->aiocnt + 1; /* All the RADOSCB and the related RBDAIOCB */
> +
> +    if (write&&  s->read_only) {
> +        acb->ret = -EROFS;
> +        return NULL;
> +    }
> +
> +    while (size>  0) {
> +        if (size<  segsize) {
> +            segsize = size;
> +        }
> +
> +        snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s.%012" PRIx64, s->block_name,
> +                 segnr);
> +
> +        rcb = qemu_malloc(sizeof(RADOSCB));
> +        rcb->done = 0;
> +        rcb->acb = acb;
> +        rcb->segsize = segsize;
> +        rcb->buf = buf;
> +
> +        if (write) {
> +            rados_aio_create_completion(rcb, NULL,
> +                                        (rados_callback_t) rbd_finish_aiocb,
> +&c);
> +            rados_aio_write(s->pool, n, segoffs, buf, segsize, c);
> +        } else {
> +            rados_aio_create_completion(rcb,
> +                                        (rados_callback_t) rbd_finish_aiocb,
> +                                        NULL,&c);
> +            rados_aio_read(s->pool, n, segoffs, buf, segsize, c);
> +        }
> +
> +        buf += segsize;
> +        size -= segsize;
> +        segoffs = 0;
> +        segsize = s->objsize;
> +        segnr++;
> +    }
> +
> +    return&acb->common;
> +}
> +
> +static BlockDriverAIOCB *rbd_aio_readv(BlockDriverState * bs,
> +                                       int64_t sector_num, QEMUIOVector * qiov,
> +                                       int nb_sectors,
> +                                       BlockDriverCompletionFunc * cb,
> +                                       void *opaque)
> +{
> +    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
> +}
> +
> +static BlockDriverAIOCB *rbd_aio_writev(BlockDriverState * bs,
> +                                        int64_t sector_num, QEMUIOVector * qiov,
> +                                        int nb_sectors,
> +                                        BlockDriverCompletionFunc * cb,
> +                                        void *opaque)
> +{
> +    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
> +}
> +
> +static int rbd_getinfo(BlockDriverState * bs, BlockDriverInfo * bdi)
> +{
> +    BDRVRBDState *s = bs->opaque;
> +    bdi->cluster_size = s->objsize;
> +    return 0;
> +}
> +
> +static int64_t rbd_getlength(BlockDriverState * bs)
> +{
> +    BDRVRBDState *s = bs->opaque;
> +
> +    return s->size;
> +}
> +
> +static int rbd_snap_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
> +{
> +    BDRVRBDState *s = bs->opaque;
> +    char inbuf[512], outbuf[128];
> +    uint64_t snap_id;
> +    int r;
> +    char *p = inbuf;
> +    char *end = inbuf + sizeof(inbuf);
> +    char n[RBD_MAX_SEG_NAME_SIZE];
> +    char *hbuf = NULL;
> +
> +    if (sn_info->name[0] == '\0')
> +        return -EINVAL; /* we need a name for rbd snapshots */
> +
> +    /*
> +     * rbd snapshots are using the name as the user controlled unique identifier
> +     * we can't use the rbd snapid for that purpose, as it can't be set
> +     */
> +    if (sn_info->id_str[0] != '\0'&&
> +        strcmp(sn_info->id_str, sn_info->name) != 0)
> +        return -EINVAL;
>    

I don't fully understand.  Does this mean that snapshots are stored in a 
shared namespace?  IOW, if a user root creates a snapshot of in one VM, 
the other VM running as root sees it too?

Regards,

Anthony Liguori
Yehuda Sadeh Weinraub - Oct. 7, 2010, 6:08 p.m.
On Thu, Oct 7, 2010 at 7:12 AM, Anthony Liguori <anthony@codemonkey.ws> wrote:
> On 08/03/2010 03:14 PM, Christian Brunner wrote:
>>
>> +#include "qemu-common.h"
>> +#include "qemu-error.h"
>> +#include<sys/types.h>
>> +#include<stdbool.h>
>> +
>> +#include<qemu-common.h>
>>
>
> This looks to be unnecessary.  Generally, system includes shouldn't be
> required so all of these should go away except rado/librados.h
Removed.

>
>> +
>> +#include "rbd_types.h"
>> +#include "module.h"
>> +#include "block_int.h"
>> +
>> +#include<stdio.h>
>> +#include<stdlib.h>
>> +#include<rados/librados.h>
>> +
>> +#include<signal.h>
>> +
>> +
>> +int eventfd(unsigned int initval, int flags);
>>
>
> This is not quite right.  Depending on eventfd is curious but in the very
> least, you need to detect the presence of eventfd in configure and provide a
> wrapper that redefines it as necessary.

Can fix that, though please see my later remarks.
>> +static int create_tmap_op(uint8_t op, const char *name, char **tmap_desc)
>> +{
>> +    uint32_t len = strlen(name);
>> +    /* total_len = encoding op + name + empty buffer */
>> +    uint32_t total_len = 1 + (sizeof(uint32_t) + len) + sizeof(uint32_t);
>> +    char *desc = NULL;
>>
>
> char is the wrong type to use here as it may be signed or unsigned.  That
> can have weird effects with binary data when you're directly manipulating
> it.
Well, I can change it to uint8_t, so that it matches the op type, but
that'll require adding some other castings. In any case, you usually
get such a weird behavior when you cast to types of different sizes
and have the sign bit padded which is not the case in here.

>
>> +
>> +    desc = qemu_malloc(total_len);
>> +
>> +    *tmap_desc = desc;
>> +
>> +    *desc = op;
>> +    desc++;
>> +    memcpy(desc,&len, sizeof(len));
>> +    desc += sizeof(len);
>> +    memcpy(desc, name, len);
>> +    desc += len;
>> +    len = 0;
>> +    memcpy(desc,&len, sizeof(len));
>> +    desc += sizeof(len);
>>
>
> Shouldn't endianness be a concern?
Right. Fixed that.

>
>> +
>> +    return desc - *tmap_desc;
>> +}
>> +
>> +static void free_tmap_op(char *tmap_desc)
>> +{
>> +    qemu_free(tmap_desc);
>> +}
>> +
>> +static int rbd_register_image(rados_pool_t pool, const char *name)
>> +{
>> +    char *tmap_desc;
>> +    const char *dir = RBD_DIRECTORY;
>> +    int ret;
>> +
>> +    ret = create_tmap_op(CEPH_OSD_TMAP_SET, name,&tmap_desc);
>> +    if (ret<  0) {
>> +        return ret;
>> +    }
>> +
>> +    ret = rados_tmap_update(pool, dir, tmap_desc, ret);
>> +    free_tmap_op(tmap_desc);
>> +
>> +    return ret;
>> +}
>>
>
> This ops are all synchronous?  IOW, rados_tmap_update() call blocks until
> the operation is completed?

Yeah. And this is only called from the rbd_create() callback.

>> +            header_snap += strlen(header_snap) + 1;
>> +            if (header_snap>  end)
>> +                error_report("bad header, snapshot list broken");
>>
>
> Missing curly braces here.
Fixed.

>> +    if (strncmp(hbuf + 68, RBD_HEADER_VERSION, 8)) {
>> +        error_report("Unknown image version %s", hbuf + 68);
>> +        r = -EMEDIUMTYPE;
>> +        goto failed;
>> +    }
>> +
>> +    RbdHeader1 *header;
>>
>>
>
> Don't mix variable definitions with code.

Fixed.

>> +    s->efd = eventfd(0, 0);
>> +    if (s->efd<  0) {
>> +        error_report("error opening eventfd");
>> +        goto failed;
>> +    }
>> +    fcntl(s->efd, F_SETFL, O_NONBLOCK);
>> +    qemu_aio_set_fd_handler(s->efd, rbd_aio_completion_cb, NULL,
>> +        rbd_aio_flush_cb, NULL, s);
>>
>
> It looks like you just use the eventfd to signal aio completion callbacks.
>  A better way to do this would be to schedule a bottom half.  eventfds are
> Linux specific and specific to recent kernels.

Digging back why we introduced the eventfd, it was due to some issues
seen with do_savevm() hangs on qemu_aio_flush(). The reason seemed
that we had no fd associated with the block device, which seemed to
not work well with the qemu aio model. If that assumption is wrong,
we'd be happy to change it. In any case, there are other more portable
ways to generate fds, so if it's needed we can do that.

>> +static int rbd_rw(BlockDriverState *bs, int64_t sector_num,
>> +                  uint8_t *buf, int nb_sectors, int write)
>> +{
>> +    BDRVRBDState *s = bs->opaque;
>> +    char n[RBD_MAX_SEG_NAME_SIZE];
>> +
>
> You don't need to implement synchronous functions as long as you have the
> async interfaces implemented.
Snipped.

>> +     */
>> +    if (sn_info->id_str[0] != '\0'&&
>> +        strcmp(sn_info->id_str, sn_info->name) != 0)
>> +        return -EINVAL;
>>
>
> I don't fully understand.  Does this mean that snapshots are stored in a
> shared namespace?  IOW, if a user root creates a snapshot of in one VM, the
> other VM running as root sees it too?
>

Snapshots are stored in a namespace for each block device. If you
share a block device between different vms, you'll also share its
snapshots.


Thanks,
Yehuda
Anthony Liguori - Oct. 7, 2010, 6:38 p.m.
On 10/07/2010 01:08 PM, Yehuda Sadeh Weinraub wrote:
> On Thu, Oct 7, 2010 at 7:12 AM, Anthony Liguori<anthony@codemonkey.ws>  wrote:
>    
>> On 08/03/2010 03:14 PM, Christian Brunner wrote:
>>      
>>> +#include "qemu-common.h"
>>> +#include "qemu-error.h"
>>> +#include<sys/types.h>
>>> +#include<stdbool.h>
>>> +
>>> +#include<qemu-common.h>
>>>
>>>        
>> This looks to be unnecessary.  Generally, system includes shouldn't be
>> required so all of these should go away except rado/librados.h
>>      
> Removed.
>
>    
>>      
>>> +
>>> +#include "rbd_types.h"
>>> +#include "module.h"
>>> +#include "block_int.h"
>>> +
>>> +#include<stdio.h>
>>> +#include<stdlib.h>
>>> +#include<rados/librados.h>
>>> +
>>> +#include<signal.h>
>>> +
>>> +
>>> +int eventfd(unsigned int initval, int flags);
>>>
>>>        
>> This is not quite right.  Depending on eventfd is curious but in the very
>> least, you need to detect the presence of eventfd in configure and provide a
>> wrapper that redefines it as necessary.
>>      
> Can fix that, though please see my later remarks.
>    
>>> +static int create_tmap_op(uint8_t op, const char *name, char **tmap_desc)
>>> +{
>>> +    uint32_t len = strlen(name);
>>> +    /* total_len = encoding op + name + empty buffer */
>>> +    uint32_t total_len = 1 + (sizeof(uint32_t) + len) + sizeof(uint32_t);
>>> +    char *desc = NULL;
>>>
>>>        
>> char is the wrong type to use here as it may be signed or unsigned.  That
>> can have weird effects with binary data when you're directly manipulating
>> it.
>>      
> Well, I can change it to uint8_t, so that it matches the op type, but
> that'll require adding some other castings. In any case, you usually
> get such a weird behavior when you cast to types of different sizes
> and have the sign bit padded which is not the case in here.
>
>    
>>      
>>> +
>>> +    desc = qemu_malloc(total_len);
>>> +
>>> +    *tmap_desc = desc;
>>> +
>>> +    *desc = op;
>>> +    desc++;
>>> +    memcpy(desc,&len, sizeof(len));
>>> +    desc += sizeof(len);
>>> +    memcpy(desc, name, len);
>>> +    desc += len;
>>> +    len = 0;
>>> +    memcpy(desc,&len, sizeof(len));
>>> +    desc += sizeof(len);
>>>
>>>        
>> Shouldn't endianness be a concern?
>>      
> Right. Fixed that.
>
>    
>>      
>>> +
>>> +    return desc - *tmap_desc;
>>> +}
>>> +
>>> +static void free_tmap_op(char *tmap_desc)
>>> +{
>>> +    qemu_free(tmap_desc);
>>> +}
>>> +
>>> +static int rbd_register_image(rados_pool_t pool, const char *name)
>>> +{
>>> +    char *tmap_desc;
>>> +    const char *dir = RBD_DIRECTORY;
>>> +    int ret;
>>> +
>>> +    ret = create_tmap_op(CEPH_OSD_TMAP_SET, name,&tmap_desc);
>>> +    if (ret<    0) {
>>> +        return ret;
>>> +    }
>>> +
>>> +    ret = rados_tmap_update(pool, dir, tmap_desc, ret);
>>> +    free_tmap_op(tmap_desc);
>>> +
>>> +    return ret;
>>> +}
>>>
>>>        
>> This ops are all synchronous?  IOW, rados_tmap_update() call blocks until
>> the operation is completed?
>>      
> Yeah. And this is only called from the rbd_create() callback.
>
>    
>>> +            header_snap += strlen(header_snap) + 1;
>>> +            if (header_snap>    end)
>>> +                error_report("bad header, snapshot list broken");
>>>
>>>        
>> Missing curly braces here.
>>      
> Fixed.
>
>    
>>> +    if (strncmp(hbuf + 68, RBD_HEADER_VERSION, 8)) {
>>> +        error_report("Unknown image version %s", hbuf + 68);
>>> +        r = -EMEDIUMTYPE;
>>> +        goto failed;
>>> +    }
>>> +
>>> +    RbdHeader1 *header;
>>>
>>>
>>>        
>> Don't mix variable definitions with code.
>>      
> Fixed.
>
>    
>>> +    s->efd = eventfd(0, 0);
>>> +    if (s->efd<    0) {
>>> +        error_report("error opening eventfd");
>>> +        goto failed;
>>> +    }
>>> +    fcntl(s->efd, F_SETFL, O_NONBLOCK);
>>> +    qemu_aio_set_fd_handler(s->efd, rbd_aio_completion_cb, NULL,
>>> +        rbd_aio_flush_cb, NULL, s);
>>>
>>>        
>> It looks like you just use the eventfd to signal aio completion callbacks.
>>   A better way to do this would be to schedule a bottom half.  eventfds are
>> Linux specific and specific to recent kernels.
>>      
> Digging back why we introduced the eventfd, it was due to some issues
> seen with do_savevm() hangs on qemu_aio_flush(). The reason seemed
> that we had no fd associated with the block device, which seemed to
> not work well with the qemu aio model. If that assumption is wrong,
> we'd be happy to change it. In any case, there are other more portable
> ways to generate fds, so if it's needed we can do that.
>    

There's no fd at all?   How do you get notifications about an 
asynchronous event completion?

Regards,

Anthony Liguori
Yehuda Sadeh Weinraub - Oct. 7, 2010, 6:41 p.m.
On Thu, Oct 7, 2010 at 11:38 AM, Anthony Liguori <anthony@codemonkey.ws> wrote:
> On 10/07/2010 01:08 PM, Yehuda Sadeh Weinraub wrote:
>>
>> On Thu, Oct 7, 2010 at 7:12 AM, Anthony Liguori<anthony@codemonkey.ws>
>>  wrote:
>>
>>>
>>> On 08/03/2010 03:14 PM, Christian Brunner wrote:
>>>
>>>>
>>>> +#include "qemu-common.h"
>>>> +#include "qemu-error.h"
>>>> +#include<sys/types.h>
>>>> +#include<stdbool.h>
>>>> +
>>>> +#include<qemu-common.h>
>>>>
>>>>
>>>
>>> This looks to be unnecessary.  Generally, system includes shouldn't be
>>> required so all of these should go away except rado/librados.h
>>>
>>
>> Removed.
>>
>>
>>>
>>>
>>>>
>>>> +
>>>> +#include "rbd_types.h"
>>>> +#include "module.h"
>>>> +#include "block_int.h"
>>>> +
>>>> +#include<stdio.h>
>>>> +#include<stdlib.h>
>>>> +#include<rados/librados.h>
>>>> +
>>>> +#include<signal.h>
>>>> +
>>>> +
>>>> +int eventfd(unsigned int initval, int flags);
>>>>
>>>>
>>>
>>> This is not quite right.  Depending on eventfd is curious but in the very
>>> least, you need to detect the presence of eventfd in configure and
>>> provide a
>>> wrapper that redefines it as necessary.
>>>
>>
>> Can fix that, though please see my later remarks.
>>
>>>>
>>>> +static int create_tmap_op(uint8_t op, const char *name, char
>>>> **tmap_desc)
>>>> +{
>>>> +    uint32_t len = strlen(name);
>>>> +    /* total_len = encoding op + name + empty buffer */
>>>> +    uint32_t total_len = 1 + (sizeof(uint32_t) + len) +
>>>> sizeof(uint32_t);
>>>> +    char *desc = NULL;
>>>>
>>>>
>>>
>>> char is the wrong type to use here as it may be signed or unsigned.  That
>>> can have weird effects with binary data when you're directly manipulating
>>> it.
>>>
>>
>> Well, I can change it to uint8_t, so that it matches the op type, but
>> that'll require adding some other castings. In any case, you usually
>> get such a weird behavior when you cast to types of different sizes
>> and have the sign bit padded which is not the case in here.
>>
>>
>>>
>>>
>>>>
>>>> +
>>>> +    desc = qemu_malloc(total_len);
>>>> +
>>>> +    *tmap_desc = desc;
>>>> +
>>>> +    *desc = op;
>>>> +    desc++;
>>>> +    memcpy(desc,&len, sizeof(len));
>>>> +    desc += sizeof(len);
>>>> +    memcpy(desc, name, len);
>>>> +    desc += len;
>>>> +    len = 0;
>>>> +    memcpy(desc,&len, sizeof(len));
>>>> +    desc += sizeof(len);
>>>>
>>>>
>>>
>>> Shouldn't endianness be a concern?
>>>
>>
>> Right. Fixed that.
>>
>>
>>>
>>>
>>>>
>>>> +
>>>> +    return desc - *tmap_desc;
>>>> +}
>>>> +
>>>> +static void free_tmap_op(char *tmap_desc)
>>>> +{
>>>> +    qemu_free(tmap_desc);
>>>> +}
>>>> +
>>>> +static int rbd_register_image(rados_pool_t pool, const char *name)
>>>> +{
>>>> +    char *tmap_desc;
>>>> +    const char *dir = RBD_DIRECTORY;
>>>> +    int ret;
>>>> +
>>>> +    ret = create_tmap_op(CEPH_OSD_TMAP_SET, name,&tmap_desc);
>>>> +    if (ret<    0) {
>>>> +        return ret;
>>>> +    }
>>>> +
>>>> +    ret = rados_tmap_update(pool, dir, tmap_desc, ret);
>>>> +    free_tmap_op(tmap_desc);
>>>> +
>>>> +    return ret;
>>>> +}
>>>>
>>>>
>>>
>>> This ops are all synchronous?  IOW, rados_tmap_update() call blocks until
>>> the operation is completed?
>>>
>>
>> Yeah. And this is only called from the rbd_create() callback.
>>
>>
>>>>
>>>> +            header_snap += strlen(header_snap) + 1;
>>>> +            if (header_snap>    end)
>>>> +                error_report("bad header, snapshot list broken");
>>>>
>>>>
>>>
>>> Missing curly braces here.
>>>
>>
>> Fixed.
>>
>>
>>>>
>>>> +    if (strncmp(hbuf + 68, RBD_HEADER_VERSION, 8)) {
>>>> +        error_report("Unknown image version %s", hbuf + 68);
>>>> +        r = -EMEDIUMTYPE;
>>>> +        goto failed;
>>>> +    }
>>>> +
>>>> +    RbdHeader1 *header;
>>>>
>>>>
>>>>
>>>
>>> Don't mix variable definitions with code.
>>>
>>
>> Fixed.
>>
>>
>>>>
>>>> +    s->efd = eventfd(0, 0);
>>>> +    if (s->efd<    0) {
>>>> +        error_report("error opening eventfd");
>>>> +        goto failed;
>>>> +    }
>>>> +    fcntl(s->efd, F_SETFL, O_NONBLOCK);
>>>> +    qemu_aio_set_fd_handler(s->efd, rbd_aio_completion_cb, NULL,
>>>> +        rbd_aio_flush_cb, NULL, s);
>>>>
>>>>
>>>
>>> It looks like you just use the eventfd to signal aio completion
>>> callbacks.
>>>  A better way to do this would be to schedule a bottom half.  eventfds
>>> are
>>> Linux specific and specific to recent kernels.
>>>
>>
>> Digging back why we introduced the eventfd, it was due to some issues
>> seen with do_savevm() hangs on qemu_aio_flush(). The reason seemed
>> that we had no fd associated with the block device, which seemed to
>> not work well with the qemu aio model. If that assumption is wrong,
>> we'd be happy to change it. In any case, there are other more portable
>> ways to generate fds, so if it's needed we can do that.
>>
>
> There's no fd at all?   How do you get notifications about an asynchronous
> event completion?
>
> Regards,
>
> Anthony Liguori
>
(resending to list, sorry)

The fd is hidden deep under in librados. We get callback notifications
for events completion.

Thanks,
Yehuda
Anthony Liguori - Oct. 7, 2010, 7:51 p.m.
On 10/07/2010 01:41 PM, Yehuda Sadeh Weinraub wrote:
> On Thu, Oct 7, 2010 at 11:38 AM, Anthony Liguori<anthony@codemonkey.ws>  wrote:
>    
>> On 10/07/2010 01:08 PM, Yehuda Sadeh Weinraub wrote:
>>      
>>> On Thu, Oct 7, 2010 at 7:12 AM, Anthony Liguori<anthony@codemonkey.ws>
>>>   wrote:
>>>
>>>        
>>>> On 08/03/2010 03:14 PM, Christian Brunner wrote:
>>>>
>>>>          
>>>>> +#include "qemu-common.h"
>>>>> +#include "qemu-error.h"
>>>>> +#include<sys/types.h>
>>>>> +#include<stdbool.h>
>>>>> +
>>>>> +#include<qemu-common.h>
>>>>>
>>>>>
>>>>>            
>>>> This looks to be unnecessary.  Generally, system includes shouldn't be
>>>> required so all of these should go away except rado/librados.h
>>>>
>>>>          
>>> Removed.
>>>
>>>
>>>        
>>>>
>>>>          
>>>>> +
>>>>> +#include "rbd_types.h"
>>>>> +#include "module.h"
>>>>> +#include "block_int.h"
>>>>> +
>>>>> +#include<stdio.h>
>>>>> +#include<stdlib.h>
>>>>> +#include<rados/librados.h>
>>>>> +
>>>>> +#include<signal.h>
>>>>> +
>>>>> +
>>>>> +int eventfd(unsigned int initval, int flags);
>>>>>
>>>>>
>>>>>            
>>>> This is not quite right.  Depending on eventfd is curious but in the very
>>>> least, you need to detect the presence of eventfd in configure and
>>>> provide a
>>>> wrapper that redefines it as necessary.
>>>>
>>>>          
>>> Can fix that, though please see my later remarks.
>>>
>>>        
>>>>> +static int create_tmap_op(uint8_t op, const char *name, char
>>>>> **tmap_desc)
>>>>> +{
>>>>> +    uint32_t len = strlen(name);
>>>>> +    /* total_len = encoding op + name + empty buffer */
>>>>> +    uint32_t total_len = 1 + (sizeof(uint32_t) + len) +
>>>>> sizeof(uint32_t);
>>>>> +    char *desc = NULL;
>>>>>
>>>>>
>>>>>            
>>>> char is the wrong type to use here as it may be signed or unsigned.  That
>>>> can have weird effects with binary data when you're directly manipulating
>>>> it.
>>>>
>>>>          
>>> Well, I can change it to uint8_t, so that it matches the op type, but
>>> that'll require adding some other castings. In any case, you usually
>>> get such a weird behavior when you cast to types of different sizes
>>> and have the sign bit padded which is not the case in here.
>>>
>>>
>>>        
>>>>
>>>>          
>>>>> +
>>>>> +    desc = qemu_malloc(total_len);
>>>>> +
>>>>> +    *tmap_desc = desc;
>>>>> +
>>>>> +    *desc = op;
>>>>> +    desc++;
>>>>> +    memcpy(desc,&len, sizeof(len));
>>>>> +    desc += sizeof(len);
>>>>> +    memcpy(desc, name, len);
>>>>> +    desc += len;
>>>>> +    len = 0;
>>>>> +    memcpy(desc,&len, sizeof(len));
>>>>> +    desc += sizeof(len);
>>>>>
>>>>>
>>>>>            
>>>> Shouldn't endianness be a concern?
>>>>
>>>>          
>>> Right. Fixed that.
>>>
>>>
>>>        
>>>>
>>>>          
>>>>> +
>>>>> +    return desc - *tmap_desc;
>>>>> +}
>>>>> +
>>>>> +static void free_tmap_op(char *tmap_desc)
>>>>> +{
>>>>> +    qemu_free(tmap_desc);
>>>>> +}
>>>>> +
>>>>> +static int rbd_register_image(rados_pool_t pool, const char *name)
>>>>> +{
>>>>> +    char *tmap_desc;
>>>>> +    const char *dir = RBD_DIRECTORY;
>>>>> +    int ret;
>>>>> +
>>>>> +    ret = create_tmap_op(CEPH_OSD_TMAP_SET, name,&tmap_desc);
>>>>> +    if (ret<      0) {
>>>>> +        return ret;
>>>>> +    }
>>>>> +
>>>>> +    ret = rados_tmap_update(pool, dir, tmap_desc, ret);
>>>>> +    free_tmap_op(tmap_desc);
>>>>> +
>>>>> +    return ret;
>>>>> +}
>>>>>
>>>>>
>>>>>            
>>>> This ops are all synchronous?  IOW, rados_tmap_update() call blocks until
>>>> the operation is completed?
>>>>
>>>>          
>>> Yeah. And this is only called from the rbd_create() callback.
>>>
>>>
>>>        
>>>>> +            header_snap += strlen(header_snap) + 1;
>>>>> +            if (header_snap>      end)
>>>>> +                error_report("bad header, snapshot list broken");
>>>>>
>>>>>
>>>>>            
>>>> Missing curly braces here.
>>>>
>>>>          
>>> Fixed.
>>>
>>>
>>>        
>>>>> +    if (strncmp(hbuf + 68, RBD_HEADER_VERSION, 8)) {
>>>>> +        error_report("Unknown image version %s", hbuf + 68);
>>>>> +        r = -EMEDIUMTYPE;
>>>>> +        goto failed;
>>>>> +    }
>>>>> +
>>>>> +    RbdHeader1 *header;
>>>>>
>>>>>
>>>>>
>>>>>            
>>>> Don't mix variable definitions with code.
>>>>
>>>>          
>>> Fixed.
>>>
>>>
>>>        
>>>>> +    s->efd = eventfd(0, 0);
>>>>> +    if (s->efd<      0) {
>>>>> +        error_report("error opening eventfd");
>>>>> +        goto failed;
>>>>> +    }
>>>>> +    fcntl(s->efd, F_SETFL, O_NONBLOCK);
>>>>> +    qemu_aio_set_fd_handler(s->efd, rbd_aio_completion_cb, NULL,
>>>>> +        rbd_aio_flush_cb, NULL, s);
>>>>>
>>>>>
>>>>>            
>>>> It looks like you just use the eventfd to signal aio completion
>>>> callbacks.
>>>>   A better way to do this would be to schedule a bottom half.  eventfds
>>>> are
>>>> Linux specific and specific to recent kernels.
>>>>
>>>>          
>>> Digging back why we introduced the eventfd, it was due to some issues
>>> seen with do_savevm() hangs on qemu_aio_flush(). The reason seemed
>>> that we had no fd associated with the block device, which seemed to
>>> not work well with the qemu aio model. If that assumption is wrong,
>>> we'd be happy to change it. In any case, there are other more portable
>>> ways to generate fds, so if it's needed we can do that.
>>>
>>>        
>> There's no fd at all?   How do you get notifications about an asynchronous
>> event completion?
>>
>> Regards,
>>
>> Anthony Liguori
>>
>>      
> (resending to list, sorry)
>
> The fd is hidden deep under in librados. We get callback notifications
> for events completion.
>    

How is that possible?  Are the callbacks delivered in the context of a 
different thread?  If so, don't you need locking?

Regards,

Anthony Liguori

> Thanks,
> Yehuda
>
Yehuda Sadeh Weinraub - Oct. 7, 2010, 8:47 p.m.
On Thu, Oct 7, 2010 at 12:51 PM, Anthony Liguori <anthony@codemonkey.ws> wrote:
> On 10/07/2010 01:41 PM, Yehuda Sadeh Weinraub wrote:
>>
>> On Thu, Oct 7, 2010 at 11:38 AM, Anthony Liguori<anthony@codemonkey.ws>
>>  wrote:
>>
>>>
>>> On 10/07/2010 01:08 PM, Yehuda Sadeh Weinraub wrote:
>>>
>>>> On Thu, Oct 7, 2010 at 7:12 AM, Anthony Liguori<anthony@codemonkey.ws>
>>>
...
>>> There's no fd at all?   How do you get notifications about an
>>> asynchronous
>>> event completion?
>>>
>>> Regards,
>>>
>>> Anthony Liguori
>>>
>>>
>>
>> (resending to list, sorry)
>>
>> The fd is hidden deep under in librados. We get callback notifications
>> for events completion.
>>
>
> How is that possible?  Are the callbacks delivered in the context of a
> different thread?  If so, don't you need locking?

Not sure I'm completely following you. The callbacks are delivered in
the context of a different thread, but won't run concurrently. Do you
see any specific concurrency issue? We can add some mutex protection
around at the aio callback, so that if librados turns multithreaded at
this point we're covered.


Thanks,
Yehuda
Anthony Liguori - Oct. 7, 2010, 9:04 p.m.
On 10/07/2010 03:47 PM, Yehuda Sadeh Weinraub wrote:
>> How is that possible?  Are the callbacks delivered in the context of a
>> different thread?  If so, don't you need locking?
>>      
> Not sure I'm completely following you. The callbacks are delivered in
> the context of a different thread, but won't run concurrently.

Concurrently to what?  How do you prevent them from running concurrently 
with qemu?

If you saw lock ups, I bet that's what it was from.

Regards,

Anthony Liguori

>   Do you
> see any specific concurrency issue? We can add some mutex protection
> around at the aio callback, so that if librados turns multithreaded at
> this point we're covered.
>
>
> Thanks,
> Yehuda
>
>
Yehuda Sadeh Weinraub - Oct. 7, 2010, 9:49 p.m.
On Thu, Oct 7, 2010 at 2:04 PM, Anthony Liguori <anthony@codemonkey.ws> wrote:
> On 10/07/2010 03:47 PM, Yehuda Sadeh Weinraub wrote:
>>>
>>> How is that possible?  Are the callbacks delivered in the context of a
>>> different thread?  If so, don't you need locking?
>>>
>>
>> Not sure I'm completely following you. The callbacks are delivered in
>> the context of a different thread, but won't run concurrently.
>
> Concurrently to what?  How do you prevent them from running concurrently
> with qemu?

There are two types of callbacks. The first is for rados aio
completions, and the second one is the one added later for the fd glue
layer.

The first callback, called by librados whenever aio completes, runs in
the context of a single librados thread:

+static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
+{
+    RBDAIOCB *acb = rcb->acb;
rcb is per a single aio. Was created  before and will be destroyed
here, whereas acb is shared between a few aios, however, it was
generated before the first aio was created.

+    int64_t r;
+    uint64_t buf = 1;
+    int i;
+
+    acb->aiocnt--;

acb->aiocnt has been set before initiating all the aios, so it's ok to
touch it now. Same goes to all acb fields.

+    r = rados_aio_get_return_value(c);
+    rados_aio_release(c);
+    if (acb->write) {
+        if (r < 0) {
+            acb->ret = r;
+            acb->error = 1;
+        } else if (!acb->error) {
+            acb->ret += rcb->segsize;
+        }
+    } else {
+        if (r == -ENOENT) {
+            memset(rcb->buf, 0, rcb->segsize);
+            if (!acb->error) {
+                acb->ret += rcb->segsize;
+            }
+        } else if (r < 0) {
+            acb->ret = r;
+            acb->error = 1;
+        } else if (r < rcb->segsize) {
+            memset(rcb->buf + r, 0, rcb->segsize - r);
+            if (!acb->error) {
+                acb->ret += rcb->segsize;
+            }
+        } else if (!acb->error) {
+            acb->ret += r;
+        }
+    }
+    if (write(acb->s->efd, &buf, sizeof(buf)) < 0)
This will wake up the io_read()

+        error_report("failed writing to acb->s->efd\n");
+    qemu_free(rcb);
+    i = 0;
+    if (!acb->aiocnt && acb->bh) {
+        qemu_bh_schedule(acb->bh);
This is the only qemu related call in here, seems safe to call it.

+    }
+}

The scheduled bh function will be called only after all aios that
relate to this specific aio set are done, so the following seems ok,
as there's no more acb references.
+static void rbd_aio_bh_cb(void *opaque)
+{
+    RBDAIOCB *acb = opaque;
+    uint64_t buf = 1;
+
+    if (!acb->write) {
+        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
+    }
+    qemu_vfree(acb->bounce);
+    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
+    qemu_bh_delete(acb->bh);
+    acb->bh = NULL;
+
+    if (write(acb->s->efd, &buf, sizeof(buf)) < 0)
+        error_report("failed writing to acb->s->efd\n");
+    qemu_aio_release(acb);
+}

Now, the second ones are the io_read(), in which we have our glue fd.
We send uint64 per each completed io

+static void rbd_aio_completion_cb(void *opaque)
+{
+    BDRVRBDState *s = opaque;
+
+    uint64_t val;
+    ssize_t ret;
+
+    do {
+        if ((ret = read(s->efd, &val, sizeof(val))) > 0) {
+            s->qemu_aio_count -= val;
There is an issue here with s->qemu_aio_count which needs to be
protected by a mutex. Other than that, it just reads from s->efd.

+       }
+    } while (ret < 0 && errno == EINTR);
+
+    return;
+}
+
+static int rbd_aio_flush_cb(void *opaque)
+{
+    BDRVRBDState *s = opaque;
+
+    return (s->qemu_aio_count > 0);
Same here as with the previous one, needs a mutex around s->qemu_aio_count.

+}

>
> If you saw lock ups, I bet that's what it was from.
>
As I explained before, before introducing the fd glue layer, the lack
of fd associated with our block device caused that there was no way
for qemu to check whether all aios were flushed or not, which didn't
work well when doing migration/savevm.

Thanks,
Yehuda
Anthony Liguori - Oct. 7, 2010, 9:55 p.m.
On 10/07/2010 04:49 PM, Yehuda Sadeh Weinraub wrote:
> On Thu, Oct 7, 2010 at 2:04 PM, Anthony Liguori<anthony@codemonkey.ws>  wrote:
>    
>> On 10/07/2010 03:47 PM, Yehuda Sadeh Weinraub wrote:
>>      
>>>> How is that possible?  Are the callbacks delivered in the context of a
>>>> different thread?  If so, don't you need locking?
>>>>
>>>>          
>>> Not sure I'm completely following you. The callbacks are delivered in
>>> the context of a different thread, but won't run concurrently.
>>>        
>> Concurrently to what?  How do you prevent them from running concurrently
>> with qemu?
>>      
> There are two types of callbacks. The first is for rados aio
> completions, and the second one is the one added later for the fd glue
> layer.
>    

This is a bad architecture for something like qemu.  You could create a 
pipe and use the pipe to signal to qemu.  Same principle as eventfd.  
Ideally, you would do this in the library itself.

Regards,

Anthony Liguori

> The first callback, called by librados whenever aio completes, runs in
> the context of a single librados thread:
>
> +static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
> +{
> +    RBDAIOCB *acb = rcb->acb;
> rcb is per a single aio. Was created  before and will be destroyed
> here, whereas acb is shared between a few aios, however, it was
> generated before the first aio was created.
>
> +    int64_t r;
> +    uint64_t buf = 1;
> +    int i;
> +
> +    acb->aiocnt--;
>
> acb->aiocnt has been set before initiating all the aios, so it's ok to
> touch it now. Same goes to all acb fields.
>
> +    r = rados_aio_get_return_value(c);
> +    rados_aio_release(c);
> +    if (acb->write) {
> +        if (r<  0) {
> +            acb->ret = r;
> +            acb->error = 1;
> +        } else if (!acb->error) {
> +            acb->ret += rcb->segsize;
> +        }
> +    } else {
> +        if (r == -ENOENT) {
> +            memset(rcb->buf, 0, rcb->segsize);
> +            if (!acb->error) {
> +                acb->ret += rcb->segsize;
> +            }
> +        } else if (r<  0) {
> +            acb->ret = r;
> +            acb->error = 1;
> +        } else if (r<  rcb->segsize) {
> +            memset(rcb->buf + r, 0, rcb->segsize - r);
> +            if (!acb->error) {
> +                acb->ret += rcb->segsize;
> +            }
> +        } else if (!acb->error) {
> +            acb->ret += r;
> +        }
> +    }
> +    if (write(acb->s->efd,&buf, sizeof(buf))<  0)
> This will wake up the io_read()
>
> +        error_report("failed writing to acb->s->efd\n");
> +    qemu_free(rcb);
> +    i = 0;
> +    if (!acb->aiocnt&&  acb->bh) {
> +        qemu_bh_schedule(acb->bh);
> This is the only qemu related call in here, seems safe to call it.
>
> +    }
> +}
>
> The scheduled bh function will be called only after all aios that
> relate to this specific aio set are done, so the following seems ok,
> as there's no more acb references.
> +static void rbd_aio_bh_cb(void *opaque)
> +{
> +    RBDAIOCB *acb = opaque;
> +    uint64_t buf = 1;
> +
> +    if (!acb->write) {
> +        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
> +    }
> +    qemu_vfree(acb->bounce);
> +    acb->common.cb(acb->common.opaque, (acb->ret>  0 ? 0 : acb->ret));
> +    qemu_bh_delete(acb->bh);
> +    acb->bh = NULL;
> +
> +    if (write(acb->s->efd,&buf, sizeof(buf))<  0)
> +        error_report("failed writing to acb->s->efd\n");
> +    qemu_aio_release(acb);
> +}
>
> Now, the second ones are the io_read(), in which we have our glue fd.
> We send uint64 per each completed io
>
> +static void rbd_aio_completion_cb(void *opaque)
> +{
> +    BDRVRBDState *s = opaque;
> +
> +    uint64_t val;
> +    ssize_t ret;
> +
> +    do {
> +        if ((ret = read(s->efd,&val, sizeof(val)))>  0) {
> +            s->qemu_aio_count -= val;
> There is an issue here with s->qemu_aio_count which needs to be
> protected by a mutex. Other than that, it just reads from s->efd.
>
> +       }
> +    } while (ret<  0&&  errno == EINTR);
> +
> +    return;
> +}
> +
> +static int rbd_aio_flush_cb(void *opaque)
> +{
> +    BDRVRBDState *s = opaque;
> +
> +    return (s->qemu_aio_count>  0);
> Same here as with the previous one, needs a mutex around s->qemu_aio_count.
>
> +}
>
>    
>> If you saw lock ups, I bet that's what it was from.
>>
>>      
> As I explained before, before introducing the fd glue layer, the lack
> of fd associated with our block device caused that there was no way
> for qemu to check whether all aios were flushed or not, which didn't
> work well when doing migration/savevm.
>
> Thanks,
> Yehuda
>
Sage Weil - Oct. 7, 2010, 10:45 p.m.
On Thu, 7 Oct 2010, Anthony Liguori wrote:

> On 10/07/2010 04:49 PM, Yehuda Sadeh Weinraub wrote:
> > On Thu, Oct 7, 2010 at 2:04 PM, Anthony Liguori<anthony@codemonkey.ws>
> > wrote:
> >    
> > > On 10/07/2010 03:47 PM, Yehuda Sadeh Weinraub wrote:
> > >      
> > > > > How is that possible?  Are the callbacks delivered in the context of a
> > > > > different thread?  If so, don't you need locking?
> > > > > 
> > > > >          
> > > > Not sure I'm completely following you. The callbacks are delivered in
> > > > the context of a different thread, but won't run concurrently.
> > > >        
> > > Concurrently to what?  How do you prevent them from running concurrently
> > > with qemu?
> > >      
> > There are two types of callbacks. The first is for rados aio
> > completions, and the second one is the one added later for the fd glue
> > layer.
> >    
> 
> This is a bad architecture for something like qemu.  You could create a 
> pipe and use the pipe to signal to qemu.  Same principle as eventfd.  
> Ideally, you would do this in the library itself.

I'm sorry, I'm having a hard time understanding what it is you're 
objecting to, or what you would prefer, as there are two different things 
we're talking about here (callbacks and fd glue/pipes).  (Please bear with 
me as I am not a qemu expert!)

The first is the aio completion.  You said a few messages back:

> It looks like you just use the eventfd to signal aio completion 
> callbacks.  A better way to do this would be to schedule a bottom half.

This is what we're doing.  The librados makes a callback to rbd.c's 
rbd_finish_aiocb(), which updates some internal rbd accounting and then 
calls qemu_bh_schedule().  Is that part right?


The second part is an fd (currently created via eventfd(), but I don't 
think it matters where it comes from) that was later added because 
qemu_aio_flush() wouldn't trigger when our aio's completed (and scheduled 
the bottom halves).  This was proposed by Simone Gotti, who had problems 
with live migration:

	http://www.mail-archive.com/qemu-devel@nongnu.org/msg35516.html

Apparently calling the bottom half isn't sufficient to wake up a blocked 
qemu_aio_flush()?  His solution was to create an eventfd() fd, write a 
word to it in the aio completion callback (before we schedule the bh), and 
add the necessary callbacks to make qemu_aio_flush() behave.

Is the problem simply that we should be using pipe(2) instead of 
eventfd(2)?

So far I've heard that we should be scheduling the bottom halves (we are), 
and we should be using a pipe to signal qemu (we're using an fd created by 
eventfd(2)).

Thanks,
sage




> 
> Regards,
> 
> Anthony Liguori
> 
> > The first callback, called by librados whenever aio completes, runs in
> > the context of a single librados thread:
> > 
> > +static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
> > +{
> > +    RBDAIOCB *acb = rcb->acb;
> > rcb is per a single aio. Was created  before and will be destroyed
> > here, whereas acb is shared between a few aios, however, it was
> > generated before the first aio was created.
> > 
> > +    int64_t r;
> > +    uint64_t buf = 1;
> > +    int i;
> > +
> > +    acb->aiocnt--;
> > 
> > acb->aiocnt has been set before initiating all the aios, so it's ok to
> > touch it now. Same goes to all acb fields.
> > 
> > +    r = rados_aio_get_return_value(c);
> > +    rados_aio_release(c);
> > +    if (acb->write) {
> > +        if (r<  0) {
> > +            acb->ret = r;
> > +            acb->error = 1;
> > +        } else if (!acb->error) {
> > +            acb->ret += rcb->segsize;
> > +        }
> > +    } else {
> > +        if (r == -ENOENT) {
> > +            memset(rcb->buf, 0, rcb->segsize);
> > +            if (!acb->error) {
> > +                acb->ret += rcb->segsize;
> > +            }
> > +        } else if (r<  0) {
> > +            acb->ret = r;
> > +            acb->error = 1;
> > +        } else if (r<  rcb->segsize) {
> > +            memset(rcb->buf + r, 0, rcb->segsize - r);
> > +            if (!acb->error) {
> > +                acb->ret += rcb->segsize;
> > +            }
> > +        } else if (!acb->error) {
> > +            acb->ret += r;
> > +        }
> > +    }
> > +    if (write(acb->s->efd,&buf, sizeof(buf))<  0)
> > This will wake up the io_read()
> > 
> > +        error_report("failed writing to acb->s->efd\n");
> > +    qemu_free(rcb);
> > +    i = 0;
> > +    if (!acb->aiocnt&&  acb->bh) {
> > +        qemu_bh_schedule(acb->bh);
> > This is the only qemu related call in here, seems safe to call it.
> > 
> > +    }
> > +}
> > 
> > The scheduled bh function will be called only after all aios that
> > relate to this specific aio set are done, so the following seems ok,
> > as there's no more acb references.
> > +static void rbd_aio_bh_cb(void *opaque)
> > +{
> > +    RBDAIOCB *acb = opaque;
> > +    uint64_t buf = 1;
> > +
> > +    if (!acb->write) {
> > +        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
> > +    }
> > +    qemu_vfree(acb->bounce);
> > +    acb->common.cb(acb->common.opaque, (acb->ret>  0 ? 0 : acb->ret));
> > +    qemu_bh_delete(acb->bh);
> > +    acb->bh = NULL;
> > +
> > +    if (write(acb->s->efd,&buf, sizeof(buf))<  0)
> > +        error_report("failed writing to acb->s->efd\n");
> > +    qemu_aio_release(acb);
> > +}
> > 
> > Now, the second ones are the io_read(), in which we have our glue fd.
> > We send uint64 per each completed io
> > 
> > +static void rbd_aio_completion_cb(void *opaque)
> > +{
> > +    BDRVRBDState *s = opaque;
> > +
> > +    uint64_t val;
> > +    ssize_t ret;
> > +
> > +    do {
> > +        if ((ret = read(s->efd,&val, sizeof(val)))>  0) {
> > +            s->qemu_aio_count -= val;
> > There is an issue here with s->qemu_aio_count which needs to be
> > protected by a mutex. Other than that, it just reads from s->efd.
> > 
> > +       }
> > +    } while (ret<  0&&  errno == EINTR);
> > +
> > +    return;
> > +}
> > +
> > +static int rbd_aio_flush_cb(void *opaque)
> > +{
> > +    BDRVRBDState *s = opaque;
> > +
> > +    return (s->qemu_aio_count>  0);
> > Same here as with the previous one, needs a mutex around s->qemu_aio_count.
> > 
> > +}
> > 
> >    
> > > If you saw lock ups, I bet that's what it was from.
> > > 
> > >      
> > As I explained before, before introducing the fd glue layer, the lack
> > of fd associated with our block device caused that there was no way
> > for qemu to check whether all aios were flushed or not, which didn't
> > work well when doing migration/savevm.
> > 
> > Thanks,
> > Yehuda
> >    
> 
> --
> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
> 
>
Anthony Liguori - Oct. 8, 2010, 2:06 p.m.
On 10/07/2010 05:45 PM, Sage Weil wrote:
> On Thu, 7 Oct 2010, Anthony Liguori wrote:
>
>    
>> On 10/07/2010 04:49 PM, Yehuda Sadeh Weinraub wrote:
>>      
>>> On Thu, Oct 7, 2010 at 2:04 PM, Anthony Liguori<anthony@codemonkey.ws>
>>> wrote:
>>>
>>>        
>>>> On 10/07/2010 03:47 PM, Yehuda Sadeh Weinraub wrote:
>>>>
>>>>          
>>>>>> How is that possible?  Are the callbacks delivered in the context of a
>>>>>> different thread?  If so, don't you need locking?
>>>>>>
>>>>>>
>>>>>>              
>>>>> Not sure I'm completely following you. The callbacks are delivered in
>>>>> the context of a different thread, but won't run concurrently.
>>>>>
>>>>>            
>>>> Concurrently to what?  How do you prevent them from running concurrently
>>>> with qemu?
>>>>
>>>>          
>>> There are two types of callbacks. The first is for rados aio
>>> completions, and the second one is the one added later for the fd glue
>>> layer.
>>>
>>>        
>> This is a bad architecture for something like qemu.  You could create a
>> pipe and use the pipe to signal to qemu.  Same principle as eventfd.
>> Ideally, you would do this in the library itself.
>>      
> I'm sorry, I'm having a hard time understanding what it is you're
> objecting to, or what you would prefer, as there are two different things
> we're talking about here (callbacks and fd glue/pipes).  (Please bear with
> me as I am not a qemu expert!)
>
> The first is the aio completion.  You said a few messages back:
>
>    
>> It looks like you just use the eventfd to signal aio completion
>> callbacks.  A better way to do this would be to schedule a bottom half.
>>      
> This is what we're doing.  The librados makes a callback to rbd.c's
> rbd_finish_aiocb(), which updates some internal rbd accounting and then
> calls qemu_bh_schedule().  Is that part right?
>    

No.  You're calling qemu_bh_schedule() in a separate thread in parallel 
to other operations.

That's absolutely not safe.

> The second part is an fd (currently created via eventfd(), but I don't
> think it matters where it comes from) that was later added because
> qemu_aio_flush() wouldn't trigger when our aio's completed (and scheduled
> the bottom halves).  This was proposed by Simone Gotti, who had problems
> with live migration:
>
> 	http://www.mail-archive.com/qemu-devel@nongnu.org/msg35516.html
>
> Apparently calling the bottom half isn't sufficient to wake up a blocked
> qemu_aio_flush()?  His solution was to create an eventfd() fd, write a
> word to it in the aio completion callback (before we schedule the bh), and
> add the necessary callbacks to make qemu_aio_flush() behave.
>
> Is the problem simply that we should be using pipe(2) instead of
> eventfd(2)?
>
> So far I've heard that we should be scheduling the bottom halves (we are),
> and we should be using a pipe to signal qemu (we're using an fd created by
> eventfd(2)).
>    

Your fundamental problem is your use of threads.  QEMU is single 
threaded.  You cannot call into QEMU code from another thread without 
introducing locking.  Any other solution is going to be intrinsically 
broken.

There are two possibilities to fix this:

1) You can change your library interface so that it doesn't generate 
callbacks via threads.  That would be my preference because I think it's 
a bad interface but it's your library so it's not really my choice :-)

2) You can limit the callbacks to doing nothing other than writing to a 
file descriptor.  You then read the file descriptor somewhere else in 
the normal QEMU code and you can use the file descriptor to get 
signals.  If you're passing data to callbacks, it's much harder because 
you're going to have to store that data somewhere and inevitably require 
locking.

The complexity of (2) is why I think thread-based callbacks is such a 
bad interface.

Regards,

Anthony Liguori

> Thanks,
> sage
>
>
>
>
>    
>> Regards,
>>
>> Anthony Liguori
>>
>>      
>>> The first callback, called by librados whenever aio completes, runs in
>>> the context of a single librados thread:
>>>
>>> +static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
>>> +{
>>> +    RBDAIOCB *acb = rcb->acb;
>>> rcb is per a single aio. Was created  before and will be destroyed
>>> here, whereas acb is shared between a few aios, however, it was
>>> generated before the first aio was created.
>>>
>>> +    int64_t r;
>>> +    uint64_t buf = 1;
>>> +    int i;
>>> +
>>> +    acb->aiocnt--;
>>>
>>> acb->aiocnt has been set before initiating all the aios, so it's ok to
>>> touch it now. Same goes to all acb fields.
>>>
>>> +    r = rados_aio_get_return_value(c);
>>> +    rados_aio_release(c);
>>> +    if (acb->write) {
>>> +        if (r<   0) {
>>> +            acb->ret = r;
>>> +            acb->error = 1;
>>> +        } else if (!acb->error) {
>>> +            acb->ret += rcb->segsize;
>>> +        }
>>> +    } else {
>>> +        if (r == -ENOENT) {
>>> +            memset(rcb->buf, 0, rcb->segsize);
>>> +            if (!acb->error) {
>>> +                acb->ret += rcb->segsize;
>>> +            }
>>> +        } else if (r<   0) {
>>> +            acb->ret = r;
>>> +            acb->error = 1;
>>> +        } else if (r<   rcb->segsize) {
>>> +            memset(rcb->buf + r, 0, rcb->segsize - r);
>>> +            if (!acb->error) {
>>> +                acb->ret += rcb->segsize;
>>> +            }
>>> +        } else if (!acb->error) {
>>> +            acb->ret += r;
>>> +        }
>>> +    }
>>> +    if (write(acb->s->efd,&buf, sizeof(buf))<   0)
>>> This will wake up the io_read()
>>>
>>> +        error_report("failed writing to acb->s->efd\n");
>>> +    qemu_free(rcb);
>>> +    i = 0;
>>> +    if (!acb->aiocnt&&   acb->bh) {
>>> +        qemu_bh_schedule(acb->bh);
>>> This is the only qemu related call in here, seems safe to call it.
>>>
>>> +    }
>>> +}
>>>
>>> The scheduled bh function will be called only after all aios that
>>> relate to this specific aio set are done, so the following seems ok,
>>> as there's no more acb references.
>>> +static void rbd_aio_bh_cb(void *opaque)
>>> +{
>>> +    RBDAIOCB *acb = opaque;
>>> +    uint64_t buf = 1;
>>> +
>>> +    if (!acb->write) {
>>> +        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
>>> +    }
>>> +    qemu_vfree(acb->bounce);
>>> +    acb->common.cb(acb->common.opaque, (acb->ret>   0 ? 0 : acb->ret));
>>> +    qemu_bh_delete(acb->bh);
>>> +    acb->bh = NULL;
>>> +
>>> +    if (write(acb->s->efd,&buf, sizeof(buf))<   0)
>>> +        error_report("failed writing to acb->s->efd\n");
>>> +    qemu_aio_release(acb);
>>> +}
>>>
>>> Now, the second ones are the io_read(), in which we have our glue fd.
>>> We send uint64 per each completed io
>>>
>>> +static void rbd_aio_completion_cb(void *opaque)
>>> +{
>>> +    BDRVRBDState *s = opaque;
>>> +
>>> +    uint64_t val;
>>> +    ssize_t ret;
>>> +
>>> +    do {
>>> +        if ((ret = read(s->efd,&val, sizeof(val)))>   0) {
>>> +            s->qemu_aio_count -= val;
>>> There is an issue here with s->qemu_aio_count which needs to be
>>> protected by a mutex. Other than that, it just reads from s->efd.
>>>
>>> +       }
>>> +    } while (ret<   0&&   errno == EINTR);
>>> +
>>> +    return;
>>> +}
>>> +
>>> +static int rbd_aio_flush_cb(void *opaque)
>>> +{
>>> +    BDRVRBDState *s = opaque;
>>> +
>>> +    return (s->qemu_aio_count>   0);
>>> Same here as with the previous one, needs a mutex around s->qemu_aio_count.
>>>
>>> +}
>>>
>>>
>>>        
>>>> If you saw lock ups, I bet that's what it was from.
>>>>
>>>>
>>>>          
>>> As I explained before, before introducing the fd glue layer, the lack
>>> of fd associated with our block device caused that there was no way
>>> for qemu to check whether all aios were flushed or not, which didn't
>>> work well when doing migration/savevm.
>>>
>>> Thanks,
>>> Yehuda
>>>
>>>        
>> --
>> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
>> the body of a message to majordomo@vger.kernel.org
>> More majordomo info at  http://vger.kernel.org/majordomo-info.html
>>
>>
>>
Anthony Liguori - Oct. 8, 2010, 4:05 p.m.
On 10/08/2010 10:50 AM, Yehuda Sadeh Weinraub wrote:
> Oh, that makes it more clean. Considering that we did it for kvm, and
> looking at the kvm qemu_bh_schedule() implementation, it does look
> thread safe (there might be an issue though with canceling the bh
> though, haven't looked at it, not really relevant).

It's definitely not thread safe.  Even though you can set the flag 
atomically (not guaranteed, but assume you can), we rely on the fact 
that we can check for pending BHs before entering sleep without having 
to worry about new BHs being scheduled in between the sleep and the 
check.  If you schedule a BH in a thread then you open yourself up to 
the race.

Regards,

Anthony Liguori

Patch

diff --git a/Makefile.objs b/Makefile.objs
index 4a1eaa1..bf45142 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -18,6 +18,7 @@  block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o
 block-nested-$(CONFIG_WIN32) += raw-win32.o
 block-nested-$(CONFIG_POSIX) += raw-posix.o
 block-nested-$(CONFIG_CURL) += curl.o
+block-nested-$(CONFIG_RBD) += rbd.o
 
 block-obj-y +=  $(addprefix block/, $(block-nested-y))
 
diff --git a/block/rbd.c b/block/rbd.c
new file mode 100644
index 0000000..0e6b2a5
--- /dev/null
+++ b/block/rbd.c
@@ -0,0 +1,907 @@ 
+/*
+ * QEMU Block driver for RADOS (Ceph)
+ *
+ * Copyright (C) 2010 Christian Brunner <chb@muc.de>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ */
+
+#include "qemu-common.h"
+#include "qemu-error.h"
+#include <sys/types.h>
+#include <stdbool.h>
+
+#include <qemu-common.h>
+
+#include "rbd_types.h"
+#include "module.h"
+#include "block_int.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <rados/librados.h>
+
+#include <signal.h>
+
+
+int eventfd(unsigned int initval, int flags);
+
+
+/*
+ * When specifying the image filename use:
+ *
+ * rbd:poolname/devicename
+ *
+ * poolname must be the name of an existing rados pool
+ *
+ * devicename is the basename for all objects used to
+ * emulate the raw device.
+ *
+ * Metadata information (image size, ...) is stored in an
+ * object with the name "devicename.rbd".
+ *
+ * The raw device is split into 4MB sized objects by default.
+ * The sequencenumber is encoded in a 12 byte long hex-string,
+ * and is attached to the devicename, separated by a dot.
+ * e.g. "devicename.1234567890ab"
+ *
+ */
+
+#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
+
+typedef struct RBDAIOCB {
+    BlockDriverAIOCB common;
+    QEMUBH *bh;
+    int ret;
+    QEMUIOVector *qiov;
+    char *bounce;
+    int write;
+    int64_t sector_num;
+    int aiocnt;
+    int error;
+    struct BDRVRBDState *s;
+} RBDAIOCB;
+
+typedef struct RADOSCB {
+    int rcbid;
+    RBDAIOCB *acb;
+    int done;
+    int64_t segsize;
+    char *buf;
+} RADOSCB;
+
+typedef struct BDRVRBDState {
+    int efd;
+    rados_pool_t pool;
+    rados_pool_t header_pool;
+    char name[RBD_MAX_OBJ_NAME_SIZE];
+    char block_name[RBD_MAX_BLOCK_NAME_SIZE];
+    uint64_t size;
+    uint64_t objsize;
+    int qemu_aio_count;
+    int read_only;
+} BDRVRBDState;
+
+typedef struct rbd_obj_header_ondisk RbdHeader1;
+
+static int rbd_parsename(const char *filename, char *pool, char **snap,
+                         char *name)
+{
+    const char *rbdname;
+    char *p;
+    int l;
+
+    if (!strstart(filename, "rbd:", &rbdname)) {
+        return -EINVAL;
+    }
+
+    pstrcpy(pool, 2 * RBD_MAX_SEG_NAME_SIZE, rbdname);
+    p = strchr(pool, '/');
+    if (p == NULL) {
+        return -EINVAL;
+    }
+
+    *p = '\0';
+
+    l = strlen(pool);
+    if(l >= RBD_MAX_SEG_NAME_SIZE) {
+        error_report("pool name to long");
+        return -EINVAL;
+    } else if (l <= 0) {
+        error_report("pool name to short");
+        return -EINVAL;
+    }
+
+    l = strlen(++p);
+    if (l >= RBD_MAX_OBJ_NAME_SIZE) {
+        error_report("object name to long");
+        return -EINVAL;
+    } else if (l <= 0) {
+        error_report("object name to short");
+        return -EINVAL;
+    }
+
+    strcpy(name, p);
+
+    *snap = strchr(name, '@');
+    if (*snap) {
+        *(*snap) = '\0';
+        (*snap)++;
+        if (!*snap) *snap = NULL;
+    }
+
+    return l;
+}
+
+static int create_tmap_op(uint8_t op, const char *name, char **tmap_desc)
+{
+    uint32_t len = strlen(name);
+    /* total_len = encoding op + name + empty buffer */
+    uint32_t total_len = 1 + (sizeof(uint32_t) + len) + sizeof(uint32_t);
+    char *desc = NULL;
+
+    desc = qemu_malloc(total_len);
+
+    *tmap_desc = desc;
+
+    *desc = op;
+    desc++;
+    memcpy(desc, &len, sizeof(len));
+    desc += sizeof(len);
+    memcpy(desc, name, len);
+    desc += len;
+    len = 0;
+    memcpy(desc, &len, sizeof(len));
+    desc += sizeof(len);
+
+    return desc - *tmap_desc;
+}
+
+static void free_tmap_op(char *tmap_desc)
+{
+    qemu_free(tmap_desc);
+}
+
+static int rbd_register_image(rados_pool_t pool, const char *name)
+{
+    char *tmap_desc;
+    const char *dir = RBD_DIRECTORY;
+    int ret;
+
+    ret = create_tmap_op(CEPH_OSD_TMAP_SET, name, &tmap_desc);
+    if (ret < 0) {
+        return ret;
+    }
+
+    ret = rados_tmap_update(pool, dir, tmap_desc, ret);
+    free_tmap_op(tmap_desc);
+
+    return ret;
+}
+
+static int touch_rbd_info(rados_pool_t pool, const char *info_oid)
+{
+    int r = rados_write(pool, info_oid, 0, NULL, 0);
+    if (r < 0) {
+        return r;
+    }
+    return 0;
+}
+
+static int rbd_assign_bid(rados_pool_t pool, uint64_t *id)
+{
+    uint64_t out[1];
+    const char *info_oid = RBD_INFO;
+
+    *id = 0;
+
+    int r = touch_rbd_info(pool, info_oid);
+    if (r < 0) {
+        return r;
+    }
+
+    r = rados_exec(pool, info_oid, "rbd", "assign_bid", NULL,
+                   0, (char *)out, sizeof(out));
+    if (r < 0) {
+        return r;
+    }
+
+    *id = out[0];
+    le64_to_cpus(out);
+
+    return 0;
+}
+
+static int rbd_create(const char *filename, QEMUOptionParameter *options)
+{
+    int64_t bytes = 0;
+    int64_t objsize;
+    uint64_t size;
+    time_t mtime;
+    uint8_t obj_order = RBD_DEFAULT_OBJ_ORDER;
+    char pool[RBD_MAX_SEG_NAME_SIZE];
+    char n[RBD_MAX_SEG_NAME_SIZE];
+    char name[RBD_MAX_SEG_NAME_SIZE];
+    char *snap;
+    RbdHeader1 header;
+    rados_pool_t p;
+    uint64_t bid;
+    uint32_t hi, lo;
+    int ret;
+
+    if (rbd_parsename(filename, pool, &snap, name) < 0) {
+        return -EINVAL;
+    }
+
+    snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s%s", name, RBD_SUFFIX);
+
+    /* Read out options */
+    while (options && options->name) {
+        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
+            bytes = options->value.n;
+        } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) {
+            if (options->value.n) {
+                objsize = options->value.n;
+                if ((objsize - 1) & objsize) {    /* not a power of 2? */
+                    error_report("obj size needs to be power of 2");
+                    return -EINVAL;
+                }
+                if (objsize < 4096) {
+                    error_report("obj size too small");
+                    return -EINVAL;
+                }
+
+                for (obj_order = 0; obj_order < 64; obj_order++) {
+                    if (objsize == 1) {
+                        break;
+                    }
+                    objsize >>= 1;
+                }
+            }
+        }
+        options++;
+    }
+
+    memset(&header, 0, sizeof(header));
+    pstrcpy(header.text, sizeof(header.text), RBD_HEADER_TEXT);
+    pstrcpy(header.signature, sizeof(header.signature), RBD_HEADER_SIGNATURE);
+    pstrcpy(header.version, sizeof(header.version), RBD_HEADER_VERSION);
+    header.image_size = bytes;
+    cpu_to_le64s((uint64_t *) & header.image_size);
+    header.options.order = obj_order;
+    header.options.crypt_type = RBD_CRYPT_NONE;
+    header.options.comp_type = RBD_COMP_NONE;
+    header.snap_seq = 0;
+    header.snap_count = 0;
+    cpu_to_le32s(&header.snap_count);
+
+    if (rados_initialize(0, NULL) < 0) {
+        error_report("error initializing");
+        return -EIO;
+    }
+
+    if (rados_open_pool(pool, &p)) {
+        error_report("error opening pool %s", pool);
+        rados_deinitialize();
+        return -EIO;
+    }
+
+    /* check for existing rbd header file */
+    ret = rados_stat(p, n, &size, &mtime);
+    if (ret == 0) {
+        ret=-EEXIST;
+        goto done;
+    }
+
+    ret = rbd_assign_bid(p, &bid);
+    if (ret < 0) {
+        error_report("failed assigning block id");
+        rados_deinitialize();
+        return -EIO;
+    }
+    hi = bid >> 32;
+    lo = bid & 0xFFFFFFFF;
+    snprintf(header.block_name, sizeof(header.block_name), "rb.%x.%x", hi, lo);
+
+    /* create header file */
+    ret = rados_write(p, n, 0, (const char *)&header, sizeof(header));
+    if (ret < 0) {
+        goto done;
+    }
+
+    ret = rbd_register_image(p, name);
+done:
+    rados_close_pool(p);
+    rados_deinitialize();
+
+    return ret;
+}
+
+static void rbd_aio_completion_cb(void *opaque)
+{
+    BDRVRBDState *s = opaque;
+
+    uint64_t val;
+    ssize_t ret;
+
+    do {
+        if ((ret = read(s->efd, &val, sizeof(val))) > 0) {
+            s->qemu_aio_count -= val;
+        }
+    } while (ret < 0 && errno == EINTR);
+
+    return;
+}
+
+static int rbd_aio_flush_cb(void *opaque)
+{
+    BDRVRBDState *s = opaque;
+
+    return (s->qemu_aio_count > 0);
+}
+
+
+static int rbd_set_snapc(rados_pool_t pool, const char *snap, RbdHeader1 *header)
+{
+    uint32_t snap_count = header->snap_count;
+    rados_snap_t *snaps = NULL;
+    rados_snap_t seq;
+    uint32_t i;
+    uint64_t snap_names_len = header->snap_names_len;
+    int r;
+    rados_snap_t snapid = 0;
+
+    cpu_to_le32s(&snap_count);
+    cpu_to_le64s(&snap_names_len);
+    if (snap_count) {
+        const char *header_snap = (const char *)&header->snaps[snap_count];
+        const char *end = header_snap + snap_names_len;
+        snaps = qemu_malloc(sizeof(rados_snap_t) * header->snap_count);
+
+        for (i=0; i < snap_count; i++) {
+            snaps[i] = (uint64_t)header->snaps[i].id;
+            cpu_to_le64s(&snaps[i]);
+
+            if (snap && strcmp(snap, header_snap) == 0) {
+                snapid = snaps[i];
+            }
+
+            header_snap += strlen(header_snap) + 1;
+            if (header_snap > end)
+                error_report("bad header, snapshot list broken");
+        }
+    }
+
+    if (snap && !snapid) {
+        error_report("snapshot not found");
+        return -ENOENT;
+    }
+    seq = header->snap_seq;
+    cpu_to_le32s((uint32_t *)&seq);
+
+    r = rados_set_snap_context(pool, seq, snaps, snap_count);
+
+    rados_set_snap(pool, snapid);
+
+    qemu_free(snaps);
+
+    return r;
+}
+
+#define BUF_READ_START_LEN    4096
+
+static int rbd_read_header(BDRVRBDState *s, char **hbuf)
+{
+    char *buf = NULL;
+    char n[RBD_MAX_SEG_NAME_SIZE];
+    uint64_t len = BUF_READ_START_LEN;
+    int r;
+
+    snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s%s", s->name, RBD_SUFFIX);
+
+    buf = qemu_malloc(len);
+
+    r = rados_read(s->header_pool, n, 0, buf, len);
+    if (r < 0)
+        goto failed;
+
+    if (r < len)
+        goto done;
+
+    qemu_free(buf);
+    buf = qemu_malloc(len);
+
+    r = rados_stat(s->header_pool, n, &len, NULL);
+    if (r < 0)
+        goto failed;
+
+    r = rados_read(s->header_pool, n, 0, buf, len);
+    if (r < 0)
+        goto failed;
+
+done:
+    *hbuf = buf;
+    return 0;
+
+failed:
+    qemu_free(buf);
+    return r;
+}
+
+static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
+{
+    BDRVRBDState *s = bs->opaque;
+    char pool[RBD_MAX_SEG_NAME_SIZE];
+    char *snap;
+    char *hbuf = NULL;
+    int r;
+
+    if (rbd_parsename(filename, pool, &snap, s->name) < 0) {
+        return -EINVAL;
+    }
+
+    if ((r = rados_initialize(0, NULL)) < 0) {
+        error_report("error initializing");
+        return r;
+    }
+
+    if ((r = rados_open_pool(pool, &s->pool))) {
+        error_report("error opening pool %s", pool);
+        rados_deinitialize();
+        return r;
+    }
+
+    if ((r = rados_open_pool(pool, &s->header_pool))) {
+        error_report("error opening pool %s", pool);
+        rados_deinitialize();
+        return r;
+    }
+
+   if ((r = rbd_read_header(s, &hbuf)) < 0) {
+        error_report("error reading header from %s", s->name);
+        goto failed;
+    }
+
+    if (strncmp(hbuf + 64, RBD_HEADER_SIGNATURE, 4)) {
+        error_report("Invalid header signature %s", hbuf + 64);
+        r = -EMEDIUMTYPE;
+        goto failed;
+    }
+
+    if (strncmp(hbuf + 68, RBD_HEADER_VERSION, 8)) {
+        error_report("Unknown image version %s", hbuf + 68);
+        r = -EMEDIUMTYPE;
+        goto failed;
+    }
+
+    RbdHeader1 *header;
+
+    header = (RbdHeader1 *) hbuf;
+    le64_to_cpus((uint64_t *) & header->image_size);
+    s->size = header->image_size;
+    s->objsize = 1 << header->options.order;
+    memcpy(s->block_name, header->block_name, sizeof(header->block_name));
+
+    r = rbd_set_snapc(s->pool, snap, header);
+    if (r < 0) {
+        error_report("failed setting snap context: %s", strerror(-r));
+        goto failed;
+    }
+
+    s->read_only = (snap != NULL);
+
+    s->efd = eventfd(0, 0);
+    if (s->efd < 0) {
+        error_report("error opening eventfd");
+        goto failed;
+    }
+    fcntl(s->efd, F_SETFL, O_NONBLOCK);
+    qemu_aio_set_fd_handler(s->efd, rbd_aio_completion_cb, NULL,
+        rbd_aio_flush_cb, NULL, s);
+
+    qemu_free(hbuf);
+
+    return 0;
+
+failed:
+    if (hbuf)
+        qemu_free(hbuf);
+
+    rados_close_pool(s->header_pool);
+    rados_close_pool(s->pool);
+    rados_deinitialize();
+    return r;
+}
+
+static void rbd_close(BlockDriverState *bs)
+{
+    BDRVRBDState *s = bs->opaque;
+
+    close(s->efd);
+    qemu_aio_set_fd_handler(s->efd, NULL , NULL, NULL, NULL, NULL);
+
+    rados_close_pool(s->header_pool);
+    rados_close_pool(s->pool);
+    rados_deinitialize();
+}
+
+static int rbd_rw(BlockDriverState *bs, int64_t sector_num,
+                  uint8_t *buf, int nb_sectors, int write)
+{
+    BDRVRBDState *s = bs->opaque;
+    char n[RBD_MAX_SEG_NAME_SIZE];
+
+    int64_t segnr, segoffs, segsize, r;
+    int64_t off, size;
+
+    off = sector_num * BDRV_SECTOR_SIZE;
+    size = nb_sectors * BDRV_SECTOR_SIZE;
+    segnr = off / s->objsize;
+    segoffs = off % s->objsize;
+    segsize = s->objsize - segoffs;
+
+    while (size > 0) {
+        if (size < segsize) {
+            segsize = size;
+        }
+
+        snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s.%012" PRIx64, s->block_name, segnr);
+
+        if (write) {
+            if (s->read_only)
+                return -EROFS;
+            if ((r = rados_write(s->pool, n, segoffs, (const char *)buf,
+                segsize)) < 0) {
+                return r;
+            }
+        } else {
+            r = rados_read(s->pool, n, segoffs, (char *)buf, segsize);
+            if (r == -ENOENT) {
+                memset(buf, 0, segsize);
+            } else if (r < 0) {
+                return r;
+            } else if (r < segsize) {
+                memset(buf + r, 0, segsize - r);
+            }
+        }
+
+        buf += segsize;
+        size -= segsize;
+        segoffs = 0;
+        segsize = s->objsize;
+        segnr++;
+    }
+
+    return 0;
+}
+
+static int rbd_read(BlockDriverState *bs, int64_t sector_num,
+                    uint8_t *buf, int nb_sectors)
+{
+    return rbd_rw(bs, sector_num, buf, nb_sectors, 0);
+}
+
+static int rbd_write(BlockDriverState *bs, int64_t sector_num,
+                     const uint8_t *buf, int nb_sectors)
+{
+    return rbd_rw(bs, sector_num, (uint8_t *) buf, nb_sectors, 1);
+}
+
+static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+    RBDAIOCB *acb = (RBDAIOCB *) blockacb;
+    qemu_bh_delete(acb->bh);
+    acb->bh = NULL;
+    qemu_aio_release(acb);
+}
+
+static AIOPool rbd_aio_pool = {
+    .aiocb_size = sizeof(RBDAIOCB),
+    .cancel = rbd_aio_cancel,
+};
+
+/* This is the callback function for rados_aio_read and _write */
+
+static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
+{
+    RBDAIOCB *acb = rcb->acb;
+    int64_t r;
+    uint64_t buf = 1;
+    int i;
+
+    acb->aiocnt--;
+    r = rados_aio_get_return_value(c);
+    rados_aio_release(c);
+    if (acb->write) {
+        if (r < 0) {
+            acb->ret = r;
+            acb->error = 1;
+        } else if (!acb->error) {
+            acb->ret += rcb->segsize;
+        }
+    } else {
+        if (r == -ENOENT) {
+            memset(rcb->buf, 0, rcb->segsize);
+            if (!acb->error) {
+                acb->ret += rcb->segsize;
+            }
+        } else if (r < 0) {
+            acb->ret = r;
+            acb->error = 1;
+        } else if (r < rcb->segsize) {
+            memset(rcb->buf + r, 0, rcb->segsize - r);
+            if (!acb->error) {
+                acb->ret += rcb->segsize;
+            }
+        } else if (!acb->error) {
+            acb->ret += r;
+        }
+    }
+    if (write(acb->s->efd, &buf, sizeof(buf)) < 0)
+        error_report("failed writing to acb->s->efd\n");
+    qemu_free(rcb);
+    i = 0;
+    if (!acb->aiocnt && acb->bh) {
+        qemu_bh_schedule(acb->bh);
+    }
+}
+
+/* Callback when all queued rados_aio requests are complete */
+
+static void rbd_aio_bh_cb(void *opaque)
+{
+    RBDAIOCB *acb = opaque;
+    uint64_t buf = 1;
+
+    if (!acb->write) {
+        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
+    }
+    qemu_vfree(acb->bounce);
+    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
+    qemu_bh_delete(acb->bh);
+    acb->bh = NULL;
+
+    if (write(acb->s->efd, &buf, sizeof(buf)) < 0)
+        error_report("failed writing to acb->s->efd\n");
+    qemu_aio_release(acb);
+}
+
+static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
+                                           int64_t sector_num,
+                                           QEMUIOVector *qiov,
+                                           int nb_sectors,
+                                           BlockDriverCompletionFunc *cb,
+                                           void *opaque, int write)
+{
+    RBDAIOCB *acb;
+    RADOSCB *rcb;
+    rados_completion_t c;
+    char n[RBD_MAX_SEG_NAME_SIZE];
+    int64_t segnr, segoffs, segsize, last_segnr;
+    int64_t off, size;
+    char *buf;
+
+    BDRVRBDState *s = bs->opaque;
+
+    acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
+    acb->write = write;
+    acb->qiov = qiov;
+    acb->bounce = qemu_blockalign(bs, qiov->size);
+    acb->aiocnt = 0;
+    acb->ret = 0;
+    acb->error = 0;
+    acb->s = s;
+
+    if (!acb->bh) {
+        acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
+    }
+
+    if (write) {
+        qemu_iovec_to_buffer(acb->qiov, acb->bounce);
+    }
+
+    buf = acb->bounce;
+
+    off = sector_num * BDRV_SECTOR_SIZE;
+    size = nb_sectors * BDRV_SECTOR_SIZE;
+    segnr = off / s->objsize;
+    segoffs = off % s->objsize;
+    segsize = s->objsize - segoffs;
+
+    last_segnr = ((off + size - 1) / s->objsize);
+    acb->aiocnt = (last_segnr - segnr) + 1;
+
+    s->qemu_aio_count+=acb->aiocnt + 1; /* All the RADOSCB and the related RBDAIOCB */
+
+    if (write && s->read_only) {
+        acb->ret = -EROFS;
+        return NULL;
+    }
+
+    while (size > 0) {
+        if (size < segsize) {
+            segsize = size;
+        }
+
+        snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s.%012" PRIx64, s->block_name,
+                 segnr);
+
+        rcb = qemu_malloc(sizeof(RADOSCB));
+        rcb->done = 0;
+        rcb->acb = acb;
+        rcb->segsize = segsize;
+        rcb->buf = buf;
+
+        if (write) {
+            rados_aio_create_completion(rcb, NULL,
+                                        (rados_callback_t) rbd_finish_aiocb,
+                                        &c);
+            rados_aio_write(s->pool, n, segoffs, buf, segsize, c);
+        } else {
+            rados_aio_create_completion(rcb,
+                                        (rados_callback_t) rbd_finish_aiocb,
+                                        NULL, &c);
+            rados_aio_read(s->pool, n, segoffs, buf, segsize, c);
+        }
+
+        buf += segsize;
+        size -= segsize;
+        segoffs = 0;
+        segsize = s->objsize;
+        segnr++;
+    }
+
+    return &acb->common;
+}
+
+static BlockDriverAIOCB *rbd_aio_readv(BlockDriverState * bs,
+                                       int64_t sector_num, QEMUIOVector * qiov,
+                                       int nb_sectors,
+                                       BlockDriverCompletionFunc * cb,
+                                       void *opaque)
+{
+    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
+}
+
+static BlockDriverAIOCB *rbd_aio_writev(BlockDriverState * bs,
+                                        int64_t sector_num, QEMUIOVector * qiov,
+                                        int nb_sectors,
+                                        BlockDriverCompletionFunc * cb,
+                                        void *opaque)
+{
+    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
+}
+
+static int rbd_getinfo(BlockDriverState * bs, BlockDriverInfo * bdi)
+{
+    BDRVRBDState *s = bs->opaque;
+    bdi->cluster_size = s->objsize;
+    return 0;
+}
+
+static int64_t rbd_getlength(BlockDriverState * bs)
+{
+    BDRVRBDState *s = bs->opaque;
+
+    return s->size;
+}
+
+static int rbd_snap_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
+{
+    BDRVRBDState *s = bs->opaque;
+    char inbuf[512], outbuf[128];
+    uint64_t snap_id;
+    int r;
+    char *p = inbuf;
+    char *end = inbuf + sizeof(inbuf);
+    char n[RBD_MAX_SEG_NAME_SIZE];
+    char *hbuf = NULL;
+
+    if (sn_info->name[0] == '\0')
+        return -EINVAL; /* we need a name for rbd snapshots */
+
+    /*
+     * rbd snapshots are using the name as the user controlled unique identifier
+     * we can't use the rbd snapid for that purpose, as it can't be set
+     */
+    if (sn_info->id_str[0] != '\0' &&
+        strcmp(sn_info->id_str, sn_info->name) != 0)
+        return -EINVAL;
+
+    if (strlen(sn_info->name) >= sizeof(sn_info->id_str))
+        return -ERANGE;
+
+    r = rados_selfmanaged_snap_create(s->header_pool, &snap_id);
+    if (r < 0) {
+        error_report("failed to create snap id: %s", strerror(-r));
+        return r;
+    }
+
+    *(uint32_t *)p = strlen(sn_info->name);
+    cpu_to_le32s((uint32_t *)p);
+    p += sizeof(uint32_t);
+    strncpy(p, sn_info->name, end - p);
+    p += strlen(p);
+    if (p + sizeof(snap_id) > end) {
+        error_report("invalid input parameter");
+        return -EINVAL;
+    }
+
+    *(uint64_t *)p = snap_id;
+    cpu_to_le64s((uint64_t *)p);
+
+    snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s%s", s->name, RBD_SUFFIX);
+
+    r = rados_exec(s->header_pool, n, "rbd", "snap_add", inbuf,
+                   sizeof(inbuf), outbuf, sizeof(outbuf));
+    if (r < 0) {
+        error_report("rbd.snap_add execution failed failed: %s", strerror(-r));
+        return r;
+    }
+
+    sprintf(sn_info->id_str, "%s", sn_info->name);
+
+    r = rbd_read_header(s, &hbuf);
+    if (r < 0) {
+        error_report("failed reading header: %s", strerror(-r));
+        return r;
+    }
+
+    RbdHeader1 *header;
+
+    header = (RbdHeader1 *) hbuf;
+    r = rbd_set_snapc(s->pool, sn_info->name, header);
+    if (r < 0) {
+        error_report("failed setting snap context: %s", strerror(-r));
+        goto failed;
+    }
+
+    return 0;
+
+failed:
+    if (hbuf)
+        qemu_free(header);
+    return r;
+}
+
+static QEMUOptionParameter rbd_create_options[] = {
+    {
+     .name = BLOCK_OPT_SIZE,
+     .type = OPT_SIZE,
+     .help = "Virtual disk size"
+    },
+    {
+     .name = BLOCK_OPT_CLUSTER_SIZE,
+     .type = OPT_SIZE,
+     .help = "RBD object size"
+    },
+    {NULL}
+};
+
+static BlockDriver bdrv_rbd = {
+    .format_name        = "rbd",
+    .instance_size      = sizeof(BDRVRBDState),
+    .bdrv_file_open     = rbd_open,
+    .bdrv_read          = rbd_read,
+    .bdrv_write         = rbd_write,
+    .bdrv_close         = rbd_close,
+    .bdrv_create        = rbd_create,
+    .bdrv_get_info      = rbd_getinfo,
+    .create_options     = rbd_create_options,
+    .bdrv_getlength     = rbd_getlength,
+    .protocol_name      = "rbd",
+
+    .bdrv_aio_readv     = rbd_aio_readv,
+    .bdrv_aio_writev    = rbd_aio_writev,
+
+    .bdrv_snapshot_create = rbd_snap_create,
+};
+
+static void bdrv_rbd_init(void)
+{
+    bdrv_register(&bdrv_rbd);
+}
+
+block_init(bdrv_rbd_init);
diff --git a/block/rbd_types.h b/block/rbd_types.h
new file mode 100644
index 0000000..c35d840
--- /dev/null
+++ b/block/rbd_types.h
@@ -0,0 +1,71 @@ 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2010 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef CEPH_RBD_TYPES_H
+#define CEPH_RBD_TYPES_H
+
+
+/*
+ * rbd image 'foo' consists of objects
+ *   foo.rbd      - image metadata
+ *   foo.00000000
+ *   foo.00000001
+ *   ...          - data
+ */
+
+#define RBD_SUFFIX              ".rbd"
+#define RBD_DIRECTORY           "rbd_directory"
+#define RBD_INFO                "rbd_info"
+
+#define RBD_DEFAULT_OBJ_ORDER   22   /* 4MB */
+
+#define RBD_MAX_OBJ_NAME_SIZE   96
+#define RBD_MAX_BLOCK_NAME_SIZE 24
+#define RBD_MAX_SEG_NAME_SIZE   128
+
+#define RBD_COMP_NONE           0
+#define RBD_CRYPT_NONE          0
+
+#define RBD_HEADER_TEXT         "<<< Rados Block Device Image >>>\n"
+#define RBD_HEADER_SIGNATURE    "RBD"
+#define RBD_HEADER_VERSION      "001.005"
+
+struct rbd_info {
+    uint64_t max_id;
+} __attribute__ ((packed));
+
+struct rbd_obj_snap_ondisk {
+    uint64_t id;
+    uint64_t image_size;
+} __attribute__((packed));
+
+struct rbd_obj_header_ondisk {
+    char text[40];
+    char block_name[RBD_MAX_BLOCK_NAME_SIZE];
+    char signature[4];
+    char version[8];
+    struct {
+        uint8_t order;
+        uint8_t crypt_type;
+        uint8_t comp_type;
+        uint8_t unused;
+    } __attribute__((packed)) options;
+    uint64_t image_size;
+    uint64_t snap_seq;
+    uint32_t snap_count;
+    uint32_t reserved;
+    uint64_t snap_names_len;
+    struct rbd_obj_snap_ondisk snaps[0];
+} __attribute__((packed));
+
+
+#endif
diff --git a/configure b/configure
index a20371c..3fb7c90 100755
--- a/configure
+++ b/configure
@@ -315,6 +315,7 @@  pkgversion=""
 check_utests="no"
 user_pie="no"
 zero_malloc=""
+rbd=""
 
 # OS specific
 if check_define __linux__ ; then
@@ -709,6 +710,10 @@  for opt do
   ;;
   --*dir)
   ;;
+  --disable-rbd) rbd="no"
+  ;;
+  --enable-rbd) rbd="yes"
+  ;;
   *) echo "ERROR: unknown option $opt"; show_help="yes"
   ;;
   esac
@@ -895,6 +900,7 @@  echo "  --enable-docs            enable documentation build"
 echo "  --disable-docs           disable documentation build"
 echo "  --disable-vhost-net      disable vhost-net acceleration support"
 echo "  --enable-vhost-net       enable vhost-net acceleration support"
+echo "  --enable-rbd             enable building the rados block device (rbd)"
 echo ""
 echo "NOTE: The object files are built at the place where configure is launched"
 exit 1
@@ -1701,6 +1707,27 @@  if test "$mingw32" != yes -a "$pthread" = no; then
 fi
 
 ##########################################
+# rbd probe
+if test "$rbd" != "no" ; then
+  cat > $TMPC <<EOF
+#include <stdio.h>
+#include <rados/librados.h>
+int main(void) { rados_initialize(0, NULL); return 0; }
+EOF
+  rbd_libs="-lrados -lcrypto"
+  if compile_prog "" "$rbd_libs" ; then
+    rbd=yes
+    libs_tools="$rbd_libs $libs_tools"
+    libs_softmmu="$rbd_libs $libs_softmmu"
+  else
+    if test "$rbd" = "yes" ; then
+      feature_not_found "rados block device"
+    fi
+    rbd=no
+  fi
+fi
+
+##########################################
 # linux-aio probe
 
 if test "$linux_aio" != "no" ; then
@@ -2187,6 +2214,7 @@  echo "preadv support    $preadv"
 echo "fdatasync         $fdatasync"
 echo "uuid support      $uuid"
 echo "vhost-net support $vhost_net"
+echo "rbd support       $rbd"
 
 if test $sdl_too_old = "yes"; then
 echo "-> Your SDL version is too old - please upgrade to have SDL support"
@@ -2434,6 +2462,9 @@  echo "CONFIG_UNAME_RELEASE=\"$uname_release\"" >> $config_host_mak
 if test "$zero_malloc" = "yes" ; then
   echo "CONFIG_ZERO_MALLOC=y" >> $config_host_mak
 fi
+if test "$rbd" = "yes" ; then
+  echo "CONFIG_RBD=y" >> $config_host_mak
+fi
 
 # USB host support
 case "$usb" in