diff mbox

[v4,1/3] block: Support Archipelago as a QEMU block backend

Message ID 1403189328-18457-2-git-send-email-cnanakos@grnet.gr
State New
Headers show

Commit Message

Chrysostomos Nanakos June 19, 2014, 2:48 p.m. UTC
VM Image on Archipelago volume is specified like this:

file=archipelago:<volumename>[/mport=<mapperd_port>[:vport=<vlmcd_port>]]

or

file.driver=archipelago,file.volume=<volumename>[,file.mport=<mapperd_port>[,
file.vport=<vlmcd_port>]]

'archipelago' is the protocol.

'mport' is the port number on which mapperd is listening. This is optional
and if not specified, QEMU will make Archipelago to use the default port.

'vport' is the port number on which vlmcd is listening. This is optional
and if not specified, QEMU will make Archipelago to use the default port.

Examples:

file=archipelago:my_vm_volume
file=archipelago:my_vm_volume/mport=123
file=archipelago:my_vm_volume/mport=123:vport=1234

or

file.driver=archipelago,file.volume=my_vm_volume
file.driver=archipelago,file.volume=my_vm_volume,file.mport=123
file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
file.vport=1234

Signed-off-by: Chrysostomos Nanakos <cnanakos@grnet.gr>
---
 MAINTAINERS         |    6 +
 block/Makefile.objs |    2 +
 block/archipelago.c | 1046 +++++++++++++++++++++++++++++++++++++++++++++++++++
 configure           |   40 ++
 4 files changed, 1094 insertions(+)
 create mode 100644 block/archipelago.c

Comments

Stefan Hajnoczi June 20, 2014, 2:33 p.m. UTC | #1
On Thu, Jun 19, 2014 at 05:48:46PM +0300, Chrysostomos Nanakos wrote:
> +typedef struct BDRVArchipelagoState {
> +    int fds[2];
> +    int qemu_aio_count;

This field is never used.  It's increment and decremented but nothing
ever checks the value.  It can be dropped.

> +    int event_reader_pos;
> +    ArchipelagoAIOCB *event_acb;
> +    const char *volname;
> +    uint64_t size;
> +    /* Archipelago specific */
> +    struct xseg *xseg;
> +    struct xseg_port *port;
> +    xport srcport;
> +    xport sport;
> +    xport mportno;
> +    xport vportno;
> +    QemuMutex archip_mutex;
> +    QemuCond archip_cond;
> +    bool is_signaled;
> +    /* Request handler specific */
> +    QemuThread request_th;
> +    QemuCond request_cond;
> +    QemuMutex request_mutex;
> +    bool th_is_signaled;
> +    bool stopping;
> +} BDRVArchipelagoState;
> +
> +typedef struct ArchipelagoSegmentedRequest {
> +    size_t count;
> +    size_t total;
> +    int ref;
> +    int failed;
> +} ArchipelagoSegmentedRequest;
> +
> +typedef struct AIORequestData {
> +    const char *volname;
> +    off_t offset;
> +    size_t size;
> +    uint64_t bufidx;
> +    int ret;
> +    int op;
> +    ArchipelagoAIOCB *aio_cb;
> +    ArchipelagoSegmentedRequest *segreq;
> +} AIORequestData;
> +
> +
> +static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb);
> +
> +static void init_local_signal(struct xseg *xseg, xport sport, xport srcport)
> +{
> +    if (xseg && (sport != srcport)) {
> +        xseg_init_local_signal(xseg, srcport);
> +        sport = srcport;
> +    }
> +}

QEMU should clean up by calling xseg_quit_local_signal().

> +
> +static void archipelago_finish_aiocb(AIORequestData *reqdata)
> +{
> +    int ret;
> +    ret = qemu_archipelago_signal_pipe(reqdata->aio_cb);
> +    if (ret < 0) {
> +        error_report("archipelago_finish_aiocb(): failed writing"
> +                     " aio_cb->s->fds");
> +    }
> +    g_free(reqdata);
> +}
> +
> +static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port *port,
> +                      struct xseg_request *expected_req)
> +{
> +    struct xseg_request *req;
> +    xseg_prepare_wait(xseg, srcport);
> +    void *psd = xseg_get_signal_desc(xseg, port);
> +    while (1) {
> +        req = xseg_receive(xseg, srcport, 0);
> +        if (req) {
> +            if (req != expected_req) {
> +                archipelagolog("Unknown received request\n");
> +                xseg_put_request(xseg, req, srcport);
> +            } else if (!(req->state & XS_SERVED)) {
> +                archipelagolog("Failed req\n");
> +                return -1;
> +            } else {
> +                break;
> +            }
> +        }
> +        xseg_wait_signal(xseg, psd, 100000UL);
> +    }
> +    xseg_cancel_wait(xseg, srcport);
> +    return 0;
> +}
> +
> +static void xseg_request_handler(void *state)
> +{

This thread is only necessary because you're not integrating xseg into
the QEMU event loop.  If you got the pipe fds from xseg and used
aio_set_fd_handler() you could eliminate this thread.  The advantage is
that you can skip the archipelago_finish_aiocb() and get slightly better
performance due to one less context switch between threads.

> +    BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
> +    void *psd = xseg_get_signal_desc(s->xseg, s->port);
> +    qemu_mutex_lock(&s->request_mutex);
> +
> +    while (!s->stopping) {
> +        struct xseg_request *req;
> +        char *data;
> +        xseg_prepare_wait(s->xseg, s->srcport);
> +        req = xseg_receive(s->xseg, s->srcport, 0);
> +        if (req) {
> +            AIORequestData *reqdata;
> +            ArchipelagoSegmentedRequest *segreq;
> +            xseg_get_req_data(s->xseg, req, (void **)&reqdata);
> +
> +            if (!(req->state & XS_SERVED)) {
> +                    segreq = reqdata->segreq;
> +                    __sync_bool_compare_and_swap(&segreq->failed, 0, 1);
> +            }
> +
> +            switch (reqdata->op) {
> +            case ARCHIP_OP_READ:
> +                    data = xseg_get_data(s->xseg, req);
> +                    segreq = reqdata->segreq;
> +                    segreq->count += req->serviced;
> +
> +                    qemu_iovec_from_buf(reqdata->aio_cb->qiov, reqdata->bufidx,
> +                            data,
> +                            req->serviced);
> +
> +                    xseg_put_request(s->xseg, req, s->srcport);
> +
> +                    __sync_add_and_fetch(&segreq->ref, -1);
> +
> +                    if (segreq->ref == 0) {

Not sure about the value of __sync_add_and_fetch() since the if
statement fetches segreq->ref again.  But I'm not reviewing the details
of the shared memory accesses.  I'm assuming this stuff is correct,
secure, etc.

> +                        if (!segreq->failed) {
> +                            reqdata->aio_cb->ret = segreq->count;
> +                            archipelago_finish_aiocb(reqdata);
> +                        }

What does segreq->failed mean?  We should always finish the I/O request,
otherwise the upper layers will run out of resources as we leak
failed requests.

> +static void parse_filename_opts(const char *filename, Error **errp,
> +                                char **volume, xport *mport, xport *vport)
> +{
> +    const char *start;
> +    char *tokens[3], *ds;
> +    int idx;
> +    xport lmport = NoPort, lvport = NoPort;
> +
> +    strstart(filename, "archipelago:", &start);
> +
> +    ds = g_strdup(start);
> +    tokens[0] = strtok(ds, "/");
> +    tokens[1] = strtok(NULL, ":");
> +    tokens[2] = strtok(NULL, "\0");
> +
> +    if (!strlen(tokens[0])) {
> +        error_setg(errp, "volume name must be specified first");
> +        return;

ds is leaked.

> +    }
> +
> +    for (idx = 1; idx < 3; idx++) {
> +        if (tokens[idx] != NULL) {
> +            if (strstart(tokens[idx], "mport=", NULL)) {
> +                xseg_find_port(tokens[idx], "mport=", &lmport);
> +            }
> +            if (strstart(tokens[idx], "vport=", NULL)) {
> +                xseg_find_port(tokens[idx], "vport=", &lvport);
> +            }
> +        }
> +    }
> +
> +    if ((lmport == (xport) -2) || (lvport == (xport) -2)) {
> +        error_setg(errp, "Usage: file=archipelago:"
> +                   "<volumename>[/mport=<mapperd_port>"
> +                   "[:vport=<vlmcd_port>]]");

ds is leaked.

> +        return;
> +    }
> +    *volume = g_strdup(tokens[0]);
> +    *mport = lmport;
> +    *vport = lvport;
> +    g_free(ds);
> +}
> +
> +static void archipelago_parse_filename(const char *filename, QDict *options,
> +                                       Error **errp)
> +{
> +    const char *start;
> +    char *volume = NULL;
> +    xport mport = NoPort, vport = NoPort;
> +
> +    if (qdict_haskey(options, ARCHIPELAGO_OPT_VOLUME)
> +            || qdict_haskey(options, ARCHIPELAGO_OPT_MPORT)
> +            || qdict_haskey(options, ARCHIPELAGO_OPT_VPORT)) {
> +        error_setg(errp, "volume/mport/vport and a file name may not be "
> +                         "specified at the same time");
> +        return;
> +    }
> +
> +    if (!strstart(filename, "archipelago:", &start)) {
> +        error_setg(errp, "File name must start with 'archipelago:'");
> +        return;
> +    }
> +
> +    if (!strlen(start) || strstart(start, "/", NULL)) {
> +        error_setg(errp, "volume name must be specified");
> +        return;
> +    }
> +
> +    parse_filename_opts(filename, errp, &volume, &mport, &vport);
> +
> +    if (volume) {
> +        qdict_put(options, ARCHIPELAGO_OPT_VOLUME, qstring_from_str(volume));
> +        g_free(volume);
> +    }
> +    if (mport != NoPort) {
> +        qdict_put(options, ARCHIPELAGO_OPT_MPORT, qint_from_int(mport));
> +    }
> +    if (vport != NoPort) {
> +        qdict_put(options, ARCHIPELAGO_OPT_VPORT, qint_from_int(vport));
> +    }
> +}
> +
> +static QemuOptsList archipelago_runtime_opts = {
> +    .name = "archipelago",
> +    .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
> +    .desc = {
> +        {
> +            .name = ARCHIPELAGO_OPT_VOLUME,
> +            .type = QEMU_OPT_STRING,
> +            .help = "Name of the volume image",
> +        },
> +        {
> +            .name = ARCHIPELAGO_OPT_MPORT,
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "Archipelago mapperd port number"
> +        },
> +        {
> +            .name = ARCHIPELAGO_OPT_VPORT,
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "Archipelago vlmcd port number"
> +
> +        },
> +        { /* end of list */ }
> +    },
> +};
> +
> +static int qemu_archipelago_open(BlockDriverState *bs,
> +                                 QDict *options,
> +                                 int bdrv_flags,
> +                                 Error **errp)
> +{
> +    int ret = 0;
> +    const char *volume;
> +    QemuOpts *opts;
> +    Error *local_err = NULL;
> +    BDRVArchipelagoState *s = bs->opaque;
> +
> +    opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, &error_abort);
> +    qemu_opts_absorb_qdict(opts, options, &local_err);
> +    if (local_err) {
> +        error_propagate(errp, local_err);
> +        qemu_opts_del(opts);
> +        return -EINVAL;
> +    }
> +
> +    s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT, 1001);
> +    s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT, 501);
> +
> +    volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
> +    if (volume == NULL) {
> +        error_setg(errp, "archipelago block driver requires an 'volume'"
> +                   " options");

"archipelago block driver requires the 'volume' option"

> +        error_propagate(errp, local_err);

This line is unnecessary since the error message was already put into
errp.

> +        qemu_opts_del(opts);
> +        return -EINVAL;
> +    }
> +    s->volname = g_strdup(volume);
> +
> +    /* Initialize XSEG, join shared memory segment */
> +    ret = qemu_archipelago_init(s);
> +    if (ret < 0) {
> +        error_setg(errp, "cannot initialize XSEG and join shared "
> +                   "memory segment");
> +        goto err_exit;
> +    }
> +
> +    s->event_reader_pos = 0;
> +    ret = qemu_pipe(s->fds);
> +    if (ret < 0) {
> +        error_setg(errp, "cannot create pipe");
> +        goto err_exit;

Do we need to xseg_leave() to avoid leaking xseg refcounts, leaving
memory mapped, and memory leaks?

> +    }
> +
> +    fcntl(s->fds[ARCHIP_FD_READ], F_SETFL, O_NONBLOCK);
> +    fcntl(s->fds[ARCHIP_FD_WRITE], F_SETFL, O_NONBLOCK);
> +    qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ],
> +                            qemu_archipelago_aio_event_reader, NULL,
> +                            s);
> +
> +    qemu_opts_del(opts);
> +    return 0;
> +
> +err_exit:
> +    qemu_opts_del(opts);
> +    return ret;

s->volname is leaked

> +}
> +
> +static void qemu_archipelago_close(BlockDriverState *bs)
> +{
> +    int r, targetlen;
> +    char *target;
> +    struct xseg_request *req;
> +    BDRVArchipelagoState *s = bs->opaque;
> +
> +    qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ], NULL, NULL, NULL);
> +    close(s->fds[0]);
> +    close(s->fds[1]);
> +
> +    s->stopping = true;
> +
> +    qemu_mutex_lock(&s->request_mutex);
> +    while (!s->th_is_signaled) {
> +        qemu_cond_wait(&s->request_cond,
> +                       &s->request_mutex);
> +    }
> +    qemu_mutex_unlock(&s->request_mutex);
> +    qemu_cond_destroy(&s->request_cond);
> +    qemu_mutex_destroy(&s->request_mutex);

