Patchwork [v2,5/7] qed: Table, L2 cache, and cluster functions

login
register
mail settings
Submitter Stefan Hajnoczi
Date Oct. 8, 2010, 3:48 p.m.
Message ID <1286552914-27014-6-git-send-email-stefanha@linux.vnet.ibm.com>
Download mbox | patch
Permalink /patch/67278/
State New
Headers show

Comments

Stefan Hajnoczi - Oct. 8, 2010, 3:48 p.m.
This patch adds code to look up data cluster offsets in the image via
the L1/L2 tables.  The L2 tables are writethrough cached in memory for
performance (each read/write requires a lookup so it is essential to
cache the tables).

With cluster lookup code in place it is possible to implement
bdrv_is_allocated() to query the number of contiguous
allocated/unallocated clusters.

Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
 Makefile.objs        |    2 +-
 block/qed-cluster.c  |  145 +++++++++++++++++++++++
 block/qed-gencb.c    |   32 +++++
 block/qed-l2-cache.c |  132 +++++++++++++++++++++
 block/qed-table.c    |  316 ++++++++++++++++++++++++++++++++++++++++++++++++++
 block/qed.c          |   57 +++++++++-
 block/qed.h          |  108 +++++++++++++++++
 trace-events         |    6 +
 8 files changed, 796 insertions(+), 2 deletions(-)
 create mode 100644 block/qed-cluster.c
 create mode 100644 block/qed-gencb.c
 create mode 100644 block/qed-l2-cache.c
 create mode 100644 block/qed-table.c
