Patchwork [v5,4/5] qed: Read/write support

login
register
mail settings
Submitter Stefan Hajnoczi
Date Nov. 24, 2010, 11:11 a.m.
Message ID <1290597076-18952-5-git-send-email-stefanha@linux.vnet.ibm.com>
Download mbox | patch
Permalink /patch/72833/
State New
Headers show

Comments

Stefan Hajnoczi - Nov. 24, 2010, 11:11 a.m.
This patch implements the read/write state machine.  Operations are
fully asynchronous and multiple operations may be active at any time.

Allocating writes lock tables to ensure metadata updates do not
interfere with each other.  If two allocating writes need to update the
same L2 table they will run sequentially.  If two allocating writes need
to update different L2 tables they will run in parallel.

Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
 Makefile.objs    |    1 +
 block/qed-lock.c |  124 ++++++++++
 block/qed.c      |  667 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
 block/qed.h      |   43 ++++
 trace-events     |   10 +
 5 files changed, 843 insertions(+), 2 deletions(-)
 create mode 100644 block/qed-lock.c
Kevin Wolf - Dec. 3, 2010, 4:06 p.m.
Am 24.11.2010 12:11, schrieb Stefan Hajnoczi:
> This patch implements the read/write state machine.  Operations are
> fully asynchronous and multiple operations may be active at any time.
> 
> Allocating writes lock tables to ensure metadata updates do not
> interfere with each other.  If two allocating writes need to update the
> same L2 table they will run sequentially.  If two allocating writes need
> to update different L2 tables they will run in parallel.
> 
> Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
> ---
>  Makefile.objs    |    1 +
>  block/qed-lock.c |  124 ++++++++++
>  block/qed.c      |  667 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  block/qed.h      |   43 ++++
>  trace-events     |   10 +
>  5 files changed, 843 insertions(+), 2 deletions(-)
>  create mode 100644 block/qed-lock.c

> diff --git a/block/qed.c b/block/qed.c
> index cd1bead..1513763 100644
> --- a/block/qed.c
> +++ b/block/qed.c
> @@ -12,8 +12,26 @@
>   *
>   */
>  
> +#include "trace.h"
>  #include "qed.h"
>  
> +static void qed_aio_cancel(BlockDriverAIOCB *blockacb)
> +{
> +    QEDAIOCB *acb = (QEDAIOCB *)blockacb;
> +    bool finished = false;
> +
> +    /* Wait for the request to finish */
> +    acb->finished = &finished;
> +    while (!finished) {
> +        qemu_aio_wait();
> +    }
> +}

Hm, you don't even try to cancel? I wonder how useful the individual
bdrv_aio_cancel implementations actually are when nobody implements
cancellation. It seems to be pretty hard to do it right.

Maybe we should consider implementing a default bdrv_aio_cancel
implementation in block.c that waits for completion?

> +/**
> + * Construct an iovec array for a given length
> + *
> + * @acb:        I/O request
> + * @len:        Maximum number of bytes
> + *
> + * This function can be called several times to build subset iovec arrays of
> + * acb->qiov.  For example:
> + *
> + *   acb->qiov->iov[] = {{0x100000, 1024},
> + *                       {0x200000, 1024}}
> + *
> + *   qed_acb_build_qiov(acb, 512) =>
> + *                      {{0x100000, 512}}
> + *
> + *   qed_acb_build_qiov(acb, 1024) =>
> + *                      {{0x100200, 512},
> + *                       {0x200000, 512}}
> + *
> + *   qed_acb_build_qiov(acb, 512) =>
> + *                      {{0x200200, 512}}
> + */
> +static void qed_acb_build_qiov(QEDAIOCB *acb, size_t len)
> +{
> +    struct iovec *iov_end = &acb->qiov->iov[acb->qiov->niov];
> +    size_t iov_offset = acb->cur_iov_offset;
> +    struct iovec *iov = acb->cur_iov;
> +
> +    while (iov != iov_end && len > 0) {
> +        size_t nbytes = MIN(iov->iov_len - iov_offset, len);
> +
> +        qemu_iovec_add(&acb->cur_qiov, iov->iov_base + iov_offset, nbytes);
> +        iov_offset += nbytes;
> +        len -= nbytes;
> +
> +        if (iov_offset >= iov->iov_len) {
> +            iov_offset = 0;
> +            iov++;
> +        }
> +    }
> +
> +    /* Stash state for next time */
> +    acb->cur_iov = iov;
> +    acb->cur_iov_offset = iov_offset;
> +}

Wouldn't it be much simpler to just store the offset into acb->qiov and
to use qemu_iovec_copy to get the subset qiov?