It's not safe to qemu_mutex_destroy() because the other thread may still
be inside qemu_mutex_unlock(&s->request_mutex) and may still access
s->request_mutex memory.

Use qemu_thread_join() before destroying request_cond and request_mutex.
That way you can be sure there is no race condition.

(I recently did the same thing and Paolo Bonzini pointed out the bug.
After checking the glibc implementation I was convinced that it's not
safe.)

> +
> +    qemu_cond_destroy(&s->archip_cond);
> +    qemu_mutex_destroy(&s->archip_mutex);
> +
> +    targetlen = strlen(s->volname);
> +    req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
> +    if (!req) {
> +        archipelagolog("Cannot get XSEG request\n");
> +        goto err_exit;
> +    }
> +    r = xseg_prep_request(s->xseg, req, targetlen, 0);
> +    if (r < 0) {
> +        xseg_put_request(s->xseg, req, s->srcport);
> +        archipelagolog("Cannot prepare XSEG close request\n");
> +        goto err_exit;
> +    }
> +
> +    target = xseg_get_target(s->xseg, req);
> +    strncpy(target, s->volname, targetlen);

Using strncpy() hints that target is a string when in fact it's not.  I
think memcpy() would be clearer here since you don't want a '\0' byte at
the end of the string.

Or maybe I'm wrong and there is some guarantee that there will be a '\0'
byte after target?

> +    req->size = req->datalen;
> +    req->offset = 0;
> +    req->op = X_CLOSE;
> +
> +    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
> +    if (p == NoPort) {
> +        xseg_put_request(s->xseg, req, s->srcport);
> +        archipelagolog("Cannot submit XSEG close request\n");
> +        goto err_exit;
> +    }
> +
> +    xseg_signal(s->xseg, p);
> +    r = wait_reply(s->xseg, s->srcport, s->port, req);
> +    if (r < 0) {
> +        archipelagolog("wait_reply() error\n");
> +    }
> +    if (!(req->state & XS_SERVED)) {
> +        archipelagolog("Could no close map for volume '%s'\n", s->volname);
> +    }
> +
> +    xseg_put_request(s->xseg, req, s->srcport);
> +
> +err_exit:
> +    xseg_leave_dynport(s->xseg, s->port);
> +    xseg_leave(s->xseg);

s->volname is leaked.

> +}
> +
> +static void qemu_archipelago_aio_cancel(BlockDriverAIOCB *blockacb)
> +{
> +    ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) blockacb;
> +    aio_cb->cancelled = true;
> +    while (aio_cb->status == -EINPROGRESS) {
> +        qemu_aio_wait();
> +    }
> +    qemu_aio_release(aio_cb);
> +}
> +
> +static const AIOCBInfo archipelago_aiocb_info = {
> +    .aiocb_size = sizeof(ArchipelagoAIOCB),
> +    .cancel = qemu_archipelago_aio_cancel,
> +};
> +
> +static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb)
> +{
> +    int ret = 0;
> +    while (1) {
> +        fd_set wfd;
> +        int fd = aio_cb->s->fds[1];
> +
> +        ret = write(fd, (void *)&aio_cb, sizeof(aio_cb));
> +        if (ret > 0) {
> +            break;
> +        }
> +        if (errno == EINTR) {
> +            continue;
> +        }
> +        if (errno != EAGAIN) {
> +            break;
> +        }
> +        FD_ZERO(&wfd);
> +        FD_SET(fd, &wfd);
> +        do {
> +            ret = select(fd + 1, NULL, &wfd, NULL, NULL);
> +        } while (ret < 0 && errno == EINTR);
> +    }
> +    return ret;
> +}

A newer signalling approach is available and will let you drop the pipe
code.  QEMUBH is a "bottom half" or deferred function call that can be
scheduled in an event loop.  Scheduling the the QEMUBH is thread-safe so
you can perform it from any thread.

See block/gluster.c:gluster_finish_aiocb() for an example using QEMUBH.