Kevin Wolf - Oct. 12, 2010, 2:44 p.m.
Am 08.10.2010 17:48, schrieb Stefan Hajnoczi:
> This patch adds code to look up data cluster offsets in the image via
> the L1/L2 tables.  The L2 tables are writethrough cached in memory for
> performance (each read/write requires a lookup so it is essential to
> cache the tables).
> 
> With cluster lookup code in place it is possible to implement
> bdrv_is_allocated() to query the number of contiguous
> allocated/unallocated clusters.
> 
> Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
> ---
>  Makefile.objs        |    2 +-
>  block/qed-cluster.c  |  145 +++++++++++++++++++++++
>  block/qed-gencb.c    |   32 +++++
>  block/qed-l2-cache.c |  132 +++++++++++++++++++++
>  block/qed-table.c    |  316 ++++++++++++++++++++++++++++++++++++++++++++++++++
>  block/qed.c          |   57 +++++++++-
>  block/qed.h          |  108 +++++++++++++++++
>  trace-events         |    6 +
>  8 files changed, 796 insertions(+), 2 deletions(-)
>  create mode 100644 block/qed-cluster.c
>  create mode 100644 block/qed-gencb.c
>  create mode 100644 block/qed-l2-cache.c
>  create mode 100644 block/qed-table.c
> 
> diff --git a/Makefile.objs b/Makefile.objs
> index ff15795..7b3b19c 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -14,7 +14,7 @@ block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
>  
>  block-nested-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vvfat.o
>  block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o
> -block-nested-y += qed.o
> +block-nested-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
>  block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o
>  block-nested-$(CONFIG_WIN32) += raw-win32.o
>  block-nested-$(CONFIG_POSIX) += raw-posix.o
> diff --git a/block/qed-cluster.c b/block/qed-cluster.c
> new file mode 100644
> index 0000000..af65e5a
> --- /dev/null
> +++ b/block/qed-cluster.c
> @@ -0,0 +1,145 @@
> +/*
> + * QEMU Enhanced Disk Format Cluster functions
> + *
> + * Copyright IBM, Corp. 2010
> + *
> + * Authors:
> + *  Stefan Hajnoczi   <stefanha@linux.vnet.ibm.com>
> + *  Anthony Liguori   <aliguori@us.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU LGPL, version 2 or later.
> + * See the COPYING file in the top-level directory.

Hm, just noticed it here: COPYING is the text of the GPL, not LGPL. The
same comment applies to all other QED files, too.

> + *
> + */
> +
> +#include "qed.h"
> +
> +/**
> + * Count the number of contiguous data clusters
> + *
> + * @s:              QED state
> + * @table:          L2 table
> + * @index:          First cluster index
> + * @n:              Maximum number of clusters
> + * @offset:         Set to first cluster offset
> + *
> + * This function scans tables for contiguous allocated or free clusters.
> + */
> +static unsigned int qed_count_contiguous_clusters(BDRVQEDState *s,
> +                                                  QEDTable *table,
> +                                                  unsigned int index,
> +                                                  unsigned int n,
> +                                                  uint64_t *offset)
> +{
> +    unsigned int end = MIN(index + n, s->table_nelems);
> +    uint64_t last = table->offsets[index];
> +    unsigned int i;
> +
> +    *offset = last;
> +
> +    for (i = index + 1; i < end; i++) {
> +        if (last == 0) {
> +            /* Counting free clusters */
> +            if (table->offsets[i] != 0) {
> +                break;
> +            }
> +        } else {
> +            /* Counting allocated clusters */
> +            if (table->offsets[i] != last + s->header.cluster_size) {
> +                break;
> +            }
> +            last = table->offsets[i];
> +        }
> +    }
> +    return i - index;
> +}
> +
> +typedef struct {
> +    BDRVQEDState *s;
> +    uint64_t pos;
> +    size_t len;
> +
> +    QEDRequest *request;
> +
> +    /* User callback */
> +    QEDFindClusterFunc *cb;
> +    void *opaque;
> +} QEDFindClusterCB;
> +
> +static void qed_find_cluster_cb(void *opaque, int ret)
> +{
> +    QEDFindClusterCB *find_cluster_cb = opaque;
> +    BDRVQEDState *s = find_cluster_cb->s;
> +    QEDRequest *request = find_cluster_cb->request;
> +    uint64_t offset = 0;
> +    size_t len = 0;
> +    unsigned int index;
> +    unsigned int n;
> +
> +    if (ret) {
> +        ret = QED_CLUSTER_ERROR;

Can ret be anything else here? If so, why would we return a more generic
error value instead of passing down the original one?

[Okay, after having read more code, this is the place where we throw
errno away. We shouldn't do that.]

I also wonder, if reading from the disk failed, is the errno value lost?

> +        goto out;
> +    }
> +
> +    index = qed_l2_index(s, find_cluster_cb->pos);
> +    n = qed_bytes_to_clusters(s,
> +                              qed_offset_into_cluster(s, find_cluster_cb->pos) +
> +                              find_cluster_cb->len);
> +    n = qed_count_contiguous_clusters(s, request->l2_table->table,
> +                                      index, n, &offset);
> +
> +    ret = offset ? QED_CLUSTER_FOUND : QED_CLUSTER_L2;
> +    len = MIN(find_cluster_cb->len, n * s->header.cluster_size -
> +              qed_offset_into_cluster(s, find_cluster_cb->pos));
> +
> +    if (offset && !qed_check_cluster_offset(s, offset)) {
> +        ret = QED_CLUSTER_ERROR;
> +        goto out;
> +    }
> +
> +out:
> +    find_cluster_cb->cb(find_cluster_cb->opaque, ret, offset, len);
> +    qemu_free(find_cluster_cb);
> +}
> +
> +/**
> + * Find the offset of a data cluster
> + *
> + * @s:          QED state
> + * @pos:        Byte position in device
> + * @len:        Number of bytes
> + * @cb:         Completion function
> + * @opaque:     User data for completion function
> + */

If we add header comments (which I think we should), we shouldn't do
them only pro forma, but try to make them actually useful, i.e. describe
all inputs and outputs.

I'm reading this code for the first time and all these callbacks are
really confusing. What I know is that in all the state that I pass (s
and request) _something_ changes and in the cb is called with _some_
parameters of which I don't know what they mean.

So a good first step would adding a description of the arguments to cb.
At least in qed_read_l2_table, which actually does directly change the
state, we should additionally state that it returns the new L2 table in
request->l2_table. Things like this are not obvious if you didn't write
the code.

> +void qed_find_cluster(BDRVQEDState *s, QEDRequest *request, uint64_t pos,
> +                      size_t len, QEDFindClusterFunc *cb, void *opaque)
> +{
> +    QEDFindClusterCB *find_cluster_cb;
> +    uint64_t l2_offset;
> +
> +    /* Limit length to L2 boundary.  Requests are broken up at the L2 boundary
> +     * so that a request acts on one L2 table at a time.
> +     */
> +    len = MIN(len, (((pos >> s->l1_shift) + 1) << s->l1_shift) - pos);
> +
> +    l2_offset = s->l1_table->offsets[qed_l1_index(s, pos)];
> +    if (!l2_offset) {
> +        cb(opaque, QED_CLUSTER_L1, 0, len);
> +        return;
> +    }
> +    if (!qed_check_table_offset(s, l2_offset)) {
> +        cb(opaque, QED_CLUSTER_ERROR, 0, 0);
> +        return;
> +    }
> +
> +    find_cluster_cb = qemu_malloc(sizeof(*find_cluster_cb));
> +    find_cluster_cb->s = s;
> +    find_cluster_cb->pos = pos;
> +    find_cluster_cb->len = len;
> +    find_cluster_cb->cb = cb;
> +    find_cluster_cb->opaque = opaque;
> +    find_cluster_cb->request = request;
> +
> +    qed_read_l2_table(s, request, l2_offset,
> +                      qed_find_cluster_cb, find_cluster_cb);
> +}
> diff --git a/block/qed-gencb.c b/block/qed-gencb.c
> new file mode 100644
> index 0000000..d389e12
> --- /dev/null
> +++ b/block/qed-gencb.c
> @@ -0,0 +1,32 @@
> +/*
> + * QEMU Enhanced Disk Format
> + *
> + * Copyright IBM, Corp. 2010
> + *
> + * Authors:
> + *  Stefan Hajnoczi   <stefanha@linux.vnet.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU LGPL, version 2 or later.
> + * See the COPYING file in the top-level directory.
> + *
> + */
> +
> +#include "qed.h"
> +
> +void *gencb_alloc(size_t len, BlockDriverCompletionFunc *cb, void *opaque)
> +{
> +    GenericCB *gencb = qemu_malloc(len);
> +    gencb->cb = cb;
> +    gencb->opaque = opaque;
> +    return gencb;
> +}
> +
> +void gencb_complete(void *opaque, int ret)
> +{
> +    GenericCB *gencb = opaque;
> +    BlockDriverCompletionFunc *cb = gencb->cb;
> +    void *user_opaque = gencb->opaque;
> +
> +    qemu_free(gencb);
> +    cb(user_opaque, ret);
> +}
> diff --git a/block/qed-l2-cache.c b/block/qed-l2-cache.c
> new file mode 100644
> index 0000000..3b2bf6e
> --- /dev/null
> +++ b/block/qed-l2-cache.c
> @@ -0,0 +1,132 @@
> +/*
> + * QEMU Enhanced Disk Format L2 Cache
> + *
> + * Copyright IBM, Corp. 2010
> + *
> + * Authors:
> + *  Anthony Liguori   <aliguori@us.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU LGPL, version 2 or later.
> + * See the COPYING file in the top-level directory.
> + *
> + */
> +
> +#include "qed.h"
> +
> +/* Each L2 holds 2GB so this let's us fully cache a 100GB disk */
> +#define MAX_L2_CACHE_SIZE 50
> +
> +/**
> + * Initialize the L2 cache
> + */
> +void qed_init_l2_cache(L2TableCache *l2_cache,
> +                       L2TableAllocFunc *alloc_l2_table,

What is this function pointer meant for? So far I can only see one call
to qed_init_l2_cache(), so I guess this indirection is just in
preparation for some future extension? Maybe add a comment?

> +                       void *alloc_l2_table_opaque)
> +{
> +    QTAILQ_INIT(&l2_cache->entries);
> +    l2_cache->n_entries = 0;
> +    l2_cache->alloc_l2_table = alloc_l2_table;
> +    l2_cache->alloc_l2_table_opaque = alloc_l2_table_opaque;
> +}
> +
> +/**
> + * Free the L2 cache
> + */
> +void qed_free_l2_cache(L2TableCache *l2_cache)
> +{
> +    CachedL2Table *entry, *next_entry;
> +
> +    QTAILQ_FOREACH_SAFE(entry, &l2_cache->entries, node, next_entry) {
> +        qemu_vfree(entry->table);
> +        qemu_free(entry);
> +    }
> +}
> +
> +/**
> + * Allocate an uninitialized entry from the cache
> + *
> + * The returned entry has a reference count of 1 and is owned by the caller.
> + */
> +CachedL2Table *qed_alloc_l2_cache_entry(L2TableCache *l2_cache)
> +{
> +    CachedL2Table *entry;
> +
> +    entry = qemu_mallocz(sizeof(*entry));
> +    entry->table = l2_cache->alloc_l2_table(l2_cache->alloc_l2_table_opaque);
> +    entry->ref++;
> +
> +    return entry;
> +}

Hm, what references are counted by ref? Do you have more than one L2
cache and an entry can be referenced by multiple of them?

> +
> +/**
> + * Decrease an entry's reference count and free if necessary when the reference
> + * count drops to zero.
> + */
> +void qed_unref_l2_cache_entry(L2TableCache *l2_cache, CachedL2Table *entry)
> +{
> +    if (!entry) {
> +        return;
> +    }
> +
> +    entry->ref--;
> +    if (entry->ref == 0) {
> +        qemu_vfree(entry->table);
> +        qemu_free(entry);
> +    }
> +}

The l2_cache arguments looks unused. Do we need it?

> +
> +/**
> + * Find an entry in the L2 cache.  This may return NULL and it's up to the
> + * caller to satisfy the cache miss.
> + *
> + * For a cached entry, this function increases the reference count and returns
> + * the entry.
> + */
> +CachedL2Table *qed_find_l2_cache_entry(L2TableCache *l2_cache, uint64_t offset)
> +{
> +    CachedL2Table *entry;
> +
> +    QTAILQ_FOREACH(entry, &l2_cache->entries, node) {
> +        if (entry->offset == offset) {
> +            entry->ref++;
> +            return entry;
> +        }
> +    }
> +    return NULL;
> +}
> +
> +/**
> + * Commit an L2 cache entry into the cache.  This is meant to be used as part of
> + * the process to satisfy a cache miss.  A caller would allocate an entry which
> + * is not actually in the L2 cache and then once the entry was valid and
> + * present on disk, the entry can be committed into the cache.
> + *
> + * Since the cache is write-through, it's important that this function is not
> + * called until the entry is present on disk and the L1 has been updated to
> + * point to the entry.
> + *
> + * N.B. This function steals a reference to the l2_table from the caller so the
> + * caller must obtain a new reference by issuing a call to
> + * qed_find_l2_cache_entry().
> + */
> +void qed_commit_l2_cache_entry(L2TableCache *l2_cache, CachedL2Table *l2_table)
> +{
> +    CachedL2Table *entry;
> +
> +    entry = qed_find_l2_cache_entry(l2_cache, l2_table->offset);
> +    if (entry) {
> +        qed_unref_l2_cache_entry(l2_cache, entry);

Maybe the qed_find_l2_cache_entry semantics isn't really the right one
if we need to decrease the refcount here just because that function just
increased it and we don't actually want that?

> +        qed_unref_l2_cache_entry(l2_cache, l2_table);
> +        return;
> +    }
> +
> +    if (l2_cache->n_entries >= MAX_L2_CACHE_SIZE) {
> +        entry = QTAILQ_FIRST(&l2_cache->entries);
> +        QTAILQ_REMOVE(&l2_cache->entries, entry, node);
> +        l2_cache->n_entries--;
> +        qed_unref_l2_cache_entry(l2_cache, entry);
> +    }
> +
> +    l2_cache->n_entries++;
> +    QTAILQ_INSERT_TAIL(&l2_cache->entries, l2_table, node);
> +}

Okay, so the table has the right refcount because we steal a refcount
from the caller, and if you don't reuse this, we explicitly unref it. Am
I the only one to find this interface confusing?

> diff --git a/block/qed-table.c b/block/qed-table.c
> new file mode 100644
> index 0000000..ba6faf0
> --- /dev/null
> +++ b/block/qed-table.c
> @@ -0,0 +1,316 @@
> +/*
> + * QEMU Enhanced Disk Format Table I/O
> + *
> + * Copyright IBM, Corp. 2010
> + *
> + * Authors:
> + *  Stefan Hajnoczi   <stefanha@linux.vnet.ibm.com>
> + *  Anthony Liguori   <aliguori@us.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU LGPL, version 2 or later.
> + * See the COPYING file in the top-level directory.
> + *
> + */
> +
> +#include "trace.h"
> +#include "qemu_socket.h" /* for EINPROGRESS on Windows */
> +#include "qed.h"
> +
> +typedef struct {
> +    GenericCB gencb;
> +    BDRVQEDState *s;
> +    QEDTable *table;
> +
> +    struct iovec iov;
> +    QEMUIOVector qiov;
> +} QEDReadTableCB;
> +
> +static void qed_read_table_cb(void *opaque, int ret)
> +{
> +    QEDReadTableCB *read_table_cb = opaque;
> +    QEDTable *table = read_table_cb->table;
> +    int noffsets = read_table_cb->iov.iov_len / sizeof(uint64_t);
> +    int i;
> +
> +    /* Handle I/O error */
> +    if (ret) {
> +        goto out;
> +    }
> +
> +    /* Byteswap offsets */
> +    for (i = 0; i < noffsets; i++) {
> +        table->offsets[i] = le64_to_cpu(table->offsets[i]);
> +    }
> +
> +out:
> +    /* Completion */
> +    trace_qed_read_table_cb(read_table_cb->s, read_table_cb->table, ret);
> +    gencb_complete(&read_table_cb->gencb, ret);
> +}
> +
> +static void qed_read_table(BDRVQEDState *s, uint64_t offset, QEDTable *table,
> +                           BlockDriverCompletionFunc *cb, void *opaque)
> +{
> +    QEDReadTableCB *read_table_cb = gencb_alloc(sizeof(*read_table_cb),
> +                                                cb, opaque);
> +    QEMUIOVector *qiov = &read_table_cb->qiov;
> +    BlockDriverAIOCB *aiocb;
> +
> +    trace_qed_read_table(s, offset, table);
> +
> +    read_table_cb->s = s;
> +    read_table_cb->table = table;
> +    read_table_cb->iov.iov_base = table->offsets,
> +    read_table_cb->iov.iov_len = s->header.cluster_size * s->header.table_size,
> +
> +    qemu_iovec_init_external(qiov, &read_table_cb->iov, 1);
> +    aiocb = bdrv_aio_readv(s->bs->file, offset / BDRV_SECTOR_SIZE, qiov,
> +                           read_table_cb->iov.iov_len / BDRV_SECTOR_SIZE,
> +                           qed_read_table_cb, read_table_cb);
> +    if (!aiocb) {
> +        qed_read_table_cb(read_table_cb, -EIO);
> +    }
> +}
> +
> +typedef struct {
> +    GenericCB gencb;
> +    BDRVQEDState *s;
> +    QEDTable *orig_table;
> +    QEDTable *table;
> +    bool flush;             /* flush after write? */
> +
> +    struct iovec iov;
> +    QEMUIOVector qiov;
> +} QEDWriteTableCB;
> +
> +static void qed_write_table_cb(void *opaque, int ret)
> +{
> +    QEDWriteTableCB *write_table_cb = opaque;
> +
> +    trace_qed_write_table_cb(write_table_cb->s,
> +                              write_table_cb->orig_table, ret);
> +
> +    if (ret) {
> +        goto out;
> +    }
> +
> +    if (write_table_cb->flush) {
> +        /* We still need to flush first */
> +        write_table_cb->flush = false;
> +        bdrv_aio_flush(write_table_cb->s->bs, qed_write_table_cb,
> +                       write_table_cb);
> +        return;
> +    }
> +
> +out:
> +    qemu_vfree(write_table_cb->table);
> +    gencb_complete(&write_table_cb->gencb, ret);
> +    return;
> +}
> +
> +/**
> + * Write out an updated part or all of a table
> + *
> + * @s:          QED state
> + * @offset:     Offset of table in image file, in bytes
> + * @table:      Table
> + * @index:      Index of first element
> + * @n:          Number of elements
> + * @flush:      Whether or not to sync to disk
> + * @cb:         Completion function
> + * @opaque:     Argument for completion function
> + */
> +static void qed_write_table(BDRVQEDState *s, uint64_t offset, QEDTable *table,
> +                            unsigned int index, unsigned int n, bool flush,
> +                            BlockDriverCompletionFunc *cb, void *opaque)
> +{
> +    QEDWriteTableCB *write_table_cb;
> +    BlockDriverAIOCB *aiocb;
> +    unsigned int sector_mask = BDRV_SECTOR_SIZE / sizeof(uint64_t) - 1;
> +    unsigned int start, end, i;
> +    size_t len_bytes;
> +
> +    trace_qed_write_table(s, offset, table, index, n);
> +
> +    /* Calculate indices of the first and one after last elements */
> +    start = index & ~sector_mask;
> +    end = (index + n + sector_mask) & ~sector_mask;
> +
> +    len_bytes = (end - start) * sizeof(uint64_t);
> +
> +    write_table_cb = gencb_alloc(sizeof(*write_table_cb), cb, opaque);
> +    write_table_cb->s = s;
> +    write_table_cb->orig_table = table;
> +    write_table_cb->flush = flush;
> +    write_table_cb->table = qemu_blockalign(s->bs, len_bytes);
> +    write_table_cb->iov.iov_base = write_table_cb->table->offsets;
> +    write_table_cb->iov.iov_len = len_bytes;
> +    qemu_iovec_init_external(&write_table_cb->qiov, &write_table_cb->iov, 1);
> +
> +    /* Byteswap table */
> +    for (i = start; i < end; i++) {
> +        uint64_t le_offset = cpu_to_le64(table->offsets[i]);
> +        write_table_cb->table->offsets[i - start] = le_offset;
> +    }
> +
> +    /* Adjust for offset into table */
> +    offset += start * sizeof(uint64_t);
> +
> +    aiocb = bdrv_aio_writev(s->bs->file, offset / BDRV_SECTOR_SIZE,
> +                            &write_table_cb->qiov,
> +                            write_table_cb->iov.iov_len / BDRV_SECTOR_SIZE,
> +                            qed_write_table_cb, write_table_cb);
> +    if (!aiocb) {
> +        qed_write_table_cb(write_table_cb, -EIO);
> +    }
> +}
> +
> +/**
> + * Propagate return value from async callback
> + */
> +static void qed_sync_cb(void *opaque, int ret)
> +{
> +    *(int *)opaque = ret;
> +}
> +
> +int qed_read_l1_table_sync(BDRVQEDState *s)
> +{
> +    int ret = -EINPROGRESS;
> +
> +    async_context_push();
> +
> +    qed_read_table(s, s->header.l1_table_offset,
> +                   s->l1_table, qed_sync_cb, &ret);
> +    while (ret == -EINPROGRESS) {
> +        qemu_aio_wait();
> +    }
> +
> +    async_context_pop();
> +
> +    return ret;
> +}
> +
> +void qed_write_l1_table(BDRVQEDState *s, unsigned int index, unsigned int n,
> +                        BlockDriverCompletionFunc *cb, void *opaque)
> +{
> +    BLKDBG_EVENT(s->bs->file, BLKDBG_L1_UPDATE);
> +    qed_write_table(s, s->header.l1_table_offset,
> +                    s->l1_table, index, n, false, cb, opaque);
> +}
> +
> +int qed_write_l1_table_sync(BDRVQEDState *s, unsigned int index,
> +                            unsigned int n)
> +{
> +    int ret = -EINPROGRESS;
> +
> +    async_context_push();
> +
> +    qed_write_l1_table(s, index, n, qed_sync_cb, &ret);
> +    while (ret == -EINPROGRESS) {
> +        qemu_aio_wait();
> +    }
> +
> +    async_context_pop();
> +
> +    return ret;
> +}
> +
> +typedef struct {
> +    GenericCB gencb;
> +    BDRVQEDState *s;
> +    uint64_t l2_offset;
> +    QEDRequest *request;
> +} QEDReadL2TableCB;
> +
> +static void qed_read_l2_table_cb(void *opaque, int ret)
> +{
> +    QEDReadL2TableCB *read_l2_table_cb = opaque;
> +    QEDRequest *request = read_l2_table_cb->request;
> +    BDRVQEDState *s = read_l2_table_cb->s;
> +    CachedL2Table *l2_table = request->l2_table;
> +
> +    if (ret) {
> +        /* can't trust loaded L2 table anymore */
> +        qed_unref_l2_cache_entry(&s->l2_cache, l2_table);
> +        request->l2_table = NULL;

Is decreasing the refcount by one and clearing request->l2_table enough?
Didn't we destroy it for all references? Unless, of course, there is at
most one reference, but then the refcount is useless.

Hm, or do we just increase the refcount before the cache entry is
actually used, and we shouldn't do that? Not sure I understand the
purpose of this refcount thing yet.

> +    } else {
> +        l2_table->offset = read_l2_table_cb->l2_offset;
> +
> +        qed_commit_l2_cache_entry(&s->l2_cache, l2_table);
> +
> +        /* This is guaranteed to succeed because we just committed the entry
> +         * to the cache.
> +         */
> +        request->l2_table = qed_find_l2_cache_entry(&s->l2_cache,
> +                                                    l2_table->offset);
> +        assert(request->l2_table != NULL);
> +    }
> +
> +    gencb_complete(&read_l2_table_cb->gencb, ret);
> +}
> +
> +void qed_read_l2_table(BDRVQEDState *s, QEDRequest *request, uint64_t offset,
> +                       BlockDriverCompletionFunc *cb, void *opaque)
> +{
> +    QEDReadL2TableCB *read_l2_table_cb;
> +
> +    qed_unref_l2_cache_entry(&s->l2_cache, request->l2_table);
> +
> +    /* Check for cached L2 entry */
> +    request->l2_table = qed_find_l2_cache_entry(&s->l2_cache, offset);
> +    if (request->l2_table) {
> +        cb(opaque, 0);
> +        return;
> +    }
> +
> +    request->l2_table = qed_alloc_l2_cache_entry(&s->l2_cache);
> +
> +    read_l2_table_cb = gencb_alloc(sizeof(*read_l2_table_cb), cb, opaque);
> +    read_l2_table_cb->s = s;
> +    read_l2_table_cb->l2_offset = offset;
> +    read_l2_table_cb->request = request;
> +
> +    BLKDBG_EVENT(s->bs->file, BLKDBG_L2_LOAD);
> +    qed_read_table(s, offset, request->l2_table->table,
> +                   qed_read_l2_table_cb, read_l2_table_cb);
> +}
> +
> +int qed_read_l2_table_sync(BDRVQEDState *s, QEDRequest *request, uint64_t offset)
> +{
> +    int ret = -EINPROGRESS;
> +
> +    async_context_push();
> +
> +    qed_read_l2_table(s, request, offset, qed_sync_cb, &ret);
> +    while (ret == -EINPROGRESS) {
> +        qemu_aio_wait();
> +    }
> +
> +    async_context_pop();
> +    return ret;
> +}
> +
> +void qed_write_l2_table(BDRVQEDState *s, QEDRequest *request,
> +                        unsigned int index, unsigned int n, bool flush,
> +                        BlockDriverCompletionFunc *cb, void *opaque)
> +{
> +    BLKDBG_EVENT(s->bs->file, BLKDBG_L2_UPDATE);
> +    qed_write_table(s, request->l2_table->offset,
> +                    request->l2_table->table, index, n, flush, cb, opaque);
> +}
> +
> +int qed_write_l2_table_sync(BDRVQEDState *s, QEDRequest *request,
> +                            unsigned int index, unsigned int n, bool flush)
> +{
> +    int ret = -EINPROGRESS;
> +
> +    async_context_push();
> +
> +    qed_write_l2_table(s, request, index, n, flush, qed_sync_cb, &ret);
> +    while (ret == -EINPROGRESS) {
> +        qemu_aio_wait();
> +    }
> +
> +    async_context_pop();
> +    return ret;
> +}
> diff --git a/block/qed.c b/block/qed.c
> index ea03798..6d7f4d7 100644
> --- a/block/qed.c
> +++ b/block/qed.c
> @@ -139,6 +139,15 @@ static int qed_read_string(BlockDriverState *file, uint64_t offset, size_t n,
>      return 0;
>  }
>  
> +static QEDTable *qed_alloc_table(void *opaque)
> +{
> +    BDRVQEDState *s = opaque;
> +
> +    /* Honor O_DIRECT memory alignment requirements */
> +    return qemu_blockalign(s->bs,
> +                           s->header.cluster_size * s->header.table_size);
> +}
> +
>  static int bdrv_qed_open(BlockDriverState *bs, int flags)
>  {
>      BDRVQEDState *s = bs->opaque;
> @@ -207,11 +216,24 @@ static int bdrv_qed_open(BlockDriverState *bs, int flags)
>              }
>          }
>      }
> +
> +    s->l1_table = qed_alloc_table(s);
> +    qed_init_l2_cache(&s->l2_cache, qed_alloc_table, s);
> +
> +    ret = qed_read_l1_table_sync(s);
> +    if (ret) {
> +        qed_free_l2_cache(&s->l2_cache);

Why not initializing the L2 cache only if the read succeeded?

Kevin
Stefan Hajnoczi - Oct. 13, 2010, 1:41 p.m.
On Tue, Oct 12, 2010 at 04:44:34PM +0200, Kevin Wolf wrote:
> Am 08.10.2010 17:48, schrieb Stefan Hajnoczi:
> > diff --git a/block/qed-cluster.c b/block/qed-cluster.c
> > new file mode 100644
> > index 0000000..af65e5a
> > --- /dev/null
> > +++ b/block/qed-cluster.c
> > @@ -0,0 +1,145 @@
> > +/*
> > + * QEMU Enhanced Disk Format Cluster functions
> > + *
> > + * Copyright IBM, Corp. 2010
> > + *
> > + * Authors:
> > + *  Stefan Hajnoczi   <stefanha@linux.vnet.ibm.com>
> > + *  Anthony Liguori   <aliguori@us.ibm.com>
> > + *
> > + * This work is licensed under the terms of the GNU LGPL, version 2 or later.
> > + * See the COPYING file in the top-level directory.
> 
> Hm, just noticed it here: COPYING is the text of the GPL, not LGPL. The
> same comment applies to all other QED files, too.

It should be COPYING.LIB, thanks for pointing this out.

> 
> > + *
> > + */
> > +
> > +#include "qed.h"
> > +
> > +/**
> > + * Count the number of contiguous data clusters
> > + *
> > + * @s:              QED state
> > + * @table:          L2 table
> > + * @index:          First cluster index
> > + * @n:              Maximum number of clusters
> > + * @offset:         Set to first cluster offset
> > + *
> > + * This function scans tables for contiguous allocated or free clusters.
> > + */
> > +static unsigned int qed_count_contiguous_clusters(BDRVQEDState *s,
> > +                                                  QEDTable *table,
> > +                                                  unsigned int index,
> > +                                                  unsigned int n,
> > +                                                  uint64_t *offset)
> > +{
> > +    unsigned int end = MIN(index + n, s->table_nelems);
> > +    uint64_t last = table->offsets[index];
> > +    unsigned int i;
> > +
> > +    *offset = last;
> > +
> > +    for (i = index + 1; i < end; i++) {
> > +        if (last == 0) {
> > +            /* Counting free clusters */
> > +            if (table->offsets[i] != 0) {
> > +                break;
> > +            }
> > +        } else {
> > +            /* Counting allocated clusters */
> > +            if (table->offsets[i] != last + s->header.cluster_size) {
> > +                break;
> > +            }
> > +            last = table->offsets[i];
> > +        }
> > +    }
> > +    return i - index;
> > +}
> > +
> > +typedef struct {
> > +    BDRVQEDState *s;
> > +    uint64_t pos;
> > +    size_t len;
> > +
> > +    QEDRequest *request;
> > +
> > +    /* User callback */
> > +    QEDFindClusterFunc *cb;
> > +    void *opaque;
> > +} QEDFindClusterCB;
> > +
> > +static void qed_find_cluster_cb(void *opaque, int ret)
> > +{
> > +    QEDFindClusterCB *find_cluster_cb = opaque;
> > +    BDRVQEDState *s = find_cluster_cb->s;
> > +    QEDRequest *request = find_cluster_cb->request;
> > +    uint64_t offset = 0;
> > +    size_t len = 0;
> > +    unsigned int index;
> > +    unsigned int n;
> > +
> > +    if (ret) {
> > +        ret = QED_CLUSTER_ERROR;
> 
> Can ret be anything else here? If so, why would we return a more generic
> error value instead of passing down the original one?
> 
> [Okay, after having read more code, this is the place where we throw
> errno away. We shouldn't do that.]
> 
> I also wonder, if reading from the disk failed, is the errno value lost?
> 
> > +        goto out;
> > +    }
> > +
> > +    index = qed_l2_index(s, find_cluster_cb->pos);
> > +    n = qed_bytes_to_clusters(s,
> > +                              qed_offset_into_cluster(s, find_cluster_cb->pos) +
> > +                              find_cluster_cb->len);
> > +    n = qed_count_contiguous_clusters(s, request->l2_table->table,
> > +                                      index, n, &offset);
> > +
> > +    ret = offset ? QED_CLUSTER_FOUND : QED_CLUSTER_L2;
> > +    len = MIN(find_cluster_cb->len, n * s->header.cluster_size -
> > +              qed_offset_into_cluster(s, find_cluster_cb->pos));
> > +
> > +    if (offset && !qed_check_cluster_offset(s, offset)) {
> > +        ret = QED_CLUSTER_ERROR;
> > +        goto out;
> > +    }
> > +
> > +out:
> > +    find_cluster_cb->cb(find_cluster_cb->opaque, ret, offset, len);
> > +    qemu_free(find_cluster_cb);
> > +}
> > +
> > +/**
> > + * Find the offset of a data cluster
> > + *
> > + * @s:          QED state
> > + * @pos:        Byte position in device
> > + * @len:        Number of bytes
> > + * @cb:         Completion function
> > + * @opaque:     User data for completion function
> > + */
> 
> If we add header comments (which I think we should), we shouldn't do
> them only pro forma, but try to make them actually useful, i.e. describe
> all inputs and outputs.
> 
> I'm reading this code for the first time and all these callbacks are
> really confusing. What I know is that in all the state that I pass (s
> and request) _something_ changes and in the cb is called with _some_
> parameters of which I don't know what they mean.
> 
> So a good first step would adding a description of the arguments to cb.
> At least in qed_read_l2_table, which actually does directly change the
> state, we should additionally state that it returns the new L2 table in
> request->l2_table. Things like this are not obvious if you didn't write
> the code.

Right, I will improve the doc comments for v3.

> 
> > +void qed_find_cluster(BDRVQEDState *s, QEDRequest *request, uint64_t pos,
> > +                      size_t len, QEDFindClusterFunc *cb, void *opaque)
> > +{
> > +    QEDFindClusterCB *find_cluster_cb;
> > +    uint64_t l2_offset;
> > +
> > +    /* Limit length to L2 boundary.  Requests are broken up at the L2 boundary
> > +     * so that a request acts on one L2 table at a time.
> > +     */
> > +    len = MIN(len, (((pos >> s->l1_shift) + 1) << s->l1_shift) - pos);
> > +
> > +    l2_offset = s->l1_table->offsets[qed_l1_index(s, pos)];
> > +    if (!l2_offset) {
> > +        cb(opaque, QED_CLUSTER_L1, 0, len);
> > +        return;
> > +    }
> > +    if (!qed_check_table_offset(s, l2_offset)) {
> > +        cb(opaque, QED_CLUSTER_ERROR, 0, 0);
> > +        return;
> > +    }
> > +
> > +    find_cluster_cb = qemu_malloc(sizeof(*find_cluster_cb));
> > +    find_cluster_cb->s = s;
> > +    find_cluster_cb->pos = pos;
> > +    find_cluster_cb->len = len;
> > +    find_cluster_cb->cb = cb;
> > +    find_cluster_cb->opaque = opaque;
> > +    find_cluster_cb->request = request;
> > +
> > +    qed_read_l2_table(s, request, l2_offset,
> > +                      qed_find_cluster_cb, find_cluster_cb);
> > +}
> > diff --git a/block/qed-gencb.c b/block/qed-gencb.c
> > new file mode 100644
> > index 0000000..d389e12
> > --- /dev/null
> > +++ b/block/qed-gencb.c
> > @@ -0,0 +1,32 @@
> > +/*
> > + * QEMU Enhanced Disk Format
> > + *
> > + * Copyright IBM, Corp. 2010
> > + *
> > + * Authors:
> > + *  Stefan Hajnoczi   <stefanha@linux.vnet.ibm.com>
> > + *
> > + * This work is licensed under the terms of the GNU LGPL, version 2 or later.
> > + * See the COPYING file in the top-level directory.
> > + *
> > + */
> > +
> > +#include "qed.h"
> > +
> > +void *gencb_alloc(size_t len, BlockDriverCompletionFunc *cb, void *opaque)
> > +{
> > +    GenericCB *gencb = qemu_malloc(len);
> > +    gencb->cb = cb;
> > +    gencb->opaque = opaque;
> > +    return gencb;
> > +}
> > +
> > +void gencb_complete(void *opaque, int ret)
> > +{
> > +    GenericCB *gencb = opaque;
> > +    BlockDriverCompletionFunc *cb = gencb->cb;
> > +    void *user_opaque = gencb->opaque;
> > +
> > +    qemu_free(gencb);
> > +    cb(user_opaque, ret);
> > +}
> > diff --git a/block/qed-l2-cache.c b/block/qed-l2-cache.c
> > new file mode 100644
> > index 0000000..3b2bf6e
> > --- /dev/null
> > +++ b/block/qed-l2-cache.c
> > @@ -0,0 +1,132 @@
> > +/*
> > + * QEMU Enhanced Disk Format L2 Cache
> > + *
> > + * Copyright IBM, Corp. 2010
> > + *
> > + * Authors:
> > + *  Anthony Liguori   <aliguori@us.ibm.com>
> > + *
> > + * This work is licensed under the terms of the GNU LGPL, version 2 or later.
> > + * See the COPYING file in the top-level directory.
> > + *
> > + */
> > +
> > +#include "qed.h"
> > +
> > +/* Each L2 holds 2GB so this let's us fully cache a 100GB disk */
> > +#define MAX_L2_CACHE_SIZE 50
> > +
> > +/**
> > + * Initialize the L2 cache
> > + */
> > +void qed_init_l2_cache(L2TableCache *l2_cache,
> > +                       L2TableAllocFunc *alloc_l2_table,
> 
> What is this function pointer meant for? So far I can only see one call
> to qed_init_l2_cache(), so I guess this indirection is just in
> preparation for some future extension? Maybe add a comment?

I will review this function pointer and address with a comment: the
L2TableAllocFunc decouples qed-l2-cache.c from the qemu_blockalign() and
table sizing.  Maybe it's not worth having this if it just complicates
things.

> 
> > +                       void *alloc_l2_table_opaque)
> > +{
> > +    QTAILQ_INIT(&l2_cache->entries);
> > +    l2_cache->n_entries = 0;
> > +    l2_cache->alloc_l2_table = alloc_l2_table;
> > +    l2_cache->alloc_l2_table_opaque = alloc_l2_table_opaque;
> > +}
> > +
> > +/**
> > + * Free the L2 cache
> > + */
> > +void qed_free_l2_cache(L2TableCache *l2_cache)
> > +{
> > +    CachedL2Table *entry, *next_entry;
> > +
> > +    QTAILQ_FOREACH_SAFE(entry, &l2_cache->entries, node, next_entry) {
> > +        qemu_vfree(entry->table);
> > +        qemu_free(entry);
> > +    }
> > +}
> > +
> > +/**
> > + * Allocate an uninitialized entry from the cache
> > + *
> > + * The returned entry has a reference count of 1 and is owned by the caller.
> > + */
> > +CachedL2Table *qed_alloc_l2_cache_entry(L2TableCache *l2_cache)
> > +{
> > +    CachedL2Table *entry;
> > +
> > +    entry = qemu_mallocz(sizeof(*entry));
> > +    entry->table = l2_cache->alloc_l2_table(l2_cache->alloc_l2_table_opaque);
> > +    entry->ref++;
> > +
> > +    return entry;
> > +}
> 
> Hm, what references are counted by ref? Do you have more than one L2
> cache and an entry can be referenced by multiple of them?

I'll add comments describing how the cache works and is used.  I hope
this will make refcounts and caching clearer.

> 
> > +
> > +/**
> > + * Decrease an entry's reference count and free if necessary when the reference
> > + * count drops to zero.
> > + */
> > +void qed_unref_l2_cache_entry(L2TableCache *l2_cache, CachedL2Table *entry)
> > +{
> > +    if (!entry) {
> > +        return;
> > +    }
> > +
> > +    entry->ref--;
> > +    if (entry->ref == 0) {
> > +        qemu_vfree(entry->table);
> > +        qemu_free(entry);
> > +    }
> > +}
> 
> The l2_cache arguments looks unused. Do we need it?

It's not needed, will remove.

> 
> > +
> > +/**
> > + * Find an entry in the L2 cache.  This may return NULL and it's up to the
> > + * caller to satisfy the cache miss.
> > + *
> > + * For a cached entry, this function increases the reference count and returns
> > + * the entry.
> > + */
> > +CachedL2Table *qed_find_l2_cache_entry(L2TableCache *l2_cache, uint64_t offset)
> > +{
> > +    CachedL2Table *entry;
> > +
> > +    QTAILQ_FOREACH(entry, &l2_cache->entries, node) {
> > +        if (entry->offset == offset) {
> > +            entry->ref++;
> > +            return entry;
> > +        }
> > +    }
> > +    return NULL;
> > +}
> > +
> > +/**
> > + * Commit an L2 cache entry into the cache.  This is meant to be used as part of
> > + * the process to satisfy a cache miss.  A caller would allocate an entry which
> > + * is not actually in the L2 cache and then once the entry was valid and
> > + * present on disk, the entry can be committed into the cache.
> > + *
> > + * Since the cache is write-through, it's important that this function is not
> > + * called until the entry is present on disk and the L1 has been updated to
> > + * point to the entry.
> > + *
> > + * N.B. This function steals a reference to the l2_table from the caller so the
> > + * caller must obtain a new reference by issuing a call to
> > + * qed_find_l2_cache_entry().
> > + */
> > +void qed_commit_l2_cache_entry(L2TableCache *l2_cache, CachedL2Table *l2_table)
> > +{
> > +    CachedL2Table *entry;
> > +
> > +    entry = qed_find_l2_cache_entry(l2_cache, l2_table->offset);
> > +    if (entry) {
> > +        qed_unref_l2_cache_entry(l2_cache, entry);
> 
> Maybe the qed_find_l2_cache_entry semantics isn't really the right one
> if we need to decrease the refcount here just because that function just
> increased it and we don't actually want that?
> 
> > +        qed_unref_l2_cache_entry(l2_cache, l2_table);
> > +        return;
> > +    }
> > +
> > +    if (l2_cache->n_entries >= MAX_L2_CACHE_SIZE) {
> > +        entry = QTAILQ_FIRST(&l2_cache->entries);
> > +        QTAILQ_REMOVE(&l2_cache->entries, entry, node);
> > +        l2_cache->n_entries--;
> > +        qed_unref_l2_cache_entry(l2_cache, entry);
> > +    }
> > +
> > +    l2_cache->n_entries++;
> > +    QTAILQ_INSERT_TAIL(&l2_cache->entries, l2_table, node);
> > +}
> 
> Okay, so the table has the right refcount because we steal a refcount
> from the caller, and if you don't reuse this, we explicitly unref it. Am
> I the only one to find this interface confusing?
> 
> > diff --git a/block/qed-table.c b/block/qed-table.c
> > new file mode 100644
> > index 0000000..ba6faf0
> > --- /dev/null
> > +++ b/block/qed-table.c
> > @@ -0,0 +1,316 @@
> > +/*
> > + * QEMU Enhanced Disk Format Table I/O
> > + *
> > + * Copyright IBM, Corp. 2010
> > + *
> > + * Authors:
> > + *  Stefan Hajnoczi   <stefanha@linux.vnet.ibm.com>
> > + *  Anthony Liguori   <aliguori@us.ibm.com>
> > + *
> > + * This work is licensed under the terms of the GNU LGPL, version 2 or later.
> > + * See the COPYING file in the top-level directory.
> > + *
> > + */
> > +
> > +#include "trace.h"
> > +#include "qemu_socket.h" /* for EINPROGRESS on Windows */
> > +#include "qed.h"
> > +
> > +typedef struct {
> > +    GenericCB gencb;
> > +    BDRVQEDState *s;
> > +    QEDTable *table;
> > +
> > +    struct iovec iov;
> > +    QEMUIOVector qiov;
> > +} QEDReadTableCB;
> > +
> > +static void qed_read_table_cb(void *opaque, int ret)
> > +{
> > +    QEDReadTableCB *read_table_cb = opaque;
> > +    QEDTable *table = read_table_cb->table;
> > +    int noffsets = read_table_cb->iov.iov_len / sizeof(uint64_t);
> > +    int i;
> > +
> > +    /* Handle I/O error */
> > +    if (ret) {
> > +        goto out;
> > +    }
> > +
> > +    /* Byteswap offsets */
> > +    for (i = 0; i < noffsets; i++) {
> > +        table->offsets[i] = le64_to_cpu(table->offsets[i]);
> > +    }
> > +
> > +out:
> > +    /* Completion */
> > +    trace_qed_read_table_cb(read_table_cb->s, read_table_cb->table, ret);
> > +    gencb_complete(&read_table_cb->gencb, ret);
> > +}
> > +
> > +static void qed_read_table(BDRVQEDState *s, uint64_t offset, QEDTable *table,
> > +                           BlockDriverCompletionFunc *cb, void *opaque)
> > +{
> > +    QEDReadTableCB *read_table_cb = gencb_alloc(sizeof(*read_table_cb),
> > +                                                cb, opaque);
> > +    QEMUIOVector *qiov = &read_table_cb->qiov;
> > +    BlockDriverAIOCB *aiocb;
> > +
> > +    trace_qed_read_table(s, offset, table);
> > +
> > +    read_table_cb->s = s;
> > +    read_table_cb->table = table;
> > +    read_table_cb->iov.iov_base = table->offsets,
> > +    read_table_cb->iov.iov_len = s->header.cluster_size * s->header.table_size,
> > +
> > +    qemu_iovec_init_external(qiov, &read_table_cb->iov, 1);
> > +    aiocb = bdrv_aio_readv(s->bs->file, offset / BDRV_SECTOR_SIZE, qiov,
> > +                           read_table_cb->iov.iov_len / BDRV_SECTOR_SIZE,
> > +                           qed_read_table_cb, read_table_cb);
> > +    if (!aiocb) {
> > +        qed_read_table_cb(read_table_cb, -EIO);
> > +    }
> > +}
> > +
> > +typedef struct {
> > +    GenericCB gencb;
> > +    BDRVQEDState *s;
> > +    QEDTable *orig_table;
> > +    QEDTable *table;
> > +    bool flush;             /* flush after write? */
> > +
> > +    struct iovec iov;
> > +    QEMUIOVector qiov;
> > +} QEDWriteTableCB;
> > +
> > +static void qed_write_table_cb(void *opaque, int ret)
> > +{
> > +    QEDWriteTableCB *write_table_cb = opaque;
> > +
> > +    trace_qed_write_table_cb(write_table_cb->s,
> > +                              write_table_cb->orig_table, ret);
> > +
> > +    if (ret) {
> > +        goto out;
> > +    }
> > +
> > +    if (write_table_cb->flush) {
> > +        /* We still need to flush first */
> > +        write_table_cb->flush = false;
> > +        bdrv_aio_flush(write_table_cb->s->bs, qed_write_table_cb,
> > +                       write_table_cb);
> > +        return;
> > +    }
> > +
> > +out:
> > +    qemu_vfree(write_table_cb->table);
> > +    gencb_complete(&write_table_cb->gencb, ret);
> > +    return;
> > +}
> > +
> > +/**
> > + * Write out an updated part or all of a table
> > + *
> > + * @s:          QED state
> > + * @offset:     Offset of table in image file, in bytes
> > + * @table:      Table
> > + * @index:      Index of first element
> > + * @n:          Number of elements
> > + * @flush:      Whether or not to sync to disk
> > + * @cb:         Completion function
> > + * @opaque:     Argument for completion function
> > + */
> > +static void qed_write_table(BDRVQEDState *s, uint64_t offset, QEDTable *table,
> > +                            unsigned int index, unsigned int n, bool flush,
> > +                            BlockDriverCompletionFunc *cb, void *opaque)
> > +{
> > +    QEDWriteTableCB *write_table_cb;
> > +    BlockDriverAIOCB *aiocb;
> > +    unsigned int sector_mask = BDRV_SECTOR_SIZE / sizeof(uint64_t) - 1;
> > +    unsigned int start, end, i;
> > +    size_t len_bytes;
> > +
> > +    trace_qed_write_table(s, offset, table, index, n);
> > +
> > +    /* Calculate indices of the first and one after last elements */
> > +    start = index & ~sector_mask;
> > +    end = (index + n + sector_mask) & ~sector_mask;
> > +
> > +    len_bytes = (end - start) * sizeof(uint64_t);
> > +
> > +    write_table_cb = gencb_alloc(sizeof(*write_table_cb), cb, opaque);
> > +    write_table_cb->s = s;
> > +    write_table_cb->orig_table = table;
> > +    write_table_cb->flush = flush;
> > +    write_table_cb->table = qemu_blockalign(s->bs, len_bytes);
> > +    write_table_cb->iov.iov_base = write_table_cb->table->offsets;
> > +    write_table_cb->iov.iov_len = len_bytes;
> > +    qemu_iovec_init_external(&write_table_cb->qiov, &write_table_cb->iov, 1);
> > +
> > +    /* Byteswap table */
> > +    for (i = start; i < end; i++) {
> > +        uint64_t le_offset = cpu_to_le64(table->offsets[i]);
> > +        write_table_cb->table->offsets[i - start] = le_offset;
> > +    }
> > +
> > +    /* Adjust for offset into table */
> > +    offset += start * sizeof(uint64_t);
> > +
> > +    aiocb = bdrv_aio_writev(s->bs->file, offset / BDRV_SECTOR_SIZE,
> > +                            &write_table_cb->qiov,
> > +                            write_table_cb->iov.iov_len / BDRV_SECTOR_SIZE,
> > +                            qed_write_table_cb, write_table_cb);
> > +    if (!aiocb) {
> > +        qed_write_table_cb(write_table_cb, -EIO);
> > +    }
> > +}
> > +
> > +/**
> > + * Propagate return value from async callback
> > + */
> > +static void qed_sync_cb(void *opaque, int ret)
> > +{
> > +    *(int *)opaque = ret;
> > +}
> > +
> > +int qed_read_l1_table_sync(BDRVQEDState *s)
> > +{
> > +    int ret = -EINPROGRESS;
> > +
> > +    async_context_push();
> > +
> > +    qed_read_table(s, s->header.l1_table_offset,
> > +                   s->l1_table, qed_sync_cb, &ret);
> > +    while (ret == -EINPROGRESS) {
> > +        qemu_aio_wait();
> > +    }
> > +
> > +    async_context_pop();
> > +
> > +    return ret;
> > +}
> > +
> > +void qed_write_l1_table(BDRVQEDState *s, unsigned int index, unsigned int n,
> > +                        BlockDriverCompletionFunc *cb, void *opaque)
> > +{
> > +    BLKDBG_EVENT(s->bs->file, BLKDBG_L1_UPDATE);
> > +    qed_write_table(s, s->header.l1_table_offset,
> > +                    s->l1_table, index, n, false, cb, opaque);
> > +}
> > +
> > +int qed_write_l1_table_sync(BDRVQEDState *s, unsigned int index,
> > +                            unsigned int n)
> > +{
> > +    int ret = -EINPROGRESS;
> > +
> > +    async_context_push();
> > +
> > +    qed_write_l1_table(s, index, n, qed_sync_cb, &ret);
> > +    while (ret == -EINPROGRESS) {
> > +        qemu_aio_wait();
> > +    }
> > +
> > +    async_context_pop();
> > +
> > +    return ret;
> > +}
> > +
> > +typedef struct {
> > +    GenericCB gencb;
> > +    BDRVQEDState *s;
> > +    uint64_t l2_offset;
> > +    QEDRequest *request;
> > +} QEDReadL2TableCB;
> > +
> > +static void qed_read_l2_table_cb(void *opaque, int ret)
> > +{
> > +    QEDReadL2TableCB *read_l2_table_cb = opaque;
> > +    QEDRequest *request = read_l2_table_cb->request;
> > +    BDRVQEDState *s = read_l2_table_cb->s;
> > +    CachedL2Table *l2_table = request->l2_table;
> > +
> > +    if (ret) {
> > +        /* can't trust loaded L2 table anymore */
> > +        qed_unref_l2_cache_entry(&s->l2_cache, l2_table);
> > +        request->l2_table = NULL;
> 
> Is decreasing the refcount by one and clearing request->l2_table enough?
> Didn't we destroy it for all references? Unless, of course, there is at
> most one reference, but then the refcount is useless.
> 
> Hm, or do we just increase the refcount before the cache entry is
> actually used, and we shouldn't do that? Not sure I understand the
> purpose of this refcount thing yet.
> 
> > +    } else {
> > +        l2_table->offset = read_l2_table_cb->l2_offset;
> > +
> > +        qed_commit_l2_cache_entry(&s->l2_cache, l2_table);
> > +
> > +        /* This is guaranteed to succeed because we just committed the entry
> > +         * to the cache.
> > +         */
> > +        request->l2_table = qed_find_l2_cache_entry(&s->l2_cache,
> > +                                                    l2_table->offset);
> > +        assert(request->l2_table != NULL);
> > +    }
> > +
> > +    gencb_complete(&read_l2_table_cb->gencb, ret);
> > +}
> > +
> > +void qed_read_l2_table(BDRVQEDState *s, QEDRequest *request, uint64_t offset,
> > +                       BlockDriverCompletionFunc *cb, void *opaque)
> > +{
> > +    QEDReadL2TableCB *read_l2_table_cb;
> > +
> > +    qed_unref_l2_cache_entry(&s->l2_cache, request->l2_table);
> > +
> > +    /* Check for cached L2 entry */
> > +    request->l2_table = qed_find_l2_cache_entry(&s->l2_cache, offset);
> > +    if (request->l2_table) {
> > +        cb(opaque, 0);
> > +        return;
> > +    }
> > +
> > +    request->l2_table = qed_alloc_l2_cache_entry(&s->l2_cache);
> > +
> > +    read_l2_table_cb = gencb_alloc(sizeof(*read_l2_table_cb), cb, opaque);
> > +    read_l2_table_cb->s = s;
> > +    read_l2_table_cb->l2_offset = offset;
> > +    read_l2_table_cb->request = request;
> > +
> > +    BLKDBG_EVENT(s->bs->file, BLKDBG_L2_LOAD);
> > +    qed_read_table(s, offset, request->l2_table->table,
> > +                   qed_read_l2_table_cb, read_l2_table_cb);
> > +}
> > +
> > +int qed_read_l2_table_sync(BDRVQEDState *s, QEDRequest *request, uint64_t offset)
> > +{
> > +    int ret = -EINPROGRESS;
> > +
> > +    async_context_push();
> > +
> > +    qed_read_l2_table(s, request, offset, qed_sync_cb, &ret);
> > +    while (ret == -EINPROGRESS) {
> > +        qemu_aio_wait();
> > +    }
> > +
> > +    async_context_pop();
> > +    return ret;
> > +}
> > +
> > +void qed_write_l2_table(BDRVQEDState *s, QEDRequest *request,
> > +                        unsigned int index, unsigned int n, bool flush,
> > +                        BlockDriverCompletionFunc *cb, void *opaque)
> > +{
> > +    BLKDBG_EVENT(s->bs->file, BLKDBG_L2_UPDATE);
> > +    qed_write_table(s, request->l2_table->offset,
> > +                    request->l2_table->table, index, n, flush, cb, opaque);
> > +}
> > +
> > +int qed_write_l2_table_sync(BDRVQEDState *s, QEDRequest *request,
> > +                            unsigned int index, unsigned int n, bool flush)
> > +{
> > +    int ret = -EINPROGRESS;
> > +
> > +    async_context_push();
> > +
> > +    qed_write_l2_table(s, request, index, n, flush, qed_sync_cb, &ret);
> > +    while (ret == -EINPROGRESS) {
> > +        qemu_aio_wait();
> > +    }
> > +
> > +    async_context_pop();
> > +    return ret;
> > +}
> > diff --git a/block/qed.c b/block/qed.c
> > index ea03798..6d7f4d7 100644
> > --- a/block/qed.c
> > +++ b/block/qed.c
> > @@ -139,6 +139,15 @@ static int qed_read_string(BlockDriverState *file, uint64_t offset, size_t n,
> >      return 0;
> >  }
> >  
> > +static QEDTable *qed_alloc_table(void *opaque)
> > +{
> > +    BDRVQEDState *s = opaque;
> > +
> > +    /* Honor O_DIRECT memory alignment requirements */
> > +    return qemu_blockalign(s->bs,
> > +                           s->header.cluster_size * s->header.table_size);
> > +}
> > +
> >  static int bdrv_qed_open(BlockDriverState *bs, int flags)
> >  {
> >      BDRVQEDState *s = bs->opaque;
> > @@ -207,11 +216,24 @@ static int bdrv_qed_open(BlockDriverState *bs, int flags)
> >              }
> >          }
> >      }
> > +
> > +    s->l1_table = qed_alloc_table(s);
> > +    qed_init_l2_cache(&s->l2_cache, qed_alloc_table, s);
> > +
> > +    ret = qed_read_l1_table_sync(s);
> > +    if (ret) {
> > +        qed_free_l2_cache(&s->l2_cache);
> 
> Why not initializing the L2 cache only if the read succeeded?

The consistency check patch adds more code after the call to
qed_read_l1_table_sync() where we really need the L2 cache, so it
will be like this or we can add more goto labels, I don't think it
matters.

Stefan

Patch

diff --git a/Makefile.objs b/Makefile.objs
index ff15795..7b3b19c 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -14,7 +14,7 @@  block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
 
 block-nested-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vvfat.o
 block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o
-block-nested-y += qed.o
+block-nested-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
 block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o
 block-nested-$(CONFIG_WIN32) += raw-win32.o
 block-nested-$(CONFIG_POSIX) += raw-posix.o
diff --git a/block/qed-cluster.c b/block/qed-cluster.c
new file mode 100644
index 0000000..af65e5a
--- /dev/null
+++ b/block/qed-cluster.c
@@ -0,0 +1,145 @@ 
+/*
+ * QEMU Enhanced Disk Format Cluster functions
+ *
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ *  Stefan Hajnoczi   <stefanha@linux.vnet.ibm.com>
+ *  Anthony Liguori   <aliguori@us.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "qed.h"
+
+/**
+ * Count the number of contiguous data clusters
+ *
+ * @s:              QED state
+ * @table:          L2 table
+ * @index:          First cluster index
+ * @n:              Maximum number of clusters
+ * @offset:         Set to first cluster offset
+ *
+ * This function scans tables for contiguous allocated or free clusters.
+ */
+static unsigned int qed_count_contiguous_clusters(BDRVQEDState *s,
+                                                  QEDTable *table,
+                                                  unsigned int index,
+                                                  unsigned int n,
+                                                  uint64_t *offset)
+{
+    unsigned int end = MIN(index + n, s->table_nelems);
+    uint64_t last = table->offsets[index];
+    unsigned int i;
+
+    *offset = last;
+
+    for (i = index + 1; i < end; i++) {
+        if (last == 0) {
+            /* Counting free clusters */
+            if (table->offsets[i] != 0) {
+                break;
+            }
+        } else {
+            /* Counting allocated clusters */
+            if (table->offsets[i] != last + s->header.cluster_size) {
+                break;
+            }
+            last = table->offsets[i];
+        }
+    }
+    return i - index;
+}
+
+typedef struct {
+    BDRVQEDState *s;
+    uint64_t pos;
+    size_t len;
+
+    QEDRequest *request;
+
+    /* User callback */
+    QEDFindClusterFunc *cb;
+    void *opaque;
+} QEDFindClusterCB;
+
+static void qed_find_cluster_cb(void *opaque, int ret)
+{
+    QEDFindClusterCB *find_cluster_cb = opaque;
+    BDRVQEDState *s = find_cluster_cb->s;
+    QEDRequest *request = find_cluster_cb->request;
+    uint64_t offset = 0;
+    size_t len = 0;
+    unsigned int index;
+    unsigned int n;
+
+    if (ret) {
+        ret = QED_CLUSTER_ERROR;
+        goto out;
+    }
+
+    index = qed_l2_index(s, find_cluster_cb->pos);
+    n = qed_bytes_to_clusters(s,
+                              qed_offset_into_cluster(s, find_cluster_cb->pos) +
+                              find_cluster_cb->len);
+    n = qed_count_contiguous_clusters(s, request->l2_table->table,
+                                      index, n, &offset);
+
+    ret = offset ? QED_CLUSTER_FOUND : QED_CLUSTER_L2;
+    len = MIN(find_cluster_cb->len, n * s->header.cluster_size -
+              qed_offset_into_cluster(s, find_cluster_cb->pos));
+
+    if (offset && !qed_check_cluster_offset(s, offset)) {
+        ret = QED_CLUSTER_ERROR;
+        goto out;
+    }
+
+out:
+    find_cluster_cb->cb(find_cluster_cb->opaque, ret, offset, len);
+    qemu_free(find_cluster_cb);
+}
+
+/**
+ * Find the offset of a data cluster
+ *
+ * @s:          QED state
+ * @pos:        Byte position in device
+ * @len:        Number of bytes
+ * @cb:         Completion function
+ * @opaque:     User data for completion function
+ */
+void qed_find_cluster(BDRVQEDState *s, QEDRequest *request, uint64_t pos,
+                      size_t len, QEDFindClusterFunc *cb, void *opaque)
+{
+    QEDFindClusterCB *find_cluster_cb;
+    uint64_t l2_offset;
+
+    /* Limit length to L2 boundary.  Requests are broken up at the L2 boundary
+     * so that a request acts on one L2 table at a time.
+     */
+    len = MIN(len, (((pos >> s->l1_shift) + 1) << s->l1_shift) - pos);
+
+    l2_offset = s->l1_table->offsets[qed_l1_index(s, pos)];
+    if (!l2_offset) {
+        cb(opaque, QED_CLUSTER_L1, 0, len);
+        return;
+    }
+    if (!qed_check_table_offset(s, l2_offset)) {
+        cb(opaque, QED_CLUSTER_ERROR, 0, 0);
+        return;
+    }
+
+    find_cluster_cb = qemu_malloc(sizeof(*find_cluster_cb));
+    find_cluster_cb->s = s;
+    find_cluster_cb->pos = pos;
+    find_cluster_cb->len = len;
+    find_cluster_cb->cb = cb;
+    find_cluster_cb->opaque = opaque;
+    find_cluster_cb->request = request;
+
+    qed_read_l2_table(s, request, l2_offset,
+                      qed_find_cluster_cb, find_cluster_cb);
+}
diff --git a/block/qed-gencb.c b/block/qed-gencb.c
new file mode 100644
index 0000000..d389e12
--- /dev/null
+++ b/block/qed-gencb.c
@@ -0,0 +1,32 @@ 
+/*
+ * QEMU Enhanced Disk Format
+ *
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ *  Stefan Hajnoczi   <stefanha@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "qed.h"
+
+void *gencb_alloc(size_t len, BlockDriverCompletionFunc *cb, void *opaque)
+{
+    GenericCB *gencb = qemu_malloc(len);
+    gencb->cb = cb;
+    gencb->opaque = opaque;
+    return gencb;
+}
+
+void gencb_complete(void *opaque, int ret)
+{
+    GenericCB *gencb = opaque;
+    BlockDriverCompletionFunc *cb = gencb->cb;
+    void *user_opaque = gencb->opaque;
+
+    qemu_free(gencb);
+    cb(user_opaque, ret);
+}
diff --git a/block/qed-l2-cache.c b/block/qed-l2-cache.c
new file mode 100644
index 0000000..3b2bf6e
--- /dev/null
+++ b/block/qed-l2-cache.c
@@ -0,0 +1,132 @@ 
+/*
+ * QEMU Enhanced Disk Format L2 Cache
+ *
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ *  Anthony Liguori   <aliguori@us.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "qed.h"
+
+/* Each L2 holds 2GB so this let's us fully cache a 100GB disk */
+#define MAX_L2_CACHE_SIZE 50
+
+/**
+ * Initialize the L2 cache
+ */
+void qed_init_l2_cache(L2TableCache *l2_cache,
+                       L2TableAllocFunc *alloc_l2_table,
+                       void *alloc_l2_table_opaque)
+{
+    QTAILQ_INIT(&l2_cache->entries);
+    l2_cache->n_entries = 0;
+    l2_cache->alloc_l2_table = alloc_l2_table;
+    l2_cache->alloc_l2_table_opaque = alloc_l2_table_opaque;
+}
+
+/**
+ * Free the L2 cache
+ */
+void qed_free_l2_cache(L2TableCache *l2_cache)
+{
+    CachedL2Table *entry, *next_entry;
+
+    QTAILQ_FOREACH_SAFE(entry, &l2_cache->entries, node, next_entry) {
+        qemu_vfree(entry->table);
+        qemu_free(entry);
+    }
+}
+
+/**
+ * Allocate an uninitialized entry from the cache
+ *
+ * The returned entry has a reference count of 1 and is owned by the caller.
+ */
+CachedL2Table *qed_alloc_l2_cache_entry(L2TableCache *l2_cache)
+{
+    CachedL2Table *entry;
+
+    entry = qemu_mallocz(sizeof(*entry));
+    entry->table = l2_cache->alloc_l2_table(l2_cache->alloc_l2_table_opaque);
+    entry->ref++;
+
+    return entry;
+}
+
+/**
+ * Decrease an entry's reference count and free if necessary when the reference
+ * count drops to zero.
+ */
+void qed_unref_l2_cache_entry(L2TableCache *l2_cache, CachedL2Table *entry)
+{
+    if (!entry) {
+        return;
+    }
+
+    entry->ref--;
+    if (entry->ref == 0) {
+        qemu_vfree(entry->table);
+        qemu_free(entry);
+    }
+}
+
+/**
+ * Find an entry in the L2 cache.  This may return NULL and it's up to the
+ * caller to satisfy the cache miss.
+ *
+ * For a cached entry, this function increases the reference count and returns
+ * the entry.
+ */
+CachedL2Table *qed_find_l2_cache_entry(L2TableCache *l2_cache, uint64_t offset)
+{
+    CachedL2Table *entry;
+
+    QTAILQ_FOREACH(entry, &l2_cache->entries, node) {
+        if (entry->offset == offset) {
+            entry->ref++;
+            return entry;
+        }
+    }
+    return NULL;
+}
+
+/**
+ * Commit an L2 cache entry into the cache.  This is meant to be used as part of
+ * the process to satisfy a cache miss.  A caller would allocate an entry which
+ * is not actually in the L2 cache and then once the entry was valid and
+ * present on disk, the entry can be committed into the cache.
+ *
+ * Since the cache is write-through, it's important that this function is not
+ * called until the entry is present on disk and the L1 has been updated to
+ * point to the entry.
+ *
+ * N.B. This function steals a reference to the l2_table from the caller so the
+ * caller must obtain a new reference by issuing a call to
+ * qed_find_l2_cache_entry().
+ */
+void qed_commit_l2_cache_entry(L2TableCache *l2_cache, CachedL2Table *l2_table)
+{
+    CachedL2Table *entry;
+
+    entry = qed_find_l2_cache_entry(l2_cache, l2_table->offset);
+    if (entry) {
+        qed_unref_l2_cache_entry(l2_cache, entry);
+        qed_unref_l2_cache_entry(l2_cache, l2_table);
+        return;
+    }
+
+    if (l2_cache->n_entries >= MAX_L2_CACHE_SIZE) {
+        entry = QTAILQ_FIRST(&l2_cache->entries);
+        QTAILQ_REMOVE(&l2_cache->entries, entry, node);
+        l2_cache->n_entries--;
+        qed_unref_l2_cache_entry(l2_cache, entry);
+    }
+
+    l2_cache->n_entries++;
+    QTAILQ_INSERT_TAIL(&l2_cache->entries, l2_table, node);
+}
diff --git a/block/qed-table.c b/block/qed-table.c
new file mode 100644
index 0000000..ba6faf0
--- /dev/null
+++ b/block/qed-table.c
@@ -0,0 +1,316 @@ 
+/*
+ * QEMU Enhanced Disk Format Table I/O
+ *
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ *  Stefan Hajnoczi   <stefanha@linux.vnet.ibm.com>
+ *  Anthony Liguori   <aliguori@us.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "trace.h"
+#include "qemu_socket.h" /* for EINPROGRESS on Windows */
+#include "qed.h"
+
+typedef struct {
+    GenericCB gencb;
+    BDRVQEDState *s;
+    QEDTable *table;
+
+    struct iovec iov;
+    QEMUIOVector qiov;
+} QEDReadTableCB;
+
+static void qed_read_table_cb(void *opaque, int ret)
+{
+    QEDReadTableCB *read_table_cb = opaque;
+    QEDTable *table = read_table_cb->table;
+    int noffsets = read_table_cb->iov.iov_len / sizeof(uint64_t);
+    int i;
+
+    /* Handle I/O error */
+    if (ret) {
+        goto out;
+    }
+
+    /* Byteswap offsets */
+    for (i = 0; i < noffsets; i++) {
+        table->offsets[i] = le64_to_cpu(table->offsets[i]);
+    }
+
+out:
+    /* Completion */
+    trace_qed_read_table_cb(read_table_cb->s, read_table_cb->table, ret);
+    gencb_complete(&read_table_cb->gencb, ret);
+}
+
+static void qed_read_table(BDRVQEDState *s, uint64_t offset, QEDTable *table,
+                           BlockDriverCompletionFunc *cb, void *opaque)
+{
+    QEDReadTableCB *read_table_cb = gencb_alloc(sizeof(*read_table_cb),
+                                                cb, opaque);
+    QEMUIOVector *qiov = &read_table_cb->qiov;
+    BlockDriverAIOCB *aiocb;
+
+    trace_qed_read_table(s, offset, table);
+
+    read_table_cb->s = s;
+    read_table_cb->table = table;
+    read_table_cb->iov.iov_base = table->offsets,
+    read_table_cb->iov.iov_len = s->header.cluster_size * s->header.table_size,
+
+    qemu_iovec_init_external(qiov, &read_table_cb->iov, 1);
+    aiocb = bdrv_aio_readv(s->bs->file, offset / BDRV_SECTOR_SIZE, qiov,
+                           read_table_cb->iov.iov_len / BDRV_SECTOR_SIZE,
+                           qed_read_table_cb, read_table_cb);
+    if (!aiocb) {
+        qed_read_table_cb(read_table_cb, -EIO);
+    }
+}
+
+typedef struct {
+    GenericCB gencb;
+    BDRVQEDState *s;
+    QEDTable *orig_table;
+    QEDTable *table;
+    bool flush;             /* flush after write? */
+
+    struct iovec iov;
+    QEMUIOVector qiov;
+} QEDWriteTableCB;
+
+static void qed_write_table_cb(void *opaque, int ret)
+{
+    QEDWriteTableCB *write_table_cb = opaque;
+
+    trace_qed_write_table_cb(write_table_cb->s,
+                              write_table_cb->orig_table, ret);
+
+    if (ret) {
+        goto out;
+    }
+
+    if (write_table_cb->flush) {
+        /* We still need to flush first */
+        write_table_cb->flush = false;
+        bdrv_aio_flush(write_table_cb->s->bs, qed_write_table_cb,
+                       write_table_cb);
+        return;
+    }
+
+out:
+    qemu_vfree(write_table_cb->table);
+    gencb_complete(&write_table_cb->gencb, ret);
+    return;
+}
+
+/**
+ * Write out an updated part or all of a table
+ *
+ * @s:          QED state
+ * @offset:     Offset of table in image file, in bytes
+ * @table:      Table
+ * @index:      Index of first element
+ * @n:          Number of elements
+ * @flush:      Whether or not to sync to disk
+ * @cb:         Completion function
+ * @opaque:     Argument for completion function
+ */
+static void qed_write_table(BDRVQEDState *s, uint64_t offset, QEDTable *table,
+                            unsigned int index, unsigned int n, bool flush,
+                            BlockDriverCompletionFunc *cb, void *opaque)
+{
+    QEDWriteTableCB *write_table_cb;
+    BlockDriverAIOCB *aiocb;
+    unsigned int sector_mask = BDRV_SECTOR_SIZE / sizeof(uint64_t) - 1;
+    unsigned int start, end, i;
+    size_t len_bytes;
+
+    trace_qed_write_table(s, offset, table, index, n);
+
+    /* Calculate indices of the first and one after last elements */
+    start = index & ~sector_mask;
+    end = (index + n + sector_mask) & ~sector_mask;
+
+    len_bytes = (end - start) * sizeof(uint64_t);
+
+    write_table_cb = gencb_alloc(sizeof(*write_table_cb), cb, opaque);
+    write_table_cb->s = s;
+    write_table_cb->orig_table = table;
+    write_table_cb->flush = flush;
+    write_table_cb->table = qemu_blockalign(s->bs, len_bytes);
+    write_table_cb->iov.iov_base = write_table_cb->table->offsets;
+    write_table_cb->iov.iov_len = len_bytes;
+    qemu_iovec_init_external(&write_table_cb->qiov, &write_table_cb->iov, 1);
+
+    /* Byteswap table */
+    for (i = start; i < end; i++) {
+        uint64_t le_offset = cpu_to_le64(table->offsets[i]);
+        write_table_cb->table->offsets[i - start] = le_offset;
+    }
+
+    /* Adjust for offset into table */
+    offset += start * sizeof(uint64_t);
+
+    aiocb = bdrv_aio_writev(s->bs->file, offset / BDRV_SECTOR_SIZE,
+                            &write_table_cb->qiov,
+                            write_table_cb->iov.iov_len / BDRV_SECTOR_SIZE,
+                            qed_write_table_cb, write_table_cb);
+    if (!aiocb) {
+        qed_write_table_cb(write_table_cb, -EIO);
+    }
+}
+
+/**
+ * Propagate return value from async callback
+ */
+static void qed_sync_cb(void *opaque, int ret)
+{
+    *(int *)opaque = ret;
+}
+
+int qed_read_l1_table_sync(BDRVQEDState *s)
+{
+    int ret = -EINPROGRESS;
+
+    async_context_push();
+
+    qed_read_table(s, s->header.l1_table_offset,
+                   s->l1_table, qed_sync_cb, &ret);
+    while (ret == -EINPROGRESS) {
+        qemu_aio_wait();
+    }
+
+    async_context_pop();
+
+    return ret;
+}
+
+void qed_write_l1_table(BDRVQEDState *s, unsigned int index, unsigned int n,
+                        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    BLKDBG_EVENT(s->bs->file, BLKDBG_L1_UPDATE);
+    qed_write_table(s, s->header.l1_table_offset,
+                    s->l1_table, index, n, false, cb, opaque);
+}
+
+int qed_write_l1_table_sync(BDRVQEDState *s, unsigned int index,
+                            unsigned int n)
+{
+    int ret = -EINPROGRESS;
+
+    async_context_push();
+
+    qed_write_l1_table(s, index, n, qed_sync_cb, &ret);
+    while (ret == -EINPROGRESS) {
+        qemu_aio_wait();
+    }
+
+    async_context_pop();
+
+    return ret;
+}
+
+typedef struct {
+    GenericCB gencb;
+    BDRVQEDState *s;
+    uint64_t l2_offset;
+    QEDRequest *request;
+} QEDReadL2TableCB;
+
+static void qed_read_l2_table_cb(void *opaque, int ret)
+{
+    QEDReadL2TableCB *read_l2_table_cb = opaque;
+    QEDRequest *request = read_l2_table_cb->request;
+    BDRVQEDState *s = read_l2_table_cb->s;
+    CachedL2Table *l2_table = request->l2_table;
+
+    if (ret) {
+        /* can't trust loaded L2 table anymore */
+        qed_unref_l2_cache_entry(&s->l2_cache, l2_table);
+        request->l2_table = NULL;
+    } else {
+        l2_table->offset = read_l2_table_cb->l2_offset;
+
+        qed_commit_l2_cache_entry(&s->l2_cache, l2_table);
+
+        /* This is guaranteed to succeed because we just committed the entry
+         * to the cache.
+         */
+        request->l2_table = qed_find_l2_cache_entry(&s->l2_cache,
+                                                    l2_table->offset);
+        assert(request->l2_table != NULL);
+    }
+
+    gencb_complete(&read_l2_table_cb->gencb, ret);
+}
+
+void qed_read_l2_table(BDRVQEDState *s, QEDRequest *request, uint64_t offset,
+                       BlockDriverCompletionFunc *cb, void *opaque)
+{
+    QEDReadL2TableCB *read_l2_table_cb;
+
+    qed_unref_l2_cache_entry(&s->l2_cache, request->l2_table);
+
+    /* Check for cached L2 entry */
+    request->l2_table = qed_find_l2_cache_entry(&s->l2_cache, offset);
+    if (request->l2_table) {
+        cb(opaque, 0);
+        return;
+    }
+
+    request->l2_table = qed_alloc_l2_cache_entry(&s->l2_cache);
+
+    read_l2_table_cb = gencb_alloc(sizeof(*read_l2_table_cb), cb, opaque);
+    read_l2_table_cb->s = s;
+    read_l2_table_cb->l2_offset = offset;
+    read_l2_table_cb->request = request;
+
+    BLKDBG_EVENT(s->bs->file, BLKDBG_L2_LOAD);
+    qed_read_table(s, offset, request->l2_table->table,
+                   qed_read_l2_table_cb, read_l2_table_cb);
+}
+
+int qed_read_l2_table_sync(BDRVQEDState *s, QEDRequest *request, uint64_t offset)
+{
+    int ret = -EINPROGRESS;
+
+    async_context_push();
+
+    qed_read_l2_table(s, request, offset, qed_sync_cb, &ret);
+    while (ret == -EINPROGRESS) {
+        qemu_aio_wait();
+    }
+
+    async_context_pop();
+    return ret;
+}
+
+void qed_write_l2_table(BDRVQEDState *s, QEDRequest *request,
+                        unsigned int index, unsigned int n, bool flush,
+                        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    BLKDBG_EVENT(s->bs->file, BLKDBG_L2_UPDATE);
+    qed_write_table(s, request->l2_table->offset,
+                    request->l2_table->table, index, n, flush, cb, opaque);
+}
+
+int qed_write_l2_table_sync(BDRVQEDState *s, QEDRequest *request,
+                            unsigned int index, unsigned int n, bool flush)
+{
+    int ret = -EINPROGRESS;
+
+    async_context_push();
+
+    qed_write_l2_table(s, request, index, n, flush, qed_sync_cb, &ret);
+    while (ret == -EINPROGRESS) {
+        qemu_aio_wait();
+    }
+
+    async_context_pop();
+    return ret;
+}
diff --git a/block/qed.c b/block/qed.c
index ea03798..6d7f4d7 100644
--- a/block/qed.c
+++ b/block/qed.c
@@ -139,6 +139,15 @@  static int qed_read_string(BlockDriverState *file, uint64_t offset, size_t n,
     return 0;
 }
 
+static QEDTable *qed_alloc_table(void *opaque)
+{
+    BDRVQEDState *s = opaque;
+
+    /* Honor O_DIRECT memory alignment requirements */
+    return qemu_blockalign(s->bs,
+                           s->header.cluster_size * s->header.table_size);
+}
+
 static int bdrv_qed_open(BlockDriverState *bs, int flags)
 {
     BDRVQEDState *s = bs->opaque;
@@ -207,11 +216,24 @@  static int bdrv_qed_open(BlockDriverState *bs, int flags)
             }
         }
     }
