Patchwork [14/26] FVD: add impl of loading data from compact image

login
register
mail settings
Submitter Chunqiang Tang
Date Feb. 25, 2011, 10:37 p.m.
Message ID <1298673486-3573-14-git-send-email-ctang@us.ibm.com>
Download mbox | patch
Permalink /patch/84582/
State New
Headers show

Comments

Chunqiang Tang - Feb. 25, 2011, 10:37 p.m.
This patch is part of the Fast Virtual Disk (FVD) proposal.
See http://wiki.qemu.org/Features/FVD.

This patch adds the implementation of load data from a compact image. This
capability is to support fvd_aio_readv() when FVD is configured to use its
one-level lookup table to do storage allocation.

Signed-off-by: Chunqiang Tang <ctang@us.ibm.com>
---
 block/fvd-load.c  |  448 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 block/fvd-utils.c |   40 +++++
 2 files changed, 488 insertions(+), 0 deletions(-)

Patch

diff --git a/block/fvd-load.c b/block/fvd-load.c
index 80ab32c..88e5fb4 100644
--- a/block/fvd-load.c
+++ b/block/fvd-load.c
@@ -11,10 +11,458 @@ 
  *
  */
 
+static void load_data_from_compact_image_cb(void *opaque, int ret);
+static BlockDriverAIOCB *load_data_from_compact_image(FvdAIOCB *parent_acb,
+                    BlockDriverState * bs, int64_t sector_num,
+                    QEMUIOVector * qiov, int nb_sectors,
+                    BlockDriverCompletionFunc * cb, void *opaque);
+static inline FvdAIOCB *init_load_acb(FvdAIOCB * parent_acb,
+                    BlockDriverState * bs, int64_t sector_num,
+                    QEMUIOVector * orig_qiov, int nb_sectors,
+                    BlockDriverCompletionFunc * cb, void *opaque);
+static int load_create_child_requests(bool count_only, BDRVFvdState *s,
+                    QEMUIOVector * orig_qiov, int64_t sector_num,
+                    int nb_sectors, int *p_nziov, int *p_niov, int *p_nqiov,
+                    FvdAIOCB *acb,  QEMUIOVector *q, struct iovec *v);
+
 static inline BlockDriverAIOCB *load_data(FvdAIOCB * parent_acb,
                     BlockDriverState * bs, int64_t sector_num,
                     QEMUIOVector * orig_qiov, int nb_sectors,
                     BlockDriverCompletionFunc * cb, void *opaque)
 {
+    BDRVFvdState *s = bs->opaque;
+
+    if (!s->table) {
+        /* Load directly since it is not a compact image. */
+        return bdrv_aio_readv(s->fvd_data, s->data_offset + sector_num,
+                              orig_qiov, nb_sectors, cb, opaque);
+    } else {
+        return load_data_from_compact_image(parent_acb, bs, sector_num,
+                                            orig_qiov, nb_sectors, cb, opaque);
+    }
+}
+
+static BlockDriverAIOCB *load_data_from_compact_image(FvdAIOCB * parent_acb,
+                    BlockDriverState * bs, int64_t sector_num,
+                    QEMUIOVector * orig_qiov, int nb_sectors,
+                    BlockDriverCompletionFunc * cb, void *opaque)
+{
+    BDRVFvdState *s = bs->opaque;
+    FvdAIOCB * acb;
+    int64_t start_sec = -1;
+    int nziov = 0;
+    int nqiov = 0;
+    int niov = 0;
+    int i;
+
+    /* Count the number of qiov and iov needed to cover the continuous regions
+     * of the compact image. */
+    load_create_child_requests(true/*count_only*/, s, orig_qiov, sector_num,
+                          nb_sectors, &nziov, &niov, &nqiov, NULL, NULL, NULL);
+
+    if (nqiov + nziov == 1) {
+        /* All data can be read in one qiov. Reuse orig_qiov. */
+        if (nziov == 1) {
+            /* This is a zero-filled region. */
+            for (i = 0; i < orig_qiov->niov; i++) {
+                memset(orig_qiov->iov[i].iov_base,
+                       0, orig_qiov->iov[i].iov_len);
+            }
+
+            /* Use a bh to invoke the callback. */
+            if (!(acb = my_qemu_aio_get(&fvd_aio_pool, bs, cb, opaque))) {
+                return NULL;
+            }
+            COPY_UUID(acb, parent_acb);
+            QDEBUG("LOAD: acb%llu-%p  load_fill_all_with_zeros\n",
+                   acb->uuid, acb);
+            acb->type = OP_WRAPPER;
+            acb->cancel_in_progress = false;
+            acb->wrapper.bh = qemu_bh_new(aio_wrapper_bh, acb);
+            qemu_bh_schedule(acb->wrapper.bh);
+            return &acb->common;
+        } else {
+            /* A non-empty region. */
+            const uint32_t first_chunk = sector_num / s->chunk_size;
+            start_sec = READ_TABLE(s->table[first_chunk]) * s->chunk_size +
+                        (sector_num % s->chunk_size);
+            if (parent_acb) {
+                QDEBUG("LOAD: acb%llu-%p  "
+                       "load_directly_as_one_continuous_region\n",
+                       parent_acb->uuid, parent_acb);
+            }
+            return bdrv_aio_readv(s->fvd_data, s->data_offset + start_sec,
+                                  orig_qiov, nb_sectors, cb, opaque);
+        }
+    }
+
+    /* Need to submit multiple requests to the lower layer. Initialize acb. */
+    if (!(acb = init_load_acb(parent_acb, bs, sector_num, orig_qiov,
+                              nb_sectors, cb, opaque))) {
+        return NULL;
+    }
+    acb->load.num_children = nqiov;
+
+    /* Allocate memory and create multiple requests. */
+    acb->load.children = my_qemu_malloc((sizeof(CompactChildCB) +
+                                         sizeof(QEMUIOVector)) * nqiov +
+                                        sizeof(struct iovec) * niov);
+    QEMUIOVector *q = (QEMUIOVector *) (acb->load.children + nqiov);
+    struct iovec *v = (struct iovec *)(q + nqiov);
+
+    if (!load_create_child_requests(false/*count_only*/, s, orig_qiov,
+                                    sector_num, nb_sectors, NULL, NULL,
+                                    &nqiov, acb, q, v)) {
+        return &acb->common;
+    }
+
+    /* Clean up after failure. nqiov is the no. of submitted child requests. */
+    for (i = 0; i < nqiov; i++) {
+        bdrv_aio_cancel(acb->load.children[i].hd_acb);
+    }
+    my_qemu_free(acb->load.children);
+    my_qemu_aio_release(acb);
     return NULL;
 }