> +static int64_t archipelago_volume_info(BDRVArchipelagoState *s)
> +{
> +    uint64_t size;
> +    int ret, targetlen;
> +    struct xseg_request *req;
> +    struct xseg_reply_info *xinfo;
> +    AIORequestData *reqdata = g_malloc(sizeof(AIORequestData));
> +
> +    if (!reqdata) {
> +        archipelagolog("Cannot allocate reqdata\n");
> +        return -1;

g_malloc() never returns NULL, this if statement can be dropped.
Chrysostomos Nanakos June 23, 2014, 8:17 a.m. UTC | #2
On 06/20/2014 05:33 PM, Stefan Hajnoczi wrote:
> On Thu, Jun 19, 2014 at 05:48:46PM +0300, Chrysostomos Nanakos wrote:
>> +typedef struct BDRVArchipelagoState {
>> +    int fds[2];
>> +    int qemu_aio_count;
> This field is never used.  It's increment and decremented but nothing
> ever checks the value.  It can be dropped.
>
>> +    int event_reader_pos;
>> +    ArchipelagoAIOCB *event_acb;
>> +    const char *volname;
>> +    uint64_t size;
>> +    /* Archipelago specific */
>> +    struct xseg *xseg;
>> +    struct xseg_port *port;
>> +    xport srcport;
>> +    xport sport;
>> +    xport mportno;
>> +    xport vportno;
>> +    QemuMutex archip_mutex;
>> +    QemuCond archip_cond;
>> +    bool is_signaled;
>> +    /* Request handler specific */
>> +    QemuThread request_th;
>> +    QemuCond request_cond;
>> +    QemuMutex request_mutex;
>> +    bool th_is_signaled;
>> +    bool stopping;
>> +} BDRVArchipelagoState;
>> +
>> +typedef struct ArchipelagoSegmentedRequest {
>> +    size_t count;
>> +    size_t total;
>> +    int ref;
>> +    int failed;
>> +} ArchipelagoSegmentedRequest;
>> +
>> +typedef struct AIORequestData {
>> +    const char *volname;
>> +    off_t offset;
>> +    size_t size;
>> +    uint64_t bufidx;
>> +    int ret;
>> +    int op;
>> +    ArchipelagoAIOCB *aio_cb;
>> +    ArchipelagoSegmentedRequest *segreq;
>> +} AIORequestData;
>> +
>> +
>> +static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb);
>> +
>> +static void init_local_signal(struct xseg *xseg, xport sport, xport srcport)
>> +{
>> +    if (xseg && (sport != srcport)) {
>> +        xseg_init_local_signal(xseg, srcport);
>> +        sport = srcport;
>> +    }
>> +}
> QEMU should clean up by calling xseg_quit_local_signal().
>
>> +
>> +static void archipelago_finish_aiocb(AIORequestData *reqdata)
>> +{
>> +    int ret;
>> +    ret = qemu_archipelago_signal_pipe(reqdata->aio_cb);
>> +    if (ret < 0) {
>> +        error_report("archipelago_finish_aiocb(): failed writing"
>> +                     " aio_cb->s->fds");
>> +    }
>> +    g_free(reqdata);
>> +}
>> +
>> +static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port *port,
>> +                      struct xseg_request *expected_req)
>> +{
>> +    struct xseg_request *req;
>> +    xseg_prepare_wait(xseg, srcport);
>> +    void *psd = xseg_get_signal_desc(xseg, port);
>> +    while (1) {
>> +        req = xseg_receive(xseg, srcport, 0);
>> +        if (req) {
>> +            if (req != expected_req) {
>> +                archipelagolog("Unknown received request\n");
>> +                xseg_put_request(xseg, req, srcport);
>> +            } else if (!(req->state & XS_SERVED)) {
>> +                archipelagolog("Failed req\n");
>> +                return -1;
>> +            } else {
>> +                break;
>> +            }
>> +        }
>> +        xseg_wait_signal(xseg, psd, 100000UL);
>> +    }
>> +    xseg_cancel_wait(xseg, srcport);
>> +    return 0;
>> +}
>> +
>> +static void xseg_request_handler(void *state)
>> +{
> This thread is only necessary because you're not integrating xseg into
> the QEMU event loop.  If you got the pipe fds from xseg and used
> aio_set_fd_handler() you could eliminate this thread.  The advantage is
> that you can skip the archipelago_finish_aiocb() and get slightly better
> performance due to one less context switch between threads.
>
>> +    BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
>> +    void *psd = xseg_get_signal_desc(s->xseg, s->port);
>> +    qemu_mutex_lock(&s->request_mutex);
>> +
>> +    while (!s->stopping) {
>> +        struct xseg_request *req;
>> +        char *data;
>> +        xseg_prepare_wait(s->xseg, s->srcport);
>> +        req = xseg_receive(s->xseg, s->srcport, 0);
>> +        if (req) {
>> +            AIORequestData *reqdata;
>> +            ArchipelagoSegmentedRequest *segreq;
>> +            xseg_get_req_data(s->xseg, req, (void **)&reqdata);
>> +
>> +            if (!(req->state & XS_SERVED)) {
>> +                    segreq = reqdata->segreq;
>> +                    __sync_bool_compare_and_swap(&segreq->failed, 0, 1);
>> +            }
>> +
>> +            switch (reqdata->op) {
>> +            case ARCHIP_OP_READ:
>> +                    data = xseg_get_data(s->xseg, req);
>> +                    segreq = reqdata->segreq;
>> +                    segreq->count += req->serviced;
>> +
>> +                    qemu_iovec_from_buf(reqdata->aio_cb->qiov, reqdata->bufidx,
>> +                            data,
>> +                            req->serviced);
>> +
>> +                    xseg_put_request(s->xseg, req, s->srcport);
>> +
>> +                    __sync_add_and_fetch(&segreq->ref, -1);
>> +
>> +                    if (segreq->ref == 0) {
> Not sure about the value of __sync_add_and_fetch() since the if
> statement fetches segreq->ref again.  But I'm not reviewing the details
> of the shared memory accesses.  I'm assuming this stuff is correct,
> secure, etc.

Yes you are right, IMHO a better and safer approach is:

if ((__sync_add_and_fetch(&segreq->ref, -1)) == 0) {
...

>
>> +                        if (!segreq->failed) {
>> +                            reqdata->aio_cb->ret = segreq->count;
>> +                            archipelago_finish_aiocb(reqdata);
>> +                        }
> What does segreq->failed mean?  We should always finish the I/O request,
> otherwise the upper layers will run out of resources as we leak
> failed requests.

Yes you are right.
If a request fails while submitting it to Archipelago 
archipelago_aio_segmented_rw()
will return -EIO to qemu_archipelago_aio_rw() which will return NULL to 
.bdrv_aio_readv/_write(). Now if all requests to Archipelago have 
succeeded in submission and one or all of them haven't been serviced 
(partial read/write) from Archipelago, archipelago_finish_aiocb() will 
fail the request. The last one wasn't implemented in this patch, v5 
series has the appropriate changes.

Is this a proper and accepted approach along with the removal of the 
pipe code and the introduction of the QEMU "bottom-half" scheduled in 
archipelago_finish_aiocb()?



>
>> +static void parse_filename_opts(const char *filename, Error **errp,
>> +                                char **volume, xport *mport, xport *vport)
>> +{
>> +    const char *start;
>> +    char *tokens[3], *ds;
>> +    int idx;
>> +    xport lmport = NoPort, lvport = NoPort;
>> +
>> +    strstart(filename, "archipelago:", &start);
>> +
>> +    ds = g_strdup(start);
>> +    tokens[0] = strtok(ds, "/");
>> +    tokens[1] = strtok(NULL, ":");
>> +    tokens[2] = strtok(NULL, "\0");
>> +
>> +    if (!strlen(tokens[0])) {
>> +        error_setg(errp, "volume name must be specified first");
>> +        return;
> ds is leaked.
>
>> +    }
>> +
>> +    for (idx = 1; idx < 3; idx++) {
>> +        if (tokens[idx] != NULL) {
>> +            if (strstart(tokens[idx], "mport=", NULL)) {
>> +                xseg_find_port(tokens[idx], "mport=", &lmport);
>> +            }
>> +            if (strstart(tokens[idx], "vport=", NULL)) {
>> +                xseg_find_port(tokens[idx], "vport=", &lvport);
>> +            }
>> +        }
>> +    }
>> +
>> +    if ((lmport == (xport) -2) || (lvport == (xport) -2)) {
>> +        error_setg(errp, "Usage: file=archipelago:"
>> +                   "<volumename>[/mport=<mapperd_port>"
>> +                   "[:vport=<vlmcd_port>]]");
> ds is leaked.
>
>> +        return;
>> +    }
>> +    *volume = g_strdup(tokens[0]);
>> +    *mport = lmport;
>> +    *vport = lvport;
>> +    g_free(ds);
>> +}
>> +
>> +static void archipelago_parse_filename(const char *filename, QDict *options,
>> +                                       Error **errp)
>> +{
>> +    const char *start;
>> +    char *volume = NULL;
>> +    xport mport = NoPort, vport = NoPort;
>> +
>> +    if (qdict_haskey(options, ARCHIPELAGO_OPT_VOLUME)
>> +            || qdict_haskey(options, ARCHIPELAGO_OPT_MPORT)
>> +            || qdict_haskey(options, ARCHIPELAGO_OPT_VPORT)) {
>> +        error_setg(errp, "volume/mport/vport and a file name may not be "
>> +                         "specified at the same time");
>> +        return;
>> +    }
>> +
>> +    if (!strstart(filename, "archipelago:", &start)) {
>> +        error_setg(errp, "File name must start with 'archipelago:'");
>> +        return;
>> +    }
>> +
>> +    if (!strlen(start) || strstart(start, "/", NULL)) {
>> +        error_setg(errp, "volume name must be specified");
>> +        return;
>> +    }
>> +
>> +    parse_filename_opts(filename, errp, &volume, &mport, &vport);
>> +
>> +    if (volume) {
>> +        qdict_put(options, ARCHIPELAGO_OPT_VOLUME, qstring_from_str(volume));
>> +        g_free(volume);
>> +    }
>> +    if (mport != NoPort) {
>> +        qdict_put(options, ARCHIPELAGO_OPT_MPORT, qint_from_int(mport));
>> +    }
>> +    if (vport != NoPort) {
>> +        qdict_put(options, ARCHIPELAGO_OPT_VPORT, qint_from_int(vport));
>> +    }
>> +}
>> +
>> +static QemuOptsList archipelago_runtime_opts = {
>> +    .name = "archipelago",
>> +    .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
>> +    .desc = {
>> +        {
>> +            .name = ARCHIPELAGO_OPT_VOLUME,
>> +            .type = QEMU_OPT_STRING,
>> +            .help = "Name of the volume image",
>> +        },
>> +        {
>> +            .name = ARCHIPELAGO_OPT_MPORT,
>> +            .type = QEMU_OPT_NUMBER,
>> +            .help = "Archipelago mapperd port number"
>> +        },
>> +        {
>> +            .name = ARCHIPELAGO_OPT_VPORT,
>> +            .type = QEMU_OPT_NUMBER,
>> +            .help = "Archipelago vlmcd port number"
>> +
>> +        },
>> +        { /* end of list */ }
>> +    },
>> +};
>> +
>> +static int qemu_archipelago_open(BlockDriverState *bs,
>> +                                 QDict *options,
>> +                                 int bdrv_flags,
>> +                                 Error **errp)
>> +{
>> +    int ret = 0;
>> +    const char *volume;
>> +    QemuOpts *opts;
>> +    Error *local_err = NULL;
>> +    BDRVArchipelagoState *s = bs->opaque;
>> +
>> +    opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, &error_abort);
>> +    qemu_opts_absorb_qdict(opts, options, &local_err);
>> +    if (local_err) {
>> +        error_propagate(errp, local_err);
>> +        qemu_opts_del(opts);
>> +        return -EINVAL;
>> +    }
>> +
>> +    s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT, 1001);
>> +    s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT, 501);
>> +
>> +    volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
>> +    if (volume == NULL) {
>> +        error_setg(errp, "archipelago block driver requires an 'volume'"
>> +                   " options");
> "archipelago block driver requires the 'volume' option"
>
>> +        error_propagate(errp, local_err);
> This line is unnecessary since the error message was already put into
> errp.
>
>> +        qemu_opts_del(opts);
>> +        return -EINVAL;
>> +    }
>> +    s->volname = g_strdup(volume);
>> +
>> +    /* Initialize XSEG, join shared memory segment */
>> +    ret = qemu_archipelago_init(s);
>> +    if (ret < 0) {
>> +        error_setg(errp, "cannot initialize XSEG and join shared "
>> +                   "memory segment");
>> +        goto err_exit;
>> +    }
>> +
>> +    s->event_reader_pos = 0;
>> +    ret = qemu_pipe(s->fds);
>> +    if (ret < 0) {
>> +        error_setg(errp, "cannot create pipe");
>> +        goto err_exit;
> Do we need to xseg_leave() to avoid leaking xseg refcounts, leaving
> memory mapped, and memory leaks?

Removed qemu_pipe() call so we do not need to call xseg_leave() anymore.

>
>> +    }
>> +
>> +    fcntl(s->fds[ARCHIP_FD_READ], F_SETFL, O_NONBLOCK);
>> +    fcntl(s->fds[ARCHIP_FD_WRITE], F_SETFL, O_NONBLOCK);
>> +    qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ],
>> +                            qemu_archipelago_aio_event_reader, NULL,
>> +                            s);
>> +
>> +    qemu_opts_del(opts);
>> +    return 0;
>> +
>> +err_exit:
>> +    qemu_opts_del(opts);
>> +    return ret;
> s->volname is leaked
>
>> +}
>> +
>> +static void qemu_archipelago_close(BlockDriverState *bs)
>> +{
>> +    int r, targetlen;
>> +    char *target;
>> +    struct xseg_request *req;
>> +    BDRVArchipelagoState *s = bs->opaque;
>> +
>> +    qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ], NULL, NULL, NULL);
>> +    close(s->fds[0]);
>> +    close(s->fds[1]);
>> +
>> +    s->stopping = true;
>> +
>> +    qemu_mutex_lock(&s->request_mutex);
>> +    while (!s->th_is_signaled) {
>> +        qemu_cond_wait(&s->request_cond,
>> +                       &s->request_mutex);
>> +    }
>> +    qemu_mutex_unlock(&s->request_mutex);
>> +    qemu_cond_destroy(&s->request_cond);
>> +    qemu_mutex_destroy(&s->request_mutex);
> It's not safe to qemu_mutex_destroy() because the other thread may still
> be inside qemu_mutex_unlock(&s->request_mutex) and may still access
> s->request_mutex memory.
>
> Use qemu_thread_join() before destroying request_cond and request_mutex.
> That way you can be sure there is no race condition.
>
> (I recently did the same thing and Paolo Bonzini pointed out the bug.
> After checking the glibc implementation I was convinced that it's not
> safe.)