+
+    s->l1_table = qed_alloc_table(s);
+    qed_init_l2_cache(&s->l2_cache, qed_alloc_table, s);
+
+    ret = qed_read_l1_table_sync(s);
+    if (ret) {
+        qed_free_l2_cache(&s->l2_cache);
+        qemu_vfree(s->l1_table);
+    }
     return ret;
 }
 
 static void bdrv_qed_close(BlockDriverState *bs)
 {
+    BDRVQEDState *s = bs->opaque;
+
+    qed_free_l2_cache(&s->l2_cache);
+    qemu_vfree(s->l1_table);
 }
 
 static void bdrv_qed_flush(BlockDriverState *bs)
@@ -336,10 +358,43 @@  static int bdrv_qed_create(const char *filename, QEMUOptionParameter *options)
                       backing_file, backing_fmt);
 }
 
+typedef struct {
+    int is_allocated;
+    int *pnum;
+} QEDIsAllocatedCB;
+
+static void qed_is_allocated_cb(void *opaque, int ret, uint64_t offset, size_t len)
+{
+    QEDIsAllocatedCB *cb = opaque;
+    *cb->pnum = len / BDRV_SECTOR_SIZE;
+    cb->is_allocated = ret == QED_CLUSTER_FOUND;
+}
+
 static int bdrv_qed_is_allocated(BlockDriverState *bs, int64_t sector_num,
                                   int nb_sectors, int *pnum)
 {
-    return -ENOTSUP;
+    BDRVQEDState *s = bs->opaque;
+    uint64_t pos = (uint64_t)sector_num * BDRV_SECTOR_SIZE;
+    size_t len = (size_t)nb_sectors * BDRV_SECTOR_SIZE;
+    QEDIsAllocatedCB cb = {
+        .is_allocated = -1,
+        .pnum = pnum,
+    };
+    QEDRequest request = { .l2_table = NULL };
+
+    async_context_push();
+
+    qed_find_cluster(s, &request, pos, len, qed_is_allocated_cb, &cb);
+
+    while (cb.is_allocated == -1) {
+        qemu_aio_wait();
+    }
+
+    async_context_pop();
+
+    qed_unref_l2_cache_entry(&s->l2_cache, request.l2_table);
+
+    return cb.is_allocated;
 }
 
 static int bdrv_qed_make_empty(BlockDriverState *bs)