> +/**
> + * Write data to the image file
> + */
> +static void qed_aio_write_main(void *opaque, int ret)
> +{
> +    QEDAIOCB *acb = opaque;
> +    BDRVQEDState *s = acb_to_s(acb);
> +    uint64_t offset = acb->cur_cluster;
> +    BlockDriverCompletionFunc *next_fn;
> +    BlockDriverAIOCB *file_acb;
> +
> +    trace_qed_aio_write_main(s, acb, ret, offset, acb->cur_qiov.size);

Why does the trace use a different offset...

> +
> +    if (ret) {
> +        qed_aio_complete(acb, ret);
> +        return;
> +    }
> +
> +    if (acb->find_cluster_ret == QED_CLUSTER_FOUND) {
> +        next_fn = qed_aio_next_io;
> +    } else {
> +        if (s->bs->backing_hd) {
> +            next_fn = qed_aio_write_flush_before_l2_update;
> +        } else {
> +            next_fn = qed_aio_write_l2_update;
> +        }
> +    }
> +
> +    offset += qed_offset_into_cluster(s, acb->cur_pos);

...than the write uses?

I missed this line at first because offset is initialized above, so I
didn't expect that this was only half of its initialization.

> +    BLKDBG_EVENT(s->bs->file, BLKDBG_WRITE_AIO);
> +    file_acb = bdrv_aio_writev(s->bs->file, offset / BDRV_SECTOR_SIZE,
> +                               &acb->cur_qiov,
> +                               acb->cur_qiov.size / BDRV_SECTOR_SIZE,
> +                               next_fn, acb);
> +    if (!file_acb) {
> +        qed_aio_complete(acb, -EIO);
> +    }
> +}
> +
> +/**
> + * Populate back untouched region of new data cluster
> + */
> +static void qed_aio_write_postfill(void *opaque, int ret)
> +{
> +    QEDAIOCB *acb = opaque;
> +    BDRVQEDState *s = acb_to_s(acb);
> +    uint64_t start = acb->cur_pos + acb->cur_qiov.size;
> +    uint64_t len = qed_start_of_cluster(s, start + s->header.cluster_size - 1) - start;
> +    uint64_t offset = acb->cur_cluster + qed_offset_into_cluster(s, acb->cur_pos) + acb->cur_qiov.size;

This look like more than 80 characters.

Kevin
Stefan Hajnoczi - Dec. 3, 2010, 8:29 p.m.
On Fri, Dec 3, 2010 at 4:06 PM, Kevin Wolf <kwolf@redhat.com> wrote:
> Am 24.11.2010 12:11, schrieb Stefan Hajnoczi:
>> +static void qed_aio_cancel(BlockDriverAIOCB *blockacb)
>> +{
>> +    QEDAIOCB *acb = (QEDAIOCB *)blockacb;
>> +    bool finished = false;
>> +
>> +    /* Wait for the request to finish */
>> +    acb->finished = &finished;
>> +    while (!finished) {
>> +        qemu_aio_wait();
>> +    }
>> +}
>
> Hm, you don't even try to cancel? I wonder how useful the individual
> bdrv_aio_cancel implementations actually are when nobody implements
> cancellation. It seems to be pretty hard to do it right.
>
> Maybe we should consider implementing a default bdrv_aio_cancel
> implementation in block.c that waits for completion?

Cancellation is difficult to implement correctly.  Even
qemu_aio_wait() is bad because IDE/scsi will block until the request
completes.  We really should have something better.

Also from what I can tell only USB gadget fs implements it in Linux
native aio!  In posix-aio-compat.c we can only cancel requests if they
haven't been issued yet.

So in summary, cancel is a nasty interface :).

>> +/**
>> + * Construct an iovec array for a given length
>> + *
>> + * @acb:        I/O request
>> + * @len:        Maximum number of bytes
>> + *
>> + * This function can be called several times to build subset iovec arrays of
>> + * acb->qiov.  For example:
>> + *
>> + *   acb->qiov->iov[] = {{0x100000, 1024},
>> + *                       {0x200000, 1024}}
>> + *
>> + *   qed_acb_build_qiov(acb, 512) =>
>> + *                      {{0x100000, 512}}
>> + *
>> + *   qed_acb_build_qiov(acb, 1024) =>
>> + *                      {{0x100200, 512},
>> + *                       {0x200000, 512}}
>> + *
>> + *   qed_acb_build_qiov(acb, 512) =>
>> + *                      {{0x200200, 512}}
>> + */
>> +static void qed_acb_build_qiov(QEDAIOCB *acb, size_t len)
>> +{
>> +    struct iovec *iov_end = &acb->qiov->iov[acb->qiov->niov];
>> +    size_t iov_offset = acb->cur_iov_offset;
>> +    struct iovec *iov = acb->cur_iov;
>> +
>> +    while (iov != iov_end && len > 0) {
>> +        size_t nbytes = MIN(iov->iov_len - iov_offset, len);
>> +
>> +        qemu_iovec_add(&acb->cur_qiov, iov->iov_base + iov_offset, nbytes);
>> +        iov_offset += nbytes;
>> +        len -= nbytes;
>> +
>> +        if (iov_offset >= iov->iov_len) {
>> +            iov_offset = 0;
>> +            iov++;
>> +        }
>> +    }
>> +
>> +    /* Stash state for next time */
>> +    acb->cur_iov = iov;
>> +    acb->cur_iov_offset = iov_offset;
>> +}
>
> Wouldn't it be much simpler to just store the offset into acb->qiov and
> to use qemu_iovec_copy to get the subset qiov?

Good point, fixed in v6.