Got it! Paolo was absolutely right! Thanks for sharing!

>> +
>> +    qemu_cond_destroy(&s->archip_cond);
>> +    qemu_mutex_destroy(&s->archip_mutex);
>> +
>> +    targetlen = strlen(s->volname);
>> +    req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
>> +    if (!req) {
>> +        archipelagolog("Cannot get XSEG request\n");
>> +        goto err_exit;
>> +    }
>> +    r = xseg_prep_request(s->xseg, req, targetlen, 0);
>> +    if (r < 0) {
>> +        xseg_put_request(s->xseg, req, s->srcport);
>> +        archipelagolog("Cannot prepare XSEG close request\n");
>> +        goto err_exit;
>> +    }
>> +
>> +    target = xseg_get_target(s->xseg, req);
>> +    strncpy(target, s->volname, targetlen);
> Using strncpy() hints that target is a string when in fact it's not.  I
> think memcpy() would be clearer here since you don't want a '\0' byte at
> the end of the string.
>
> Or maybe I'm wrong and there is some guarantee that there will be a '\0'
> byte after target?

No you are not wrong, memcpy() is clearer, fixed for v5 series.

>> +    req->size = req->datalen;
>> +    req->offset = 0;
>> +    req->op = X_CLOSE;
>> +
>> +    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
>> +    if (p == NoPort) {
>> +        xseg_put_request(s->xseg, req, s->srcport);
>> +        archipelagolog("Cannot submit XSEG close request\n");
>> +        goto err_exit;
>> +    }
>> +
>> +    xseg_signal(s->xseg, p);
>> +    r = wait_reply(s->xseg, s->srcport, s->port, req);
>> +    if (r < 0) {
>> +        archipelagolog("wait_reply() error\n");
>> +    }
>> +    if (!(req->state & XS_SERVED)) {
>> +        archipelagolog("Could no close map for volume '%s'\n", s->volname);
>> +    }
>> +
>> +    xseg_put_request(s->xseg, req, s->srcport);
>> +
>> +err_exit:
>> +    xseg_leave_dynport(s->xseg, s->port);
>> +    xseg_leave(s->xseg);
> s->volname is leaked.
>
>> +}
>> +
>> +static void qemu_archipelago_aio_cancel(BlockDriverAIOCB *blockacb)
>> +{
>> +    ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) blockacb;
>> +    aio_cb->cancelled = true;
>> +    while (aio_cb->status == -EINPROGRESS) {
>> +        qemu_aio_wait();
>> +    }
>> +    qemu_aio_release(aio_cb);
>> +}
>> +
>> +static const AIOCBInfo archipelago_aiocb_info = {
>> +    .aiocb_size = sizeof(ArchipelagoAIOCB),
>> +    .cancel = qemu_archipelago_aio_cancel,
>> +};
>> +
>> +static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb)
>> +{
>> +    int ret = 0;
>> +    while (1) {
>> +        fd_set wfd;
>> +        int fd = aio_cb->s->fds[1];
>> +
>> +        ret = write(fd, (void *)&aio_cb, sizeof(aio_cb));
>> +        if (ret > 0) {
>> +            break;
>> +        }
>> +        if (errno == EINTR) {
>> +            continue;
>> +        }
>> +        if (errno != EAGAIN) {
>> +            break;
>> +        }
>> +        FD_ZERO(&wfd);
>> +        FD_SET(fd, &wfd);
>> +        do {
>> +            ret = select(fd + 1, NULL, &wfd, NULL, NULL);
>> +        } while (ret < 0 && errno == EINTR);
>> +    }
>> +    return ret;
>> +}
> A newer signalling approach is available and will let you drop the pipe
> code.  QEMUBH is a "bottom half" or deferred function call that can be
> scheduled in an event loop.  Scheduling the the QEMUBH is thread-safe so
> you can perform it from any thread.