diff --git a/block/qed.h b/block/qed.h
index 7ce95a7..9ea288f 100644
--- a/block/qed.h
+++ b/block/qed.h
@@ -96,16 +96,103 @@  typedef struct {
 } QEDHeader;
 
 typedef struct {
+    uint64_t offsets[0];            /* in bytes */
+} QEDTable;
+
+/* The L2 cache is a simple write-through cache for L2 structures */
+typedef struct CachedL2Table {
+    QEDTable *table;
+    uint64_t offset;    /* offset=0 indicates an invalidate entry */
+    QTAILQ_ENTRY(CachedL2Table) node;
+    int ref;
+} CachedL2Table;
+
+/**
+ * Allocate an L2 table
+ *
+ * This callback is used by the L2 cache to allocate tables without knowing
+ * their size or alignment requirements.
+ */
+typedef QEDTable *L2TableAllocFunc(void *opaque);
+
+typedef struct {
+    QTAILQ_HEAD(, CachedL2Table) entries;
+    unsigned int n_entries;
+    L2TableAllocFunc *alloc_l2_table;
+    void *alloc_l2_table_opaque;
+} L2TableCache;
+
+typedef struct QEDRequest {
+    CachedL2Table *l2_table;
+} QEDRequest;
+
+typedef struct {
     BlockDriverState *bs;           /* device */
     uint64_t file_size;             /* length of image file, in bytes */
 
     QEDHeader header;               /* always cpu-endian */
+    QEDTable *l1_table;
+    L2TableCache l2_cache;          /* l2 table cache */
     uint32_t table_nelems;
     uint32_t l1_shift;
     uint32_t l2_shift;
     uint32_t l2_mask;
 } BDRVQEDState;
 