+
+static void load_data_from_compact_image_cb(void *opaque, int ret)
+{
+    CompactChildCB *child = opaque;
+    FvdAIOCB *acb = child->acb;
+
+    if (acb->cancel_in_progress) {
+        return;
+    }
+
+    /* Now fvd_aio_cancel_store_compact() won't cancel this child request. */
+    child->hd_acb = NULL;
+
+    if (acb->load.ret == 0) {
+        acb->load.ret = ret;
+    } else {
+        QDEBUG("LOAD: acb%llu-%p  load_child=%d total_children=%d "
+               "error ret=%d\n", acb->uuid, acb, acb->load.finished_children,
+               acb->load.num_children, ret);
+    }
+
+    acb->load.finished_children++;
+    if (acb->load.finished_children < acb->load.num_children) {
+        QDEBUG("LOAD: acb%llu-%p  load_finished_children=%d "
+               "total_children=%d\n", acb->uuid, acb,
+               acb->load.finished_children, acb->load.num_children);
+        return;
+    }
+
+    QDEBUG("LOAD: acb%llu-%p  load_last_child_finished ret=%d\n", acb->uuid,
+           acb, acb->load.ret);
+    acb->common.cb(acb->common.opaque, acb->load.ret);
+    if (acb->load.children) {
+        my_qemu_free(acb->load.children);
+    }
+    my_qemu_aio_release(acb);
+}
+
+static inline FvdAIOCB *init_load_acb(FvdAIOCB * parent_acb,
+                                      BlockDriverState * bs,
+                                      int64_t sector_num,
+                                      QEMUIOVector * orig_qiov,
+                                      int nb_sectors,
+                                      BlockDriverCompletionFunc * cb,
+                                      void *opaque)
+{
+    FvdAIOCB *const acb = my_qemu_aio_get(&fvd_aio_pool, bs, cb, opaque);
+    if (!acb) {
+        return NULL;
+    }
+    acb->type = OP_LOAD_COMPACT;
+    acb->cancel_in_progress = false;
+    acb->sector_num = sector_num;
+    acb->nb_sectors = nb_sectors;
+    acb->load.parent_acb = parent_acb;
+    acb->load.finished_children = 0;
+    acb->load.children = NULL;
+    acb->load.one_child.hd_acb = NULL;
+    acb->load.orig_qiov = orig_qiov;
+    acb->load.ret = 0;
+    COPY_UUID(acb, parent_acb);
+    return acb;
+}
+
+static inline int load_create_one_child(bool count_only, bool empty,
+                    QEMUIOVector * orig_qiov, int *iov_index, size_t *iov_left,
+                    uint8_t **iov_buf, int64_t start_sec, int sectors_in_region,
+                    int *p_niov, int *p_nziov, int *p_nqiov, BDRVFvdState *s,
+                    FvdAIOCB *acb, QEMUIOVector **q, struct iovec **v)
+{
+    int niov;
+
+    if (count_only) {
+        if (empty) {
+            count_iov(orig_qiov->iov, iov_index, iov_buf,
+                      iov_left, sectors_in_region * 512);
+            (*p_nziov)++;
+        } else {
+            niov = count_iov(orig_qiov->iov, iov_index, iov_buf,
+                              iov_left, sectors_in_region * 512);
+            *p_niov += niov;
+            (*p_nqiov)++;
+        }
+        return 0;
+    }
+
+    /* Not count_only, need to take real actions. */
+    if (empty) {
+        /* Fill iov data with zeros. */
+        zero_iov(orig_qiov->iov, iov_index, iov_buf, iov_left,
+                 sectors_in_region * 512);
+        return 0;
+    }
+
+    /* Create a child request to read data. */
+    niov = setup_iov(orig_qiov->iov, *v, iov_index, iov_buf,
+                     iov_left, sectors_in_region * 512);
+    qemu_iovec_init_external(*q, *v, niov);
+    QDEBUG("LOAD: acb%llu-%p  create_child %d sector_num=%" PRId64
+           " nb_sectors=%d niov=%d\n", acb->uuid, acb, *p_nqiov,
+           start_sec, sectors_in_region, niov);
+    acb->load.children[*p_nqiov].hd_acb =
+        bdrv_aio_readv(s->fvd_data, s->data_offset + start_sec, *q,
+                       sectors_in_region, load_data_from_compact_image_cb,
+                       &acb->load.children[*p_nqiov]);
+    if (!acb->load.children[*p_nqiov].hd_acb) {
+        return -1;
+    }
+    acb->load.children[*p_nqiov].acb = acb;
+    *v = *v + niov;
+    (*q)++;
+    (*p_nqiov)++;
+
+    return 0;
+}
+
+static int load_create_child_requests(bool count_only, BDRVFvdState *s,
+    QEMUIOVector * orig_qiov, int64_t sector_num, int nb_sectors, int *p_nziov,
+    int *p_niov, int *p_nqiov, FvdAIOCB *acb,  QEMUIOVector *q, struct iovec *v)
+{
+    const uint32_t first_chunk = sector_num / s->chunk_size;
+    const uint32_t last_chunk = (sector_num + nb_sectors - 1) / s->chunk_size;
+    int iov_index = 0;
+    size_t iov_left = orig_qiov->iov[0].iov_len;
+    uint8_t *iov_buf = orig_qiov->iov[0].iov_base;
+    int nziov = 0;      /* Number of empty regions. */
+    int nqiov = 0;
+    int niov = 0;
+    int64_t prev = READ_TABLE2(s->table[first_chunk]);
+    int64_t start_sec = -1;
+    int sectors_in_region;
+    int32_t chunk;
+    int64_t chunk_end;
+    int64_t last_chunk_data;
+
+    /* Calculate data in the last chunk. */
+    last_chunk_data = (sector_num + nb_sectors) % s->chunk_size;
+    if (last_chunk_data == 0) {
+        last_chunk_data = s->chunk_size;
+    }
+
+    /* Calculate data in the first chunk. */
+    if (first_chunk < last_chunk) {
+        sectors_in_region = s->chunk_size - (sector_num % s->chunk_size);
+    } else {
+        sectors_in_region = nb_sectors;
+    }
+
+    /* Check if the first chunk spans over s->avail_storage. If so, the part
+     * beyond avail_storage must be filled with zeros rather than reading from
+     * the underlying storage as it may not be written yet, which is possible.
+     * This is explained using the following example. Suppose a chunk consists
+     * of 4 sectors (i.e., chunk_size=4) and the last allocated chunk,
+     * c=[s0 s1 s2 s3], was allocated when the VM wrote to sector s1.
+     * Although the table indicates the full chunk is allocated, the
+     * underlying host file system only grows the image file to the size just
+     * enough to accomdating sector s1, as s1 is the frontier of the sectors
+     * written. This frontier (s1 in this example) is recorded in
+     * s->avail_storage. If the VM reads sector s2, which is beyond the
+     * frontier, the driver should return an array of zeros rather than trying
+     * to read from the underlying host file system. Otherwise, it will cause
+     * a read error as sector s2 is beyond the current size of the image file.
+     */
+    if (!IS_EMPTY(prev)) {
+        start_sec = prev * s->chunk_size + (sector_num % s->chunk_size);
+
+        if (start_sec >= s->avail_storage) {
+            prev = EMPTY_TABLE; /* Pretend the first chunk is empty. */
+        } else {
+            if (first_chunk < last_chunk) {
+                chunk_end = (prev + 1) * s->chunk_size;
+            } else {
+                chunk_end = prev * s->chunk_size + last_chunk_data;
+            }
+
+            if (s->avail_storage < chunk_end) {
+                /* First chunk spans over s->avail_storage. Split it into
+                 * two regions. The first region is read from disk while the
+                 * second region is filled with zeros. */
+
+                /* Handle the first region. */
+                sectors_in_region = (s->avail_storage % s->chunk_size) -
+                    (sector_num % s->chunk_size);
+
+                if (load_create_one_child(count_only, false/*!empty*/,
+                                    orig_qiov, &iov_index, &iov_left,
+                                    &iov_buf, start_sec, sectors_in_region,
+                                    &niov, &nziov, &nqiov, s,
+                                    acb, &q, &v)) {
+                    goto fail;
+                }
+
+                /* Start the second, empty region. */
+                prev = EMPTY_TABLE;
+                if (first_chunk < last_chunk) {
+                    sectors_in_region = s->chunk_size -
+                            (s->avail_storage % s->chunk_size);
+                } else {
+                    sectors_in_region = nb_sectors - sectors_in_region;
+                }
+            }
+        }
+    }
+
+    for (chunk = first_chunk + 1; chunk <= last_chunk; chunk++) {
+        uint32_t current = READ_TABLE2(s->table[chunk]);
+        int64_t data_size;
+
+        /* Check if the chunk spans over s->avail_storage. */
+        if (!IS_EMPTY(current)) {
+            if (current * s->chunk_size >= s->avail_storage) {
+                current = EMPTY_TABLE; /* Pretend this chunk is empty. */
+            } else {
+                if (chunk < last_chunk) {
+                    chunk_end = (current + 1) * s->chunk_size;
+                } else {
+                    chunk_end = current * s->chunk_size + last_chunk_data;
+                }
+
+                if (s->avail_storage < chunk_end) {
+                    /* This chunk spans over s->avail_storage. Split
+                     * it into two regions. The first region is read from disk
+                     * while the second region is filled with zeros. */
+                    if (IS_EMPTY(prev)) {
+                        /* Terminate the previous empty region. */
+                        load_create_one_child(count_only, true/*empty*/,
+                                            orig_qiov, &iov_index, &iov_left,
+                                            &iov_buf, start_sec,
+                                            sectors_in_region, &niov, &nziov,
+                                            &nqiov, s, acb, &q, &v);
+
+                        /* Start the first region of this split chunk. */
+                        start_sec = current * s->chunk_size;
+                        sectors_in_region = s->avail_storage % s->chunk_size;
+
+                    } else {
+                        if (current == prev + 1) {
+                            /* Append the first region to the previous one. */
+                            sectors_in_region +=
+                                s->avail_storage % s->chunk_size;
+                        } else {
+                            /* Terminate the previous region. */
+                            if (load_create_one_child(count_only,
+                                    false/*!empty*/, orig_qiov, &iov_index,
+                                    &iov_left, &iov_buf, start_sec,
+                                    sectors_in_region, &niov, &nziov, &nqiov,
+                                    s, acb, &q, &v)) {
+                                goto fail;
+                            }
+
+                            /* Start the first region of this split chunk. */
+                            start_sec = current * s->chunk_size;
+                            sectors_in_region =
+                                s->avail_storage % s->chunk_size;
+                        }
+                    }
+
+                    /* Terminate the first region of this split chunk. */
+                    if (load_create_one_child(count_only, false/*!empty*/,
+                            orig_qiov, &iov_index, &iov_left, &iov_buf,
+                            start_sec, sectors_in_region, &niov, &nziov,
+                            &nqiov, s, acb, &q, &v)) {
+                        goto fail;
+                    }
+
+                    /* Start the second, empty region of this split chunk. */
+                    prev = EMPTY_TABLE;
+                    sectors_in_region = chunk_end - s->avail_storage;
+                    continue; /* This chunk is done. Go to handle next chunk. */
+                }
+            }
+        }
+
+        /* Simple case: not spanning over s->avail_storage. */
+        if (chunk < last_chunk) {
+            data_size = s->chunk_size;
+        } else {
+            data_size = last_chunk_data;
+        }
+
+        if ((IS_EMPTY(prev) && IS_EMPTY(current)) ||
+            (!IS_EMPTY(prev) && !IS_EMPTY(current) && current == prev + 1)) {
+            /* Continue the previous region. */
+            sectors_in_region += data_size;
+        } else {
+            /* Terminate the previous region. */
+            if (load_create_one_child(count_only, IS_EMPTY(prev), orig_qiov,
+                    &iov_index, &iov_left, &iov_buf, start_sec,
+                    sectors_in_region, &niov, &nziov, &nqiov, s, acb, &q, &v)) {
+                goto fail;
+            }
+
+            /* Start the next region. */
+            start_sec = current * s->chunk_size;
+            sectors_in_region = data_size;
+        }
+        prev = current;
+    }
+
+    /* Handle the last continuous region. */
+    if (count_only) {
+        if (IS_EMPTY(prev)) {
+            nziov++;
+        } else {
+            niov += count_iov(orig_qiov->iov, &iov_index, &iov_buf,
+                              &iov_left, sectors_in_region * 512);
+            nqiov++;
+        }
+
+        *p_nqiov = nqiov;
+        *p_nziov = nziov;
+        *p_niov = niov;
+        return 0;
+    }
+
+    /* Handle the last continuous region. */
+    if (IS_EMPTY(prev)) {
+        zero_iov(orig_qiov->iov, &iov_index, &iov_buf, &iov_left,
+                 sectors_in_region * 512);
+    } else {
+        niov = setup_iov(orig_qiov->iov, v, &iov_index, &iov_buf,
+                         &iov_left, sectors_in_region * 512);
+        qemu_iovec_init_external(q, v, niov);
+        QDEBUG("LOAD: acb%llu-%p  create_child %d sector_num=%" PRId64
+               " nb_sectors=%d niov=%d\n", acb->uuid, acb, nqiov, start_sec,
+               sectors_in_region, niov);
+        acb->load.children[nqiov].hd_acb =
+            bdrv_aio_readv(s->fvd_data, s->data_offset + start_sec, q,
+                           sectors_in_region, load_data_from_compact_image_cb,
+                           &acb->load.children[nqiov]);
+        if (!acb->load.children[nqiov].hd_acb) {
+            goto fail;
+        }
+        acb->load.children[nqiov].acb = acb;
+    }
+    ASSERT(iov_index == orig_qiov->niov - 1 && iov_left == 0);
+    return 0;
+
+fail:
+    *p_nqiov = nqiov; /* The number of children already created. */
+    return -1;
+}
diff --git a/block/fvd-utils.c b/block/fvd-utils.c
index 9feaa35..578eed4 100644
--- a/block/fvd-utils.c
+++ b/block/fvd-utils.c
@@ -107,3 +107,43 @@  static int setup_iov(struct iovec *orig_iov, struct iovec *new_iov,
         count++;
     }
 }
+
+static int zero_iov(struct iovec *orig_iov, int *p_index, uint8_t ** p_buf,
+                    size_t * p_left, size_t total)
+{
+    int index = *p_index;
+    uint8_t *buf = *p_buf;
+    int left = *p_left;
+    int count = 0;
+
+    if (left <= 0) {
+        index++;
+        buf = orig_iov[index].iov_base;
+        left = orig_iov[index].iov_len;
+    }
+
+    while (1) {
+        if (left >= total) {
+            memset(buf, 0, total);
+            *p_buf = buf + total;
+            *p_left = left - total;
+            *p_index = index;
+            return count + 1;
+        }
+
+        memset(buf, 0, left);
+        total -= left;
+        index++;
+        buf = orig_iov[index].iov_base;
+        left = orig_iov[index].iov_len;
+        count++;
+    }
+}
+
+static void aio_wrapper_bh(void *opaque)
+{
+    FvdAIOCB *acb = opaque;
+    acb->common.cb(acb->common.opaque, 0);
+    qemu_bh_delete(acb->wrapper.bh);
+    my_qemu_aio_release(acb);
+}