>
> See block/gluster.c:gluster_finish_aiocb() for an example using QEMUBH.
>
>> +static int64_t archipelago_volume_info(BDRVArchipelagoState *s)
>> +{
>> +    uint64_t size;
>> +    int ret, targetlen;
>> +    struct xseg_request *req;
>> +    struct xseg_reply_info *xinfo;
>> +    AIORequestData *reqdata = g_malloc(sizeof(AIORequestData));
>> +
>> +    if (!reqdata) {
>> +        archipelagolog("Cannot allocate reqdata\n");
>> +        return -1;
> g_malloc() never returns NULL, this if statement can be dropped.
Stefan Hajnoczi June 23, 2014, 8:31 a.m. UTC | #3
On Mon, Jun 23, 2014 at 11:17:16AM +0300, Chrysostomos Nanakos wrote:
> On 06/20/2014 05:33 PM, Stefan Hajnoczi wrote:
> >On Thu, Jun 19, 2014 at 05:48:46PM +0300, Chrysostomos Nanakos wrote:
> >
> >>+                        if (!segreq->failed) {
> >>+                            reqdata->aio_cb->ret = segreq->count;
> >>+                            archipelago_finish_aiocb(reqdata);
> >>+                        }
> >What does segreq->failed mean?  We should always finish the I/O request,
> >otherwise the upper layers will run out of resources as we leak
> >failed requests.
> 
> Yes you are right.
> If a request fails while submitting it to Archipelago
> archipelago_aio_segmented_rw()
> will return -EIO to qemu_archipelago_aio_rw() which will return NULL to
> .bdrv_aio_readv/_write(). Now if all requests to Archipelago have succeeded
> in submission and one or all of them haven't been serviced (partial
> read/write) from Archipelago, archipelago_finish_aiocb() will fail the
> request. The last one wasn't implemented in this patch, v5 series has the
> appropriate changes.
> 
> Is this a proper and accepted approach along with the removal of the pipe
> code and the introduction of the QEMU "bottom-half" scheduled in
> archipelago_finish_aiocb()?

Sounds good.

Please also take a look at
http://wiki.qemu.org/Documentation/QemuIoTests

This is the test suite we use for QEMU block drivers.  It includes many
basic I/O tests and also some specialized tests for advanced features
like snapshotting.

For an example of how to add support, take a look at the commit which
added NFS support:
http://git.qemu.org/?p=qemu.git;a=commitdiff;h=170632dbc9f75217861dd8bf2e6da3c269a1ba18

Please do something similar so you can run the test suite like this:

  cd tests/qemu-iotests
  ./check -archipelago

Stefan
diff mbox

Patch

diff --git a/MAINTAINERS b/MAINTAINERS
index d1a3405..db3c16e 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -994,3 +994,9 @@  SSH
 M: Richard W.M. Jones <rjones@redhat.com>
 S: Supported
 F: block/ssh.c
+
+ARCHIPELAGO
+M: Chrysostomos Nanakos <cnanakos@grnet.gr>
+M: Chrysostomos Nanakos <chris@include.gr>
+S: Maintained
+F: block/archipelago.c
diff --git a/block/Makefile.objs b/block/Makefile.objs
index fd88c03..858d2b3 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -17,6 +17,7 @@  block-obj-$(CONFIG_LIBNFS) += nfs.o
 block-obj-$(CONFIG_CURL) += curl.o
 block-obj-$(CONFIG_RBD) += rbd.o
 block-obj-$(CONFIG_GLUSTERFS) += gluster.o
+block-obj-$(CONFIG_ARCHIPELAGO) += archipelago.o
 block-obj-$(CONFIG_LIBSSH2) += ssh.o
 endif
 
@@ -35,5 +36,6 @@  gluster.o-cflags   := $(GLUSTERFS_CFLAGS)
 gluster.o-libs     := $(GLUSTERFS_LIBS)
 ssh.o-cflags       := $(LIBSSH2_CFLAGS)
 ssh.o-libs         := $(LIBSSH2_LIBS)
+archipelago.o-libs := $(ARCHIPELAGO_LIBS)
 qcow.o-libs        := -lz
 linux-aio.o-libs   := -laio
diff --git a/block/archipelago.c b/block/archipelago.c
new file mode 100644
index 0000000..3da7a1c
--- /dev/null
+++ b/block/archipelago.c
@@ -0,0 +1,1046 @@ 
+/*
+ * Copyright 2014 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+#include "block/block_int.h"
+#include "qemu/error-report.h"
+#include "qemu/thread.h"
+#include "qapi/qmp/qint.h"
+#include "qapi/qmp/qstring.h"
+#include "qapi/qmp/qjson.h"
+
+#include <inttypes.h>
+#include <xseg/xseg.h>
+#include <xseg/protocol.h>
+
+#define ARCHIP_FD_READ      0
+#define ARCHIP_FD_WRITE     1
+#define MAX_REQUEST_SIZE    524288
+
+#define ARCHIPELAGO_OPT_VOLUME      "volume"
+#define ARCHIPELAGO_OPT_MPORT       "mport"
+#define ARCHIPELAGO_OPT_VPORT       "vport"
+
+#define archipelagolog(fmt, ...) \
+    do {                         \
+        fprintf(stderr, "archipelago\t%-24s: " fmt, __func__, ##__VA_ARGS__); \
+    } while (0)
+
+typedef enum {
+    ARCHIP_OP_READ,
+    ARCHIP_OP_WRITE,
+    ARCHIP_OP_FLUSH,
+    ARCHIP_OP_VOLINFO,
+} ARCHIPCmd;
+
+typedef struct ArchipelagoAIOCB {
+    BlockDriverAIOCB common;
+    struct BDRVArchipelagoState *s;
+    QEMUIOVector *qiov;
+    void *buffer;
+    ARCHIPCmd cmd;
+    bool cancelled;
+    int status;
+    int64_t size;
+    int64_t ret;
+} ArchipelagoAIOCB;
+
+typedef struct BDRVArchipelagoState {
+    int fds[2];
+    int qemu_aio_count;
+    int event_reader_pos;
+    ArchipelagoAIOCB *event_acb;
+    const char *volname;
+    uint64_t size;
+    /* Archipelago specific */
+    struct xseg *xseg;
+    struct xseg_port *port;
+    xport srcport;
+    xport sport;
+    xport mportno;
+    xport vportno;
+    QemuMutex archip_mutex;
+    QemuCond archip_cond;
+    bool is_signaled;
+    /* Request handler specific */
+    QemuThread request_th;
+    QemuCond request_cond;
+    QemuMutex request_mutex;
+    bool th_is_signaled;
+    bool stopping;
+} BDRVArchipelagoState;
+
+typedef struct ArchipelagoSegmentedRequest {
+    size_t count;
+    size_t total;
+    int ref;
+    int failed;
+} ArchipelagoSegmentedRequest;
+
+typedef struct AIORequestData {
+    const char *volname;
+    off_t offset;
+    size_t size;
+    uint64_t bufidx;
+    int ret;
+    int op;
+    ArchipelagoAIOCB *aio_cb;
+    ArchipelagoSegmentedRequest *segreq;
+} AIORequestData;
+
+
+static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb);
+
+static void init_local_signal(struct xseg *xseg, xport sport, xport srcport)
+{
+    if (xseg && (sport != srcport)) {
+        xseg_init_local_signal(xseg, srcport);
+        sport = srcport;
+    }
+}
+
+static void archipelago_finish_aiocb(AIORequestData *reqdata)
+{
+    int ret;
+    ret = qemu_archipelago_signal_pipe(reqdata->aio_cb);
+    if (ret < 0) {
+        error_report("archipelago_finish_aiocb(): failed writing"
+                     " aio_cb->s->fds");
+    }
+    g_free(reqdata);
+}
+
+static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port *port,
+                      struct xseg_request *expected_req)
+{
+    struct xseg_request *req;
+    xseg_prepare_wait(xseg, srcport);
+    void *psd = xseg_get_signal_desc(xseg, port);
+    while (1) {
+        req = xseg_receive(xseg, srcport, 0);
+        if (req) {
+            if (req != expected_req) {
+                archipelagolog("Unknown received request\n");
+                xseg_put_request(xseg, req, srcport);
+            } else if (!(req->state & XS_SERVED)) {
+                archipelagolog("Failed req\n");
+                return -1;
+            } else {
+                break;
+            }
+        }
+        xseg_wait_signal(xseg, psd, 100000UL);
+    }
+    xseg_cancel_wait(xseg, srcport);
+    return 0;
+}
+
+static void xseg_request_handler(void *state)
+{
+    BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
+    void *psd = xseg_get_signal_desc(s->xseg, s->port);
+    qemu_mutex_lock(&s->request_mutex);
+
+    while (!s->stopping) {
+        struct xseg_request *req;
+        char *data;
+        xseg_prepare_wait(s->xseg, s->srcport);
+        req = xseg_receive(s->xseg, s->srcport, 0);
+        if (req) {
+            AIORequestData *reqdata;
+            ArchipelagoSegmentedRequest *segreq;
+            xseg_get_req_data(s->xseg, req, (void **)&reqdata);
+
+            if (!(req->state & XS_SERVED)) {
+                    segreq = reqdata->segreq;
+                    __sync_bool_compare_and_swap(&segreq->failed, 0, 1);
+            }
+
+            switch (reqdata->op) {
+            case ARCHIP_OP_READ:
+                    data = xseg_get_data(s->xseg, req);
+                    segreq = reqdata->segreq;
+                    segreq->count += req->serviced;
+
+                    qemu_iovec_from_buf(reqdata->aio_cb->qiov, reqdata->bufidx,
+                            data,
+                            req->serviced);
+
+                    xseg_put_request(s->xseg, req, s->srcport);
+
+                    __sync_add_and_fetch(&segreq->ref, -1);
+
+                    if (segreq->ref == 0) {
+                        if (!segreq->failed) {
+                            reqdata->aio_cb->ret = segreq->count;
+                            archipelago_finish_aiocb(reqdata);
+                        }
+                        g_free(segreq);
+                    }
+                    if (segreq->failed) {
+                        g_free(reqdata);
+                    }
+                    break;
+            case ARCHIP_OP_WRITE:
+                    segreq = reqdata->segreq;
+                    segreq->count += req->serviced;
+                    xseg_put_request(s->xseg, req, s->srcport);
+
+                    __sync_add_and_fetch(&segreq->ref, -1);
+
+                    if (segreq->ref == 0) {
+                        if (!segreq->failed) {
+                            reqdata->aio_cb->ret = segreq->count;
+                            archipelago_finish_aiocb(reqdata);
+                        }
+                        g_free(segreq);
+                    }
+                    if (segreq->failed) {
+                        g_free(reqdata);
+                    }
+                    break;
+            case ARCHIP_OP_VOLINFO:
+                    s->is_signaled = true;
+                    qemu_cond_signal(&s->archip_cond);
+                    break;
+            }
+        } else {
+            xseg_wait_signal(s->xseg, psd, 100000UL);
+        }
+        xseg_cancel_wait(s->xseg, s->srcport);
+    }
+
+    s->th_is_signaled = true;
+    qemu_cond_signal(&s->request_cond);
+    qemu_mutex_unlock(&s->request_mutex);
+    qemu_thread_exit(NULL);
+}
+
+static int qemu_archipelago_xseg_init(BDRVArchipelagoState *s)
+{
+    if (xseg_initialize()) {
+        archipelagolog("Cannot initialize XSEG\n");
+        goto err_exit;
+    }
+
+    s->xseg = xseg_join((char *)"posix", (char *)"archipelago",
+                        (char *)"posixfd", NULL);
+    if (!s->xseg) {
+        archipelagolog("Cannot join XSEG shared memory segment\n");
+        goto err_exit;
+    }
+    s->port = xseg_bind_dynport(s->xseg);
+    s->srcport = s->port->portno;
+    init_local_signal(s->xseg, s->sport, s->srcport);
+    return 0;
+
+err_exit:
+    return -1;
+}
+
+static int qemu_archipelago_init(BDRVArchipelagoState *s)
+{
+    int ret;
+
+    ret = qemu_archipelago_xseg_init(s);
+    if (ret < 0) {
+        error_report("Cannot initialize XSEG. Aborting...\n");
+        goto err_exit;
+    }
+
+    qemu_cond_init(&s->archip_cond);
+    qemu_mutex_init(&s->archip_mutex);
+    qemu_cond_init(&s->request_cond);
+    qemu_mutex_init(&s->request_mutex);
+    s->th_is_signaled = false;
+    qemu_thread_create(&s->request_th, "xseg_io_th",
+                       (void *) xseg_request_handler,
+                       (void *) s, QEMU_THREAD_DETACHED);
+
+err_exit:
+    return ret;
+}
+
+static void qemu_archipelago_complete_aio(ArchipelagoAIOCB *aio_cb)
+{
+    qemu_vfree(aio_cb->buffer);
+    aio_cb->common.cb(aio_cb->common.opaque,
+                        (aio_cb->ret > 0 ? 0 : aio_cb->ret));
+    aio_cb->status = 0;
+
+    if (!aio_cb->cancelled) {
+        qemu_aio_release(aio_cb);
+    }
+}
+
+static void qemu_archipelago_aio_event_reader(void *opaque)
+{
+    BDRVArchipelagoState *s = opaque;
+    ssize_t ret;
+
+    do {
+        char *p = (char *)&s->event_acb;
+
+        ret = read(s->fds[ARCHIP_FD_READ], p + s->event_reader_pos,
+                   sizeof(s->event_acb) - s->event_reader_pos);
+        if (ret > 0) {
+            s->event_reader_pos += ret;
+            if (s->event_reader_pos == sizeof(s->event_acb)) {
+                s->event_reader_pos = 0;
+                qemu_archipelago_complete_aio(s->event_acb);
+                s->qemu_aio_count--;
+            }
+        }
+    } while (ret < 0 && errno == EINTR);
+}
+
+static void xseg_find_port(char *pstr, const char *needle, xport *aport)
+{
+    const char *a;
+    char *endptr = NULL;
+    unsigned long port;
+    if (strstart(pstr, needle, &a)) {
+        port = strtoul(a, &endptr, 10);
+        if (strlen(endptr)) {
+            *aport = (xport) -2;
+            return;
+        }
+        *aport = (xport) port;
+    }
+}
+
+static void parse_filename_opts(const char *filename, Error **errp,
+                                char **volume, xport *mport, xport *vport)
+{
+    const char *start;
+    char *tokens[3], *ds;
+    int idx;
+    xport lmport = NoPort, lvport = NoPort;
+
+    strstart(filename, "archipelago:", &start);
+
+    ds = g_strdup(start);
+    tokens[0] = strtok(ds, "/");
+    tokens[1] = strtok(NULL, ":");
+    tokens[2] = strtok(NULL, "\0");
+
+    if (!strlen(tokens[0])) {
+        error_setg(errp, "volume name must be specified first");
+        return;
+    }
+
+    for (idx = 1; idx < 3; idx++) {
+        if (tokens[idx] != NULL) {
+            if (strstart(tokens[idx], "mport=", NULL)) {
+                xseg_find_port(tokens[idx], "mport=", &lmport);
+            }
+            if (strstart(tokens[idx], "vport=", NULL)) {
+                xseg_find_port(tokens[idx], "vport=", &lvport);
+            }
+        }
+    }
+
+    if ((lmport == (xport) -2) || (lvport == (xport) -2)) {
+        error_setg(errp, "Usage: file=archipelago:"
+                   "<volumename>[/mport=<mapperd_port>"
+                   "[:vport=<vlmcd_port>]]");
+        return;
+    }
+    *volume = g_strdup(tokens[0]);
+    *mport = lmport;
+    *vport = lvport;
+    g_free(ds);
+}
+
+static void archipelago_parse_filename(const char *filename, QDict *options,
+                                       Error **errp)
+{
+    const char *start;
+    char *volume = NULL;
+    xport mport = NoPort, vport = NoPort;
+
+    if (qdict_haskey(options, ARCHIPELAGO_OPT_VOLUME)
+            || qdict_haskey(options, ARCHIPELAGO_OPT_MPORT)
+            || qdict_haskey(options, ARCHIPELAGO_OPT_VPORT)) {
+        error_setg(errp, "volume/mport/vport and a file name may not be "
+                         "specified at the same time");
+        return;
+    }
+
+    if (!strstart(filename, "archipelago:", &start)) {
+        error_setg(errp, "File name must start with 'archipelago:'");
+        return;
+    }
+
+    if (!strlen(start) || strstart(start, "/", NULL)) {
+        error_setg(errp, "volume name must be specified");
+        return;
+    }
+
+    parse_filename_opts(filename, errp, &volume, &mport, &vport);
+
+    if (volume) {
+        qdict_put(options, ARCHIPELAGO_OPT_VOLUME, qstring_from_str(volume));
+        g_free(volume);
+    }
+    if (mport != NoPort) {
+        qdict_put(options, ARCHIPELAGO_OPT_MPORT, qint_from_int(mport));
+    }
+    if (vport != NoPort) {
+        qdict_put(options, ARCHIPELAGO_OPT_VPORT, qint_from_int(vport));
+    }
+}
+
+static QemuOptsList archipelago_runtime_opts = {
+    .name = "archipelago",
+    .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
+    .desc = {
+        {
+            .name = ARCHIPELAGO_OPT_VOLUME,
+            .type = QEMU_OPT_STRING,
+            .help = "Name of the volume image",
+        },
+        {
+            .name = ARCHIPELAGO_OPT_MPORT,
+            .type = QEMU_OPT_NUMBER,
+            .help = "Archipelago mapperd port number"
+        },
+        {
+            .name = ARCHIPELAGO_OPT_VPORT,
+            .type = QEMU_OPT_NUMBER,
+            .help = "Archipelago vlmcd port number"
+
+        },
+        { /* end of list */ }
+    },
+};
+
+static int qemu_archipelago_open(BlockDriverState *bs,
+                                 QDict *options,
+                                 int bdrv_flags,
+                                 Error **errp)
+{
+    int ret = 0;
+    const char *volume;
+    QemuOpts *opts;
+    Error *local_err = NULL;
+    BDRVArchipelagoState *s = bs->opaque;
+
+    opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, &error_abort);
+    qemu_opts_absorb_qdict(opts, options, &local_err);
+    if (local_err) {
+        error_propagate(errp, local_err);
+        qemu_opts_del(opts);
+        return -EINVAL;
+    }
+
+    s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT, 1001);
+    s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT, 501);
+
+    volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
+    if (volume == NULL) {
+        error_setg(errp, "archipelago block driver requires an 'volume'"
+                   " options");
+        error_propagate(errp, local_err);
+        qemu_opts_del(opts);
+        return -EINVAL;
+    }
+    s->volname = g_strdup(volume);
+
+    /* Initialize XSEG, join shared memory segment */
+    ret = qemu_archipelago_init(s);
+    if (ret < 0) {
+        error_setg(errp, "cannot initialize XSEG and join shared "
+                   "memory segment");
+        goto err_exit;
+    }
+
+    s->event_reader_pos = 0;
+    ret = qemu_pipe(s->fds);
+    if (ret < 0) {
+        error_setg(errp, "cannot create pipe");
+        goto err_exit;
+    }
+
+    fcntl(s->fds[ARCHIP_FD_READ], F_SETFL, O_NONBLOCK);
+    fcntl(s->fds[ARCHIP_FD_WRITE], F_SETFL, O_NONBLOCK);
+    qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ],
+                            qemu_archipelago_aio_event_reader, NULL,
+                            s);
+
+    qemu_opts_del(opts);
+    return 0;
+
+err_exit:
+    qemu_opts_del(opts);
+    return ret;
+}
+
+static void qemu_archipelago_close(BlockDriverState *bs)
+{
+    int r, targetlen;
+    char *target;
+    struct xseg_request *req;
+    BDRVArchipelagoState *s = bs->opaque;
+
+    qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ], NULL, NULL, NULL);
+    close(s->fds[0]);
+    close(s->fds[1]);
+
+    s->stopping = true;
+
+    qemu_mutex_lock(&s->request_mutex);
+    while (!s->th_is_signaled) {
+        qemu_cond_wait(&s->request_cond,
+                       &s->request_mutex);
+    }
+    qemu_mutex_unlock(&s->request_mutex);
+    qemu_cond_destroy(&s->request_cond);
+    qemu_mutex_destroy(&s->request_mutex);
+
+    qemu_cond_destroy(&s->archip_cond);
+    qemu_mutex_destroy(&s->archip_mutex);
+
+    targetlen = strlen(s->volname);
+    req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
+    if (!req) {
+        archipelagolog("Cannot get XSEG request\n");
+        goto err_exit;
+    }
+    r = xseg_prep_request(s->xseg, req, targetlen, 0);
+    if (r < 0) {
+        xseg_put_request(s->xseg, req, s->srcport);
+        archipelagolog("Cannot prepare XSEG close request\n");
+        goto err_exit;
+    }
+
+    target = xseg_get_target(s->xseg, req);
+    strncpy(target, s->volname, targetlen);
+    req->size = req->datalen;
+    req->offset = 0;
+    req->op = X_CLOSE;
+
+    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
+    if (p == NoPort) {
+        xseg_put_request(s->xseg, req, s->srcport);
+        archipelagolog("Cannot submit XSEG close request\n");
+        goto err_exit;
+    }
+
+    xseg_signal(s->xseg, p);
+    r = wait_reply(s->xseg, s->srcport, s->port, req);
+    if (r < 0) {
+        archipelagolog("wait_reply() error\n");
+    }
+    if (!(req->state & XS_SERVED)) {
+        archipelagolog("Could no close map for volume '%s'\n", s->volname);
+    }
+
+    xseg_put_request(s->xseg, req, s->srcport);
+
+err_exit:
+    xseg_leave_dynport(s->xseg, s->port);
+    xseg_leave(s->xseg);
+}
+
+static void qemu_archipelago_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+    ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) blockacb;
+    aio_cb->cancelled = true;
+    while (aio_cb->status == -EINPROGRESS) {
+        qemu_aio_wait();
+    }
+    qemu_aio_release(aio_cb);
+}
+
+static const AIOCBInfo archipelago_aiocb_info = {
+    .aiocb_size = sizeof(ArchipelagoAIOCB),
+    .cancel = qemu_archipelago_aio_cancel,
+};
+
+static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb)
+{
+    int ret = 0;
+    while (1) {
+        fd_set wfd;
+        int fd = aio_cb->s->fds[1];
+
+        ret = write(fd, (void *)&aio_cb, sizeof(aio_cb));
+        if (ret > 0) {
+            break;
+        }
+        if (errno == EINTR) {
+            continue;
+        }
+        if (errno != EAGAIN) {
+            break;
+        }
+        FD_ZERO(&wfd);
+        FD_SET(fd, &wfd);
+        do {
+            ret = select(fd + 1, NULL, &wfd, NULL, NULL);
+        } while (ret < 0 && errno == EINTR);
+    }
+    return ret;
+}
+
+static int archipelago_aio_read(BDRVArchipelagoState *s,
+                                uint64_t bufidx,
+                                size_t count,
+                                off_t offset,
+                                ArchipelagoAIOCB *aio_cb,
+                                ArchipelagoSegmentedRequest *segreq)
+{
+    int ret, targetlen;
+    char *target;
+    struct xseg_request *req;
+    AIORequestData *reqdata = g_malloc(sizeof(AIORequestData));
+
+    if (!reqdata) {
+        archipelagolog("Cannot allocate reqdata\n");
+        return -1;
+    }
+    targetlen = strlen(s->volname);
+    req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
+    if (!req) {
+        archipelagolog("Cannot get XSEG request\n");
+        goto err_exit2;
+    }
+    ret = xseg_prep_request(s->xseg, req, targetlen, count);
+    if (ret < 0) {
+        archipelagolog("Cannot prepare XSEG request\n");
+        goto err_exit;
+    }
+    target = xseg_get_target(s->xseg, req);
+    if (!target) {
+        archipelagolog("Cannot get XSEG target\n");
+        goto err_exit;
+    }
+    strncpy(target, s->volname, targetlen);
+    req->size = count;
+    req->offset = offset;
+    req->op = X_READ;
+
+    reqdata->volname = s->volname;
+    reqdata->offset = offset;
+    reqdata->size = count;
+    reqdata->bufidx = bufidx;
+    reqdata->aio_cb = aio_cb;
+    reqdata->op = ARCHIP_OP_READ;
+    reqdata->segreq = segreq;
+
+    xseg_set_req_data(s->xseg, req, reqdata);
+    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
+    if (p == NoPort) {
+        archipelagolog("Could not submit XSEG request\n");
+        goto err_exit;
+    }
+    xseg_signal(s->xseg, p);
+
+    return 0;
+err_exit:
+    g_free(reqdata);
+    xseg_put_request(s->xseg, req, s->srcport);
+    return -1;
+err_exit2:
+    g_free(reqdata);
+    return -1;
+}
+
+static int archipelago_aio_write(BDRVArchipelagoState *s,
+                                 uint64_t bufidx,
+                                 size_t count,
+                                 off_t offset,
+                                 ArchipelagoAIOCB *aio_cb,
+                                 ArchipelagoSegmentedRequest *segreq)
+{
+    char *data = NULL;
+    struct xseg_request *req;
+    int ret, targetlen;
+    AIORequestData *reqdata = g_malloc(sizeof(AIORequestData));
+
+    if (!reqdata) {
+        archipelagolog("Cannot allocate reqdata\n");
+        return -1;
+    }
+    targetlen = strlen(s->volname);
+    req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
+    if (!req) {
+        archipelagolog("Cannot get XSEG request\n");
+        goto err_exit2;
+    }
+    ret = xseg_prep_request(s->xseg, req, targetlen, count);
+    if (ret < 0) {
+        archipelagolog("Cannot prepare XSEG request\n");
+        goto err_exit;
+    }
+    char *target = xseg_get_target(s->xseg, req);
+    if (!target) {
+        archipelagolog("Cannot get XSEG target\n");
+        goto err_exit;
+    }
+    strncpy(target, s->volname, targetlen);
+    req->size = count;
+    req->offset = offset;
+    req->op = X_WRITE;
+
+    reqdata->volname = s->volname;
+    reqdata->offset = offset;
+    reqdata->size = count;
+    reqdata->bufidx = bufidx;
+    reqdata->aio_cb = aio_cb;
+    reqdata->op = ARCHIP_OP_WRITE;
+    reqdata->segreq = segreq;
+
+    xseg_set_req_data(s->xseg, req, reqdata);
+
+    data = xseg_get_data(s->xseg, req);
+    if (!data) {
+        archipelagolog("Cannot get XSEG data\n");
+        goto err_exit;
+    }
+    memcpy(data, aio_cb->buffer + bufidx, count);
+
+    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
+    if (p == NoPort) {
+        archipelagolog("Could not submit XSEG request\n");
+        goto err_exit;
+    }
+    xseg_signal(s->xseg, p);
+    return 0;
+
+err_exit:
+    g_free(reqdata);
+    xseg_put_request(s->xseg, req, s->srcport);
+    return -1;
+err_exit2:
+    g_free(reqdata);
+    return -1;
+}
+
+static int archipelago_aio_segmented_rw(BDRVArchipelagoState *s,
+                                        size_t count,
+                                        off_t offset,
+                                        ArchipelagoAIOCB *aio_cb,
+                                        int op)
+{
+    int i, ret, segments_nr, last_segment_size;
+    ArchipelagoSegmentedRequest *segreq;
+
+    segreq = g_malloc(sizeof(ArchipelagoSegmentedRequest));
+
+    if (!segreq) {
+        archipelagolog("Cannot allocate segreq\n");
+        return -1;
+    }
+    if (op == ARCHIP_OP_FLUSH) {
+        segments_nr = 1;
+        segreq->ref = segments_nr;
+        segreq->total = count;
+        segreq->count = 0;
+        segreq->failed = 0;
+        ret = archipelago_aio_write(s, 0, count, offset, aio_cb,
+                                    segreq);
+        if (ret < 0) {
+            goto err_exit;
+        }
+        return 0;
+    }
+
+    segments_nr = (int)(count / MAX_REQUEST_SIZE) + \
+                  ((count % MAX_REQUEST_SIZE) ? 1 : 0);
+    last_segment_size = (int)(count % MAX_REQUEST_SIZE);
+
+    segreq->ref = segments_nr;
+    segreq->total = count;
+    segreq->count = 0;
+    segreq->failed = 0;
+
+    for (i = 0; i < segments_nr - 1; i++) {
+        switch (op) {
+        case ARCHIP_OP_READ:
+            ret = archipelago_aio_read(s, i * MAX_REQUEST_SIZE,
+                                       MAX_REQUEST_SIZE,
+                                       offset + i * MAX_REQUEST_SIZE,
+                                       aio_cb, segreq);
+            break;
+        case ARCHIP_OP_WRITE:
+            ret = archipelago_aio_write(s, i * MAX_REQUEST_SIZE,
+                                        MAX_REQUEST_SIZE,
+                                        offset + i * MAX_REQUEST_SIZE,
+                                        aio_cb, segreq);
+            break;
+        }
+
+        if (ret < 0) {
+            goto err_exit;
+        }
+    }
+
+    if ((segments_nr > 1) && last_segment_size) {
+        switch (op) {
+        case ARCHIP_OP_READ:
+            ret = archipelago_aio_read(s, i * MAX_REQUEST_SIZE,
+                                       last_segment_size,
+                                       offset + i * MAX_REQUEST_SIZE,
+                                       aio_cb, segreq);
+            break;
+        case ARCHIP_OP_WRITE:
+            ret = archipelago_aio_write(s, i * MAX_REQUEST_SIZE,
+                                        last_segment_size,
+                                        offset + i * MAX_REQUEST_SIZE,
+                                        aio_cb, segreq);
+            break;
+        }
+    } else if ((segments_nr > 1) && !last_segment_size) {
+        switch (op) {
+        case ARCHIP_OP_READ:
+            ret = archipelago_aio_read(s, i * MAX_REQUEST_SIZE,
+                                       MAX_REQUEST_SIZE,
+                                       offset + i * MAX_REQUEST_SIZE,
+                                       aio_cb, segreq);
+            break;
+        case ARCHIP_OP_WRITE:
+            ret = archipelago_aio_write(s, i * MAX_REQUEST_SIZE,
+                                        MAX_REQUEST_SIZE,
+                                        offset + i * MAX_REQUEST_SIZE,
+                                        aio_cb, segreq);
+            break;
+        }
+    } else if (segments_nr == 1) {
+        switch (op) {
+        case ARCHIP_OP_READ:
+            ret = archipelago_aio_read(s, 0, count, offset, aio_cb,
+                                       segreq);
+            break;
+        case ARCHIP_OP_WRITE:
+            ret = archipelago_aio_write(s, 0, count, offset, aio_cb,
+                                        segreq);
+            break;
+        }
+    }
+
+    if (ret < 0) {
+        goto err_exit;
+    }
+
+    return 0;
+
+err_exit:
+    __sync_add_and_fetch(&segreq->failed, 1);
+    if (segments_nr == 1) {
+        __sync_add_and_fetch(&segreq->ref, -1);
+    } else {
+        __sync_add_and_fetch(&segreq->ref, -segments_nr + i);
+    }
+
+    if (!segreq->ref) {
+        g_free(segreq);
+    }
+
+    return ret;
+}
+
+static BlockDriverAIOCB *qemu_archipelago_aio_rw(BlockDriverState *bs,
+                                                 int64_t sector_num,
+                                                 QEMUIOVector *qiov,
+                                                 int nb_sectors,
+                                                 BlockDriverCompletionFunc *cb,
+                                                 void *opaque,
+                                                 int op)
+{
+    ArchipelagoAIOCB *aio_cb;
+    BDRVArchipelagoState *s = bs->opaque;
+    int64_t size, off;
+    int ret;
+
+    aio_cb = qemu_aio_get(&archipelago_aiocb_info, bs, cb, opaque);
+    aio_cb->cmd = op;
+    aio_cb->qiov = qiov;
+
+    if (op != ARCHIP_OP_FLUSH) {
+        aio_cb->buffer = qemu_blockalign(bs, qiov->size);
+    } else {
+        aio_cb->buffer = NULL;
+    }
+
+    aio_cb->ret = 0;
+    aio_cb->s = s;
+    aio_cb->cancelled = false;
+    aio_cb->status = -EINPROGRESS;
+
+    if (op == ARCHIP_OP_WRITE) {
+        qemu_iovec_to_buf(aio_cb->qiov, 0, aio_cb->buffer, qiov->size);
+    }
+
+    off = sector_num * BDRV_SECTOR_SIZE;
+    size = nb_sectors * BDRV_SECTOR_SIZE;
+    aio_cb->size = size;
+
+    s->qemu_aio_count++;
+
+
+    ret = archipelago_aio_segmented_rw(s, size, off,
+                                       aio_cb, op);
+    if (ret < 0) {
+        goto err_exit;
+    }
+    return &aio_cb->common;
+
+err_exit:
+    error_report("qemu_archipelago_aio_rw(): I/O Error\n");
+    s->qemu_aio_count--;
+    qemu_aio_release(aio_cb);
+    return NULL;
+}
+
+static BlockDriverAIOCB *qemu_archipelago_aio_readv(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
+                                   opaque, ARCHIP_OP_READ);
+}
+
+static BlockDriverAIOCB *qemu_archipelago_aio_writev(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
+                                   opaque, ARCHIP_OP_WRITE);
+}
+
+static int64_t archipelago_volume_info(BDRVArchipelagoState *s)
+{
+    uint64_t size;
+    int ret, targetlen;
+    struct xseg_request *req;
+    struct xseg_reply_info *xinfo;
+    AIORequestData *reqdata = g_malloc(sizeof(AIORequestData));
+
+    if (!reqdata) {
+        archipelagolog("Cannot allocate reqdata\n");
+        return -1;
+    }
+    const char *volname = s->volname;
+    targetlen = strlen(volname);
+    req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
+    if (!req) {
+        archipelagolog("Cannot get XSEG request\n");
+        goto err_exit2;
+    }
+    ret = xseg_prep_request(s->xseg, req, targetlen,
+                            sizeof(struct xseg_reply_info));
+    if (ret < 0) {
+        archipelagolog("Cannot prepare XSEG request\n");
+        goto err_exit;
+    }
+    char *target = xseg_get_target(s->xseg, req);
+    if (!target) {
+        archipelagolog("Cannot get XSEG target\n");
+        goto err_exit;
+    }
+    strncpy(target, volname, targetlen);
+    req->size = req->datalen;
+    req->offset = 0;
+    req->op = X_INFO;
+
+    reqdata->op = ARCHIP_OP_VOLINFO;
+    reqdata->volname = volname;
+    xseg_set_req_data(s->xseg, req, reqdata);
+
+    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
+    if (p == NoPort) {
+        archipelagolog("Cannot submit XSEG request\n");
+        goto err_exit;
+    }
+    xseg_signal(s->xseg, p);
+    qemu_mutex_lock(&s->archip_mutex);
+    while (!s->is_signaled) {
+        qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
+    }
+    s->is_signaled = false;
+    qemu_mutex_unlock(&s->archip_mutex);
+
+    xinfo = (struct xseg_reply_info *) xseg_get_data(s->xseg, req);
+    size = xinfo->size;
+    xseg_put_request(s->xseg, req, s->srcport);
+    g_free(reqdata);
+    s->size = size;
+    return size;
+
+err_exit:
+    g_free(reqdata);
+    xseg_put_request(s->xseg, req, s->srcport);
+    return -1;
+err_exit2:
+    g_free(reqdata);
+    return -1;
+}
+
+static int64_t qemu_archipelago_getlength(BlockDriverState *bs)
+{
+    int64_t ret;
+    BDRVArchipelagoState *s = bs->opaque;
+
+    ret = archipelago_volume_info(s);
+    return ret;
+}
+
+static BlockDriverAIOCB *qemu_archipelago_aio_flush(BlockDriverState *bs,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    return qemu_archipelago_aio_rw(bs, 0, NULL, 0, cb, opaque,
+                                   ARCHIP_OP_FLUSH);
+}
+
+static BlockDriver bdrv_archipelago = {
+    .format_name         = "archipelago",
+    .protocol_name       = "archipelago",
+    .instance_size       = sizeof(BDRVArchipelagoState),
+    .bdrv_parse_filename = archipelago_parse_filename,
+    .bdrv_file_open      = qemu_archipelago_open,
+    .bdrv_close          = qemu_archipelago_close,
+    .bdrv_getlength      = qemu_archipelago_getlength,
+    .bdrv_aio_readv      = qemu_archipelago_aio_readv,
+    .bdrv_aio_writev     = qemu_archipelago_aio_writev,
+    .bdrv_aio_flush      = qemu_archipelago_aio_flush,
+    .bdrv_has_zero_init  = bdrv_has_zero_init_1,
+};
+
+static void bdrv_archipelago_init(void)
+{
+    bdrv_register(&bdrv_archipelago);
+}
+
+block_init(bdrv_archipelago_init);
diff --git a/configure b/configure
index 27d84d9..c0bd2b3 100755
--- a/configure
+++ b/configure
@@ -326,6 +326,7 @@  seccomp=""
 glusterfs=""
 glusterfs_discard="no"
 glusterfs_zerofill="no"