+enum {
+    QED_CLUSTER_FOUND,         /* cluster found */
+    QED_CLUSTER_L2,            /* cluster missing in L2 */
+    QED_CLUSTER_L1,            /* cluster missing in L1 */
+    QED_CLUSTER_ERROR,         /* error looking up cluster */
+};
+
+typedef void QEDFindClusterFunc(void *opaque, int ret, uint64_t offset, size_t len);
+
+/**
+ * Generic callback for chaining async callbacks
+ */
+typedef struct {
+    BlockDriverCompletionFunc *cb;
+    void *opaque;
+} GenericCB;
+
+void *gencb_alloc(size_t len, BlockDriverCompletionFunc *cb, void *opaque);
+void gencb_complete(void *opaque, int ret);
+
+/**
+ * L2 cache functions
+ */
+void qed_init_l2_cache(L2TableCache *l2_cache, L2TableAllocFunc *alloc_l2_table, void *alloc_l2_table_opaque);
+void qed_free_l2_cache(L2TableCache *l2_cache);
+CachedL2Table *qed_alloc_l2_cache_entry(L2TableCache *l2_cache);
+void qed_unref_l2_cache_entry(L2TableCache *l2_cache, CachedL2Table *entry);
+CachedL2Table *qed_find_l2_cache_entry(L2TableCache *l2_cache, uint64_t offset);
+void qed_commit_l2_cache_entry(L2TableCache *l2_cache, CachedL2Table *l2_table);
+
+/**
+ * Table I/O functions
+ */
+int qed_read_l1_table_sync(BDRVQEDState *s);
+void qed_write_l1_table(BDRVQEDState *s, unsigned int index, unsigned int n,
+                        BlockDriverCompletionFunc *cb, void *opaque);
+int qed_write_l1_table_sync(BDRVQEDState *s, unsigned int index,
+                            unsigned int n);
+int qed_read_l2_table_sync(BDRVQEDState *s, QEDRequest *request,
+                           uint64_t offset);
+void qed_read_l2_table(BDRVQEDState *s, QEDRequest *request, uint64_t offset,
+                       BlockDriverCompletionFunc *cb, void *opaque);
+void qed_write_l2_table(BDRVQEDState *s, QEDRequest *request,
+                        unsigned int index, unsigned int n, bool flush,
+                        BlockDriverCompletionFunc *cb, void *opaque);
+int qed_write_l2_table_sync(BDRVQEDState *s, QEDRequest *request,
+                            unsigned int index, unsigned int n, bool flush);
+
+/**
+ * Cluster functions
+ */
+void qed_find_cluster(BDRVQEDState *s, QEDRequest *request, uint64_t pos,
+                      size_t len, QEDFindClusterFunc *cb, void *opaque);
+
 /**
  * Utility functions
  */
