Patchwork [5/5] Fast Virtual Disk (FVD) Proposal Part 5

login
register
mail settings
Submitter Chunqiang Tang
Date Jan. 19, 2011, 10:04 p.m.
Message ID <1295474688-6219-5-git-send-email-ctang@us.ibm.com>
Download mbox | patch
Permalink /patch/79609/
State New
Headers show

Comments

Chunqiang Tang - Jan. 19, 2011, 10:04 p.m.
Part 5 of the block device driver for the proposed FVD image format.
Multiple patches are used in order to manage the size of each patch.
This patch includes some new files for FVD.

See the related discussions at
http://lists.gnu.org/archive/html/qemu-devel/2011-01/msg00426.html .

Signed-off-by: Chunqiang Tang <ctang@us.ibm.com>
---
 block/fvd-read.c  |  562 ++++++++++++++++++++++++++++++++++++++++++++++++
 block/fvd-store.c |  494 ++++++++++++++++++++++++++++++++++++++++++
 block/fvd-utils.c |  612 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 block/fvd-write.c |  449 +++++++++++++++++++++++++++++++++++++++
 4 files changed, 2117 insertions(+), 0 deletions(-)
 create mode 100644 block/fvd-read.c
 create mode 100644 block/fvd-store.c
 create mode 100644 block/fvd-utils.c
 create mode 100644 block/fvd-write.c

Patch