+archipelago=""
 virtio_blk_data_plane=""
 gtk=""
 gtkabi=""
@@ -1086,6 +1087,10 @@  for opt do
   ;;
   --enable-glusterfs) glusterfs="yes"
   ;;
+  --disable-archipelago) archipelago="no"
+  ;;
+  --enable-archipelago) archipelago="yes"
+  ;;
   --disable-virtio-blk-data-plane) virtio_blk_data_plane="no"
   ;;
   --enable-virtio-blk-data-plane) virtio_blk_data_plane="yes"
@@ -1375,6 +1380,8 @@  Advanced options (experts only):
   --enable-coroutine-pool  enable coroutine freelist (better performance)
   --enable-glusterfs       enable GlusterFS backend
   --disable-glusterfs      disable GlusterFS backend
+  --enable-archipelago     enable Archipelago backend
+  --disable-archipelago    disable Archipelago backend
   --enable-gcov            enable test coverage analysis with gcov
   --gcov=GCOV              use specified gcov [$gcov_tool]
   --enable-tpm             enable TPM support
@@ -3041,6 +3048,33 @@  EOF
   fi
 fi
 
+
+##########################################
+# archipelago probe
+if test "$archipelago" != "no" ; then
+    cat > $TMPC <<EOF
+#include <stdio.h>
+#include <xseg/xseg.h>
+#include <xseg/protocol.h>
+int main(void) {
+    xseg_initialize();
+    return 0;
+}
+EOF
+    archipelago_libs=-lxseg
+    if compile_prog "" "$archipelago_libs"; then
+        archipelago="yes"
+        libs_tools="$archipelago_libs $libs_tools"
+        libs_softmmu="$archipelago_libs $libs_softmmu"
+    else
+      if test "$archipelago" = "yes" ; then
+        feature_not_found "Archipelago backend support" "Install libxseg devel"
+      fi
+      archipelago="no"
+    fi
+fi
+
+
 ##########################################
 # glusterfs probe
 if test "$glusterfs" != "no" ; then
@@ -4230,6 +4264,7 @@  echo "seccomp support   $seccomp"
 echo "coroutine backend $coroutine"
 echo "coroutine pool    $coroutine_pool"
 echo "GlusterFS support $glusterfs"
+echo "Archipelago support $archipelago"
 echo "virtio-blk-data-plane $virtio_blk_data_plane"
 echo "gcov              $gcov_tool"
 echo "gcov enabled      $gcov"
@@ -4664,6 +4699,11 @@  if test "$glusterfs_zerofill" = "yes" ; then
   echo "CONFIG_GLUSTERFS_ZEROFILL=y" >> $config_host_mak
 fi
 
+if test "$archipelago" = "yes" ; then
+  echo "CONFIG_ARCHIPELAGO=m" >> $config_host_mak
+  echo "ARCHIPELAGO_LIBS=$archipelago_libs" >> $config_host_mak
+fi
+
 if test "$libssh2" = "yes" ; then
   echo "CONFIG_LIBSSH2=m" >> $config_host_mak
   echo "LIBSSH2_CFLAGS=$libssh2_cflags" >> $config_host_mak