>> +/**
>> + * Write data to the image file
>> + */
>> +static void qed_aio_write_main(void *opaque, int ret)
>> +{
>> +    QEDAIOCB *acb = opaque;
>> +    BDRVQEDState *s = acb_to_s(acb);
>> +    uint64_t offset = acb->cur_cluster;
>> +    BlockDriverCompletionFunc *next_fn;
>> +    BlockDriverAIOCB *file_acb;
>> +
>> +    trace_qed_aio_write_main(s, acb, ret, offset, acb->cur_qiov.size);
>
> Why does the trace use a different offset...
>
>> +
>> +    if (ret) {
>> +        qed_aio_complete(acb, ret);
>> +        return;
>> +    }
>> +
>> +    if (acb->find_cluster_ret == QED_CLUSTER_FOUND) {
>> +        next_fn = qed_aio_next_io;
>> +    } else {
>> +        if (s->bs->backing_hd) {
>> +            next_fn = qed_aio_write_flush_before_l2_update;
>> +        } else {
>> +            next_fn = qed_aio_write_l2_update;
>> +        }
>> +    }
>> +
>> +    offset += qed_offset_into_cluster(s, acb->cur_pos);
>
> ...than the write uses?
>
> I missed this line at first because offset is initialized above, so I
> didn't expect that this was only half of its initialization.

Fixed in v6.

>> +/**
>> + * Populate back untouched region of new data cluster
>> + */
>> +static void qed_aio_write_postfill(void *opaque, int ret)
>> +{
>> +    QEDAIOCB *acb = opaque;
>> +    BDRVQEDState *s = acb_to_s(acb);
>> +    uint64_t start = acb->cur_pos + acb->cur_qiov.size;
>> +    uint64_t len = qed_start_of_cluster(s, start + s->header.cluster_size - 1) - start;
>> +    uint64_t offset = acb->cur_cluster + qed_offset_into_cluster(s, acb->cur_pos) + acb->cur_qiov.size;
>
> This look like more than 80 characters.

Fixed in v6.

Stefan

Patch

diff --git a/Makefile.objs b/Makefile.objs
index 46c2ced..6703bc6 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -21,6 +21,7 @@  block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
 block-nested-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vvfat.o
 block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o
 block-nested-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
+block-nested-y += qed-lock.o
 block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o
 block-nested-$(CONFIG_WIN32) += raw-win32.o
 block-nested-$(CONFIG_POSIX) += raw-posix.o
diff --git a/block/qed-lock.c b/block/qed-lock.c
new file mode 100644
index 0000000..1b7a763
--- /dev/null
+++ b/block/qed-lock.c
@@ -0,0 +1,124 @@ 
+/*
+ * QEMU Enhanced Disk Format Lock
+ *
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ *  Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ *
+ */
+
+/*
+ * Table locking works as follows:
+ *
+ * Reads and non-allocating writes do not acquire locks because they do not
+ * modify tables and only see committed L2 cache entries.
+ *
+ * An allocating write request that needs to update an existing L2 table
+ * acquires a lock on the table.  This serializes requests that touch the same
+ * L2 table.
+ *
+ * An allocating write request that needs to create a new L2 table and update
+ * the L1 table acquires a lock on the L1 table.  This serializes requests that
+ * create new L2 tables.
+ *
+ * When a request is unable to acquire a lock, it is put to sleep and must
+ * return.  When the lock it attempted to acquire becomes available, a wakeup
+ * function is invoked to activate it again.
+ *
+ * A request must retry its cluster lookup after waking up because the tables
+ * have changed.  For example, an allocating write may no longer need to
+ * allocate if the previous request already allocated the cluster.
+ */
+
+#include "qed.h"
+
+struct QEDLockEntry {
+    uint64_t key;
+    QSIMPLEQ_HEAD(, QEDAIOCB) reqs;
+    QTAILQ_ENTRY(QEDLockEntry) next;
+};
+
+/**
+ * Initialize a lock
+ *
+ * @lock:           Lock
+ * @wakeup_fn:      Callback to reactivate a sleeping request
+ */
+void qed_lock_init(QEDLock *lock, BlockDriverCompletionFunc *wakeup_fn)
+{
+    QTAILQ_INIT(&lock->entries);
+    lock->wakeup_fn = wakeup_fn;
+}
+
+/**
+ * Acquire a lock on a given key
+ *
+ * @lock:           Lock
+ * @key:            Key to lock on
+ * @acb:            Request
+ * @ret:            true if lock was acquired, false if request needs to sleep
+ *
+ * If the request currently has another lock held, that lock will be released.
+ */
+bool qed_lock(QEDLock *lock, uint64_t key, QEDAIOCB *acb)
+{
+    QEDLockEntry *entry = acb->lock_entry;
+
+    if (entry) {
+        /* Lock already held */
+        if (entry->key == key) {
+            return true;
+        }
+
+        /* Release old lock */
+        qed_unlock(lock, acb);
+    }
+
+    /* Find held lock */
+    QTAILQ_FOREACH(entry, &lock->entries, next) {
+        if (entry->key == key) {
+            QSIMPLEQ_INSERT_TAIL(&entry->reqs, acb, next);
+            acb->lock_entry = entry;
+            return false;
+        }
+    }
+
+    /* Allocate new lock entry */
+    entry = qemu_malloc(sizeof(*entry));
+    entry->key = key;
+    QSIMPLEQ_INIT(&entry->reqs);
+    QSIMPLEQ_INSERT_TAIL(&entry->reqs, acb, next);
+    QTAILQ_INSERT_TAIL(&lock->entries, entry, next);
+    acb->lock_entry = entry;
+    return true;
+}
+
+/**
+ * Release a held lock
+ */
+void qed_unlock(QEDLock *lock, QEDAIOCB *acb)
+{
+    QEDLockEntry *entry = acb->lock_entry;
+
+    if (!entry) {
+        return;
+    }
+
+    acb->lock_entry = NULL;
+    QSIMPLEQ_REMOVE_HEAD(&entry->reqs, next);
+
+    /* Wake up next lock holder */
+    acb = QSIMPLEQ_FIRST(&entry->reqs);
+    if (acb) {
+        lock->wakeup_fn(acb, 0);
+        return;
+    }
+
+    /* Free lock entry */
+    QTAILQ_REMOVE(&lock->entries, entry, next);
+    qemu_free(entry);
+}
diff --git a/block/qed.c b/block/qed.c
index cd1bead..1513763 100644
--- a/block/qed.c
+++ b/block/qed.c
@@ -12,8 +12,26 @@ 
  *
  */
 
+#include "trace.h"
 #include "qed.h"
 
+static void qed_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+    QEDAIOCB *acb = (QEDAIOCB *)blockacb;
+    bool finished = false;
+
+    /* Wait for the request to finish */
+    acb->finished = &finished;
+    while (!finished) {
+        qemu_aio_wait();
+    }
+}
+
+static AIOPool qed_aio_pool = {
+    .aiocb_size         = sizeof(QEDAIOCB),
+    .cancel             = qed_aio_cancel,
+};
+
 static int bdrv_qed_probe(const uint8_t *buf, int buf_size,
                           const char *filename)
 {
@@ -155,6 +173,24 @@  static int qed_read_string(BlockDriverState *file, uint64_t offset, size_t n,
     return 0;
 }
 
+/**
+ * Allocate new clusters
+ *
+ * @s:          QED state
+ * @n:          Number of contiguous clusters to allocate
+ * @ret:        Offset of first allocated cluster
+ *
+ * This function only produces the offset where the new clusters should be
+ * written.  It updates BDRVQEDState but does not make any changes to the image
+ * file.
+ */
+static uint64_t qed_alloc_clusters(BDRVQEDState *s, unsigned int n)
+{
+    uint64_t offset = s->file_size;
+    s->file_size += n * s->header.cluster_size;
+    return offset;
+}
+
 QEDTable *qed_alloc_table(BDRVQEDState *s)
 {
     /* Honor O_DIRECT memory alignment requirements */
@@ -162,6 +198,23 @@  QEDTable *qed_alloc_table(BDRVQEDState *s)
                            s->header.cluster_size * s->header.table_size);
 }
 
+/**
+ * Allocate a new zeroed L2 table
+ */
+static CachedL2Table *qed_new_l2_table(BDRVQEDState *s)
+{
+    CachedL2Table *l2_table = qed_alloc_l2_cache_entry(&s->l2_cache);
+
+    l2_table->table = qed_alloc_table(s);
+    l2_table->offset = qed_alloc_clusters(s, s->header.table_size);
+
+    memset(l2_table->table->offsets, 0,
+           s->header.cluster_size * s->header.table_size);
+    return l2_table;
+}
+
+static void qed_aio_next_io(void *opaque, int ret);
+
 static int bdrv_qed_open(BlockDriverState *bs, int flags)
 {
     BDRVQEDState *s = bs->opaque;
@@ -170,6 +223,7 @@  static int bdrv_qed_open(BlockDriverState *bs, int flags)
     int ret;
 
     s->bs = bs;
+    qed_lock_init(&s->lock, qed_aio_next_io);
 
     ret = bdrv_pread(bs->file, 0, &le_header, sizeof(le_header));
     if (ret < 0) {
@@ -431,13 +485,622 @@  static int bdrv_qed_make_empty(BlockDriverState *bs)
     return -ENOTSUP;
 }
 
+static BDRVQEDState *acb_to_s(QEDAIOCB *acb)
+{
+    return acb->common.bs->opaque;
+}
+
+/**
+ * Read from the backing file or zero-fill if no backing file
+ *
+ * @s:          QED state
+ * @pos:        Byte position in device
+ * @qiov:       Destination I/O vector
+ * @cb:         Completion function
+ * @opaque:     User data for completion function
+ *
+ * This function reads qiov->size bytes starting at pos from the backing file.
+ * If there is no backing file then zeroes are read.
+ */
+static void qed_read_backing_file(BDRVQEDState *s, uint64_t pos,
+                                  QEMUIOVector *qiov,
+                                  BlockDriverCompletionFunc *cb, void *opaque)
+{
+    BlockDriverAIOCB *aiocb;
+    uint64_t backing_length = 0;
+    size_t size;
+
+    /* If there is a backing file, get its length.  Treat the absence of a
+     * backing file like a zero length backing file.
+     */
+    if (s->bs->backing_hd) {
+        int64_t l = bdrv_getlength(s->bs->backing_hd);
+        if (l < 0) {
+            cb(opaque, l);
+            return;
+        }
+        backing_length = l;
+    }
+
+    /* Zero all sectors if reading beyond the end of the backing file */
+    if (pos >= backing_length ||
+        pos + qiov->size > backing_length) {
+        qemu_iovec_memset(qiov, 0, qiov->size);
+    }
+
+    /* Complete now if there are no backing file sectors to read */
+    if (pos >= backing_length) {
+        cb(opaque, 0);
+        return;
+    }
+
+    /* If the read straddles the end of the backing file, shorten it */
+    size = MIN((uint64_t)backing_length - pos, qiov->size);
+
+    BLKDBG_EVENT(s->bs->file, BLKDBG_READ_BACKING);
+    aiocb = bdrv_aio_readv(s->bs->backing_hd, pos / BDRV_SECTOR_SIZE,
+                           qiov, size / BDRV_SECTOR_SIZE, cb, opaque);
+    if (!aiocb) {
+        cb(opaque, -EIO);
+    }
+}
+
+typedef struct {
+    GenericCB gencb;
+    BDRVQEDState *s;
+    QEMUIOVector qiov;
+    struct iovec iov;
+    uint64_t offset;
+} CopyFromBackingFileCB;
+
+static void qed_copy_from_backing_file_cb(void *opaque, int ret)
+{
+    CopyFromBackingFileCB *copy_cb = opaque;
+    qemu_vfree(copy_cb->iov.iov_base);
+    gencb_complete(&copy_cb->gencb, ret);
+}
+
+static void qed_copy_from_backing_file_write(void *opaque, int ret)
+{
+    CopyFromBackingFileCB *copy_cb = opaque;
+    BDRVQEDState *s = copy_cb->s;
+    BlockDriverAIOCB *aiocb;
+
+    if (ret) {
+        qed_copy_from_backing_file_cb(copy_cb, ret);
+        return;
+    }
+
+    BLKDBG_EVENT(s->bs->file, BLKDBG_COW_WRITE);
+    aiocb = bdrv_aio_writev(s->bs->file, copy_cb->offset / BDRV_SECTOR_SIZE,
+                            &copy_cb->qiov,
+                            copy_cb->qiov.size / BDRV_SECTOR_SIZE,
+                            qed_copy_from_backing_file_cb, copy_cb);
+    if (!aiocb) {
+        qed_copy_from_backing_file_cb(copy_cb, -EIO);
+    }
+}
+
+/**
+ * Copy data from backing file into the image
+ *
+ * @s:          QED state
+ * @pos:        Byte position in device
+ * @len:        Number of bytes
+ * @offset:     Byte offset in image file
+ * @cb:         Completion function
+ * @opaque:     User data for completion function
+ */
+static void qed_copy_from_backing_file(BDRVQEDState *s, uint64_t pos,
+                                       uint64_t len, uint64_t offset,
+                                       BlockDriverCompletionFunc *cb,
+                                       void *opaque)
+{
+    CopyFromBackingFileCB *copy_cb;
+
+    /* Skip copy entirely if there is no work to do */
+    if (len == 0) {
+        cb(opaque, 0);
+        return;
+    }
+
+    copy_cb = gencb_alloc(sizeof(*copy_cb), cb, opaque);
+    copy_cb->s = s;
+    copy_cb->offset = offset;
+    copy_cb->iov.iov_base = qemu_blockalign(s->bs, len);
+    copy_cb->iov.iov_len = len;
+    qemu_iovec_init_external(&copy_cb->qiov, &copy_cb->iov, 1);
+
+    qed_read_backing_file(s, pos, &copy_cb->qiov,
+                          qed_copy_from_backing_file_write, copy_cb);
+}
+
+/**
+ * Link one or more contiguous clusters into a table
+ *
+ * @s:              QED state
+ * @table:          L2 table
+ * @index:          First cluster index
+ * @n:              Number of contiguous clusters
+ * @cluster:        First cluster byte offset in image file
+ */
+static void qed_update_l2_table(BDRVQEDState *s, QEDTable *table, int index,
+                                unsigned int n, uint64_t cluster)
+{
+    int i;
+    for (i = index; i < index + n; i++) {
+        table->offsets[i] = cluster;
+        cluster += s->header.cluster_size;
+    }
+}
+
+static void qed_aio_complete_bh(void *opaque)
+{
+    QEDAIOCB *acb = opaque;
+    BlockDriverCompletionFunc *cb = acb->common.cb;
+    void *user_opaque = acb->common.opaque;
+    int ret = acb->bh_ret;
+    bool *finished = acb->finished;
+
+    qemu_bh_delete(acb->bh);
+    qemu_aio_release(acb);
+
+    /* Invoke callback */
+    cb(user_opaque, ret);
+
+    /* Signal cancel completion */
+    if (finished) {
+        *finished = true;
+    }
+}
+
+static void qed_aio_complete(QEDAIOCB *acb, int ret)
+{
+    BDRVQEDState *s = acb_to_s(acb);
+
+    trace_qed_aio_complete(s, acb, ret);
+
+    /* Free resources */
+    qemu_iovec_destroy(&acb->cur_qiov);
+    qed_unref_l2_cache_entry(acb->request.l2_table);
+    qed_unlock(&s->lock, acb);
+
+    /* Arrange for a bh to invoke the completion function */
+    acb->bh_ret = ret;
+    acb->bh = qemu_bh_new(qed_aio_complete_bh, acb);
+    qemu_bh_schedule(acb->bh);
+}
+
+/**
+ * Construct an iovec array for a given length
+ *
+ * @acb:        I/O request
+ * @len:        Maximum number of bytes
+ *
+ * This function can be called several times to build subset iovec arrays of
+ * acb->qiov.  For example:
+ *
+ *   acb->qiov->iov[] = {{0x100000, 1024},
+ *                       {0x200000, 1024}}
+ *
+ *   qed_acb_build_qiov(acb, 512) =>
+ *                      {{0x100000, 512}}
+ *
+ *   qed_acb_build_qiov(acb, 1024) =>
+ *                      {{0x100200, 512},
+ *                       {0x200000, 512}}
+ *
+ *   qed_acb_build_qiov(acb, 512) =>
+ *                      {{0x200200, 512}}
+ */
+static void qed_acb_build_qiov(QEDAIOCB *acb, size_t len)
+{
+    struct iovec *iov_end = &acb->qiov->iov[acb->qiov->niov];
+    size_t iov_offset = acb->cur_iov_offset;
+    struct iovec *iov = acb->cur_iov;
+
+    while (iov != iov_end && len > 0) {
+        size_t nbytes = MIN(iov->iov_len - iov_offset, len);
+
+        qemu_iovec_add(&acb->cur_qiov, iov->iov_base + iov_offset, nbytes);
+        iov_offset += nbytes;
+        len -= nbytes;
+
+        if (iov_offset >= iov->iov_len) {
+            iov_offset = 0;
+            iov++;
+        }
+    }
+
+    /* Stash state for next time */
+    acb->cur_iov = iov;
+    acb->cur_iov_offset = iov_offset;
+}
+
+/**
+ * Commit the current L2 table to the cache
+ */
+static void qed_commit_l2_update(void *opaque, int ret)
+{
+    QEDAIOCB *acb = opaque;
+    BDRVQEDState *s = acb_to_s(acb);
+    CachedL2Table *l2_table = acb->request.l2_table;
+
+    qed_commit_l2_cache_entry(&s->l2_cache, l2_table);
+
+    /* This is guaranteed to succeed because we just committed the entry to the
+     * cache.
+     */
+    acb->request.l2_table = qed_find_l2_cache_entry(&s->l2_cache,
+                                                    l2_table->offset);
+    assert(acb->request.l2_table != NULL);
+
+    qed_aio_next_io(opaque, ret);
+}
+
+/**
+ * Update L1 table with new L2 table offset and write it out
+ */
+static void qed_aio_write_l1_update(void *opaque, int ret)
+{
+    QEDAIOCB *acb = opaque;
+    BDRVQEDState *s = acb_to_s(acb);
+    int index;
+
+    if (ret) {
+        qed_aio_complete(acb, ret);
+        return;
+    }
+
+    index = qed_l1_index(s, acb->cur_pos);
+    s->l1_table->offsets[index] = acb->request.l2_table->offset;
+
+    qed_write_l1_table(s, index, 1, qed_commit_l2_update, acb);
+}
+
+/**
+ * Update L2 table with new cluster offsets and write them out
+ */
+static void qed_aio_write_l2_update(void *opaque, int ret)
+{
+    QEDAIOCB *acb = opaque;
+    BDRVQEDState *s = acb_to_s(acb);
+    bool need_alloc = acb->find_cluster_ret == QED_CLUSTER_L1;
+    int index;
+
+    if (ret) {
+        goto err;
+    }
+
+    if (need_alloc) {
+        qed_unref_l2_cache_entry(acb->request.l2_table);
+        acb->request.l2_table = qed_new_l2_table(s);
+    }
+
+    index = qed_l2_index(s, acb->cur_pos);
+    qed_update_l2_table(s, acb->request.l2_table->table, index, acb->cur_nclusters,
+                         acb->cur_cluster);
+
+    if (need_alloc) {
+        /* Write out the whole new L2 table */
+        qed_write_l2_table(s, &acb->request, 0, s->table_nelems, true,
+                            qed_aio_write_l1_update, acb);
+    } else {
+        /* Write out only the updated part of the L2 table */
+        qed_write_l2_table(s, &acb->request, index, acb->cur_nclusters, false,
+                            qed_aio_next_io, acb);
+    }
+    return;
+
+err:
+    qed_aio_complete(acb, ret);
+}
+
+/**
+ * Flush new data clusters before updating the L2 table
+ *
+ * This flush is necessary when a backing file is in use.  A crash during an
+ * allocating write could result in empty clusters in the image.  If the write
+ * only touched a subregion of the cluster, then backing image sectors have
+ * been lost in the untouched region.  The solution is to flush after writing a
+ * new data cluster and before updating the L2 table.
+ */
+static void qed_aio_write_flush_before_l2_update(void *opaque, int ret)
+{
+    QEDAIOCB *acb = opaque;
+    BDRVQEDState *s = acb_to_s(acb);
+
+    if (!bdrv_aio_flush(s->bs->file, qed_aio_write_l2_update, opaque)) {
+        qed_aio_complete(acb, -EIO);
+    }
+}
+
+/**
+ * Write data to the image file
+ */
+static void qed_aio_write_main(void *opaque, int ret)
+{
+    QEDAIOCB *acb = opaque;
+    BDRVQEDState *s = acb_to_s(acb);
+    uint64_t offset = acb->cur_cluster;
+    BlockDriverCompletionFunc *next_fn;
+    BlockDriverAIOCB *file_acb;
+
+    trace_qed_aio_write_main(s, acb, ret, offset, acb->cur_qiov.size);
+
+    if (ret) {
+        qed_aio_complete(acb, ret);
+        return;
+    }
+
+    if (acb->find_cluster_ret == QED_CLUSTER_FOUND) {
+        next_fn = qed_aio_next_io;
+    } else {
+        if (s->bs->backing_hd) {
+            next_fn = qed_aio_write_flush_before_l2_update;
+        } else {
+            next_fn = qed_aio_write_l2_update;
+        }
+    }
+
+    offset += qed_offset_into_cluster(s, acb->cur_pos);
+    BLKDBG_EVENT(s->bs->file, BLKDBG_WRITE_AIO);
+    file_acb = bdrv_aio_writev(s->bs->file, offset / BDRV_SECTOR_SIZE,
+                               &acb->cur_qiov,
+                               acb->cur_qiov.size / BDRV_SECTOR_SIZE,
+                               next_fn, acb);
+    if (!file_acb) {
+        qed_aio_complete(acb, -EIO);
+    }
+}
+
+/**
+ * Populate back untouched region of new data cluster
+ */
+static void qed_aio_write_postfill(void *opaque, int ret)
+{
+    QEDAIOCB *acb = opaque;
+    BDRVQEDState *s = acb_to_s(acb);
+    uint64_t start = acb->cur_pos + acb->cur_qiov.size;
+    uint64_t len = qed_start_of_cluster(s, start + s->header.cluster_size - 1) - start;
+    uint64_t offset = acb->cur_cluster + qed_offset_into_cluster(s, acb->cur_pos) + acb->cur_qiov.size;
+
+    if (ret) {
+        qed_aio_complete(acb, ret);
+        return;
+    }
+
+    trace_qed_aio_write_postfill(s, acb, start, len, offset);
+    qed_copy_from_backing_file(s, start, len, offset,
+                                qed_aio_write_main, acb);
+}
+
+/**
+ * Populate front untouched region of new data cluster
+ */
+static void qed_aio_write_prefill(void *opaque, int ret)
+{
+    QEDAIOCB *acb = opaque;
+    BDRVQEDState *s = acb_to_s(acb);
+    uint64_t start = qed_start_of_cluster(s, acb->cur_pos);
+    uint64_t len = qed_offset_into_cluster(s, acb->cur_pos);
+
+    trace_qed_aio_write_prefill(s, acb, start, len, acb->cur_cluster);
+    qed_copy_from_backing_file(s, start, len, acb->cur_cluster,
+                                qed_aio_write_postfill, acb);
+}
+
+/**
+ * Write new data cluster
+ *
+ * @acb:        Write request
+ * @len:        Length in bytes
+ *
+ * This path is taken when writing to previously unallocated clusters.
+ */
+static void qed_aio_write_alloc(QEDAIOCB *acb, size_t len)
+{
+    BDRVQEDState *s = acb_to_s(acb);
+
+    /* If a new L2 table is being allocated, lock the L1 table.  Otherwise
+     * just lock the L2 table.
+     */
+    uint64_t lock_key = acb->find_cluster_ret == QED_CLUSTER_L1 ?
+                        s->header.l1_table_offset :
+                        acb->request.l2_table->offset;
+
+    if (!qed_lock(&s->lock, lock_key, acb)) {
+        return; /* sleep until woken up again */
+    }
+
+    acb->cur_nclusters = qed_bytes_to_clusters(s,
+            qed_offset_into_cluster(s, acb->cur_pos) + len);
+    acb->cur_cluster = qed_alloc_clusters(s, acb->cur_nclusters);
+    qed_acb_build_qiov(acb, len);
+
+    /* Write new cluster */
+    qed_aio_write_prefill(acb, 0);
+}
+
+/**
+ * Write data cluster in place
+ *
+ * @acb:        Write request
+ * @offset:     Cluster offset in bytes
+ * @len:        Length in bytes
+ *
+ * This path is taken when writing to already allocated clusters.
+ */
+static void qed_aio_write_inplace(QEDAIOCB *acb, uint64_t offset, size_t len)
+{
+    BDRVQEDState *s = acb_to_s(acb);
+
+    /* Release held lock (if any) because in-place writes don't touch tables */
+    qed_unlock(&s->lock, acb);
+
+    /* Calculate the I/O vector */
+    acb->cur_cluster = offset;
+    qed_acb_build_qiov(acb, len);
+
+    /* Do the actual write */
+    qed_aio_write_main(acb, 0);
+}
+
+/**
+ * Write data cluster
+ *
+ * @opaque:     Write request
+ * @ret:        QED_CLUSTER_FOUND, QED_CLUSTER_L2, QED_CLUSTER_L1,
+ *              or -errno
+ * @offset:     Cluster offset in bytes
+ * @len:        Length in bytes
+ *
+ * Callback from qed_find_cluster().
+ */
+static void qed_aio_write_data(void *opaque, int ret,
+                               uint64_t offset, size_t len)
+{
+    QEDAIOCB *acb = opaque;
+
+    trace_qed_aio_write_data(acb_to_s(acb), acb, ret, offset, len);
+
+    acb->find_cluster_ret = ret;
+
+    switch (ret) {
+    case QED_CLUSTER_FOUND:
+        qed_aio_write_inplace(acb, offset, len);
+        break;
+
+    case QED_CLUSTER_L2:
+    case QED_CLUSTER_L1:
+        qed_aio_write_alloc(acb, len);
+        break;
+
+    default:
+        qed_aio_complete(acb, ret);
+        break;
+    }
+}
+
+/**
+ * Read data cluster
+ *
+ * @opaque:     Read request
+ * @ret:        QED_CLUSTER_FOUND, QED_CLUSTER_L2, QED_CLUSTER_L1,
+ *              or -errno
+ * @offset:     Cluster offset in bytes
+ * @len:        Length in bytes
+ *
+ * Callback from qed_find_cluster().
+ */
+static void qed_aio_read_data(void *opaque, int ret,
+                              uint64_t offset, size_t len)
+{
+    QEDAIOCB *acb = opaque;
+    BDRVQEDState *s = acb_to_s(acb);
+    BlockDriverState *bs = acb->common.bs;
+    BlockDriverAIOCB *file_acb;
+
+    trace_qed_aio_read_data(s, acb, ret, offset, len);
+
+    if (ret < 0) {
+        goto err;
+    }
+
+    qed_acb_build_qiov(acb, len);
+
+    /* Adjust offset into cluster */
+    offset += qed_offset_into_cluster(s, acb->cur_pos);
+
+    /* Handle backing file and unallocated sparse hole reads */
+    if (ret != QED_CLUSTER_FOUND) {
+        qed_read_backing_file(s, acb->cur_pos, &acb->cur_qiov,
+                              qed_aio_next_io, acb);
+        return;
+    }
+
+    BLKDBG_EVENT(bs->file, BLKDBG_READ_AIO);
+    file_acb = bdrv_aio_readv(bs->file, offset / BDRV_SECTOR_SIZE,
+                              &acb->cur_qiov,
+                              acb->cur_qiov.size / BDRV_SECTOR_SIZE,
+                              qed_aio_next_io, acb);
+    if (!file_acb) {
+        ret = -EIO;
+        goto err;
+    }
+    return;
+
+err:
+    qed_aio_complete(acb, ret);
+}
+
+/**
+ * Begin next I/O or complete the request
+ */
+static void qed_aio_next_io(void *opaque, int ret)
+{
+    QEDAIOCB *acb = opaque;
+    BDRVQEDState *s = acb_to_s(acb);
+    QEDFindClusterFunc *io_fn =
+        acb->is_write ? qed_aio_write_data : qed_aio_read_data;
+
+    trace_qed_aio_next_io(s, acb, ret, acb->cur_pos + acb->cur_qiov.size);
+
+    /* Handle I/O error */
+    if (ret) {
+        qed_aio_complete(acb, ret);
+        return;
+    }
+
+    acb->cur_pos += acb->cur_qiov.size;
+    qemu_iovec_reset(&acb->cur_qiov);
+
+    /* Complete request */
+    if (acb->cur_pos >= acb->end_pos) {
+        qed_aio_complete(acb, 0);
+        return;
+    }
+
+    /* Find next cluster and start I/O */
+    qed_find_cluster(s, &acb->request,
+                      acb->cur_pos, acb->end_pos - acb->cur_pos,
+                      io_fn, acb);
+}
+
+static BlockDriverAIOCB *qed_aio_setup(BlockDriverState *bs,
+                                       int64_t sector_num,
+                                       QEMUIOVector *qiov, int nb_sectors,
+                                       BlockDriverCompletionFunc *cb,
+                                       void *opaque, bool is_write)
+{
+    QEDAIOCB *acb = qemu_aio_get(&qed_aio_pool, bs, cb, opaque);
+
+    trace_qed_aio_setup(bs->opaque, acb, sector_num, nb_sectors,
+                         opaque, is_write);
+
+    acb->is_write = is_write;
+    acb->finished = NULL;
+    acb->qiov = qiov;
+    acb->cur_iov = acb->qiov->iov;
+    acb->cur_iov_offset = 0;
+    acb->cur_pos = (uint64_t)sector_num * BDRV_SECTOR_SIZE;
+    acb->end_pos = acb->cur_pos + nb_sectors * BDRV_SECTOR_SIZE;
+    acb->request.l2_table = NULL;
+    acb->lock_entry = NULL;
+    qemu_iovec_init(&acb->cur_qiov, qiov->niov);
+
+    /* Start request */
+    qed_aio_next_io(acb, 0);
+    return &acb->common;
+}
+
 static BlockDriverAIOCB *bdrv_qed_aio_readv(BlockDriverState *bs,
                                             int64_t sector_num,
                                             QEMUIOVector *qiov, int nb_sectors,
                                             BlockDriverCompletionFunc *cb,
                                             void *opaque)
 {
-    return NULL;
+    return qed_aio_setup(bs, sector_num, qiov, nb_sectors, cb, opaque, false);
 }
 
 static BlockDriverAIOCB *bdrv_qed_aio_writev(BlockDriverState *bs,
@@ -446,7 +1109,7 @@  static BlockDriverAIOCB *bdrv_qed_aio_writev(BlockDriverState *bs,
                                              BlockDriverCompletionFunc *cb,
                                              void *opaque)
 {
-    return NULL;
+    return qed_aio_setup(bs, sector_num, qiov, nb_sectors, cb, opaque, true);
 }
 
 static BlockDriverAIOCB *bdrv_qed_aio_flush(BlockDriverState *bs,
diff --git a/block/qed.h b/block/qed.h
index 6d49a4d..01c7b2e 100644
--- a/block/qed.h
+++ b/block/qed.h
@@ -116,6 +116,41 @@  typedef struct QEDRequest {
     CachedL2Table *l2_table;
 } QEDRequest;
 
+typedef struct QEDLockEntry QEDLockEntry;
+
+typedef struct QEDAIOCB {
+    BlockDriverAIOCB common;
+    QEMUBH *bh;
+    int bh_ret;                     /* final return status for completion bh */
+    QSIMPLEQ_ENTRY(QEDAIOCB) next;  /* next request */
+    QEDLockEntry *lock_entry;       /* held lock */
+    bool is_write;                  /* false - read, true - write */
+    bool *finished;                 /* signal for cancel completion */
+    uint64_t end_pos;               /* request end on block device, in bytes */
+
+    /* User scatter-gather list */
+    QEMUIOVector *qiov;
+    struct iovec *cur_iov;          /* current iovec to process */
+    size_t cur_iov_offset;          /* byte count already processed in iovec */
+
+    /* Current cluster scatter-gather list */
+    QEMUIOVector cur_qiov;
+    uint64_t cur_pos;               /* position on block device, in bytes */
+    uint64_t cur_cluster;           /* cluster offset in image file */
+    unsigned int cur_nclusters;     /* number of clusters being accessed */
+    int find_cluster_ret;           /* used for L1/L2 update */
+
+    QEDRequest request;
+} QEDAIOCB;
+
+/**
+ * Lock used to serialize requests touching the same table
+ */
+typedef struct {
+    QTAILQ_HEAD(, QEDLockEntry) entries;
+    BlockDriverCompletionFunc *wakeup_fn;
+} QEDLock;
+
 typedef struct {
     BlockDriverState *bs;           /* device */
     uint64_t file_size;             /* length of image file, in bytes */
@@ -123,6 +158,7 @@  typedef struct {
     QEDHeader header;               /* always cpu-endian */
     QEDTable *l1_table;
     L2TableCache l2_cache;          /* l2 table cache */
+    QEDLock lock;                   /* table lock */
     uint32_t table_nelems;
     uint32_t l1_shift;
     uint32_t l2_shift;
@@ -178,6 +214,13 @@  CachedL2Table *qed_find_l2_cache_entry(L2TableCache *l2_cache, uint64_t offset);
 void qed_commit_l2_cache_entry(L2TableCache *l2_cache, CachedL2Table *l2_table);
 
 /**
+ * Lock functions
+ */
+void qed_lock_init(QEDLock *lock, BlockDriverCompletionFunc *wakeup_fn);
+bool qed_lock(QEDLock *lock, uint64_t key, QEDAIOCB *acb);
+void qed_unlock(QEDLock *lock, QEDAIOCB *acb);
+
+/**
  * Table I/O functions
  */
 int qed_read_l1_table_sync(BDRVQEDState *s);
diff --git a/trace-events b/trace-events
index 769c198..8b8c1ae 100644
--- a/trace-events
+++ b/trace-events
@@ -203,3 +203,13 @@  disable qed_read_table(void *s, uint64_t offset, void *table) "s %p offset %"PRI
 disable qed_read_table_cb(void *s, void *table, int ret) "s %p table %p ret %d"
 disable qed_write_table(void *s, uint64_t offset, void *table, unsigned int index, unsigned int n) "s %p offset %"PRIu64" table %p index %u n %u"
 disable qed_write_table_cb(void *s, void *table, int ret) "s %p table %p ret %d"
+
+# block/qed.c
+disable qed_aio_complete(void *s, void *acb, int ret) "s %p acb %p ret %d"
+disable qed_aio_setup(void *s, void *acb, int64_t sector_num, int nb_sectors, void *opaque, int is_write) "s %p acb %p sector_num %"PRId64" nb_sectors %d opaque %p is_write %d"
+disable qed_aio_next_io(void *s, void *acb, int ret, uint64_t cur_pos) "s %p acb %p ret %d cur_pos %"PRIu64""
+disable qed_aio_read_data(void *s, void *acb, int ret, uint64_t offset, size_t len) "s %p acb %p ret %d offset %"PRIu64" len %zu"
+disable qed_aio_write_data(void *s, void *acb, int ret, uint64_t offset, size_t len) "s %p acb %p ret %d offset %"PRIu64" len %zu"
+disable qed_aio_write_prefill(void *s, void *acb, uint64_t start, size_t len, uint64_t offset) "s %p acb %p start %"PRIu64" len %zu offset %"PRIu64""
+disable qed_aio_write_postfill(void *s, void *acb, uint64_t start, size_t len, uint64_t offset) "s %p acb %p start %"PRIu64" len %zu offset %"PRIu64""
+disable qed_aio_write_main(void *s, void *acb, int ret, uint64_t offset, size_t len) "s %p acb %p ret %d offset %"PRIu64" len %zu"