diff --git a/block/fvd-read.c b/block/fvd-read.c
new file mode 100644
index 0000000..b0cfb91
--- /dev/null
+++ b/block/fvd-read.c
@@ -0,0 +1,562 @@ 
+/*
+ * Copyright (c) 2010-2011 IBM
+ *
+ * Authors:
+ *         Chunqiang Tang <ctang@us.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.
+ * See the COPYING file in the top-level directory.
+ */
+
+/*=============================================================================
+ *  A short description: this module implements bdrv_aio_readv() for FVD.
+ *============================================================================*/
+
+static void finish_read_backing_for_copy_on_read (void *opaque, int ret);
+static void finish_read_fvd (void *opaque, int ret);
+static inline void calc_read_region (BDRVFvdState * s, int64_t sector_num,
+                                     int nb_sectors,
+                                     int64_t * p_first_sec_in_fvd,
+                                     int64_t * p_last_sec_in_fvd,
+                                     int64_t * p_first_sec_in_backing,
+                                     int64_t * p_last_sec_in_backing);
+
+static BlockDriverAIOCB *fvd_aio_readv (BlockDriverState * bs,
+                                        int64_t sector_num, QEMUIOVector * qiov,
+                                        int nb_sectors,
+                                        BlockDriverCompletionFunc * cb,
+                                        void *opaque)
+{
+    BDRVFvdState *s = bs->opaque;
+
+    TRACE_REQUEST (FALSE, sector_num, nb_sectors);
+
+    if (!s->data_region_prepared) {
+        init_data_region (s);
+    }
+
+    if (s->prefetch_state == PREFETCH_STATE_FINISHED
+        || sector_num >= s->nb_sectors_in_base_img) {
+        /* This is an  efficient case. See Section 3.3.5 of the FVD-cow paper.
+         * This also covers the case of no base image. */
+        return load_data (NULL, bs, sector_num, qiov, nb_sectors, cb, opaque);
+    }
+
+    /* Figure out data regions in the base image and in the FVD data file. */
+    int64_t last_sec_in_backing, first_sec_in_backing;
+    int64_t last_sec_in_fvd, first_sec_in_fvd;
+    calc_read_region (s, sector_num, nb_sectors, &first_sec_in_fvd,
+                      &last_sec_in_fvd, &first_sec_in_backing,
+                      &last_sec_in_backing);
+
+    if (first_sec_in_backing < 0) {
+        /* A simple case: all requested data are in the FVD data file. */
+        return load_data (NULL, bs, sector_num, qiov, nb_sectors, cb, opaque);
+    }
+
+    /* Do copy-on-read only if the context id is 0, i.e., it is not emulating
+     * synchronous I/O.  Doing copy-on-read in emulated synchronous I/O may
+     * leave the copy-on-read callbacks never being processed due to
+     * mismatching contextid. */
+    const int copy_on_read = s->copy_on_read && (get_async_context_id () == 0);
+
+    if (first_sec_in_fvd < 0 && !copy_on_read) {
+        /* A simple case: all requested data are in the base image and no need
+         * to do copy_on_read. */
+        return bdrv_aio_readv (bs->backing_hd, sector_num, qiov, nb_sectors, cb,
+                               opaque);
+    }
+
+    /* The remaining cases are more complicated, which can be: 1. Data are
+     * only in the base image and copy-on-read is needed.  2. Data are in both
+     * the base image and the FVD data file. Copy-on-read may be either TRUE
+     * or FALSE. */
+    FvdAIOCB *acb = my_qemu_aio_get (&fvd_aio_pool, bs, cb, opaque);
+    if (!acb) {
+        return NULL;
+    }
+
+    QDEBUG ("READ: acb%llu-%p  start  sector_num=%" PRId64 " nb_sectors=%d\n",
+            acb->uuid, acb, sector_num, nb_sectors);
+
+    acb->type = OP_READ;
+    acb->sector_num = sector_num;
+    acb->nb_sectors = nb_sectors;
+    acb->read.qiov = qiov;
+    acb->read.ret = 0;
+    acb->read.read_backing.hd_acb = NULL;
+    acb->read.read_backing.done = FALSE;
+    acb->read.read_backing.iov.iov_base = NULL;
+    acb->read.read_fvd.hd_acb = NULL;
+    acb->read.read_fvd.iov.iov_base = NULL;
+    acb->read.read_fvd.done = (first_sec_in_fvd < 0);
+
+    /* Read from the base image. */
+    if (copy_on_read) {
+        /* Round the request to the block boundary. */
+        acb->read.read_backing.sector_num =
+            ROUND_DOWN (first_sec_in_backing, s->block_size);
+        int64_t end = ROUND_UP (last_sec_in_backing + 1, s->block_size);
+        if (end > s->nb_sectors_in_base_img) {
+            end = s->nb_sectors_in_base_img;
+        }
+        acb->read.read_backing.nb_sectors =
+            end - acb->read.read_backing.sector_num;
+    } else {
+        acb->read.read_backing.sector_num = first_sec_in_backing;
+        acb->read.read_backing.nb_sectors =
+            last_sec_in_backing - first_sec_in_backing + 1;
+    }
+
+    acb->read.read_backing.iov.iov_len =
+        acb->read.read_backing.nb_sectors * 512;
+    acb->read.read_backing.iov.iov_base =
+        my_qemu_blockalign (bs->backing_hd, acb->read.read_backing.iov.iov_len);
+    qemu_iovec_init_external (&acb->read.read_backing.qiov,
+                              &acb->read.read_backing.iov, 1);
+    acb->read.read_backing.hd_acb =
+        bdrv_aio_readv (bs->backing_hd, acb->read.read_backing.sector_num,
+                        &acb->read.read_backing.qiov,
+                        acb->read.read_backing.nb_sectors,
+                        finish_read_backing_for_copy_on_read, acb);
+    QDEBUG ("READ: acb%llu-%p  read_backing  backing_sector_num=%" PRId64
+            " backing_nb_sectors=%d\n", acb->uuid, acb,
+            acb->read.read_backing.sector_num,
+            acb->read.read_backing.nb_sectors);
+
+    if (!acb->read.read_backing.hd_acb) {
+        my_qemu_vfree (acb->read.read_backing.iov.iov_base);
+        my_qemu_aio_release (acb);
+        return NULL;
+    }
+
+    if (first_sec_in_fvd >= 0) {
+        /* Read the FVD data file. */
+        acb->read.read_fvd.sector_num = first_sec_in_fvd;
+        acb->read.read_fvd.nb_sectors = last_sec_in_fvd - first_sec_in_fvd + 1;
+        acb->read.read_fvd.iov.iov_len = acb->read.read_fvd.nb_sectors * 512;
+
+        /* Make a copy of the current bitmap because it may change when the
+         * read requests finish. */
+        int64_t b = MIN (acb->read.read_backing.sector_num,
+                         acb->read.read_fvd.sector_num);
+        b = b / s->block_size / 8;        /* First byte of the bitmap we need. */
+        int64_t e1 = acb->read.read_backing.sector_num +
+                            acb->read.read_backing.nb_sectors;
+        int64_t e2 = acb->read.read_fvd.sector_num +
+                            acb->read.read_fvd.nb_sectors;
+        int64_t e = MAX (e1, e2);
+        if (e > s->nb_sectors_in_base_img) {
+            e = s->nb_sectors_in_base_img;
+        }
+        e = (e - 1) / s->block_size / 8;/* Last byte of the bitmap we need. */
+        int bitmap_bytes = e - b + 1;
+        int buf_size = acb->read.read_fvd.iov.iov_len +
+                                    ROUND_UP (bitmap_bytes, 512);
+        acb->read.read_fvd.iov.iov_base =
+            my_qemu_blockalign (s->fvd_data, buf_size);
+        uint8_t *saved_bitmap = ((uint8_t *) acb->read.read_fvd.iov.iov_base) +
+                                    acb->read.read_fvd.iov.iov_len;
+        memcpy (saved_bitmap, s->fresh_bitmap + b, bitmap_bytes);
+
+        qemu_iovec_init_external (&acb->read.read_fvd.qiov,
+                                  &acb->read.read_fvd.iov, 1);
+        QDEBUG ("READ: acb%llu-%p  read_fvd  fvd_sector_num=%" PRId64
+                " fvd_nb_sectors=%d\n", acb->uuid, acb,
+                acb->read.read_fvd.sector_num, acb->read.read_fvd.nb_sectors);
+        acb->read.read_fvd.hd_acb = load_data (acb, bs, first_sec_in_fvd,
+                                               &acb->read.read_fvd.qiov,
+                                               acb->read.read_fvd.nb_sectors,
+                                               finish_read_fvd, acb);
+        if (!acb->read.read_fvd.hd_acb) {
+            if (acb->read.read_backing.hd_acb) {
+                bdrv_aio_cancel (acb->read.read_backing.hd_acb);
+                my_qemu_vfree (acb->read.read_backing.iov.iov_base);
+            }
+            my_qemu_vfree (acb->read.read_fvd.iov.iov_base);
+            my_qemu_aio_release (acb);
+            return NULL;
+        }
+    }
+
+    return &acb->common;
+}
+
+static void finish_copy_on_read (void *opaque, int ret)
+{
+    FvdAIOCB *acb = opaque;
+    BlockDriverState *bs = acb->common.bs;
+    BDRVFvdState *s = bs->opaque;
+
+    if (ret == 0) {
+        /* Update fresh_bitmap but do not update stale_bitmap or the on-disk
+         * bitmap. See Section 3.3.4 of the FVD-cow paper. */
+        update_fresh_bitmap (acb->sector_num, acb->nb_sectors, s);
+    }
+
+    s->outstanding_copy_on_read_data -= acb->nb_sectors * 512;
+
+#ifdef FVD_DEBUG
+    s->total_copy_on_read_data += acb->nb_sectors * 512;
+#endif
+    QDEBUG ("READ: acb%llu-%p  finish_copy_on_read  buffer_sector_num=%" PRId64
+            " buffer_nb_sectors=%d write_sector_num=%" PRId64
+            " write_nb_sectors=%d outstanding_copy_on_read=%" PRId64 "\n",
+            acb->uuid, acb, acb->copy.buffered_sector_begin,
+            (int) (acb->copy.buffered_sector_end -
+                   acb->copy.buffered_sector_begin), acb->sector_num,
+            acb->nb_sectors, s->outstanding_copy_on_read_data);
+
+    QLIST_REMOVE (acb, copy_lock.next);
+    restart_dependent_writes (acb);
+
+    int64_t begin = acb->sector_num + acb->nb_sectors;
+    int64_t end = acb->copy.buffered_sector_end;
+
+    if (find_region_in_base_img (s, &begin, &end)) {
+        acb->sector_num = begin;
+        acb->nb_sectors = end - begin;
+        acb->copy.iov.iov_base = acb->copy.buf +
+                                (begin - acb->copy.buffered_sector_begin) * 512;
+        acb->copy.iov.iov_len = acb->nb_sectors * 512;
+        qemu_iovec_init_external (&acb->copy.qiov, &acb->copy.iov, 1);
+        QDEBUG ("READ: acb%llu-%p  copy_on_read  buffer_sector_num=%" PRId64
+                " buffer_nb_sectors=%d write_sector_num=%" PRId64
+                " write_nb_sectors=%d outstanding_copy_on_read=%" PRId64 "\n",
+                acb->uuid, acb, acb->copy.buffered_sector_begin,
+                (int) (acb->copy.buffered_sector_end -
+                       acb->copy.buffered_sector_begin), acb->sector_num,
+                acb->nb_sectors, s->outstanding_copy_on_read_data);
+        acb->copy.hd_acb = store_data (TRUE, acb, bs, acb->sector_num,
+                                       &acb->copy.qiov, acb->nb_sectors,
+                                       finish_copy_on_read, acb);
+        if (acb->copy.hd_acb) {
+            QLIST_INIT (&acb->copy_lock.dependent_writes);
+            acb->copy_lock.begin = begin;
+            acb->copy_lock.end = end;
+            QLIST_INSERT_HEAD (&s->copy_locks, acb, copy_lock.next);
+            s->outstanding_copy_on_read_data += acb->copy.iov.iov_len;
+            return;
+        }
+    }
+
+    QDEBUG ("READ: acb%llu-%p  no_more_copy_on_read\n", acb->uuid, acb);
+    my_qemu_vfree (acb->copy.buf);
+    my_qemu_aio_release (acb);
+}
+
+static void finish_read (FvdAIOCB * acb)
+{
+    BlockDriverState *bs = acb->common.bs;
+    BDRVFvdState *s = bs->opaque;
+
+    if (acb->read.ret != 0) {
+        QDEBUG ("READ: acb%llu-%p  finish_read error ret=%d sector_num=%" PRId64
+                " nb_sectors=%d\n", acb->uuid, acb, acb->read.ret,
+                acb->sector_num, acb->nb_sectors);
+        acb->common.cb (acb->common.opaque, acb->read.ret);
+        if (acb->read.read_backing.iov.iov_base) {
+            my_qemu_vfree (acb->read.read_backing.iov.iov_base);
+        }
+        if (acb->read.read_fvd.iov.iov_base) {
+            my_qemu_vfree (acb->read.read_fvd.iov.iov_base);
+        }
+        my_qemu_aio_release (acb);
+
+        return;
+    }
+
+    if (!acb->read.read_fvd.iov.iov_base) {
+        /* Only read data from the base image. */
+        uint8_t *data = ((uint8_t *) acb->read.read_backing.iov.iov_base) +
+                    (acb->sector_num - acb->read.read_backing.sector_num) * 512;
+        qemu_iovec_from_buffer (acb->read.qiov, data, acb->nb_sectors * 512);
+    } else {
+        /* Under the guidance of the saved bitmap, merge data from the FVD
+         * data file and the base image. */
+        uint8_t *saved_bitmap = ((uint8_t *) acb->read.read_fvd.iov.iov_base) +
+                                            acb->read.read_fvd.iov.iov_len;
+        int64_t bitmap_offset = MIN (acb->read.read_backing.sector_num,
+                                     acb->read.read_fvd.sector_num);
+        bitmap_offset = bitmap_offset / s->block_size / 8;
+        int iov_index = 0;
+        uint8_t *iov_buf = acb->read.qiov->iov[0].iov_base;
+        int iov_left = acb->read.qiov->iov[0].iov_len;
+        int64_t sec = acb->sector_num;
+        const int64_t end = acb->sector_num + acb->nb_sectors;
+        int64_t first_sec;
+        uint8_t *source;
+
+        if (bitmap_show_sector_in_base_img
+            (sec, s, bitmap_offset, saved_bitmap)) {
+            goto in_backing;
+        }
+
+        while (1) {
+            /* For a section of data in the FVD data file. */
+            if (sec >= end) {
+                break;
+            }
+
+            first_sec = sec;
+            do {
+                sec++;
+            } while (sec < end && !bitmap_show_sector_in_base_img (sec, s,
+                                        bitmap_offset, saved_bitmap));
+
+            source = ((uint8_t *) acb->read.read_fvd.iov.iov_base) +
+                            (first_sec - acb->read.read_fvd.sector_num) * 512;
+            copy_to_iov (acb->read.qiov->iov, &iov_index, &iov_buf, &iov_left,
+                         source, (sec - first_sec) * 512);
+
+          in_backing:
+            /* For a section of data in the base image. */
+            if (sec >= end) {
+                break;
+            }
+
+            first_sec = sec;
+            do {
+                sec++;
+            } while (sec < end && bitmap_show_sector_in_base_img (sec, s,
+                                                bitmap_offset, saved_bitmap));
+
+            source = ((uint8_t *) acb->read.read_backing.iov.iov_base) +
+                        (first_sec - acb->read.read_backing.sector_num) * 512;
+            copy_to_iov (acb->read.qiov->iov, &iov_index, &iov_buf, &iov_left,
+                         source, (sec - first_sec) * 512);
+        }
+
+        ASSERT (iov_index == acb->read.qiov->niov - 1 && iov_left == 0);
+        my_qemu_vfree (acb->read.read_fvd.iov.iov_base);
+    }
+
+    QDEBUG ("READ: acb%llu-%p  finish_read  ret=%d\n", acb->uuid, acb,
+            acb->read.ret);
+    acb->common.cb (acb->common.opaque, acb->read.ret);
+
+    if (!s->copy_on_read || get_async_context_id () != 0) {
+        /* Do copy-on-read only if the context id is 0, i.e., it is not
+         * emulating synchronous I/O.  Doing copy-on-read in emulated
+         * synchronous I/O may leave the copy-on-read callbacks never being
+         * processed due to mismatching context id. */
+        my_qemu_vfree (acb->read.read_backing.iov.iov_base);
+        my_qemu_aio_release (acb);
+        return;
+    }
+
+    /* Convert AIOReadCB into a AIOCopyCB for copy-on-read. */
+    uint8_t *buf = acb->read.read_backing.iov.iov_base;
+    int64_t begin = acb->read.read_backing.sector_num;
+    int64_t end = begin + acb->read.read_backing.nb_sectors;
+
+    acb->type = OP_COPY;
+    acb->copy.buf = buf;
+    acb->copy.buffered_sector_begin = begin;
+    acb->copy.buffered_sector_end = end;
+
+    if (s->outstanding_copy_on_read_data < s->max_outstanding_copy_on_read_data
+        && find_region_in_base_img (s, &begin, &end)) {
+        /* Write to the FVD data file. */
+        acb->sector_num = begin;
+        acb->nb_sectors = end - begin;
+        acb->copy.iov.iov_base =
+            buf + (begin - acb->copy.buffered_sector_begin) * 512;
+        acb->copy.iov.iov_len = acb->nb_sectors * 512;
+        qemu_iovec_init_external (&acb->copy.qiov, &acb->copy.iov, 1);
+        QDEBUG ("READ: acb%llu-%p  copy_on_read  buffer_sector_num=%" PRId64
+                " buffer_nb_sectors=%d write_sector_num=%" PRId64
+                " write_nb_sectors=%d outstanding_copy_on_read=%" PRId64 "\n",
+                acb->uuid, acb, acb->copy.buffered_sector_begin,
+                (int) (acb->copy.buffered_sector_end -
+                       acb->copy.buffered_sector_begin), acb->sector_num,
+                acb->nb_sectors, s->outstanding_copy_on_read_data);
+        acb->copy.hd_acb = store_data (TRUE, acb, bs, acb->sector_num,
+                                       &acb->copy.qiov, acb->nb_sectors,
+                                       finish_copy_on_read, acb);
+        if (acb->copy.hd_acb) {
+            QLIST_INIT (&acb->copy_lock.dependent_writes);
+            acb->copy_lock.begin = begin;
+            acb->copy_lock.end = end;
+            QLIST_INSERT_HEAD (&s->copy_locks, acb, copy_lock.next);
+            s->outstanding_copy_on_read_data += acb->copy.iov.iov_len;
+            return;
+        }
+    }
+
+    /* No more copy-on-read to do. */
+    my_qemu_vfree (acb->copy.buf);
+    my_qemu_aio_release (acb);
+}
+
+static void finish_read_fvd (void *opaque, int ret)
+{
+    FvdAIOCB *acb = opaque;
+
+    QDEBUG ("READ: acb%llu-%p  finish_read_fvd ret=%d\n", acb->uuid, acb, ret);
+    acb->read.read_fvd.hd_acb = NULL;
+    acb->read.read_fvd.done = TRUE;
+    if (acb->read.ret == 0) {
+        acb->read.ret = ret;
+    }
+
+    if (acb->read.read_backing.done) {
+        finish_read (acb);        /* The other request also finished. */
+    }
+}
+
+static void finish_read_backing_for_copy_on_read (void *opaque, int ret)
+{
+    FvdAIOCB *acb = opaque;
+
+    QDEBUG ("READ: acb%llu-%p  finish_read_backing ret=%d\n", acb->uuid, acb,
+            ret);
+    acb->read.read_backing.hd_acb = NULL;
+    acb->read.read_backing.done = TRUE;
+    if (acb->read.ret == 0) {
+        acb->read.ret = ret;
+    }
+
+    if (acb->read.read_fvd.done) {
+        finish_read (acb);        /* The other request also finished. */
+    }
+}
+
+static inline void calc_read_region (BDRVFvdState * s, int64_t sector_num,
+                                     int nb_sectors,
+                                     int64_t * p_first_sec_in_fvd,
+                                     int64_t * p_last_sec_in_fvd,
+                                     int64_t * p_first_sec_in_backing,
+                                     int64_t * p_last_sec_in_backing)
+{
+    int64_t last_sec_in_backing = -1, first_sec_in_backing = -1;
+    int64_t last_sec_in_fvd = -1, first_sec_in_fvd = -1;
+    int prev_block_in_backing;
+
+    if (fresh_bitmap_show_sector_in_base_img (sector_num, s)) {
+        first_sec_in_backing = last_sec_in_backing = sector_num;
+        prev_block_in_backing = TRUE;
+    } else {
+        first_sec_in_fvd = last_sec_in_fvd = sector_num;
+        prev_block_in_backing = FALSE;
+    }
+
+    /* Begin of next block. */
+    int64_t sec = ROUND_UP (sector_num + 1, s->block_size);
+
+    const int64_t sec_end = sector_num + nb_sectors;
+    int64_t last_sec = MIN (sec_end, s->nb_sectors_in_base_img) - 1;
+
+    while (1) {
+        if (sec > last_sec) {
+            sec = last_sec;
+        }
+
+        if (fresh_bitmap_show_sector_in_base_img (sec, s)) {
+            if (first_sec_in_backing < 0) {
+                first_sec_in_backing = sec;
+            }
+            if (!prev_block_in_backing) {
+                last_sec_in_fvd = sec - 1;
+                prev_block_in_backing = TRUE;
+            }
+            last_sec_in_backing = sec;
+        } else {
+            if (first_sec_in_fvd < 0) {
+                first_sec_in_fvd = sec;
+            }
+            if (prev_block_in_backing) {
+                last_sec_in_backing = sec - 1;
+                prev_block_in_backing = FALSE;
+            }
+            last_sec_in_fvd = sec;
+        }
+
+        if (sec == last_sec) {
+            break;
+        }
+        sec += s->block_size;
+    }
+
+    if (sec_end > s->nb_sectors_in_base_img) {
+        if (first_sec_in_fvd < 0) {
+            first_sec_in_fvd = s->nb_sectors_in_base_img;
+        }
+        last_sec_in_fvd = sec_end - 1;
+    }
+
+    *p_first_sec_in_fvd = first_sec_in_fvd;
+    *p_last_sec_in_fvd = last_sec_in_fvd;
+    *p_first_sec_in_backing = first_sec_in_backing;
+    *p_last_sec_in_backing = last_sec_in_backing;
+}
+
+static void fvd_read_cancel (FvdAIOCB * acb)
+{
+    if (acb->read.read_backing.hd_acb) {
+        bdrv_aio_cancel (acb->read.read_backing.hd_acb);
+    }
+    if (acb->read.read_fvd.hd_acb) {
+        bdrv_aio_cancel (acb->read.read_fvd.hd_acb);
+    }
+    if (acb->read.read_backing.iov.iov_base) {
+        my_qemu_vfree (acb->read.read_backing.iov.iov_base);
+    }
+    if (acb->read.read_fvd.iov.iov_base) {
+        my_qemu_vfree (acb->read.read_fvd.iov.iov_base);
+    }
+    my_qemu_aio_release (acb);
+}
+
+static void fvd_copy_cancel (FvdAIOCB * acb)
+{
+    BlockDriverState *bs = acb->common.bs;
+    BDRVFvdState *s = bs->opaque;
+
+    if (acb->copy.hd_acb) {
+        bdrv_aio_cancel (acb->copy.hd_acb);
+    }
+    if (acb->copy_lock.next.le_prev != NULL) {
+        QLIST_REMOVE (acb, copy_lock.next);
+        restart_dependent_writes (acb);
+    }
+    my_qemu_vfree (acb->copy.buf);
+    if (acb->common.cb != null_prefetch_cb) {
+        /* This is a copy-on-read operation. */
+        s->outstanding_copy_on_read_data -= acb->nb_sectors * 512;
+    }
+    my_qemu_aio_release (acb);
+}
+
+static void restart_dependent_writes (FvdAIOCB * acb)
+{
+    acb->copy_lock.next.le_prev = NULL;
+    FvdAIOCB *req = acb->copy_lock.dependent_writes.lh_first;
+
+    while (req) {
+        /* Keep a copy of 'next' as it may be changed in do_aiO_write(). */
+        FvdAIOCB *next = req->write.next_dependent_write.le_next;
+
+        /* Indicate that this write is no longer on any depedent list. This
+         * helps fvd_read_cancel() work properly. */
+        req->write.next_dependent_write.le_prev = NULL;
+
+        if (acb->type == OP_WRITE) {
+            QDEBUG ("WRITE: acb%llu-%p  finished_and_restart_conflict_write "
+                    "acb%llu-%p\n", acb->uuid, acb, req->uuid, req);
+        } else {
+            QDEBUG ("READ: copy_on_read acb%llu-%p  "
+                    "finished_and_restart_conflict_write acb%llu-%p\n",
+                    acb->uuid, acb, req->uuid, req);
+        }
+
+        if (do_aio_write (req) < 0) {
+            QDEBUG ("WRITE: acb%llu-%p  finished with error ret=%d\n",
+                    req->uuid, req, -1);
+            req->common.cb (req->common.opaque, -1);
+            my_qemu_aio_release (req);
+        }
+
+        req = next;
+    }
+}
diff --git a/block/fvd-store.c b/block/fvd-store.c
new file mode 100644
index 0000000..ae7f045
--- /dev/null
+++ b/block/fvd-store.c
@@ -0,0 +1,494 @@ 
+/*
+ * Copyright (c) 2010-2011 IBM
+ *
+ * Authors:
+ *         Chunqiang Tang <ctang@us.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.
+ * See the COPYING file in the top-level directory.
+ */
+
+/*=============================================================================
+ *  A short description: this FVD module implements storing data to a
+ *  compact image.
+ *===========================================================================*/
+
+static uint32_t allocate_chunk (BlockDriverState * bs);
+static inline FvdAIOCB *init_store_acb (int soft_write,
+                                        QEMUIOVector * orig_qiov,
+                                        BlockDriverState * bs,
+                                        int64_t sector_num, int nb_sectors,
+                                        FvdAIOCB * parent_acb,
+                                        BlockDriverCompletionFunc * cb,
+                                        void *opaque);
+static void finish_store_data_in_compact_image (void *opaque, int ret);
+
+static inline BlockDriverAIOCB *store_data (int soft_write,
+                                            FvdAIOCB * parent_acb,
+                                            BlockDriverState * bs,
+                                            int64_t sector_num,
+                                            QEMUIOVector * orig_qiov,
+                                            int nb_sectors,
+                                            BlockDriverCompletionFunc * cb,
+                                            void *opaque)
+{
+    BDRVFvdState *s = bs->opaque;
+
+    TRACE_STORE_IN_FVD ("store_data", sector_num, nb_sectors);
+
+    if (!s->table) {
+        /* Write directly since it is not a compact image. */
+        return bdrv_aio_writev (s->fvd_data, s->data_offset + sector_num,
+                                orig_qiov, nb_sectors, cb, opaque);
+    } else {
+        return store_data_in_compact_image (NULL, soft_write, parent_acb, bs,
+                                            sector_num, orig_qiov, nb_sectors,
+                                            cb, opaque);
+    }
+}
+
+/* Store data in the compact image. The argument 'soft_write' means
+ * the store was caused by copy-on-read or prefetching, which need not
+ * update metadata immediately. */
+static BlockDriverAIOCB *store_data_in_compact_image (FvdAIOCB * acb,
+                                                      int soft_write,
+                                                      FvdAIOCB * parent_acb,
+                                                      BlockDriverState * bs,
+                                                      int64_t sector_num,
+                                                      QEMUIOVector * orig_qiov,
+                                                      const int nb_sectors,
+                                                      BlockDriverCompletionFunc
+                                                      * cb, void *opaque)
+{
+    BDRVFvdState *s = bs->opaque;
+
+    const uint32_t first_chunk = sector_num / s->chunk_size;
+    const uint32_t last_chunk = (sector_num + nb_sectors - 1) / s->chunk_size;
+    int table_dirty = FALSE;
+    uint32_t chunk;
+    int64_t start_sec;
+
+    /* Check if storag space is allocated. */
+    for (chunk = first_chunk; chunk <= last_chunk; chunk++) {
+        if (IS_EMPTY (s->table[chunk])) {
+            uint32_t id = allocate_chunk (bs);
+            if (IS_EMPTY (id)) {
+                return NULL;
+            }
+            id |= DIRTY_TABLE;
+            WRITE_TABLE (s->table[chunk], id);
+
+            table_dirty = TRUE;
+        } else if (IS_DIRTY (s->table[chunk])) {
+            /* This is possible if a previous soft-write allocated the storage
+             * space but did not flush the table entry change to the journal
+             * and hence did not clean the dirty bit. This is also possible
+             * with two concurrent hard-writes. The first hard-write allocated
+             * the storage space but has not flushed the table entry change to
+             * the journal yet and hence the table entry remains dirty. In
+             * this case, the second hard-write will also try to flush this
+             * dirty table entry to the journal. The outcome is correct since
+             * they store the same metadata change in the journal (although
+             * twice). For this race condition, we prefer to have two writes
+             * to the journal rather than introducing a locking mechanism,
+             * because this happens rarely and those two writes to the journal
+             * are likely to be merged by the kernel into a single write since
+             * they are likely to update back-to-back sectors in the journal.
+             * A locking mechanism would be less efficient, because the large
+             * size of chunks would cause unnecessary locking due to ``false
+             * sharing'' of a chunk by two writes. */
+            table_dirty = TRUE;
+        }
+    }
+
+    const int update_table = (!soft_write && table_dirty);
+    size_t iov_left;
+    uint8_t *iov_buf;
+    int nb, iov_index, nqiov, niov;
+    uint32_t prev;
+
+    if (first_chunk == last_chunk) {
+        goto handle_one_continuous_region;
+    }
+
+    /* Count the number of qiov and iov needed to cover the continuous regions
+     * of the compact image. */
+    iov_left = orig_qiov->iov[0].iov_len;
+    iov_buf = orig_qiov->iov[0].iov_base;
+    iov_index = 0;
+    nqiov = 0;
+    niov = 0;
+    prev = READ_TABLE (s->table[first_chunk]);
+
+    /* Data in the first chunk. */
+    nb = s->chunk_size - (sector_num % s->chunk_size);
+
+    for (chunk = first_chunk + 1; chunk <= last_chunk; chunk++) {
+        uint32_t current = READ_TABLE (s->table[chunk]);
+        int64_t data_size;
+        if (chunk < last_chunk) {
+            data_size = s->chunk_size;
+        } else {
+            data_size = (sector_num + nb_sectors) % s->chunk_size;
+            if (data_size == 0) {
+                data_size = s->chunk_size;
+            }
+        }
+
+        if (current == prev + 1) {
+            nb += data_size;        /* Continue the previous region. */
+        } else {
+            /* Terminate the previous region. */
+            niov +=
+                count_iov (orig_qiov->iov, &iov_index, &iov_buf, &iov_left,
+                           nb * 512);
+            nqiov++;
+            nb = data_size;        /* Data in the new region. */
+        }
+        prev = current;
+    }
+
+    if (nqiov == 0) {
+      handle_one_continuous_region:
+        /* A simple case. All data can be written out in one qiov and no new
+         * chunks are allocated. */
+        start_sec = READ_TABLE (s->table[first_chunk]) * s->chunk_size +
+                                        (sector_num % s->chunk_size);
+
+        if (!update_table && !acb) {
+            if (parent_acb) {
+                QDEBUG ("STORE: acb%llu-%p  "
+                        "store_directly_without_table_update\n",
+                        parent_acb->uuid, parent_acb);
+            }
+            return bdrv_aio_writev (s->fvd_data, s->data_offset + start_sec,
+                                    orig_qiov, nb_sectors, cb, opaque);
+        }
+
+        if (!acb && !(acb = init_store_acb (soft_write, orig_qiov, bs,
+                            sector_num, nb_sectors, parent_acb, cb, opaque))) {
+            return NULL;
+        }
+
+        QDEBUG ("STORE: acb%llu-%p  store_directly  sector_num=%" PRId64
+                " nb_sectors=%d\n", acb->uuid, acb, acb->sector_num,
+                acb->nb_sectors);
+
+        acb->store.update_table = update_table;
+        acb->store.num_children = 1;
+        acb->store.one_child.hd_acb =
+            bdrv_aio_writev (s->fvd_data, s->data_offset + start_sec, orig_qiov,
+                             nb_sectors, finish_store_data_in_compact_image,
+                             &acb->store.one_child);
+        if (acb->store.one_child.hd_acb) {
+            acb->store.one_child.acb = acb;
+            return &acb->common;
+        } else {
+            my_qemu_aio_release (acb);
+            return NULL;
+        }
+    }
+
+    /* qiov for the last continuous region. */
+    niov += count_iov (orig_qiov->iov, &iov_index, &iov_buf,
+                       &iov_left, nb * 512);
+    nqiov++;
+    ASSERT (iov_index == orig_qiov->niov - 1 && iov_left == 0);
+
+    /* Need to submit multiple requests to the lower layer. */
+    if (!acb && !(acb = init_store_acb (soft_write, orig_qiov, bs, sector_num,
+                                        nb_sectors, parent_acb, cb, opaque))) {
+        return NULL;
+    }
+    acb->store.update_table = update_table;
+    acb->store.num_children = nqiov;
+
+    if (!parent_acb) {
+        QDEBUG ("STORE: acb%llu-%p  start  sector_num=%" PRId64
+                " nb_sectors=%d\n", acb->uuid, acb, acb->sector_num,
+                acb->nb_sectors);
+    }
+
+    /* Allocate memory and create multiple requests. */
+    const size_t metadata_size = nqiov * (sizeof (CompactChildCB) +
+                                          sizeof (QEMUIOVector))
+                                    + niov * sizeof (struct iovec);
+    acb->store.children = (CompactChildCB *) my_qemu_malloc (metadata_size);
+    QEMUIOVector *q = (QEMUIOVector *) (acb->store.children + nqiov);
+    struct iovec *v = (struct iovec *) (q + nqiov);
+
+    start_sec = READ_TABLE (s->table[first_chunk]) * s->chunk_size +
+                                        (sector_num % s->chunk_size);
+    nqiov = 0;
+    iov_index = 0;
+    iov_left = orig_qiov->iov[0].iov_len;
+    iov_buf = orig_qiov->iov[0].iov_base;
+    prev = READ_TABLE (s->table[first_chunk]);
+
+    /* Data in the first chunk. */
+    if (first_chunk == last_chunk) {
+        nb = nb_sectors;
+    }
+    else {
+        nb = s->chunk_size - (sector_num % s->chunk_size);
+    }
+
+    for (chunk = first_chunk + 1; chunk <= last_chunk; chunk++) {
+        uint32_t current = READ_TABLE (s->table[chunk]);
+        int64_t data_size;
+        if (chunk < last_chunk) {
+            data_size = s->chunk_size;
+        } else {
+            data_size = (sector_num + nb_sectors) % s->chunk_size;
+            if (data_size == 0) {
+                data_size = s->chunk_size;
+            }
+        }
+
+        if (current == prev + 1) {
+            nb += data_size;        /* Continue the previous region. */
+        } else {
+            /* Terminate the previous continuous region. */
+            niov = setup_iov (orig_qiov->iov, v, &iov_index,
+                              &iov_buf, &iov_left, nb * 512);
+            qemu_iovec_init_external (q, v, niov);
+            QDEBUG ("STORE: acb%llu-%p  create_child %d sector_num=%" PRId64
+                    " nb_sectors=%d niov=%d\n", acb->uuid, acb, nqiov,
+                    start_sec, q->size / 512, q->niov);
+            acb->store.children[nqiov].hd_acb =
+                bdrv_aio_writev (s->fvd_data, s->data_offset + start_sec, q,
+                                 q->size / 512,
+                                 finish_store_data_in_compact_image,
+                                 &acb->store.children[nqiov]);
+            if (!acb->store.children[nqiov].hd_acb) {
+                goto fail;
+            }
+            acb->store.children[nqiov].acb = acb;
+            v += niov;
+            q++;
+            nqiov++;
+            start_sec = current * s->chunk_size; /* Begin of the new region. */
+            nb = data_size;        /* Data in the new region. */
+        }
+        prev = current;
+    }
+
+    /* Requst for the last chunk. */
+    niov = setup_iov (orig_qiov->iov, v, &iov_index, &iov_buf,
+                      &iov_left, nb * 512);
+    ASSERT (iov_index == orig_qiov->niov - 1 && iov_left == 0);
+    qemu_iovec_init_external (q, v, niov);
+
+    QDEBUG ("STORE: acb%llu-%p  create_child_last %d sector_num=%" PRId64
+            " nb_sectors=%d niov=%d\n", acb->uuid, acb, nqiov, start_sec,
+            q->size / 512, q->niov);
+    acb->store.children[nqiov].hd_acb =
+        bdrv_aio_writev (s->fvd_data, s->data_offset + start_sec, q,
+                         q->size / 512, finish_store_data_in_compact_image,
+                         &acb->store.children[nqiov]);
+    if (acb->store.children[nqiov].hd_acb) {
+        acb->store.children[nqiov].acb = acb;
+        return &acb->common;
+    }
+
+    int i;
+  fail:
+    QDEBUG ("STORE: acb%llu-%p  failed\n", acb->uuid, acb);
+    for (i = 0; i < nqiov; i++) {
+        bdrv_aio_cancel (acb->store.children[i].hd_acb);
+    }
+    my_qemu_free (acb->store.children);
+    my_qemu_aio_release (acb);
+    return NULL;
+}
+
+static uint32_t allocate_chunk (BlockDriverState * bs)
+{
+    BDRVFvdState *s = bs->opaque;
+
+    /* Check if there is sufficient storage space. */
+    if (s->used_storage + s->chunk_size > s->data_storage) {
+        if (s->add_storage_cmd) {
+            if (system (s->add_storage_cmd)) {
+                fprintf (stderr, "Error in executing %s\n", s->add_storage_cmd);
+            }
+        } else {
+            /* If the image is stored on a file system, the image file size
+             * can be increased by bdrv_truncate. */
+            int64_t new_size = (s->data_offset + s->used_storage +
+                                s->storage_grow_unit) * 512;
+            bdrv_truncate (s->fvd_data, new_size);
+        }
+
+        /* Check how much storage is available now. */
+        int64_t size = bdrv_getlength (s->fvd_data);
+        if (size < 0) {
+            fprintf (stderr, "Error in bdrv_getlength(%s)\n", bs->filename);
+            return EMPTY_TABLE;
+        }
+        s->data_storage = size / 512 - s->data_offset;
+        if (s->used_storage + s->chunk_size > s->data_storage) {
+            fprintf (stderr, "Could not allocate more storage space.\n");
+            return EMPTY_TABLE;
+        }
+
+        QDEBUG ("Increased storage to %" PRId64 " bytes.\n", size);
+    }
+
+    uint32_t allocated_chunk_id = s->used_storage / s->chunk_size;
+    s->used_storage += s->chunk_size;
+    return allocated_chunk_id;
+}
+
+static void finish_store_data_in_compact_image (void *opaque, int ret)
+{
+    CompactChildCB *child = opaque;
+    FvdAIOCB *acb = child->acb;
+
+    /* Now fvd_store_compact_cancel(), if invoked, won't cancel this child
+     * request. */
+    child->hd_acb = NULL;
+
+    if (acb->store.ret == 0) {
+        acb->store.ret = ret;
+    } else {
+        QDEBUG ("STORE: acb%llu-%p  store_child=%d total_children=%d error "
+                "ret=%d\n", acb->uuid, acb, acb->store.finished_children,
+             acb->store.num_children, ret);
+    }
+
+    acb->store.finished_children++;
+    if (acb->store.finished_children < acb->store.num_children) {
+        QDEBUG ("STORE: acb%llu-%p  store_finished_children=%d "
+                "total_children=%d\n", acb->uuid, acb,
+                acb->store.finished_children, acb->store.num_children);
+        return;
+    }
+
+    /* All child requests finished. Free buffers. */
+    if (acb->store.children) {
+        my_qemu_free (acb->store.children);
+        acb->store.children = NULL;
+    }
+
+    if (acb->store.ret) {        /* error */
+        QDEBUG ("STORE: acb%llu-%p  "
+                "store_last_child_finished_with_error ret=%d\n",
+                acb->uuid, acb, acb->store.ret);
+        acb->common.cb (acb->common.opaque, acb->store.ret);
+        my_qemu_aio_release (acb);
+        return;
+    }
+
+    if (!acb->store.update_table) {
+        QDEBUG ("STORE: acb%llu-%p  "
+                "store_last_child_finished_without_table_update\n",
+                acb->uuid, acb);
+        acb->common.cb (acb->common.opaque, acb->store.ret);
+        my_qemu_aio_release (acb);
+        return;
+    }
+
+    /* Check whether the table entries are still dirty. Note that while saving
+     * this write to disk, other writes might have already flushed the dirty
+     * table entries to the journal. If those table entries are no longer
+     * dirty, depending on the behavior of parent_acb, it might be able to
+     * skip a journal update. */
+    BlockDriverState *bs = acb->common.bs;
+    BDRVFvdState *s = bs->opaque;
+    uint32_t first_chunk = acb->sector_num / s->chunk_size;
+    const uint32_t last_chunk =
+        (acb->sector_num + acb->nb_sectors - 1) / s->chunk_size;
+    int update_table = FALSE;
+    uint32_t chunk;
+    for (chunk = first_chunk; chunk <= last_chunk; chunk++) {
+        if (IS_DIRTY (s->table[chunk])) {
+            update_table = TRUE;
+            break;
+        }
+    }
+
+    if (acb->store.parent_acb) {
+        /* Metadata update will be handled by the parent write. */
+        ASSERT (acb->store.parent_acb->type == OP_WRITE);
+        QDEBUG ("STORE: acb%llu-%p  "
+                "store_last_child_finished_with_parent_do_table_update\n",
+                acb->uuid, acb);
+        acb->store.parent_acb->write.update_table = update_table;
+        acb->common.cb (acb->common.opaque, acb->store.ret);
+        my_qemu_aio_release (acb);
+        return;
+    }
+
+    if (update_table) {
+        QDEBUG ("STORE: acb%llu-%p  "
+                "store_last_child_finished_and_start_table_update\n",
+                acb->uuid, acb);
+        write_metadata_to_journal (acb);
+    } else {
+        QDEBUG ("STORE: acb%llu-%p  "
+                "store_last_child_finished_without_table_update\n",
+                acb->uuid, acb);
+        acb->common.cb (acb->common.opaque, acb->store.ret);
+        my_qemu_aio_release (acb);
+    }
+}
+
+static inline FvdAIOCB *init_store_acb (int soft_write,
+                                        QEMUIOVector * orig_qiov,
+                                        BlockDriverState * bs,
+                                        int64_t sector_num, int nb_sectors,
+                                        FvdAIOCB * parent_acb,
+                                        BlockDriverCompletionFunc * cb,
+                                        void *opaque)
+{
+    FvdAIOCB *acb = my_qemu_aio_get (&fvd_aio_pool, bs, cb, opaque);
+    if (!acb) {
+        return NULL;
+    }
+    acb->type = OP_STORE_COMPACT;
+    acb->sector_num = sector_num;
+    acb->nb_sectors = nb_sectors;
+    acb->store.soft_write = soft_write;
+    acb->store.orig_qiov = orig_qiov;
+    acb->store.parent_acb = parent_acb;
+    acb->store.finished_children = 0;
+    acb->store.num_children = 0;
+    acb->store.one_child.hd_acb = NULL;
+    acb->store.children = NULL;
+    acb->store.ret = 0;
+    acb->jcb.iov.iov_base = NULL;
+    acb->jcb.hd_acb = NULL;
+    acb->jcb.next_wait_for_journal.le_prev = NULL;
+    COPY_UUID (acb, parent_acb);
+
+    return acb;
+}
+
+static void fvd_store_compact_cancel (FvdAIOCB * acb)
+{
+    if (acb->store.children) {
+        int i;
+        for (i = 0; i < acb->store.num_children; i++) {
+            if (acb->store.children[i].hd_acb) {
+                bdrv_aio_cancel (acb->store.children[i].hd_acb);
+            }
+        }
+        my_qemu_free (acb->store.children);
+    }
+    if (acb->store.one_child.hd_acb) {
+        bdrv_aio_cancel (acb->store.one_child.hd_acb);
+    }
+    if (acb->jcb.hd_acb) {
+        bdrv_aio_cancel (acb->jcb.hd_acb);
+        free_journal_sectors (acb->common.bs->opaque);
+    }
+    if (acb->jcb.iov.iov_base != NULL) {
+        my_qemu_vfree (acb->jcb.iov.iov_base);
+    }
+    if (acb->jcb.next_wait_for_journal.le_prev) {
+        QLIST_REMOVE (acb, jcb.next_wait_for_journal);
+    }
+
+    my_qemu_aio_release (acb);
+}
diff --git a/block/fvd-utils.c b/block/fvd-utils.c
new file mode 100644
index 0000000..3f7d4ec
--- /dev/null
+++ b/block/fvd-utils.c
@@ -0,0 +1,612 @@ 
+/*
+ * Copyright (c) 2010-2011 IBM
+ *
+ * Authors:
+ *         Chunqiang Tang <ctang@us.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.
+ * See the COPYING file in the top-level directory.
+ */
+
+/*==============================================================================
+ *  A short description: this module implements basic utility functions for
+ *  the Fast Virtual Disk (FVD) format.
+ *============================================================================*/
+
+static inline int stale_bitmap_show_sector_in_base_img (int64_t sector_num,
+                                                const BDRVFvdState * s)
+{
+    if (sector_num >= s->nb_sectors_in_base_img) {
+        return FALSE;
+    }
+
+    int64_t block_num = sector_num / s->block_size;
+    int64_t bitmap_byte_offset = block_num / 8;
+    uint8_t bitmap_bit_offset = block_num % 8;
+    uint8_t b = s->stale_bitmap[bitmap_byte_offset];
+    return 0 == (int) ((b >> bitmap_bit_offset) & 0x01);
+}
+
+static inline int
+fresh_bitmap_show_sector_in_base_img (int64_t sector_num,
+                                              const BDRVFvdState * s)
+{
+    if (sector_num >= s->nb_sectors_in_base_img) {
+        return FALSE;
+    }
+
+    int64_t block_num = sector_num / s->block_size;
+    int64_t bitmap_byte_offset = block_num / 8;
+    uint8_t bitmap_bit_offset = block_num % 8;
+    uint8_t b = s->fresh_bitmap[bitmap_byte_offset];
+    return 0 == (int) ((b >> bitmap_bit_offset) & 0x01);
+}
+
+static inline void update_fresh_bitmap (int64_t sector_num, int nb_sectors,
+                                           const BDRVFvdState * s)
+{
+    if (sector_num >= s->nb_sectors_in_base_img) {
+        return;
+    }
+
+    int64_t end = sector_num + nb_sectors;
+    if (end > s->nb_sectors_in_base_img) {
+        end = s->nb_sectors_in_base_img;
+    }
+
+    int64_t block_num = sector_num / s->block_size;
+    int64_t block_end = (end - 1) / s->block_size;
+
+    for (; block_num <= block_end; block_num++) {
+        int64_t bitmap_byte_offset = block_num / 8;
+        uint8_t bitmap_bit_offset = block_num % 8;
+        uint8_t mask = (uint8_t) (0x01 << bitmap_bit_offset);
+        uint8_t b = s->fresh_bitmap[bitmap_byte_offset];
+        if (!(b & mask)) {
+            b |= mask;
+            s->fresh_bitmap[bitmap_byte_offset] = b;
+        }
+    }
+}
+
+static void update_stale_bitmap (BDRVFvdState * s, int64_t sector_num,
+                                 int nb_sectors)
+{
+    if (sector_num >= s->nb_sectors_in_base_img) {
+        return;
+    }
+
+    int64_t end = sector_num + nb_sectors;
+    if (end > s->nb_sectors_in_base_img) {
+        end = s->nb_sectors_in_base_img;
+    }
+
+    int64_t block_num = sector_num / s->block_size;
+    const int64_t block_end = (end - 1) / s->block_size;
+
+    for (; block_num <= block_end; block_num++) {
+        int64_t bitmap_byte_offset = block_num / 8;
+        uint8_t bitmap_bit_offset = block_num % 8;
+        uint8_t mask = (uint8_t) (0x01 << bitmap_bit_offset);
+        uint8_t b = s->stale_bitmap[bitmap_byte_offset];
+        if (!(b & mask)) {
+            ASSERT (s->stale_bitmap == s->fresh_bitmap ||
+                    (s->fresh_bitmap[bitmap_byte_offset] & mask));
+            b |= mask;
+            s->stale_bitmap[bitmap_byte_offset] = b;
+        }
+    }
+}
+
+static void update_both_bitmaps (BDRVFvdState * s, int64_t sector_num,
+                                 int nb_sectors)
+{
+    if (sector_num >= s->nb_sectors_in_base_img) {
+        return;
+    }
+
+    int64_t end = sector_num + nb_sectors;
+    if (end > s->nb_sectors_in_base_img) {
+        end = s->nb_sectors_in_base_img;
+    }
+
+    int64_t block_num = sector_num / s->block_size;
+    const int64_t block_end = (end - 1) / s->block_size;
+
+    for (; block_num <= block_end; block_num++) {
+        int64_t bitmap_byte_offset = block_num / 8;
+        uint8_t bitmap_bit_offset = block_num % 8;
+        uint8_t mask = (uint8_t) (0x01 << bitmap_bit_offset);
+        uint8_t b = s->fresh_bitmap[bitmap_byte_offset];
+        if (!(b & mask)) {
+            b |= mask;
+            s->fresh_bitmap[bitmap_byte_offset] =
+                s->stale_bitmap[bitmap_byte_offset] = b;
+        }
+    }
+}
+
+/* Return TRUE if a valid region is found. */
+static int find_region_in_base_img (BDRVFvdState * s, int64_t * from,
+                                    int64_t * to)
+{
+    int64_t sec = *from;
+    int64_t last_sec = *to;
+
+    if (last_sec > s->nb_sectors_in_base_img) {
+        last_sec = s->nb_sectors_in_base_img;
+    }
+
+    if (sec >= last_sec) {
+        return FALSE;
+    }
+
+    if (!fresh_bitmap_show_sector_in_base_img (sec, s)) {
+        /* Find the first sector in the base image. */
+
+        sec = ROUND_UP (sec + 1, s->block_size); /* Begin of next block. */
+        while (1) {
+            if (sec >= last_sec) {
+                return FALSE;
+            }
+            if (fresh_bitmap_show_sector_in_base_img (sec, s)) {
+                break;
+            }
+            sec += s->block_size;        /* Begin of the next block. */
+        }
+    }
+
+    /* Find the end of the region in the base image. */
+    int64_t first_sec = sec;
+    sec = ROUND_UP (sec + 1, s->block_size);        /* Begin of next block. */
+    while (1) {
+        if (sec >= last_sec) {
+            sec = last_sec;
+            break;
+        }
+        if (!fresh_bitmap_show_sector_in_base_img (sec, s)) {
+            break;
+        }
+        sec += s->block_size;        /* Begin of the next block. */
+    }
+    last_sec = sec;
+
+    /* Check conflicting copy-on-reads. */
+    FvdAIOCB *old;
+    QLIST_FOREACH (old, &s->copy_locks, copy_lock.next) {
+        if (old->copy_lock.begin <= first_sec
+                && first_sec < old->copy_lock.end) {
+            first_sec = old->copy_lock.end;
+        }
+        if (old->copy_lock.begin < last_sec && last_sec <= old->copy_lock.end) {
+            last_sec = old->copy_lock.begin;
+        }
+    }
+
+    if (first_sec >= last_sec) {
+        return FALSE;        /* The entire region is already covered. */
+    }
+
+     /* This loop cannot be merged with the loop above. Otherwise, the logic
+      * would be incorrect.  This loop covers the case that an old request
+      * spans over a subset of the region being checked. */
+    QLIST_FOREACH (old, &s->copy_locks, copy_lock.next) {
+        if (first_sec <= old->copy_lock.begin
+            && old->copy_lock.begin < last_sec) {
+            last_sec = old->copy_lock.begin;
+        }
+    }
+
+    /* Check conflicting writes. */
+    QLIST_FOREACH (old, &s->write_locks, write.next_write_lock) {
+        int64_t old_end = old->sector_num + old->nb_sectors;
+        if (old->sector_num <= first_sec && first_sec < old_end) {
+            first_sec = old_end;
+        }
+        if (old->sector_num < last_sec && last_sec <= old_end) {
+            last_sec = old->sector_num;
+        }
+    }
+
+    if (first_sec >= last_sec) {
+        return FALSE;        /* The entire region is already covered. */
+    }
+
+     /* This loop cannot be merged with the loop above. Otherwise, the logic
+      * would be incorrect.  This loop covers the case that an old request
+      * spans over a subset of the region being checked. */
+    QLIST_FOREACH (old, &s->write_locks, write.next_write_lock) {
+        if (first_sec <= old->sector_num && old->sector_num < last_sec) {
+            last_sec = old->sector_num;
+        }
+    }
+
+    ASSERT (first_sec % s->block_size == 0 && (last_sec % s->block_size == 0
+                || last_sec == s->nb_sectors_in_base_img));
+
+    *from = first_sec;
+    *to = last_sec;
+    return TRUE;
+}
+
+static inline int bitmap_show_sector_in_base_img (int64_t sector_num,
+                                                       const BDRVFvdState * s,
+                                                       int bitmap_offset,
+                                                       uint8_t * bitmap)
+{
+    if (sector_num >= s->nb_sectors_in_base_img) {
+        return FALSE;
+    }
+
+    int64_t block_num = sector_num / s->block_size;
+    int64_t bitmap_byte_offset = block_num / 8 - bitmap_offset;
+    uint8_t bitmap_bit_offset = block_num % 8;
+    uint8_t b = bitmap[bitmap_byte_offset];
+    return 0 == (int) ((b >> bitmap_bit_offset) & 0x01);
+}
+
+static inline void copy_to_iov (struct iovec *iov, int *p_index,
+                                uint8_t ** p_buf, int *p_left,
+                                uint8_t * source, int total)
+{
+    int index = *p_index;
+    uint8_t *buf = *p_buf;
+    int left = *p_left;
+
+    if (left <= 0) {
+        index++;
+        buf = iov[index].iov_base;
+        left = iov[index].iov_len;
+    }
+
+    while (1) {
+        if (left >= total) {
+            memcpy (buf, source, total);
+            *p_buf = buf + total;
+            *p_left = left - total;
+            *p_index = index;
+            return;
+        }
+
+        memcpy (buf, source, left);
+        total -= left;
+        source += left;
+        index++;
+        buf = iov[index].iov_base;
+        left = iov[index].iov_len;
+    }
+}
+
+static inline void init_data_region (BDRVFvdState * s)
+{
+    bdrv_truncate (s->fvd_data, s->data_offset * 512 + s->virtual_disk_size);
+    s->data_region_prepared = TRUE;
+}
+
+static inline void update_clean_shutdown_flag (BDRVFvdState * s, int clean)
+{
+    FvdHeader header;
+    if (!read_fvd_header (s, &header)) {
+        header.clean_shutdown = clean;
+
+        if (!update_fvd_header (s, &header)) {
+            QDEBUG ("Set clean_shutdown to %s\n", BOOL (clean));
+        }
+    }
+}
+
+static inline int stale_bitmap_need_update (FvdAIOCB * acb)
+{
+    BlockDriverState *bs = acb->common.bs;
+    BDRVFvdState *s = bs->opaque;
+    int64_t end = acb->sector_num + acb->nb_sectors;
+
+    if (end > s->nb_sectors_in_base_img) {
+        end = s->nb_sectors_in_base_img;
+    }
+    int64_t block_end = (end - 1) / s->block_size;
+    int64_t block_num = acb->sector_num / s->block_size;
+
+    for (; block_num <= block_end; block_num++) {
+        int64_t bitmap_byte_offset = block_num / 8;
+        uint8_t bitmap_bit_offset = block_num % 8;
+        uint8_t mask = (uint8_t) (0x01 << bitmap_bit_offset);
+        uint8_t b = s->stale_bitmap[bitmap_byte_offset];
+        if (!(b & mask)) {
+            return TRUE;
+        }
+    }
+
+    return FALSE;
+}
+
+static int update_fresh_bitmap_and_check_stale_bitmap (FvdAIOCB * acb)
+{
+    BlockDriverState *bs = acb->common.bs;
+    BDRVFvdState *s = bs->opaque;
+
+    if (acb->sector_num >= s->nb_sectors_in_base_img) {
+        return FALSE;
+    }
+
+    int need_update = FALSE;
+    int64_t end = acb->sector_num + acb->nb_sectors;
+
+    if (end > s->nb_sectors_in_base_img) {
+        end = s->nb_sectors_in_base_img;
+    }
+
+    int64_t block_end = (end - 1) / s->block_size;
+    int64_t block_num = acb->sector_num / s->block_size;
+
+    for (; block_num <= block_end; block_num++) {
+        int64_t bitmap_byte_offset = block_num / 8;
+        uint8_t bitmap_bit_offset = block_num % 8;
+        uint8_t mask = (uint8_t) (0x01 << bitmap_bit_offset);
+        uint8_t b = s->stale_bitmap[bitmap_byte_offset];
+        if (b & mask) {
+            /* If the bit in stale_bitmap is set, the corresponding bit in
+             * fresh_bitmap must be set already. */
+            continue;
+        }
+
+        need_update = TRUE;
+        b = s->fresh_bitmap[bitmap_byte_offset];
+        if (!(b & mask)) {
+            b |= mask;
+            s->fresh_bitmap[bitmap_byte_offset] = b;
+        }
+    }
+
+    return need_update;
+}
+
+static void fvd_header_cpu_to_le (FvdHeader * header)
+{
+    cpu_to_le32s (&header->magic);
+    cpu_to_le32s (&header->version);
+    cpu_to_le32s ((uint32_t *) & header->all_data_in_fvd_img);
+    cpu_to_le32s ((uint32_t *) & header->generate_prefetch_profile);
+    cpu_to_le64s ((uint64_t *) & header->metadata_size);
+    cpu_to_le64s ((uint64_t *) & header->virtual_disk_size);
+    cpu_to_le64s ((uint64_t *) & header->base_img_size);
+    cpu_to_le64s ((uint64_t *) & header->max_outstanding_copy_on_read_data);
+    cpu_to_le64s ((uint64_t *) & header->bitmap_offset);
+    cpu_to_le64s ((uint64_t *) & header->prefetch_profile_offset);
+    cpu_to_le64s ((uint64_t *) & header->prefetch_profile_entries);
+    cpu_to_le64s ((uint64_t *) & header->bitmap_size);
+    cpu_to_le32s ((uint32_t *) & header->copy_on_read);
+    cpu_to_le32s ((uint32_t *) & header->need_zero_init);
+    cpu_to_le32s ((uint32_t *) & header->prefetch_start_delay);
+    cpu_to_le32s ((uint32_t *) & header->profile_directed_prefetch_start_delay);
+    cpu_to_le32s ((uint32_t *) & header->num_prefetch_slots);
+    cpu_to_le32s ((uint32_t *) & header->bytes_per_prefetch);
+    cpu_to_le32s ((uint32_t *) & header->prefetch_throttle_time);
+    cpu_to_le32s ((uint32_t *) & header->prefetch_read_throughput_measure_time);
+    cpu_to_le32s ((uint32_t *) &header->prefetch_write_throughput_measure_time);
+    cpu_to_le32s ((uint32_t *) & header->prefetch_perf_calc_alpha);
+    cpu_to_le32s ((uint32_t *) & header->prefetch_min_read_throughput);
+    cpu_to_le32s ((uint32_t *) & header->prefetch_min_write_throughput);
+    cpu_to_le32s ((uint32_t *) & header->prefetch_max_read_throughput);
+    cpu_to_le32s ((uint32_t *) & header->prefetch_max_write_throughput);
+    cpu_to_le32s ((uint32_t *) & header->block_size);
+    cpu_to_le32s ((uint32_t *) & header->unit_of_PrefetchProfileEntry_len);
+    cpu_to_le32s ((uint32_t *) & header->compact_image);
+    cpu_to_le64s ((uint64_t *) & header->chunk_size);
+    cpu_to_le64s ((uint64_t *) & header->storage_grow_unit);
+    cpu_to_le64s ((uint64_t *) & header->table_offset);
+    cpu_to_le32s ((uint32_t *) & header->clean_shutdown);
+    cpu_to_le64s ((uint64_t *) & header->journal_offset);
+    cpu_to_le64s ((uint64_t *) & header->journal_size);
+}
+
+static void fvd_header_le_to_cpu (FvdHeader * header)
+{
+    le32_to_cpus (&header->magic);
+    le32_to_cpus (&header->version);
+    le32_to_cpus ((uint32_t *) & header->all_data_in_fvd_img);
+    le32_to_cpus ((uint32_t *) & header->generate_prefetch_profile);
+    le64_to_cpus ((uint64_t *) & header->metadata_size);
+    le64_to_cpus ((uint64_t *) & header->virtual_disk_size);
+    le64_to_cpus ((uint64_t *) & header->base_img_size);
+    le64_to_cpus ((uint64_t *) & header->max_outstanding_copy_on_read_data);
+    le64_to_cpus ((uint64_t *) & header->bitmap_offset);
+    le64_to_cpus ((uint64_t *) & header->prefetch_profile_offset);
+    le64_to_cpus ((uint64_t *) & header->prefetch_profile_entries);
+    le64_to_cpus ((uint64_t *) & header->bitmap_size);
+    le32_to_cpus ((uint32_t *) & header->copy_on_read);
+    le32_to_cpus ((uint32_t *) & header->need_zero_init);
+    le32_to_cpus ((uint32_t *) & header->prefetch_start_delay);
+    le32_to_cpus ((uint32_t *) & header->profile_directed_prefetch_start_delay);
+    le32_to_cpus ((uint32_t *) & header->num_prefetch_slots);
+    le32_to_cpus ((uint32_t *) & header->bytes_per_prefetch);
+    le32_to_cpus ((uint32_t *) & header->prefetch_throttle_time);
+    le32_to_cpus ((uint32_t *) & header->prefetch_read_throughput_measure_time);
+    le32_to_cpus ((uint32_t *) &header->prefetch_write_throughput_measure_time);
+    le32_to_cpus ((uint32_t *) & header->prefetch_perf_calc_alpha);
+    le32_to_cpus ((uint32_t *) & header->prefetch_min_read_throughput);
+    le32_to_cpus ((uint32_t *) & header->prefetch_min_write_throughput);
+    le32_to_cpus ((uint32_t *) & header->prefetch_max_read_throughput);
+    le32_to_cpus ((uint32_t *) & header->prefetch_max_write_throughput);
+    le32_to_cpus ((uint32_t *) & header->block_size);
+    le32_to_cpus ((uint32_t *) & header->unit_of_PrefetchProfileEntry_len);
+    le32_to_cpus ((uint32_t *) & header->compact_image);
+    le64_to_cpus ((uint64_t *) & header->chunk_size);
+    le64_to_cpus ((uint64_t *) & header->storage_grow_unit);
+    le64_to_cpus ((uint64_t *) & header->table_offset);
+    le32_to_cpus ((uint32_t *) & header->clean_shutdown);
+    le64_to_cpus ((uint64_t *) & header->journal_offset);
+    le64_to_cpus ((uint64_t *) & header->journal_size);
+}
+
+static void flush_metadata_to_disk (BlockDriverState * bs)
+{
+    BDRVFvdState *s = bs->opaque;
+
+    if (bs->read_only || !s->fvd_metadata) {
+        return;
+    }
+
+    if (s->stale_bitmap) {
+        /* Flush fresh_bitmap to disk. */
+        int nb = (int) (s->bitmap_size / 512);
+        QDEBUG ("Flush FVD bitmap (%d sectors) to disk\n", nb);
+        bdrv_write (s->fvd_metadata, s->bitmap_offset, s->fresh_bitmap, nb);
+    }
+
+    if (s->table) {
+        /* Flush table to disk. */
+        int table_entries =
+            (int) (ROUND_UP (s->virtual_disk_size, s->chunk_size * 512) /
+                   (s->chunk_size * 512));
+
+        /* Clean the DIRTY_TABLE bit. */
+        int i;
+        for (i = 0; i < table_entries; i++) {
+            CLEAN_DIRTY (s->table[i]);
+        }
+
+        int64_t table_size = sizeof (uint32_t) * table_entries;
+        table_size = ROUND_UP (table_size, DEF_PAGE_SIZE);
+        int nb = (int) (table_size / 512);
+        QDEBUG ("Flush FVD table (%d sectors) to disk\n", nb);
+        bdrv_write (s->fvd_metadata, s->table_offset, (uint8_t *) s->table, nb);
+    }
+}
+
+static int read_fvd_header (BDRVFvdState * s, FvdHeader * header)
+{
+    if (bdrv_pread (s->fvd_metadata, 0, header, sizeof (FvdHeader)) !=
+        sizeof (FvdHeader)) {
+        fprintf (stderr, "Failed to read the FVD header.\n");
+        return -1;
+    }
+
+    fvd_header_le_to_cpu (header);
+
+    if (header->magic != FVD_MAGIC || header->version != FVD_VERSION) {
+        fprintf (stderr, "Error: image does not have the correct FVD format "
+                 "magic number in header\n");
+        return -1;
+    }
+
+    return 0;
+}
+
+static int update_fvd_header (BDRVFvdState * s, FvdHeader * header)
+{
+    fvd_header_cpu_to_le (header);
+    int ret = bdrv_pwrite (s->fvd_metadata, 0, header, sizeof (FvdHeader));
+
+    if (ret != sizeof (FvdHeader)) {
+        fprintf (stderr, "Failed to update the FVD header.\n");
+        ASSERT (FALSE);
+        return -EIO;
+    }
+
+    return 0;
+}
+
+static void null_prefetch_cb (void *opaque, int ret)
+{
+    /* Nothing to do and will never be invoked. Only need it to distinguish
+     * copy-on-read from prefetch. */
+    ASSERT (FALSE);
+}
+
+static int count_iov (struct iovec *orig_iov, int *p_index, uint8_t ** p_buf,
+                      size_t * p_left, size_t total)
+{
+    int index = *p_index;
+    uint8_t *buf = *p_buf;
+    int left = *p_left;
+    int count = 0;
+
+    if (left <= 0) {
+        index++;
+        buf = orig_iov[index].iov_base;
+        left = orig_iov[index].iov_len;
+    }
+
+    while (1) {
+        if (left >= total) {
+            *p_buf = buf + total;
+            *p_left = left - total;
+            *p_index = index;
+            return count + 1;
+        }
+
+        total -= left;
+        index++;
+        buf = orig_iov[index].iov_base;
+        left = orig_iov[index].iov_len;
+        count++;
+    }
+}
+
+static int setup_iov (struct iovec *orig_iov, struct iovec *new_iov,
+                      int *p_index, uint8_t ** p_buf, size_t * p_left,
+                      size_t total)
+{
+    int index = *p_index;
+    uint8_t *buf = *p_buf;
+    int left = *p_left;
+    int count = 0;
+
+    if (left <= 0) {
+        index++;
+        buf = orig_iov[index].iov_base;
+        left = orig_iov[index].iov_len;
+    }
+
+    while (1) {
+        if (left >= total) {
+            new_iov[count].iov_base = buf;
+            new_iov[count].iov_len = total;
+            *p_buf = buf + total;
+            *p_left = left - total;
+            *p_index = index;
+            return count + 1;
+        }
+
+        new_iov[count].iov_base = buf;
+        new_iov[count].iov_len = left;
+        total -= left;
+        index++;
+        buf = orig_iov[index].iov_base;
+        left = orig_iov[index].iov_len;
+        count++;
+    }
+}
+
+static int zero_iov (struct iovec *orig_iov, int *p_index, uint8_t ** p_buf,
+                     size_t * p_left, size_t total)
+{
+    int index = *p_index;
+    uint8_t *buf = *p_buf;
+    int left = *p_left;
+    int count = 0;
+
+    if (left <= 0) {
+        index++;
+        buf = orig_iov[index].iov_base;
+        left = orig_iov[index].iov_len;
+    }
+
+    while (1) {
+        if (left >= total) {
+            memset (buf, 0, total);
+            *p_buf = buf + total;
+            *p_left = left - total;
+            *p_index = index;
+            return count + 1;
+        }
+
+        memset (buf, 0, left);
+        total -= left;
+        index++;
+        buf = orig_iov[index].iov_base;
+        left = orig_iov[index].iov_len;
+        count++;
+    }
+}
diff --git a/block/fvd-write.c b/block/fvd-write.c
new file mode 100644
index 0000000..90350ce
--- /dev/null
+++ b/block/fvd-write.c
@@ -0,0 +1,449 @@ 
+/*
+ * Copyright (c) 2010-2011 IBM
+ *
+ * Authors:
+ *         Chunqiang Tang <ctang@us.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.
+ * See the COPYING file in the top-level directory.
+ */
+
+/*=============================================================================
+ *  A short description: this module implements bdrv_aio_writev() for FVD.
+ *===========================================================================*/
+
+static BlockDriverAIOCB *fvd_aio_writev (BlockDriverState * bs,
+                                         int64_t sector_num,
+                                         QEMUIOVector * qiov, int nb_sectors,
+                                         BlockDriverCompletionFunc * cb,
+                                         void *opaque)
+{
+    BDRVFvdState *s = bs->opaque;
+    FvdAIOCB *acb;
+
+    TRACE_REQUEST (TRUE, sector_num, nb_sectors);
+
+    if (!s->data_region_prepared) {
+        init_data_region (s);
+    }
+
+    if (s->prefetch_state == PREFETCH_STATE_FINISHED
+        || sector_num >= s->nb_sectors_in_base_img) {
+        /* This is an  efficient case. See Section 3.3.5 of the FVD-cow paper.
+         * This also covers the case of no base image. */
+        return store_data (FALSE, NULL, bs, sector_num, qiov,
+                           nb_sectors, cb, opaque);
+    }
+
+    /* Check if all requested sectors are in the FVD data file. */
+    int64_t sec = ROUND_DOWN (sector_num, s->block_size);
+    int64_t sec_in_last_block = ROUND_DOWN (sector_num + nb_sectors - 1,
+                                            s->block_size);
+    do {
+        if (stale_bitmap_show_sector_in_base_img (sec, s)) {
+            goto slow_path;
+        }
+        sec += s->block_size;
+    } while (sec <= sec_in_last_block);
+
+    /* This is the fast path, as all requested data are in the FVD data file
+     * and no need to update the bitmap. */
+    return store_data (FALSE, NULL, bs, sector_num, qiov,
+                       nb_sectors, cb, opaque);
+
+  slow_path:
+    acb = my_qemu_aio_get (&fvd_aio_pool, bs, cb, opaque);
+    if (!acb) {
+        return NULL;
+    }
+
+    acb->type = OP_WRITE;
+    acb->sector_num = sector_num;
+    acb->nb_sectors = nb_sectors;
+    acb->write.ret = 0;
+    acb->write.update_table = FALSE;
+    acb->write.qiov = qiov;
+    acb->write.hd_acb = NULL;
+    acb->write.cow_buf = NULL;
+    acb->copy_lock.next.le_prev = NULL;
+    acb->write.next_write_lock.le_prev = NULL;
+    acb->write.next_dependent_write.le_prev = NULL;
+    acb->jcb.iov.iov_base = NULL;
+    acb->jcb.hd_acb = NULL;
+    acb->jcb.next_wait_for_journal.le_prev = NULL;
+    QLIST_INIT (&acb->copy_lock.dependent_writes);
+
+    QDEBUG ("WRITE: acb%llu-%p  start  sector_num=%" PRId64 " nb_sectors=%d\n",
+            acb->uuid, acb, acb->sector_num, acb->nb_sectors);
+
+    if (do_aio_write (acb) < 0) {
+        my_qemu_aio_release (acb);
+        return NULL;
+    }
+#ifdef FVD_DEBUG
+    pending_local_writes++;
+#endif
+    return &acb->common;
+}
+
+static void fvd_write_cancel (FvdAIOCB * acb)
+{
+    if (acb->write.hd_acb) {
+        bdrv_aio_cancel (acb->write.hd_acb);
+    }
+    if (acb->jcb.hd_acb) {
+        bdrv_aio_cancel (acb->jcb.hd_acb);
+        free_journal_sectors (acb->common.bs->opaque);
+    }
+    if (acb->jcb.next_wait_for_journal.le_prev) {
+        QLIST_REMOVE (acb, jcb.next_wait_for_journal);
+    }
+    if (acb->write.next_dependent_write.le_prev) {
+        QLIST_REMOVE (acb, write.next_dependent_write);
+    }
+    free_write_resource (acb);
+}
+
+static void free_write_resource (FvdAIOCB * acb)
+{
+    if (acb->write.next_write_lock.le_prev) {
+        QLIST_REMOVE (acb, write.next_write_lock);
+    }
+    if (acb->copy_lock.next.le_prev) {
+        QLIST_REMOVE (acb, copy_lock.next);
+        restart_dependent_writes (acb);
+    }
+    if (acb->write.cow_buf) {
+        my_qemu_vfree (acb->write.cow_buf);
+    }
+    if (acb->jcb.iov.iov_base != NULL) {
+        my_qemu_vfree (acb->jcb.iov.iov_base);
+    }
+
+    my_qemu_aio_release (acb);
+
+#ifdef FVD_DEBUG
+    pending_local_writes--;
+#endif
+}
+
+static inline void finish_write (FvdAIOCB * acb, int ret)
+{
+    QDEBUG ("WRITE: acb%llu-%p  completely_finished ret=%d\n", acb->uuid, acb,
+            ret);
+    acb->common.cb (acb->common.opaque, ret);
+    free_write_resource (acb);
+}
+
+static void finish_write_data (void *opaque, int ret)
+{
+    FvdAIOCB *acb = opaque;
+    BlockDriverState *bs = acb->common.bs;
+    BDRVFvdState *s = bs->opaque;
+
+    acb->write.ret = ret;
+    acb->write.hd_acb = NULL;
+
+    if (ret != 0) {
+        QDEBUG ("WRITE: acb%llu-%p  finish_write_data error ret=%d\n",
+                acb->uuid, acb, ret);
+        finish_write (acb, ret);
+        return;
+    }
+
+    QDEBUG ("WRITE: acb%llu-%p  finish_write_data\n", acb->uuid, acb);
+
+    /* Figure out whether to update metadata or not. */
+    if (s->fresh_bitmap == s->stale_bitmap) {
+        /* This is the case if neither copy_on_read nor prefetching is
+         * enabled. Cannot update fresh_bitmap until the on-disk metadata is
+         * updated. */
+        if (acb->write.update_table || stale_bitmap_need_update (acb)) {
+            /* Cannot release lock on data now since fresh_bitmap has not been
+             * updated. Otherwise, a copy-on-write or copy-on-read operation
+             * may use data from the backing image to overwrite the data just
+             * been written. */
+            write_metadata_to_journal (acb);
+        } else {
+            finish_write (acb, ret);        /* No need to update metadata. */
+        }
+        return;
+    }
+
+    /* stale_bitmap and fresh_bitmap are different. Now we can update
+     * fresh_bitmap. stale_bitmap will be updated after the on-disk metadata
+     * are updated. */
+    int update_stale_bitmap = update_fresh_bitmap_and_check_stale_bitmap (acb);
+
+    if (acb->write.update_table || update_stale_bitmap) {
+        /* Release lock on data now since fresh_bitmap has been updated. */
+        QLIST_REMOVE (acb, write.next_write_lock);
+        acb->write.next_write_lock.le_prev = NULL;
+        if (acb->copy_lock.next.le_prev) {
+            QLIST_REMOVE (acb, copy_lock.next);
+            restart_dependent_writes (acb);
+        }
+
+        write_metadata_to_journal (acb);
+    } else {
+        finish_write (acb, ret);
+    }
+}
+
+static void finish_read_backing_for_copy_on_write (void *opaque, int ret)
+{
+    FvdAIOCB *acb = (FvdAIOCB *) opaque;
+    BlockDriverState *bs = acb->common.bs;
+
+    if (ret != 0) {
+        QDEBUG ("WRITE: acb%llu-%p  finish_read_from_backing with error "
+                "ret=%d\n", acb->uuid, acb, ret);
+        finish_write (acb, ret);
+    } else {
+        QDEBUG ("WRITE: acb%llu-%p  "
+                "finish_read_from_backing_and_start_write_data\n",
+                acb->uuid, acb);
+        acb->write.hd_acb = store_data (FALSE, acb, bs,
+                                        acb->write.cow_start_sector,
+                                        acb->write.cow_qiov,
+                                        acb->write.cow_qiov->size / 512,
+                                        finish_write_data, acb);
+        if (!acb->write.hd_acb) {
+            finish_write (acb, -1);
+        }
+    }
+}
+
+static int do_aio_write (FvdAIOCB * acb)
+{
+    BlockDriverState *bs = acb->common.bs;
+    BDRVFvdState *s = bs->opaque;
+
+    /* Calculate the data region need be locked. */
+    const int64_t sector_end = acb->sector_num + acb->nb_sectors;
+    const int64_t block_begin = ROUND_DOWN (acb->sector_num, s->block_size);
+    int64_t block_end = ROUND_UP (sector_end, s->block_size);
+
+    /* Check for conflicting copy-on-reads. */
+    FvdAIOCB *old;
+    QLIST_FOREACH (old, &s->copy_locks, copy_lock.next) {
+        if (old->copy_lock.end > acb->sector_num &&
+            sector_end > old->copy_lock.begin) {
+            QLIST_INSERT_HEAD (&old->copy_lock.dependent_writes, acb,
+                               write.next_dependent_write);
+            QDEBUG ("WRITE: acb%llu-%p  put_on_hold_due_to_data_conflict "
+                    "with %s acb%llu-%p\n", acb->uuid, acb,
+                    old->type == OP_WRITE ? "write" : "copy_on_read",
+                    old->uuid, old);
+            return 0;
+        }
+    }
+
+    /* No conflict. Now check if this write updates partial blocks and hence
+     * need to read those blocks from the base image and merge with this
+     * write. */
+    int read_first_block, read_last_block;
+    if (acb->sector_num % s->block_size == 0) {
+        read_first_block = FALSE;
+    } else
+        if (fresh_bitmap_show_sector_in_base_img (acb->sector_num, s)) {
+        read_first_block = TRUE;
+    } else {
+        read_first_block = FALSE;
+    }
+
+    if (sector_end % s->block_size == 0) {
+        read_last_block = FALSE;
+    } else if (fresh_bitmap_show_sector_in_base_img (sector_end - 1, s)) {
+        read_last_block = TRUE;
+    } else {
+        read_last_block = FALSE;
+    }
+
+    if (read_first_block) {
+        if (read_last_block) {
+            /* Case 1: Read all the blocks involved from the base image. */
+            const QEMUIOVector *old_qiov = acb->write.qiov;
+            if (block_end > s->nb_sectors_in_base_img) {
+                block_end = s->nb_sectors_in_base_img;
+            }
+
+            int buf_size = (block_end - block_begin) * 512
+                    + 2 * sizeof (QEMUIOVector)
+                    + sizeof (struct iovec) * (old_qiov->niov + 3);
+            buf_size = ROUND_UP (buf_size, 512);
+            acb->write.cow_buf = my_qemu_blockalign (bs->backing_hd, buf_size);
+
+            /* For reading from the base image. */
+            QEMUIOVector *read_qiov = (QEMUIOVector *) (acb->write.cow_buf +
+                                  (block_end - block_begin) * 512);
+            read_qiov->iov = (struct iovec *) (read_qiov + 1);
+            read_qiov->nalloc = -1;
+            read_qiov->niov = 1;
+            read_qiov->iov[0].iov_base = acb->write.cow_buf;
+            read_qiov->iov[0].iov_len = read_qiov->size =
+                (block_end - block_begin) * 512;
+
+            /* For writing to the FVD data file. */
+            QEMUIOVector *write_qiov = (QEMUIOVector *) (read_qiov->iov + 1);
+            write_qiov->iov = (struct iovec *) (write_qiov + 1);
+            write_qiov->nalloc = -1;
+            write_qiov->niov = old_qiov->niov + 2;
+            write_qiov->size = read_qiov->size;
+
+            /* The first entry is for data read from the base image. */
+            write_qiov->iov[0].iov_base = acb->write.cow_buf;
+            write_qiov->iov[0].iov_len = (acb->sector_num - block_begin) * 512;
+            memcpy (&write_qiov->iov[1], old_qiov->iov,
+                    sizeof (struct iovec) * old_qiov->niov);
+
+            /* The last entry is for data read from the base image. */
+            write_qiov->iov[old_qiov->niov + 1].iov_base = acb->write.cow_buf
+                                            + (sector_end - block_begin) * 512;
+            write_qiov->iov[old_qiov->niov + 1].iov_len =
+                                                (block_end - sector_end) * 512;
+            acb->write.cow_qiov = write_qiov;
+            acb->write.cow_start_sector = block_begin;
+
+            acb->write.hd_acb = bdrv_aio_readv (bs->backing_hd, block_begin,
+                                    read_qiov, block_end - block_begin,
+                                    finish_read_backing_for_copy_on_write, acb);
+            if (!acb->write.hd_acb) {
+                goto fail;
+            }
+
+            acb->copy_lock.begin = block_begin;
+            acb->copy_lock.end = block_end;
+            QLIST_INSERT_HEAD (&s->copy_locks, acb, copy_lock.next);
+            QDEBUG ("WRITE: acb%llu-%p  "
+                    "read_first_last_partial_blocks_from_backing  sector_num=%"
+                    PRId64 " nb_sectors=%d\n", acb->uuid, acb, block_begin,
+                    (int) (block_end - block_begin));
+        } else {
+            /* Case 2: Read the first block from the base image. */
+            int nb = acb->sector_num - block_begin;
+            const QEMUIOVector *old_qiov = acb->write.qiov;
+
+            /* Space for data and metadata. */
+            int buf_size = nb * 512 + 2 * sizeof (QEMUIOVector)
+                                + sizeof (struct iovec) * (old_qiov->niov + 2);
+            buf_size = ROUND_UP (buf_size, 512);
+            acb->write.cow_buf = my_qemu_blockalign (bs->backing_hd, buf_size);
+
+            /* For reading from the base image. */
+            QEMUIOVector *read_qiov =
+                (QEMUIOVector *) (acb->write.cow_buf + nb * 512);
+            read_qiov->iov = (struct iovec *) (read_qiov + 1);
+            read_qiov->nalloc = -1;
+            read_qiov->niov = 1;
+            read_qiov->iov[0].iov_base = acb->write.cow_buf;
+            read_qiov->iov[0].iov_len = read_qiov->size = nb * 512;
+
+            /* For writing to the FVD data file. */
+            QEMUIOVector *write_qiov = (QEMUIOVector *) (read_qiov->iov + 1);
+            write_qiov->iov = (struct iovec *) (write_qiov + 1);
+            write_qiov->nalloc = -1;
+            write_qiov->niov = old_qiov->niov + 1;
+            write_qiov->size = old_qiov->size + read_qiov->size;
+
+            /* The first entry is added for data read from the base image. */
+            write_qiov->iov[0].iov_base = acb->write.cow_buf;
+            write_qiov->iov[0].iov_len = read_qiov->size;
+            memcpy (&write_qiov->iov[1], old_qiov->iov,
+                    sizeof (struct iovec) * old_qiov->niov);
+            acb->write.cow_qiov = write_qiov;
+            acb->write.cow_start_sector = block_begin;
+
+            acb->write.hd_acb = bdrv_aio_readv (bs->backing_hd,
+                                    block_begin, read_qiov, nb,
+                                    finish_read_backing_for_copy_on_write, acb);
+            if (!acb->write.hd_acb) {
+                goto fail;
+            }
+
+            acb->copy_lock.begin = block_begin;
+            acb->copy_lock.end = block_begin + s->block_size;
+            QLIST_INSERT_HEAD (&s->copy_locks, acb, copy_lock.next);
+            QDEBUG ("WRITE: acb%llu-%p  read_first_partial_block_from_backing  "
+                    "sector_num=%" PRId64 " nb_sectors=%d\n",
+                    acb->uuid, acb, block_begin, nb);
+        }
+    } else {
+        if (read_last_block) {
+            /* Case 3: Read the last block from the base image. */
+            int nb;
+            if (block_end < s->nb_sectors_in_base_img) {
+                nb = block_end - sector_end;
+            }
+            else {
+                nb = s->nb_sectors_in_base_img - sector_end;
+            }
+            const QEMUIOVector *old_qiov = acb->write.qiov;
+
+            /* Space for data and metadata. */
+            int buf_size = nb * 512 + 2 * sizeof (QEMUIOVector)
+                                + sizeof (struct iovec) * (old_qiov->niov + 2);
+            buf_size = ROUND_UP (buf_size, 512);
+            acb->write.cow_buf = my_qemu_blockalign (bs->backing_hd, buf_size);
+
+            /* For reading from the base image. */
+            QEMUIOVector *read_qiov = (QEMUIOVector *) (acb->write.cow_buf
+                                                        + nb * 512);
+            read_qiov->iov = (struct iovec *) (read_qiov + 1);
+            read_qiov->nalloc = -1;
+            read_qiov->niov = 1;
+            read_qiov->iov[0].iov_base = acb->write.cow_buf;
+            read_qiov->iov[0].iov_len = read_qiov->size = nb * 512;
+
+            /* For writing to the FVD data file. */
+            QEMUIOVector *write_qiov = (QEMUIOVector *) (read_qiov->iov + 1);
+            write_qiov->iov = (struct iovec *) (write_qiov + 1);
+            write_qiov->nalloc = -1;
+            write_qiov->niov = old_qiov->niov + 1;
+            write_qiov->size = old_qiov->size + read_qiov->size;
+            memcpy (write_qiov->iov, old_qiov->iov,
+                    sizeof (struct iovec) * old_qiov->niov);
+
+            /* The last appended entry is for data read from the base image. */
+            write_qiov->iov[old_qiov->niov].iov_base = acb->write.cow_buf;
+            write_qiov->iov[old_qiov->niov].iov_len = read_qiov->size;
+            acb->write.cow_qiov = write_qiov;
+            acb->write.cow_start_sector = acb->sector_num;
+
+            acb->write.hd_acb = bdrv_aio_readv (bs->backing_hd,
+                                    sector_end, read_qiov, nb,
+                                    finish_read_backing_for_copy_on_write, acb);
+            if (!acb->write.hd_acb) {
+                goto fail;
+            }
+
+            acb->copy_lock.end = block_end;
+            acb->copy_lock.begin = block_end - s->block_size;
+            QLIST_INSERT_HEAD (&s->copy_locks, acb, copy_lock.next);
+            QDEBUG ("WRITE: acb%llu-%p  read_last_partial_block_from_backing  "
+                    "sector_num=%" PRId64 " nb_sectors=%d\n",
+                    acb->uuid, acb, sector_end, nb);
+        } else {
+            /* Case 4: Can write directly and no need to merge with data from
+             * the base image. */
+            QDEBUG ("WRITE: acb%llu-%p  "
+                    "write_fvd_without_read_partial_block_from_backing\n",
+                    acb->uuid, acb);
+            acb->write.hd_acb = store_data (FALSE, acb, bs, acb->sector_num,
+                                            acb->write.qiov, acb->nb_sectors,
+                                            finish_write_data, acb);
+            if (!acb->write.hd_acb) {
+                goto fail;
+            }
+        }
+    }
+
+    QLIST_INSERT_HEAD (&s->write_locks, acb, write.next_write_lock);
+    return 0;
+
+  fail:
+    if (acb->write.cow_buf) {
+        my_qemu_vfree (acb->write.cow_buf);
+    }
+    return -1;
+}