@@ -114,6 +201,27 @@  static inline uint64_t qed_start_of_cluster(BDRVQEDState *s, uint64_t offset)
     return offset & ~(uint64_t)(s->header.cluster_size - 1);
 }
 
+static inline uint64_t qed_offset_into_cluster(BDRVQEDState *s, uint64_t offset)
+{
+    return offset & (s->header.cluster_size - 1);
+}
+
+static inline unsigned int qed_bytes_to_clusters(BDRVQEDState *s, size_t bytes)
+{
+    return qed_start_of_cluster(s, bytes + (s->header.cluster_size - 1)) /
+           (s->header.cluster_size - 1);
+}
+
+static inline unsigned int qed_l1_index(BDRVQEDState *s, uint64_t pos)
+{
+    return pos >> s->l1_shift;
+}
+
+static inline unsigned int qed_l2_index(BDRVQEDState *s, uint64_t pos)
+{
+    return (pos >> s->l2_shift) & s->l2_mask;
+}
+
 /**
  * Test if a cluster offset is valid
  */
diff --git a/trace-events b/trace-events
index f32c83f..a390196 100644
--- a/trace-events
+++ b/trace-events
@@ -67,3 +67,9 @@  disable cpu_out(unsigned int addr, unsigned int val) "addr %#x value %u"
 # balloon.c
 # Since requests are raised via monitor, not many tracepoints are needed.
 disable balloon_event(void *opaque, unsigned long addr) "opaque %p addr %lu"
+
+# block/qed-table.c
+disable qed_read_table(void *s, uint64_t offset, void *table) "s %p offset %"PRIu64" table %p"
+disable qed_read_table_cb(void *s, void *table, int ret) "s %p table %p ret %d"
+disable qed_write_table(void *s, uint64_t offset, void *table, unsigned int index, unsigned int n) "s %p offset %"PRIu64" table %p index %u n %u"
+disable qed_write_table_cb(void *s, void *table, int ret) "s %p table %p ret %d"