Patchwork [4/5] Fast Virtual Disk (FVD) Proposal Part 4

login
register
mail settings
Submitter Chunqiang Tang
Date Jan. 19, 2011, 10:04 p.m.
Message ID <1295474688-6219-4-git-send-email-ctang@us.ibm.com>
Download mbox | patch
Permalink /patch/79611/
State New
Headers show

Comments

Chunqiang Tang - Jan. 19, 2011, 10:04 p.m.
Part 4 of the block device driver for the proposed FVD image format.
Multiple patches are used in order to manage the size of each patch.
This patch includes some new files for FVD.

See the related discussions at
http://lists.gnu.org/archive/html/qemu-devel/2011-01/msg00426.html .

Signed-off-by: Chunqiang Tang <ctang@us.ibm.com>
---
 block/fvd-journal.c  |  558 +++++++++++++++++++++++++++++++++++++++++++++
 block/fvd-load.c     |  364 +++++++++++++++++++++++++++++
 block/fvd-misc.c     |  616 ++++++++++++++++++++++++++++++++++++++++++++++++++
 block/fvd-open.c     |  446 ++++++++++++++++++++++++++++++++++++
 block/fvd-prefetch.c |  598 ++++++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 2582 insertions(+), 0 deletions(-)
 create mode 100644 block/fvd-journal.c
 create mode 100644 block/fvd-load.c
 create mode 100644 block/fvd-misc.c
 create mode 100644 block/fvd-open.c
 create mode 100644 block/fvd-prefetch.c

Patch

diff --git a/block/fvd-journal.c b/block/fvd-journal.c
new file mode 100644
index 0000000..7bd316a
--- /dev/null
+++ b/block/fvd-journal.c
@@ -0,0 +1,558 @@ 
+/*
+ * Copyright (c) 2010-2011 IBM
+ *
+ * Authors:
+ *         Chunqiang Tang <ctang@us.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.
+ * See the COPYING file in the top-level directory.
+ */
+
+/*=============================================================================
+ *  A short description: this FVD module implements a journal for committing
+ *  metadata changes. Each sector in the journal is self-contained so that
+ *  updates are atomic. A sector may contain one or multiple journal records.
+ *  There are two types of journal records:
+ * bitmap_update and table_update.
+ *   Format of a bitmap_update record:
+ *         + BITMAP_JRECORD (uint32_t)
+ *         + num_dirty_sectors (uint32_t)
+ *         + dirty_sector_begin (int64_t)
+ *   Format of a table_update record:
+ *         + TABLE_JRECORD (uint32_t)
+ *         + dirty_table_offset (uint32_t)
+ *         + num_dirty_table_entries (uint32_t)
+ *         +   table_entry_1 (uint32_t)
+ *         +   table_entry_2 (uint32_t)
+ *         +   ...
+ * If both the bitmap and the table need update, one sector contains a
+ * TABLE_JRECORD and a BITMAP_JRECORD, and these two records cover
+ * the same range of virtual disk data so that the corresponding parts of the
+ * bitmap and the table are always updated in one atomic operation.
+ *============================================================================*/
+
+#define BITMAP_JRECORD                 ((uint32_t)0x3F2AB8ED)
+#define TABLE_JRECORD                ((uint32_t)0xB4E6F7AC)
+#define EMPTY_JRECORD                ((uint32_t)0)
+#define BITMAP_JRECORD_SIZE         (2*sizeof(uint32_t) + sizeof(int64_t))
+#define TABLE_JRECORD_HDR_SIZE         (3*sizeof(uint32_t))
+#define TABLE_JRECORDS_PER_SECTOR \
+                ((512 - TABLE_JRECORD_HDR_SIZE)/sizeof(uint32_t))
+
+/* One BITMAP_JRECORD and this number of BITMAP_JRECORDs can fit
+ * in one journal sector. */
+#define MIXED_JRECORDS_PER_SECTOR ((512 - TABLE_JRECORD_HDR_SIZE - \
+                                BITMAP_JRECORD_SIZE) / sizeof(uint32_t))
+
+static inline int64_t calc_min_journal_size (int64_t table_entries)
+{
+    return (table_entries + MIXED_JRECORDS_PER_SECTOR - 1)
+                            / MIXED_JRECORDS_PER_SECTOR * 512;
+}
+
+static int init_journal (int read_only, BlockDriverState * bs,
+                         FvdHeader * header)
+{
+    /* A trick to figure out whether it is runningin a qemu tool. */
+    const int in_qemu_tool = (rt_clock == NULL);
+
+    BDRVFvdState *s = bs->opaque;
+    s->journal_size = header->journal_size / 512;
+    s->journal_offset = header->journal_offset / 512;
+    s->next_journal_sector = 0;
+
+    if (read_only) {
+        return 0;
+    }
+
+    if (s->journal_size <= 0) {
+        if (!s->table && !s->fresh_bitmap) {
+            return 0;        /* No need to use the journal. */
+        }
+
+        if (!header->clean_shutdown) {
+            fprintf (stderr, "ERROR: the image may be corrupted because it was "
+                     "not shut down gracefully last\ntime and it does not use "
+                     "a journal. You may continue to use the image at your\n"
+                     "own risk by manually resetting the clean_shutdown flag "
+                     "in the image.\n\n");
+            s->dirty_image = TRUE;
+            if (in_qemu_tool) {
+                return 0;        /* Allow qemu tools to use the image. */
+            } else {
+                /* Do not allow boot the VM until the clean_shutdown flag is
+                 * manually cleaned. */
+                return -1;
+            }
+        }
+
+        QDEBUG ("Journal is disabled\n");
+        return 0;
+    }
+
+    if (header->clean_shutdown) {
+        QDEBUG ("Journal is skipped as the VM was shut down gracefully "
+                "last time.\n");
+        return 0;
+    }
+
+    QDEBUG ("Recover from the journal as the VM was not shut down gracefully "
+            "last time.\n");
+
+    uint8_t *journal = my_qemu_blockalign (s->fvd_metadata,
+                                           s->journal_size * 512);
+    int ret = bdrv_read (s->fvd_metadata, s->journal_offset,
+                         journal, s->journal_size);
+    if (ret < 0) {
+        my_qemu_vfree (journal);
+        fprintf (stderr, "Failed to read the journal (%" PRId64 ") bytes\n",
+                 s->journal_size * 512);
+        return -1;
+    }
+
+    /* Go through every journal sector. */
+    uint8_t *sector = journal;
+    uint8_t *journal_end = journal + s->journal_size * 512;
+    while (sector < journal_end) {
+        uint32_t *type = (uint32_t *) sector;        /* Journal record type. */
+        while ((uint8_t *) type < (sector + 512)) {
+            if (le32_to_cpu (*type) == BITMAP_JRECORD) {
+                uint32_t *nb_sectors = type + 1; /* BITMAP_JRECORD field 2. */
+                int64_t *sector_num = (int64_t *) (type + 2);        /* field 3. */
+                if (s->stale_bitmap) {
+                    update_both_bitmaps (s, le64_to_cpu (*sector_num),
+                                     le32_to_cpu (*nb_sectors));
+                    QDEBUG ("JOURNAL: recover BITMAP_JRECORD sector_num=%"
+                            PRId64 " nb_sectors=%u\n",
+                            le64_to_cpu (*sector_num),
+                            le32_to_cpu (*nb_sectors));
+                }
+
+                /* First field of the next journal record. */
+                type = (uint32_t *) sector_num + 1;
+            } else if (le32_to_cpu (*type) == TABLE_JRECORD) {
+                uint32_t *offset = type + 1;        /* TABLE_JRECORD field 2. */
+                uint32_t *count = type + 2;        /* TABLE_JRECORD field 3. */
+                uint32_t *content = type + 3;        /* fields 4 and beyond. */
+                const uint32_t chunk = le32_to_cpu (*offset);
+                const uint32_t n = le32_to_cpu (*count);
+                uint32_t i;
+                for (i = 0; i < n; i++) {
+                    s->table[chunk + i] = content[i];
+
+                    /* The dirty bit was not cleaned when the table entry was
+                     * saved in the journal. */
+                    CLEAN_DIRTY2 (s->table[chunk + i]);
+                }
+                type = content + n; /* First field of the next record. */
+                QDEBUG ("JOURNAL: recover TABLE_JRECORD chunk_start=%u "
+                        "nb_chunks=%u\n", chunk, n);
+            } else {
+                /* End of valid records in this journal sector. */
+                ASSERT (le32_to_cpu (*type) == EMPTY_JRECORD);
+                break;
+            }
+        }
+
+        sector += 512;
+    }
+    my_qemu_vfree (journal);
+    flush_metadata_to_disk (bs);        /* Write the recovered metadata. */
+
+    return 0;
+}
+
+/*
+ * This function first flushes in-memory metadata to disk and then recycle the
+ * used journal sectors. It is possible to make this operation asynchronous so
+ * that the performance is better.  However, the overall performance
+ * improvement may be limited since recycling the journal happens very
+ * infrequently and updating on-disk metadata finishes quickly because of the
+ * small size of the metadata.
+ */
+static void recycle_journal (BDRVFvdState * s)
+{
+#ifdef FVD_DEBUG
+    static int64_t recycle_count = 0;
+    QDEBUG ("JOURNAL: start journal recycle %" PRId64 ".\n", recycle_count);
+    recycle_count++;
+    int64_t begin_time = qemu_get_clock (rt_clock);
+#endif
+
+    /* Write fresh_bitmap to disk. */
+    if (s->fresh_bitmap) {
+        int nb = (int) (s->bitmap_size / 512);
+        QDEBUG ("JOURNAL: flush bitmap (%d sectors) to disk\n", nb);
+
+        /* How to recover if this write fails? */
+        bdrv_write (s->fvd_metadata, s->bitmap_offset, s->fresh_bitmap, nb);
+
+        if (s->fresh_bitmap != s->stale_bitmap) {
+            memcpy (s->stale_bitmap, s->fresh_bitmap, s->bitmap_size);
+        }
+    }
+
+    /* Clean DIRTY_TABLE bit and write the table to disk. */
+    if (s->table) {
+        int table_entries =
+            (int) (ROUND_UP (s->virtual_disk_size, s->chunk_size * 512) /
+                   (s->chunk_size * 512));
+        int i;
+        for (i = 0; i < table_entries; i++) {
+            CLEAN_DIRTY (s->table[i]);
+        }
+
+        int64_t table_size = sizeof (uint32_t) * table_entries;
+        table_size = ROUND_UP (table_size, DEF_PAGE_SIZE);
+        int nb = (int) (table_size / 512);
+        QDEBUG ("JOURNAL: flush table (%d sectors) to disk\n", nb);
+
+        /* How to recover if this write fails? */
+        bdrv_write (s->fvd_metadata, s->table_offset, (uint8_t *) s->table, nb);
+    }
+    s->next_journal_sector = 0;
+
+#ifdef FVD_DEBUG
+    int64_t end_time = qemu_get_clock (rt_clock);
+    QDEBUG ("JOURNAL: journal recycle took %" PRId64 " ms.\n",
+            (end_time - begin_time));
+#endif
+}
+
+static void free_journal_sectors (BDRVFvdState * s)
+{
+    if (s->journal_size <= 0) {
+        return;
+    }
+
+    s->ongoing_journal_updates--;
+    ASSERT (s->ongoing_journal_updates >= 0);
+    if (s->ongoing_journal_updates > 0 || QLIST_EMPTY (&s->wait_for_journal)) {
+        return;
+    }
+
+    /* Some requests are waiting for the journal to be recycled in order to
+     * get free journal sectors. */
+    recycle_journal (s);
+
+    /* Restart requests in the wait_for_journal list.  First make a copy of
+     * the head and then empty the head. */
+    FvdAIOCB *acb = QLIST_FIRST (&s->wait_for_journal);
+    QLIST_INIT (&s->wait_for_journal);
+    FvdAIOCB *next;
+
+    /* Restart all dependent requests. Cannot use QLIST_FOREACH here, because
+     * the next link might not be the same any more after the callback. */
+    while (acb) {
+        next = acb->jcb.next_wait_for_journal.le_next;
+        acb->jcb.next_wait_for_journal.le_prev = NULL;
+        QDEBUG ("WRITE: acb%llu-%p  restart_write_metadata_to_journal "
+                "after recycle_journal\n", acb->uuid, acb);
+        write_metadata_to_journal (acb);
+        acb = next;
+    }
+}
+
+static int64_t allocate_journal_sectors (BDRVFvdState * s, FvdAIOCB * acb,
+                                         int num_sectors)
+{
+    ASSERT (num_sectors <= s->journal_size);
+
+    if (!QLIST_EMPTY (&s->wait_for_journal)) {
+        /* Waiting for journal recycle to finish. */
+        ASSERT (s->ongoing_journal_updates > 0);
+        QDEBUG ("WRITE: acb%llu-%p  wait_for_journal_recycle\n",
+                acb->uuid, acb);
+        QLIST_INSERT_HEAD (&s->wait_for_journal, acb,
+                           jcb.next_wait_for_journal);
+        return -1;
+    }
+
+    int64_t journal_sec;
+    if (s->next_journal_sector + num_sectors <= s->journal_size) {
+      alloc_sector:
+        journal_sec = s->next_journal_sector;
+        s->next_journal_sector += num_sectors;
+        s->ongoing_journal_updates++;
+        return journal_sec;
+    }
+
+    /* No free journal sector is available. Check if the journal can be
+     * recycled now. */
+    if (s->ongoing_journal_updates == 0) {
+        recycle_journal (s);
+        goto alloc_sector;
+    }
+
+    /* Waiting for journal recycle to finish. It will be waken up later in
+     * free_journal_sectors(). */
+    QLIST_INSERT_HEAD (&s->wait_for_journal, acb, jcb.next_wait_for_journal);
+    QDEBUG ("WRITE: acb%llu-%p  wait_for_journal_recycle\n", acb->uuid, acb);
+    return -1;
+}
+
+static void finish_write_journal (void *opaque, int ret)
+{
+    FvdAIOCB *acb = (FvdAIOCB *) opaque;
+    BlockDriverState *bs = acb->common.bs;
+    BDRVFvdState *s = bs->opaque;
+
+    if (ret == 0) {
+        QDEBUG ("JOURNAL: acb%llu-%p  finish_write_journal\n", acb->uuid, acb);
+
+        if (s->table) {
+            /* Update the table. */
+            int i;
+            const uint32_t first_chunk = acb->sector_num / s->chunk_size;
+            const uint32_t last_chunk = (acb->sector_num + acb->nb_sectors - 1)
+                                                            / s->chunk_size;
+            for (i = first_chunk; i <= last_chunk; i++) {
+                CLEAN_DIRTY2 (s->table[i]);
+            }
+        }
+
+        if (s->stale_bitmap) {
+            /* If fresh_bitmap differs from stale_bitmap, fresh_bitmap has
+             * already been updated in finish_write_data() when invoking
+             * update_fresh_bitmap_and_check_stale_bitmap(). */
+            update_stale_bitmap (s, acb->sector_num, acb->nb_sectors);
+        }
+    } else {
+        QDEBUG ("JOURNAL: acb%llu-%p  finish_write_journal error ret=%d\n",
+                acb->uuid, acb, ret);
+    }
+
+    /* Clean up. */
+    if (acb->type == OP_STORE_COMPACT) {
+        acb->common.cb (acb->common.opaque, ret);
+        if (acb->jcb.iov.iov_base != NULL) {
+            my_qemu_vfree (acb->jcb.iov.iov_base);
+        }
+        my_qemu_aio_release (acb);
+    } else {
+        ASSERT (acb->type == OP_WRITE);
+        finish_write (acb, ret);
+    }
+
+    free_journal_sectors (s);
+}
+
+static void write_metadata_to_journal (FvdAIOCB * acb)
+{
+    BlockDriverState *bs = acb->common.bs;
+    BDRVFvdState *s = bs->opaque;
+    int64_t journal_sec;
+    int num_journal_sectors;
+
+    ASSERT ((s->table || s->fresh_bitmap)
+            && (acb->type == OP_WRITE || acb->type == OP_STORE_COMPACT));
+
+    /* Is journal is disabled? */
+    if (s->journal_size <= 0) {
+        finish_write_journal (acb, 0);
+        return;
+    }
+
+    if (!s->table) {
+        /* Only update the bitmap. */
+        num_journal_sectors = 1;
+        journal_sec = allocate_journal_sectors (s, acb, num_journal_sectors);
+        if (journal_sec < 0) {
+            /* No journal sector is available now. It will be waken up later
+             * in free_journal_sectors(). */
+            return;
+        }
+        acb->jcb.iov.iov_len = 512;
+        acb->jcb.iov.iov_base = my_qemu_blockalign (s->fvd_metadata, 512);
+
+        uint32_t *type = (uint32_t *) acb->jcb.iov.iov_base; /* Field 1. */
+        uint32_t *nb_sectors = type + 1;        /* BITMAP_JRECORD field 2. */
+        int64_t *sector_num = (int64_t *) (type + 2);        /* field 3. */
+        *type = cpu_to_le32 (BITMAP_JRECORD);
+        *nb_sectors = cpu_to_le32 ((uint32_t) acb->nb_sectors);
+        *sector_num = cpu_to_le64 (acb->sector_num);
+        *((uint32_t *) (sector_num + 1)) = EMPTY_JRECORD;/* Mark record end. */
+
+    } else if (!s->fresh_bitmap) {
+        /* Only update the table. */
+        const int64_t first_chunk = acb->sector_num / s->chunk_size;
+        const int64_t last_chunk = (acb->sector_num + acb->nb_sectors - 1)
+                                                            / s->chunk_size;
+        int num_chunks = last_chunk - first_chunk + 1;
+        num_journal_sectors = (num_chunks + TABLE_JRECORDS_PER_SECTOR - 1)
+                                                / TABLE_JRECORDS_PER_SECTOR;
+        journal_sec = allocate_journal_sectors (s, acb, num_journal_sectors);
+        if (journal_sec < 0) {
+            /* No journal sector is available now. It will be waken up later
+             * in free_journal_sectors(). */
+            return;
+        }
+
+        acb->jcb.iov.iov_len = num_journal_sectors * 512;
+        acb->jcb.iov.iov_base = my_qemu_blockalign (s->fvd_metadata,
+                                                    acb->jcb.iov.iov_len);
+
+        uint32_t *type = (uint32_t *) acb->jcb.iov.iov_base; /* Field 1. */
+        int64_t chunk = first_chunk;
+
+        while (1) {
+            /* Start a new journal sector. */
+            uint32_t *offset = type + 1;        /* TABLE_JRECORD field 2. */
+            uint32_t *count = type + 2;        /* TABLE_JRECORD field 3. */
+            uint32_t *content = type + 3;        /* Fields 4 and beyond. */
+            *type = cpu_to_le32 (TABLE_JRECORD);
+            *offset = cpu_to_le32 (chunk);
+
+            if (num_chunks <= TABLE_JRECORDS_PER_SECTOR) {
+                /* This is the last journal sector. */
+                *count = cpu_to_le32 (num_chunks);
+                memcpy (content, &s->table[chunk],
+                        sizeof (uint32_t) * num_chunks);
+                if (num_chunks < TABLE_JRECORDS_PER_SECTOR) {
+                    *(content + num_chunks) = EMPTY_JRECORD; /* Mark end. */
+                }
+                break;
+            }
+
+            *count = cpu_to_le32 (TABLE_JRECORDS_PER_SECTOR);
+            memcpy (content, &s->table[chunk],
+                    sizeof (uint32_t) * TABLE_JRECORDS_PER_SECTOR);
+            chunk += TABLE_JRECORDS_PER_SECTOR;
+            num_chunks -= TABLE_JRECORDS_PER_SECTOR;
+
+            /* Next TABLE_JRECORD field 1. */
+            type = content + TABLE_JRECORDS_PER_SECTOR;
+        }
+    } else {
+        /* Update both the table and the bitmap. It may use multiple journal
+         * sectors. Each sector is self-contained, including a TABLE_JRECORD
+         * and a BITMAP_JRECORD. The two records one the same sector cover the
+         * same range of virtual disk data.  The purpose is to update the
+         * corresponding parts of the bitmap and the table in one atomic
+         * operation. */
+        const int64_t first_chunk = acb->sector_num / s->chunk_size;
+        const int64_t last_chunk = (acb->sector_num + acb->nb_sectors - 1)
+                                                / s->chunk_size;
+        int num_chunks = last_chunk - first_chunk + 1;
+        num_journal_sectors = (num_chunks + MIXED_JRECORDS_PER_SECTOR - 1)
+                                                / MIXED_JRECORDS_PER_SECTOR;
+        journal_sec = allocate_journal_sectors (s, acb, num_journal_sectors);
+        if (journal_sec < 0) {
+            /* No journal sector is available now. It will be waken up later
+             * in free_journal_sectors(). */
+            return;
+        }
+        acb->jcb.iov.iov_len = num_journal_sectors * 512;
+        acb->jcb.iov.iov_base = my_qemu_blockalign (s->fvd_metadata,
+                                                    acb->jcb.iov.iov_len);
+
+        uint32_t *type = (uint32_t *) acb->jcb.iov.iov_base; /* Field 1. */
+        int64_t chunk = first_chunk;
+        int64_t sector_num = acb->sector_num;
+        uint32_t nb_sectors;
+        if (num_journal_sectors == 1) {
+            nb_sectors = acb->nb_sectors;
+        } else {
+            /* Number of sectors that fall into the first chunk. */
+            nb_sectors = (first_chunk + MIXED_JRECORDS_PER_SECTOR)
+                                    * s->chunk_size - acb->sector_num;
+        }
+
+        while (1) {
+            /* Start a new journal sector. */
+            uint32_t *offset = type + 1;        /* TABLE_JRECORD field 2. */
+            uint32_t *count = type + 2;                /* TABLE_JRECORD field 3. */
+            uint32_t *content = type + 3;         /* Fields 4 and beyond. */
+            *type = cpu_to_le32 (TABLE_JRECORD);
+            *offset = cpu_to_le32 (chunk);
+
+            if (num_chunks <= MIXED_JRECORDS_PER_SECTOR) {
+                /* This is the last journal sector. */
+                *count = cpu_to_le32 (num_chunks);
+                memcpy (content, &s->table[chunk],
+                        sizeof (uint32_t) * num_chunks);
+
+                /* A BITMAP_JRECORD follows a TABLE_JRECORD so that they are
+                 * updated in one atomic operatoin. */
+                type = content + num_chunks;        /* BITMAP_JRECORD field 1. */
+                uint32_t *p_nb_sectors = type + 1; /* BITMAP_JRECORD field 2. */
+                int64_t *p_sector_num = (int64_t *) (type + 2);        /* Field 3. */
+                *type = cpu_to_le32 (BITMAP_JRECORD);
+                *p_nb_sectors = cpu_to_le32 (nb_sectors);
+                *p_sector_num = cpu_to_le64 (sector_num);
+
+                if (num_chunks < MIXED_JRECORDS_PER_SECTOR) {
+                    *((uint32_t *) (p_sector_num + 1)) = EMPTY_JRECORD;        /*End*/
+                }
+                break;
+            }
+
+            *count = cpu_to_le32 (MIXED_JRECORDS_PER_SECTOR);
+            memcpy (content, &s->table[chunk],
+                    sizeof (uint32_t) * MIXED_JRECORDS_PER_SECTOR);
+
+            /* A BITMAP_JRECORD follows a TABLE_JRECORD so that they are
+             * updated in one atomic operatoin. */
+            type = content + MIXED_JRECORDS_PER_SECTOR;                /* Field 1. */
+            uint32_t *p_nb_sectors = type + 1;        /* BITMAP_JRECORD field 2. */
+            int64_t *p_sector_num = (int64_t *) (type + 2);        /* Field 3. */
+            *type = cpu_to_le32 (BITMAP_JRECORD);
+            *p_nb_sectors = cpu_to_le32 (nb_sectors);
+            *p_sector_num = cpu_to_le64 (sector_num);
+
+            /* Prepare for the next journal sector. */
+            type = (uint32_t *) (p_sector_num + 1);
+            chunk += MIXED_JRECORDS_PER_SECTOR;
+            sector_num = chunk * s->chunk_size;
+            num_chunks -= MIXED_JRECORDS_PER_SECTOR;
+            if (num_chunks <= MIXED_JRECORDS_PER_SECTOR) {
+                /* Data sectors covered by the last journal sector. */
+                nb_sectors = (acb->sector_num + acb->nb_sectors)
+                                            - chunk * s->chunk_size;
+            } else {
+                nb_sectors = s->chunk_size * MIXED_JRECORDS_PER_SECTOR;
+            }
+        }
+    }
+
+    QDEBUG ("JOURNAL: acb%llu-%p  write_metadata_to_journal journal_sec=%"
+            PRId64 " nb_journal_sectors=%d\n", acb->uuid, acb, journal_sec,
+            num_journal_sectors);
+    qemu_iovec_init_external (&acb->jcb.qiov, &acb->jcb.iov, 1);
+    acb->jcb.hd_acb = bdrv_aio_writev (s->fvd_metadata,
+                                       s->journal_offset + journal_sec,
+                                       &acb->jcb.qiov, num_journal_sectors,
+                                       finish_write_journal, acb);
+    if (!acb->jcb.hd_acb) {
+        finish_write_journal (acb, -1);
+    }
+}
+
+#ifdef FVD_DEBUG
+static int emulate_host_crash = TRUE;
+#else
+static int emulate_host_crash = FALSE;
+#endif
+
+static void flush_metadata_to_disk_on_exit (BlockDriverState *bs)
+{
+    BDRVFvdState *s = bs->opaque;
+
+    if (bs->read_only || !s->fvd_metadata) {
+        return;
+    }
+
+    /* If (emulate_host_crash==TRUE), do not flush metadata to disk
+     * so that it has to rely on journal for recovery. */
+    if (s->journal_size <= 0 || !emulate_host_crash) {
+        flush_metadata_to_disk (bs);
+        if (!s->dirty_image) {
+            update_clean_shutdown_flag (s, TRUE);
+        }
+    }
+}
+
+void fvd_enable_host_crash_test (void)
+{
+    emulate_host_crash = TRUE;
+}
diff --git a/block/fvd-load.c b/block/fvd-load.c
new file mode 100644
index 0000000..fd72e31
--- /dev/null
+++ b/block/fvd-load.c
@@ -0,0 +1,364 @@ 
+/*
+ * Copyright (c) 2010-2011 IBM
+ *
+ * Authors:
+ *         Chunqiang Tang <ctang@us.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.
+ * See the COPYING file in the top-level directory.
+ */
+
+/*=============================================================================
+ *  A short description: this FVD module implements loading data from a
+ *  compact image.
+ *============================================================================*/
+
+static void aio_wrapper_bh (void *opaque);
+static void finish_load_data_from_compact_image (void *opaque, int ret);
+static inline FvdAIOCB *init_load_acb (FvdAIOCB * parent_acb,
+                                       BlockDriverState * bs,
+                                       int64_t sector_num,
+                                       QEMUIOVector * orig_qiov, int nb_sectors,
+                                       BlockDriverCompletionFunc * cb,
+                                       void *opaque);
+
+static inline BlockDriverAIOCB *load_data (FvdAIOCB * parent_acb,
+                                           BlockDriverState * bs,
+                                           int64_t sector_num,
+                                           QEMUIOVector * orig_qiov,
+                                           int nb_sectors,
+                                           BlockDriverCompletionFunc * cb,
+                                           void *opaque)
+{
+    BDRVFvdState *s = bs->opaque;
+
+    if (!s->table) {
+        /* Load directly since it is not a compact image. */
+        return bdrv_aio_readv (s->fvd_data, s->data_offset + sector_num,
+                               orig_qiov, nb_sectors, cb, opaque);
+    } else {
+        return load_data_from_compact_image (NULL, parent_acb, bs, sector_num,
+                                             orig_qiov, nb_sectors, cb, opaque);
+    }
+}
+
+static BlockDriverAIOCB *
+load_data_from_compact_image (FvdAIOCB * acb, FvdAIOCB * parent_acb,
+                              BlockDriverState * bs, int64_t sector_num,
+                              QEMUIOVector * orig_qiov, int nb_sectors,
+                              BlockDriverCompletionFunc * cb, void *opaque)
+{
+    BDRVFvdState *s = bs->opaque;
+    const uint32_t first_chunk = sector_num / s->chunk_size;
+    const uint32_t last_chunk = (sector_num + nb_sectors - 1) / s->chunk_size;
+    uint32_t chunk;
+    int64_t start_sec;
+    int i;
+
+    if (first_chunk == last_chunk) {
+        goto handle_one_continuous_region;
+    }
+
+    /* Count the number of qiov and iov needed to cover the continuous regions
+     * of the compact image. */
+    int iov_index = 0;
+    size_t iov_left = orig_qiov->iov[0].iov_len;
+    uint8_t *iov_buf = orig_qiov->iov[0].iov_base;
+    int nqiov = 0;
+    int nziov = 0;        /* Number of empty regions. */
+    int niov = 0;
+    uint32_t prev = READ_TABLE2 (s->table[first_chunk]);
+
+    /* Amount of data in the first chunk. */
+    int nb = s->chunk_size - (sector_num % s->chunk_size);
+
+    for (chunk = first_chunk + 1; chunk <= last_chunk; chunk++) {
+        uint32_t current = READ_TABLE2 (s->table[chunk]);
+        int64_t data_size;
+        if (chunk < last_chunk) {
+            data_size = s->chunk_size;
+        } else {
+            data_size = (sector_num + nb_sectors) % s->chunk_size;
+            if (data_size == 0) {
+                data_size = s->chunk_size;
+            }
+        }
+
+        if ((IS_EMPTY (current) && IS_EMPTY (prev)) ||
+            (!IS_EMPTY (prev) && !IS_EMPTY (current) && current == prev + 1)) {
+            nb += data_size;        /* Belong to the previous continuous region. */
+        } else {
+            /* Terminate the previous continuous region. */
+            if (IS_EMPTY (prev)) {
+                /* Skip this empty region. */
+                count_iov (orig_qiov->iov, &iov_index, &iov_buf,
+                           &iov_left, nb * 512);
+                nziov++;
+            } else {
+                niov += count_iov (orig_qiov->iov, &iov_index, &iov_buf,
+                                   &iov_left, nb * 512);
+                nqiov++;
+            }
+            nb = data_size;        /* Data in the new region. */
+        }
+        prev = current;
+    }
+
+    if (nqiov == 0 && nziov == 0) {
+        /* All data can be read in one qiov. Reuse orig_qiov. */
+      handle_one_continuous_region:
+        if (IS_EMPTY (s->table[first_chunk])) {
+            /* Fill qiov with zeros. */
+            for (i = 0; i < orig_qiov->niov; i++) {
+                memset (orig_qiov->iov[i].iov_base,
+                        0, orig_qiov->iov[i].iov_len);
+            }
+
+            /* Use a bh to invoke the callback. */
+            if (!acb) {
+                if (!(acb = my_qemu_aio_get (&fvd_aio_pool, bs, cb, opaque))) {
+                    return NULL;
+                }
+                COPY_UUID (acb, parent_acb);
+            }
+
+            QDEBUG ("LOAD: acb%llu-%p  load_fill_all_with_zeros\n",
+                    acb->uuid, acb);
+            acb->type = OP_WRAPPER;
+            acb->wrapper.bh = qemu_bh_new (aio_wrapper_bh, acb);
+            qemu_bh_schedule (acb->wrapper.bh);
+            return &acb->common;
+        }
+
+        /* A non-empty region. */
+        start_sec = READ_TABLE (s->table[first_chunk]) * s->chunk_size +
+                                    (sector_num % s->chunk_size);
+        if (!acb) {
+            if (parent_acb) {
+                QDEBUG ("LOAD: acb%llu-%p  "
+                        "load_directly_as_one_continuous_region\n",
+                        parent_acb->uuid, acb);
+            }
+            return bdrv_aio_readv (s->fvd_data, s->data_offset + start_sec,
+                                   orig_qiov, nb_sectors, cb, opaque);
+        }
+
+        QDEBUG ("LOAD: acb%llu-%p  load_directly_as_one_continuous_region\n",
+                acb->uuid, acb);
+        acb->load.num_children = 1;
+        acb->load.one_child.hd_acb =
+            bdrv_aio_readv (s->fvd_data, s->data_offset + start_sec, orig_qiov,
+                            nb_sectors, finish_load_data_from_compact_image,
+                            &acb->load.one_child);
+        if (acb->load.one_child.hd_acb) {
+            acb->load.one_child.acb = acb;
+            return &acb->common;
+        } else {
+            my_qemu_aio_release (acb);
+            return NULL;
+        }
+    }
+
+    /* qiov for the last continuous region. */
+    if (!IS_EMPTY (prev)) {
+        niov += count_iov (orig_qiov->iov, &iov_index, &iov_buf,
+                           &iov_left, nb * 512);
+        nqiov++;
+        ASSERT (iov_index == orig_qiov->niov - 1 && iov_left == 0);
+    }
+
+    /* Need to submit multiple requests to the lower layer. Initialize acb. */
+    if (!acb && !(acb = init_load_acb (parent_acb, bs, sector_num,
+                                       orig_qiov, nb_sectors, cb, opaque))) {
+        return NULL;
+    }
+    acb->load.num_children = nqiov;
+
+    /* Allocate memory and create multiple requests. */
+    acb->load.children = my_qemu_malloc ((sizeof (CompactChildCB) +
+                                          sizeof (QEMUIOVector)) * nqiov +
+                                         sizeof (struct iovec) * niov);
+    QEMUIOVector *q = (QEMUIOVector *) (acb->load.children + nqiov);
+    struct iovec *v = (struct iovec *) (q + nqiov);
+
+    /* Set up iov and qiov. */
+    nqiov = 0;
+    iov_index = 0;
+    iov_left = orig_qiov->iov[0].iov_len;
+    iov_buf = orig_qiov->iov[0].iov_base;
+    nb = s->chunk_size - (sector_num % s->chunk_size); /* Data in first chunk.*/
+    prev = READ_TABLE2 (s->table[first_chunk]);
+
+    /* if (IS_EMPTY(prev)), start_sec will not be used later, and hence safe. */
+    start_sec = prev * s->chunk_size + (sector_num % s->chunk_size);
+
+    for (chunk = first_chunk + 1; chunk <= last_chunk; chunk++) {
+        uint32_t current = READ_TABLE2 (s->table[chunk]);
+        int64_t data_size;
+        if (chunk < last_chunk) {
+            data_size = s->chunk_size;
+        } else {
+            data_size = (sector_num + nb_sectors) % s->chunk_size;
+            if (data_size == 0) {
+                data_size = s->chunk_size;
+            }
+        }
+
+        if ((IS_EMPTY (prev) && IS_EMPTY (current)) ||
+            (!IS_EMPTY (prev) && !IS_EMPTY (current) && current == prev + 1)) {
+            nb += data_size;        /* Continue the previous region. */
+        } else {
+            /* Terminate the previous continuous region. */
+            if (IS_EMPTY (prev)) {
+                zero_iov (orig_qiov->iov, &iov_index, &iov_buf, &iov_left,
+                          nb * 512);        /* Fill iov data with zeros. */
+            } else {
+                niov = setup_iov (orig_qiov->iov, v, &iov_index, &iov_buf,
+                                  &iov_left, nb * 512);
+                qemu_iovec_init_external (q, v, niov);
+                QDEBUG ("LOAD: acb%llu-%p  create_child %d sector_num=%" PRId64
+                        " nb_sectors=%d niov=%d\n", acb->uuid, acb, nqiov,
+                        start_sec, nb, niov);
+                acb->load.children[nqiov].hd_acb =
+                    bdrv_aio_readv (s->fvd_data, s->data_offset + start_sec, q,
+                                    nb, finish_load_data_from_compact_image,
+                                    &acb->load.children[nqiov]);
+                if (!acb->load.children[nqiov].hd_acb) {
+                    goto fail;
+                }
+                acb->load.children[nqiov].acb = acb;
+                v += niov;
+                q++;
+                nqiov++;
+            }
+
+            nb = data_size;
+
+            /* if (IS_EMPTY(current)), start_sec will not be used later. */
+            start_sec = current * s->chunk_size;
+        }
+        prev = current;
+    }
+
+    /* The last continuous region. */
+    if (IS_EMPTY (prev)) {
+        zero_iov (orig_qiov->iov, &iov_index, &iov_buf, &iov_left, nb * 512);
+    } else {
+        niov = setup_iov (orig_qiov->iov, v, &iov_index, &iov_buf,
+                          &iov_left, nb * 512);
+        qemu_iovec_init_external (q, v, niov);
+        QDEBUG ("LOAD: acb%llu-%p  create_child %d sector_num=%" PRId64
+                " nb_sectors=%d niov=%d\n", acb->uuid, acb, nqiov, start_sec,
+                nb, niov);
+        acb->load.children[nqiov].hd_acb =
+            bdrv_aio_readv (s->fvd_data, s->data_offset + start_sec, q, nb,
+                            finish_load_data_from_compact_image,
+                            &acb->load.children[nqiov]);
+        if (!acb->load.children[nqiov].hd_acb) {
+            goto fail;
+        }
+        acb->load.children[nqiov].acb = acb;
+    }
+    ASSERT (iov_index == orig_qiov->niov - 1 && iov_left == 0);
+
+    return &acb->common;
+
+  fail:
+    for (i = 0; i < nqiov; i++) {
+        bdrv_aio_cancel (acb->load.children[i].hd_acb);
+    }
+    my_qemu_free (acb->load.children);
+    my_qemu_aio_release (acb);
+    return NULL;
+}
+
+static void aio_wrapper_bh (void *opaque)
+{
+    FvdAIOCB *acb = opaque;
+    acb->common.cb (acb->common.opaque, 0);
+    qemu_bh_delete (acb->wrapper.bh);
+    my_qemu_aio_release (acb);
+}
+
+static void finish_load_data_from_compact_image (void *opaque, int ret)
+{
+    CompactChildCB *child = opaque;
+    FvdAIOCB *acb = child->acb;
+
+    /* Now fvd_store_compact_cancel(), if invoked, won't cancel this child
+     * request. */
+    child->hd_acb = NULL;
+
+    if (acb->load.ret == 0) {
+        acb->load.ret = ret;
+    } else {
+        QDEBUG ("LOAD: acb%llu-%p  load_child=%d total_children=%d "
+                "error ret=%d\n", acb->uuid, acb, acb->load.finished_children,
+                acb->load.num_children, ret);
+    }
+
+    acb->load.finished_children++;
+    if (acb->load.finished_children < acb->load.num_children) {
+        QDEBUG ("LOAD: acb%llu-%p  load_finished_children=%d "
+                "total_children=%d\n", acb->uuid, acb,
+                acb->load.finished_children, acb->load.num_children);
+        return;
+    }
+
+    QDEBUG ("LOAD: acb%llu-%p  load_last_child_finished ret=%d\n", acb->uuid,
+            acb, acb->load.ret);
+    acb->common.cb (acb->common.opaque, acb->load.ret);
+    if (acb->load.children) {
+        my_qemu_free (acb->load.children);
+    }
+    my_qemu_aio_release (acb);
+}
+
+static inline FvdAIOCB *init_load_acb (FvdAIOCB * parent_acb,
+                                       BlockDriverState * bs,
+                                       int64_t sector_num,
+                                       QEMUIOVector * orig_qiov,
+                                       int nb_sectors,
+                                       BlockDriverCompletionFunc * cb,
+                                       void *opaque)
+{
+    FvdAIOCB *const acb = my_qemu_aio_get (&fvd_aio_pool, bs, cb, opaque);
+    if (!acb) {
+        return NULL;
+    }
+    acb->type = OP_LOAD_COMPACT;
+    acb->sector_num = sector_num;
+    acb->nb_sectors = nb_sectors;
+    acb->load.parent_acb = parent_acb;
+    acb->load.finished_children = 0;
+    acb->load.children = NULL;
+    acb->load.one_child.hd_acb = NULL;
+    acb->load.orig_qiov = orig_qiov;
+    acb->load.ret = 0;
+    COPY_UUID (acb, parent_acb);
+    return acb;
+}
+
+static void fvd_wrapper_cancel (FvdAIOCB * acb)
+{
+    qemu_bh_cancel (acb->wrapper.bh);
+    qemu_bh_delete (acb->wrapper.bh);
+    my_qemu_aio_release (acb);
+}
+
+static void fvd_load_compact_cancel (FvdAIOCB * acb)
+{
+    if (acb->load.children) {
+        int i;
+        for (i = 0; i < acb->load.num_children; i++) {
+            if (acb->load.children[i].hd_acb) {
+                bdrv_aio_cancel (acb->load.children[i].hd_acb);
+            }
+        }
+        my_qemu_free (acb->load.children);
+    }
+    if (acb->load.one_child.hd_acb) {
+        bdrv_aio_cancel (acb->load.one_child.hd_acb);
+    }
+    my_qemu_aio_release (acb);
+}
diff --git a/block/fvd-misc.c b/block/fvd-misc.c
new file mode 100644
index 0000000..da184c8
--- /dev/null
+++ b/block/fvd-misc.c
@@ -0,0 +1,616 @@ 
+/*
+ * Copyright (c) 2010-2011 IBM
+ *
+ * Authors:
+ *         Chunqiang Tang <ctang@us.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.
+ * See the COPYING file in the top-level directory.
+ */
+
+/*=============================================================================
+ *  A short description: this module implements misc functions of the
+ *  BlockDriver interface for the Fast Virtual Disk (FVD) format.
+ *===========================================================================*/
+
+static void fvd_flush_cancel (FvdAIOCB * acb)
+{
+    if (acb->flush.data_acb) {
+        bdrv_aio_cancel (acb->flush.data_acb);
+    }
+    if (acb->flush.metadata_acb) {
+        bdrv_aio_cancel (acb->flush.metadata_acb);
+    }
+    my_qemu_aio_release (acb);
+}
+
+static void fvd_aio_cancel (BlockDriverAIOCB * blockacb)
+{
+    FvdAIOCB *acb = container_of (blockacb, FvdAIOCB, common);
+
+    QDEBUG ("CANCEL: acb%llu-%p\n", acb->uuid, acb);
+
+    switch (acb->type) {
+    case OP_READ:
+        fvd_read_cancel (acb);
+        break;
+
+    case OP_WRITE:
+        fvd_write_cancel (acb);
+        break;
+
+    case OP_COPY:
+        fvd_copy_cancel (acb);
+        break;
+
+    case OP_LOAD_COMPACT:
+        fvd_load_compact_cancel (acb);
+        break;
+
+    case OP_STORE_COMPACT:
+        fvd_store_compact_cancel (acb);
+        break;
+
+    case OP_WRAPPER:
+        fvd_wrapper_cancel (acb);
+        break;
+
+    case OP_FLUSH:
+        fvd_flush_cancel (acb);
+        break;
+    }
+}
+
+static inline void finish_flush (FvdAIOCB * acb)
+{
+    QDEBUG ("FLUSH: acb%llu-%p  finish_flush ret=%d\n",
+            acb->uuid, acb, acb->flush.ret);
+    acb->common.cb (acb->common.opaque, acb->flush.ret);
+    my_qemu_aio_release (acb);
+}
+
+static void finish_flush_data (void *opaque, int ret)
+{
+    FvdAIOCB *acb = opaque;
+
+    QDEBUG ("FLUSH: acb%llu-%p  finish_flush_data ret=%d\n",
+            acb->uuid, acb, ret);
+
+    if (acb->flush.ret == 0) {
+        acb->flush.ret = ret;
+    }
+
+    acb->flush.data_acb = NULL;
+    acb->flush.num_finished++;
+    if (acb->flush.num_finished == 2) {
+        finish_flush (acb);
+    }
+}
+
+static void finish_flush_metadata (void *opaque, int ret)
+{
+    FvdAIOCB *acb = opaque;
+
+    QDEBUG ("FLUSH: acb%llu-%p  finish_flush_metadata ret=%d\n",
+            acb->uuid, acb, ret);
+
+    if (acb->flush.ret == 0) {
+        acb->flush.ret = ret;
+    }
+
+    acb->flush.metadata_acb = NULL;
+    acb->flush.num_finished++;
+    if (acb->flush.num_finished == 2) {
+        finish_flush (acb);
+    }
+}
+
+static BlockDriverAIOCB *fvd_aio_flush (BlockDriverState * bs,
+                                BlockDriverCompletionFunc * cb, void *opaque)
+{
+    BDRVFvdState *s = bs->opaque;
+    if (s->fvd_data == s->fvd_metadata) {
+        return bdrv_aio_flush (s->fvd_metadata, cb, opaque);
+    }
+
+    FvdAIOCB *acb = my_qemu_aio_get (&fvd_aio_pool, bs, cb, opaque);
+    if (!acb) {
+        return NULL;
+    }
+
+    acb->type = OP_FLUSH;
+    acb->flush.num_finished = 0;
+    acb->flush.ret = 0;
+    acb->flush.data_acb = bdrv_aio_flush (s->fvd_data, finish_flush_data, acb);
+    if (!acb->flush.data_acb) {
+        my_qemu_aio_release (acb);
+        return NULL;
+    }
+
+    acb->flush.metadata_acb = bdrv_aio_flush (s->fvd_metadata,
+                                              finish_flush_metadata, acb);
+    if (!acb->flush.metadata_acb) {
+        bdrv_aio_cancel (acb->flush.data_acb);
+        my_qemu_aio_release (acb);
+        return NULL;
+    }
+
+    QDEBUG ("FLUSH: acb%llu-%p  start\n", acb->uuid, acb);
+    return &acb->common;
+}
+
+static int fvd_flush (BlockDriverState * bs)
+{
+    BDRVFvdState *s = bs->opaque;
+    int ret;
+
+    QDEBUG ("fvd_flush() invoked\n");
+
+    if (s->fvd_data) {
+        if ((ret = bdrv_flush (s->fvd_data))) {
+            return ret;
+        }
+    }
+    if (s->fvd_metadata == s->fvd_data) {
+        return 0;
+    }
+
+    return bdrv_flush (s->fvd_metadata);
+}
+
+static void fvd_close (BlockDriverState * bs)
+{
+    BDRVFvdState *s = bs->opaque;
+    FvdAIOCB *acb;
+    int i;
+
+    if (s->prefetch_state == PREFETCH_STATE_RUNNING) {
+        s->prefetch_state = PREFETCH_STATE_DISABLED;
+    }
+    if (s->prefetch_timer) {
+        qemu_del_timer (s->prefetch_timer);
+        qemu_free_timer (s->prefetch_timer);
+        s->prefetch_timer = NULL;
+    }
+
+    /* Clean up prefetch operations. */
+    if (s->prefetch_acb) {
+        for (i = 0; i < s->num_prefetch_slots; i++) {
+            if (s->prefetch_acb[i] != NULL) {
+                acb = s->prefetch_acb[i];
+                if (acb->copy.hd_acb) {
+                    bdrv_aio_cancel (acb->copy.hd_acb);
+                }
+                my_qemu_vfree (s->prefetch_acb[i]->copy.buf);
+                my_qemu_aio_release (s->prefetch_acb[i]);
+                s->prefetch_acb[i] = NULL;
+            }
+        }
+        my_qemu_free (s->prefetch_acb);
+        s->prefetch_acb = NULL;
+    }
+
+    flush_metadata_to_disk_on_exit (bs);
+
+    if (s->stale_bitmap) {
+        my_qemu_vfree (s->stale_bitmap);
+        if (s->fresh_bitmap != s->stale_bitmap) {
+            my_qemu_vfree (s->fresh_bitmap);
+        }
+        s->stale_bitmap = NULL;
+        s->fresh_bitmap = NULL;
+    }
+
+    if (s->table) {
+        my_qemu_vfree (s->table);
+        s->table = NULL;
+    }
+
+    if (s->fvd_metadata) {
+        if (s->fvd_metadata != s->fvd_data) {
+            bdrv_delete (s->fvd_metadata);
+        }
+        s->fvd_metadata = NULL;
+    }
+    if (s->fvd_data) {
+        bdrv_delete (s->fvd_data);
+        s->fvd_data = NULL;
+    }
+
+    if (s->add_storage_cmd) {
+        my_qemu_free (s->add_storage_cmd);
+        s->add_storage_cmd = NULL;
+    }
+#ifdef FVD_DEBUG
+    dump_resource_summary (s);
+#endif
+}
+
+static int fvd_probe (const uint8_t * buf, int buf_size, const char *filename)
+{
+    const FvdHeader *header = (const void *) buf;
+
+    if (buf_size >= 2 * sizeof (uint32_t)
+        && le32_to_cpu (header->magic) == FVD_MAGIC
+        && le32_to_cpu (header->version) == FVD_VERSION) {
+        return 100;
+    } else {
+        return 0;
+    }
+}
+
+static int fvd_is_allocated (BlockDriverState * bs, int64_t sector_num,
+                             int nb_sectors, int *pnum)
+{
+    BDRVFvdState *s = bs->opaque;
+
+    if (s->prefetch_state == PREFETCH_STATE_FINISHED
+        || sector_num >= s->nb_sectors_in_base_img
+        || !fresh_bitmap_show_sector_in_base_img (sector_num, s)) {
+        /* For the three cases that data may be saved in the FVD data file, we
+         * still need to check the underlying storage because those data could
+         * be holes in a sparse image, due to the optimization of "free write
+         * to zero-filled blocks". See Section 3.3.3 of the FVD-cow paper.
+         * This also covers the case of no base image. */
+
+        if (!s->table) {
+            return bdrv_is_allocated (s->fvd_data, s->data_offset + sector_num,
+                                      nb_sectors, pnum);
+        }
+
+        /* Use the table to figure it out. */
+        int64_t first_chunk = sector_num / s->chunk_size;
+        int64_t last_chunk = (sector_num + nb_sectors - 1) / s->chunk_size;
+        int allocated = !IS_EMPTY (s->table[first_chunk]);
+        int count;
+
+        if (first_chunk == last_chunk) {
+            /* All data in one chunk. */
+            *pnum = nb_sectors;
+            return allocated;
+        }
+
+        /* Data in the first chunk. */
+        count = s->chunk_size - (sector_num % s->chunk_size);
+
+        /* Full chunks. */
+        first_chunk++;
+        while (first_chunk < last_chunk) {
+            if ((allocated && IS_EMPTY (s->table[first_chunk]))
+                || (!allocated && !IS_EMPTY (s->table[first_chunk]))) {
+                *pnum = count;
+                return allocated;
+            }
+
+            count += s->chunk_size;
+            first_chunk++;
+        }
+
+        /* Data in the last chunk. */
+        if ((allocated && !IS_EMPTY (s->table[last_chunk]))
+            || (!allocated && IS_EMPTY (s->table[last_chunk]))) {
+            int nb = (sector_num + nb_sectors) % s->chunk_size;
+            count += nb ? nb : s->chunk_size;
+        }
+
+        *pnum = count;
+        return allocated;
+    }
+
+    /* Use the FVD metadata to find out sectors in the base image. */
+    int64_t end = sector_num + nb_sectors;
+    if (end > s->nb_sectors_in_base_img) {
+        end = s->nb_sectors_in_base_img;
+    }
+
+    int64_t next = sector_num + 1;
+    while (next < end && fresh_bitmap_show_sector_in_base_img (next, s)) {
+        next++;
+    }
+
+    *pnum = next - sector_num;
+    return FALSE;
+}
+
+static void update_usage (void)
+{
+    printf ("Usage: update <image_file> [attribute=val]\n       See outputs of"
+            "the 'info' command for all available attributes.\n");
+}
+
+static int fvd_get_info (BlockDriverState * bs, BlockDriverInfo * bdi)
+{
+    BDRVFvdState *s = bs->opaque;
+    FvdHeader header;
+
+    if (read_fvd_header (s, &header) < 0) {
+        return -1;
+    }
+
+    printf ("========= Begin of FVD specific information ==================\n");
+    printf ("magic\t\t\t\t\t\t%0X\n", header.magic);
+    printf ("version\t\t\t\t\t\t%d\n", header.version);
+    printf ("virtual_disk_size (bytes)\t\t\t%" PRId64 "\n",
+            header.virtual_disk_size);
+    printf ("disk_metadata_size (bytes)\t\t\t%" PRId64 "\n",
+            header.metadata_size);
+    if (header.data_file[0]) {
+        printf ("data_file\t\t\t\t\t%s\n", header.data_file);
+    }
+    if (header.data_file_fmt[0]) {
+        printf ("data_file_fmt\t\t\t\t%s\n", header.data_file_fmt);
+    }
+
+    if (header.base_img[0] != 0) {
+        printf ("base_img\t\t\t\t\t%s\n", header.base_img);
+        printf ("all_data_in_fvd_img\t\t\t\t%s\n",
+                BOOL (header.all_data_in_fvd_img));
+        printf ("base_img_size (bytes)\t\t\t\t%" PRId64 "\n",
+                header.base_img_size);
+        printf ("bitmap_offset (bytes)\t\t\t\t%" PRId64 "\n",
+                header.bitmap_offset);
+        printf ("bitmap_size (bytes)\t\t\t\t%" PRId64 "\n", header.bitmap_size);
+        printf ("prefetch_profile_offset (bytes)\t\t\t%" PRId64 "\n",
+                header.prefetch_profile_offset);
+        printf ("prefetch_profile_entries\t\t\t%" PRId64 "\n",
+                header.prefetch_profile_entries);
+        printf ("prefetch_profile_entry_len_unit\t\t\t%d\n",
+                header.unit_of_PrefetchProfileEntry_len);
+        printf ("block_size\t\t\t\t\t%d\n", header.block_size);
+        printf ("copy_on_read\t\t\t\t\t%s\n", BOOL (header.copy_on_read));
+        printf ("max_outstanding_copy_on_read_data (bytes)\t%" PRId64 "\n",
+                header.max_outstanding_copy_on_read_data);
+        printf ("prefetch_start_delay (sec)\t\t\t%d\n",
+                header.prefetch_start_delay);
+        printf ("profile_directed_prefetch_start_delay (sec)\t%d\n",
+                header.profile_directed_prefetch_start_delay);
+        printf ("max_num_outstanding_prefetch_writes\t\t%d\n",
+                header.num_prefetch_slots);
+        printf ("bytes_per_prefetch\t\t\t\t%d\n", header.bytes_per_prefetch);
+        printf ("prefetch_over_threshold_throttle_time (ms)\t%d\n",
+                header.prefetch_throttle_time);
+        printf ("prefetch_read_throughput_measure_time (ms)\t%d\n",
+                header.prefetch_read_throughput_measure_time);
+        printf ("prefetch_write_throughput_measure_time (ms)\t%d\n",
+                header.prefetch_write_throughput_measure_time);
+        printf ("prefetch_min_read_throughput_threshold (KB/s)\t%d\n",
+                header.prefetch_min_read_throughput);
+        printf ("prefetch_min_write_throughput_threshold (KB/s)\t%d\n",
+                header.prefetch_min_write_throughput);
+        printf ("prefetch_max_read_throughput_threshold (KB/s)\t%d\n",
+                header.prefetch_max_read_throughput);
+        printf ("prefetch_max_write_throughput_threshold (KB/s)\t%d\n",
+                header.prefetch_max_write_throughput);
+        printf ("prefetch_perf_calc_alpha\t\t\t%d\n",
+                header.prefetch_perf_calc_alpha);
+        printf ("generate_prefetch_profile\t\t\t%s\n",
+                BOOL (header.generate_prefetch_profile));
+    }
+
+    printf ("need_zero_init\t\t\t\t\t%s\n", BOOL (header.need_zero_init));
+    printf ("compact_image\t\t\t\t\t%s\n", BOOL (header.compact_image));
+    if (header.compact_image) {
+        printf ("data_storage (bytes)\t\t\t\t%" PRId64 "\n",
+                s->data_storage * 512);
+        printf ("chunk_size (bytes)\t\t\t\t%" PRId64 "\n", header.chunk_size);
+        printf ("used_chunks (bytes)\t\t\t\t%" PRId64 "\n",
+                s->used_storage * 512);
+        printf ("storage_grow_unit (bytes)\t\t\t%" PRId64 "\n",
+                header.storage_grow_unit);
+        printf ("table_offset (bytes)\t\t\t\t%" PRId64 "\n",
+                header.table_offset);
+        int64_t vsize = ROUND_UP (s->virtual_disk_size, s->chunk_size * 512);
+        int table_entries = vsize / (s->chunk_size * 512);
+        int64_t table_size = sizeof (uint32_t) * table_entries;
+        table_size = ROUND_UP (table_size, DEF_PAGE_SIZE);
+        printf ("table_size (bytes)\t\t\t\t%" PRId64 "\n", table_size);
+
+        if (header.add_storage_cmd[0] != 0) {
+            printf ("add_storage_cmd\t\t\t\t\t%s\n", header.add_storage_cmd);
+        }
+    }
+    printf ("clean_shutdown\t\t\t\t\t%s\n", BOOL (header.clean_shutdown));
+    if (header.journal_size > 0) {
+        printf ("journal_offset\t\t\t\t\t%" PRId64 "\n", header.journal_offset);
+        printf ("journal_size\t\t\t\t\t%" PRId64 "\n", header.journal_size);
+    }
+    printf ("========= End of FVD specific information ====================\n");
+
+    bdi->cluster_size = 0;
+    bdi->vm_state_offset = 0;
+    return 0;
+}
+
+static int fvd_has_zero_init (BlockDriverState * bs)
+{
+    BDRVFvdState *s = bs->opaque;
+    return bdrv_has_zero_init (s->fvd_data);
+}
+
+static int fvd_update (BlockDriverState * bs, int argc, char **argv)
+{
+    BDRVFvdState *s = bs->opaque;
+    FvdHeader header;
+    int i;
+
+    if (argc <= 0) {
+        update_usage ();
+        return -1;
+    }
+
+    if (strcmp (argv[0], "-h") == 0 || strcmp (argv[0], "--help") == 0
+        || strcmp (argv[0], "-o") == 0) {
+        update_usage ();
+        return 0;
+    }
+
+    read_fvd_header (s, &header);
+
+    for (i = 0; i < argc; i++) {
+        char *attr = argv[i];
+        char *val = strchr (attr, '=');
+        if (val == NULL) {
+            fprintf (stderr, "Error: string '%s' is not in the format of "
+                     "'attribute=val' without spaces.\n", attr);
+            return -1;
+        }
+        *val = 0;
+        val++;
+
+        if (strcmp (attr, "size") == 0) {
+            int64_t new_size;
+            new_size = atoll (val);
+            int len = strlen (val);
+            if (val[len - 1] == 'G') {
+                new_size *= ((int64_t) 1024) * 1024 * 1024;
+            } else if (val[len - 1] == 'M') {
+                new_size *= ((int64_t) 1024) * 1024;
+            } else if (val[len - 1] == 'K') {
+                new_size *= ((int64_t) 1024);
+            } else if (val[len - 1] == 'B') {
+                /* No change to new_size as it is already in bytes. */
+            } else {
+                /* If no unit is specified, the default unit is KB. */
+                new_size *= ((int64_t) 1024);
+            }
+
+            if (new_size <= 0) {
+                fprintf (stderr, "Error: size %s is not positive.\n", val);
+                return -1;
+            }
+
+            new_size = ROUND_UP (new_size, 512);
+            if (new_size < header.virtual_disk_size) {
+                printf ("Warning: image's new size %" PRId64
+                        " is smaller than the original size %" PRId64
+                        ". Some image data will be truncated.\n",
+                        new_size, header.virtual_disk_size);
+            }
+            header.virtual_disk_size = new_size;
+            printf ("Image resized to %" PRId64 " bytes.\n", new_size);
+        } else if (strcmp (attr, "base_img") == 0) {
+            if (strlen (val) > 1023) {
+                fprintf (stderr, "Error: the new base image name is longer "
+                         "than 1023, which is not allowed.\n");
+                return -1;
+            }
+
+            memset (header.base_img, 0, 1024);
+            pstrcpy (header.base_img, 1024, val);
+            printf ("Backing file updated to '%s'.\n", val);
+        } else if (strcmp (attr, "data_file") == 0) {
+            if (strlen (val) > 1023) {
+                fprintf (stderr, "Error: the new data file name is longer "
+                         "than 1023, which is not allowed.\n");
+                return -1;
+            }
+
+            memset (header.data_file, 0, 1024);
+            pstrcpy (header.data_file, 1024, val);
+            printf ("Data file updated to '%s'.\n", val);
+        } else if (strcmp (attr, "need_zero_init") == 0) {
+            if (strcasecmp (val, "true") == 0 || strcasecmp (val, "on") == 0) {
+                header.need_zero_init = TRUE;
+                printf ("need_zero_init is turned on for this disk.\n");
+            } else {
+                header.need_zero_init = FALSE;
+                printf ("need_zero_init is turned off for this disk.\n");
+            }
+        } else if (strcmp (attr, "copy_on_read") == 0) {
+            if (strcasecmp (val, "true") == 0 || strcasecmp (val, "on") == 0) {
+                header.copy_on_read = TRUE;
+                printf ("Copy on read is enabled for this disk.\n");
+            } else {
+                header.copy_on_read = FALSE;
+                printf ("Copy on read is disabled for this disk.\n");
+            }
+        } else if (strcmp (attr, "clean_shutdown") == 0) {
+            if (strcasecmp (val, "true") == 0 || strcasecmp (val, "on") == 0) {
+                header.clean_shutdown = TRUE;
+                printf ("clean_shutdown is manually set to true\n");
+            } else {
+                header.clean_shutdown = FALSE;
+                printf ("clean_shutdown is manually set to false\n");
+            }
+        } else if (strcmp (attr, "max_outstanding_copy_on_read_data") == 0) {
+            header.max_outstanding_copy_on_read_data = atoll (val);
+            if (header.max_outstanding_copy_on_read_data <= 0) {
+                fprintf (stderr, "Error: max_outstanding_copy_on_read_data "
+                         "must be positive while the provided value is %"
+                         PRId64 ".\n",
+                         header.max_outstanding_copy_on_read_data);
+                return -1;
+            }
+            printf ("max_outstanding_copy_on_read_data updated to %" PRId64
+                    ".\n", header.max_outstanding_copy_on_read_data);
+        } else if (strcmp (attr, "prefetch_start_delay") == 0) {
+            header.prefetch_start_delay = atoi (val);
+            if (header.prefetch_start_delay >= 0) {
+                printf ("Prefetch starting delay updated to %d seconds.\n",
+                        header.prefetch_start_delay);
+            }
+            else {
+                printf ("Prefetch starting delay updated to %d seconds. "
+                        "Because of the negative value, prefetching is "
+                        "disabled for this image.\n",
+                        header.prefetch_start_delay);
+            }
+        } else if (strcmp (attr, "max_num_outstanding_prefetch_writes") == 0) {
+            header.num_prefetch_slots = atoi (val);
+            if (header.num_prefetch_slots < 1) {
+                fprintf (stderr, "Error: max_num_outstanding_prefetch_writes "
+                         "%d is not a positive integer.\n",
+                         header.num_prefetch_slots);
+                return -1;
+            }
+            printf ("max_num_outstanding_prefetch_writes updated to %d.\n",
+                    header.num_prefetch_slots);
+        } else if (strcmp (attr, "bytes_per_prefetch") == 0) {
+            header.bytes_per_prefetch = atoi (val);
+            if (header.bytes_per_prefetch < DEF_PAGE_SIZE) {
+                fprintf (stderr, "Error: bytes_per_prefetch cannot be smaller "
+                         "than %d.\n", DEF_PAGE_SIZE);
+                return -1;
+            }
+            printf ("bytes_per_prefetch updated to %d.\n",
+                    header.bytes_per_prefetch);
+        } else if (strcmp (attr, "prefetch_min_read_throughput_threshold")==0) {
+            header.prefetch_min_read_throughput = atoi (val);
+            printf ("prefetch_min_read_throughput_threshold updated to %d "
+                    "KB/s\n", header.prefetch_min_read_throughput);
+        } else if (strcmp (attr,"prefetch_min_write_throughput_threshold")==0) {
+            header.prefetch_min_write_throughput = atoi (val);
+            printf ("prefetch_min_write_throughput_threshold updated to %d "
+                    "KB/s\n", header.prefetch_min_write_throughput);
+        } else if (strcmp (attr, "prefetch_perf_calc_alpha") == 0) {
+            header.prefetch_perf_calc_alpha = atoi (val);
+            printf ("prefetch_perf_calc_alpha updated to %d\n",
+                    header.prefetch_perf_calc_alpha);
+        } else if (strcmp (attr, "prefetch_read_throughput_measure_time")==0) {
+            header.prefetch_read_throughput_measure_time = atoi (val);
+            printf ("prefetch_read_throughput_measure_time updated to %d ms\n",
+                    header.prefetch_read_throughput_measure_time);
+        } else if (strcmp (attr, "prefetch_write_throughput_measure_time")==0) {
+            header.prefetch_write_throughput_measure_time = atoi (val);
+            printf ("prefetch_write_throughput_measure_time updated to %d ms\n",
+                    header.prefetch_write_throughput_measure_time);
+        } else if (strcmp (attr, "prefetch_over_threshold_throttle_time")==0) {
+            header.prefetch_throttle_time = atoi (val);
+            if (header.prefetch_throttle_time > 0) {
+                printf ("prefetch_over_threshold_throttle_time updated to %d "
+                        "milliseconds.\n", header.prefetch_throttle_time);
+            } else {
+                printf ("prefetch_over_threshold_throttle_time updated to %d "
+                        "milliseconds. It is not positive and hence no "
+                        "throttling will be applied to prefetch.\n",
+                        header.prefetch_throttle_time);
+            }
+        } else {
+            fprintf (stderr, "Error: unknown setting '%s=%s'\n", attr, val);
+            return -1;
+        }
+    }
+
+    update_fvd_header (s, &header);
+    return 0;
+}
diff --git a/block/fvd-open.c b/block/fvd-open.c
new file mode 100644
index 0000000..9ca8e2e
--- /dev/null
+++ b/block/fvd-open.c
@@ -0,0 +1,446 @@ 
+/*
+ * Copyright (c) 2010-2011 IBM
+ *
+ * Authors:
+ *         Chunqiang Tang <ctang@us.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.
+ * See the COPYING file in the top-level directory.
+ */
+
+/*=============================================================================
+ *  A short description: this module implements bdrv_file_open() for FVD.
+ *============================================================================*/
+
+static void init_prefetch_timer (BlockDriverState * bs, BDRVFvdState * s);
+static int init_data_file (BDRVFvdState * s, FvdHeader * header, int flags);
+static int init_bitmap (BlockDriverState * bs, BDRVFvdState * s,
+                        FvdHeader * header, const char *const filename);
+static int load_table (BDRVFvdState * s, FvdHeader * header,
+                       const char *const filename);
+static int init_journal (int read_only, BlockDriverState * bs,
+                         FvdHeader * header);
+static int init_compact_image (BDRVFvdState * s, FvdHeader * header,
+                               const char *const filename);
+
+static int fvd_open (BlockDriverState * bs, const char *filename, int flags)
+{
+    BDRVFvdState *s = bs->opaque;
+    int ret;
+    FvdHeader header;
+    BlockDriver *drv;
+
+    /* A trick to figure out whether it runs a qemu tool such as qemu-nbd. */
+    const int in_qemu_tool = (rt_clock == NULL);
+
+    const char * protocol = strchr (filename, ':');
+    if (protocol) {
+        drv = bdrv_find_protocol (filename);
+        filename = protocol + 1;
+    }
+    else {
+        /* Use "raw" instead of "file" to allow storing the image on device. */
+        drv = bdrv_find_format ("raw");
+        if (!drv) {
+            fprintf (stderr, "Failed to find the block device driver\n");
+            return -EINVAL;
+        }
+    }
+
+    s->fvd_metadata = bdrv_new ("");
+    ret = bdrv_open (s->fvd_metadata, filename, flags, drv);
+    if (ret < 0) {
+        fprintf (stderr, "Failed to open %s\n", filename);
+        return ret;
+    }
+
+    /* Initialize so that jumping to 'fail' would do cleanup properly. */
+    s->stale_bitmap = NULL;
+    s->fresh_bitmap = NULL;
+    s->table = NULL;
+    s->outstanding_copy_on_read_data = 0;
+    QLIST_INIT (&s->write_locks);
+    QLIST_INIT (&s->copy_locks);
+    QLIST_INIT (&s->wait_for_journal);
+    s->ongoing_journal_updates = 0;
+    s->prefetch_acb = NULL;
+    s->add_storage_cmd = NULL;
+#ifdef FVD_DEBUG
+    s->total_copy_on_read_data = s->total_prefetch_data = 0;
+#endif
+
+    if (bdrv_pread (s->fvd_metadata, 0, &header, sizeof (header)) !=
+        sizeof (header)) {
+        fprintf (stderr, "Failed to read the header of %s\n", filename);
+        goto fail;
+    }
+
+    fvd_header_le_to_cpu (&header);
+
+    if (header.magic != FVD_MAGIC || header.version != FVD_VERSION) {
+        fprintf (stderr, "Incorrect magic number in the header of %s: "
+                 "magic=%0X version=%d expect_magic=%0X expect_version=%d\n",
+                 filename, header.magic, header.version, FVD_MAGIC,
+                 FVD_VERSION);
+        goto fail;
+    }
+    if (header.virtual_disk_size % 512 != 0) {
+        fprintf (stderr, "Disk size %"PRId64" in the header of %s is not "
+                 "a multple of 512.\n", header.virtual_disk_size, filename);
+        goto fail;
+    }
+
+    /* Initialize the fields of BDRVFvdState. */
+    s->dirty_image = FALSE;
+    s->block_size = header.block_size / 512;
+    s->bitmap_size = header.bitmap_size;
+    s->prefetch_error = FALSE;
+    s->prefetch_timer = NULL;
+    s->sectors_per_prefetch = (header.bytes_per_prefetch + 511) / 512;
+    s->prefetch_throttle_time = header.prefetch_throttle_time;
+    s->prefetch_perf_calc_alpha = header.prefetch_perf_calc_alpha / 100.0;
+    s->prefetch_read_throughput_measure_time =
+                        header.prefetch_read_throughput_measure_time;
+    s->prefetch_write_throughput_measure_time =
+                        header.prefetch_write_throughput_measure_time;
+
+    /* Convert KB/s to bytes/millisec. */
+    s->prefetch_min_read_throughput =
+            ((double) header.prefetch_min_read_throughput) * 1024.0 / 1000.0;
+    s->prefetch_min_write_throughput =
+            ((double) header.prefetch_min_write_throughput) * 1024.0 / 1000.0;
+
+    if (header.base_img[0] != 0 && s->sectors_per_prefetch%s->block_size != 0) {
+        fprintf (stderr, "sectors_per_prefetch (%d) is not a multiple of "
+                 "block_size (%d)\n",
+                 s->sectors_per_prefetch * 512, s->block_size * 512);
+    }
+    s->max_outstanding_copy_on_read_data =
+        header.max_outstanding_copy_on_read_data;
+    if (s->max_outstanding_copy_on_read_data < header.block_size * 2) {
+        s->max_outstanding_copy_on_read_data = header.block_size;
+    }
+
+    if (header.num_prefetch_slots < 1) {
+        s->num_prefetch_slots = 1;
+    } else {
+        s->num_prefetch_slots = header.num_prefetch_slots;
+    }
+    if (in_qemu_tool) {
+        /* No prefetching in a qemu tool. */
+        s->prefetch_start_delay = -1;
+
+#ifndef SIMULATED_TEST_WITH_QEMU_IO
+        s->copy_on_read = FALSE;        /* No prefetching in a qemu tool. */
+#else
+        /* But allow debugging copy_on_read in qemu-io if configured. */
+        s->copy_on_read = header.copy_on_read;
+#endif
+    } else {
+        s->prefetch_start_delay = header.prefetch_start_delay;
+        s->copy_on_read = header.copy_on_read;
+    }
+    s->virtual_disk_size = header.virtual_disk_size;
+    s->bitmap_offset = header.bitmap_offset / 512;
+    s->nb_sectors_in_base_img = header.base_img_size / 512;
+    bs->total_sectors = s->virtual_disk_size / 512;
+
+    if (init_data_file (s, &header, flags)) {
+        goto fail;
+    }
+
+    if (init_bitmap (bs, s, &header, filename)) {
+        goto fail;
+    }
+
+    if (load_table (s, &header, filename)) {
+        goto fail;
+    }
+
+    const int read_only = !(flags & BDRV_O_RDWR);
+    if (init_journal (read_only, bs, &header)) {
+        goto fail;
+    }
+
+    /* This must be done after init_journal() because it may use metadata
+     * recovered from the journal. */
+    if (init_compact_image (s, &header, filename)) {
+        goto fail;
+    }
+
+    if (!read_only) {
+        /* This flag will be cleaned later when the image is shut down
+         * gracefully. */
+        update_clean_shutdown_flag (s, FALSE);
+    }
+    init_prefetch_timer (bs, s);
+
+    QDEBUG ("copy_on_read=%s block_size=%d journal_size=%" PRId64
+            " prefetching_delay=%d prefetch_slots=%d "
+            "prefetch_read_threshold_KB=%.0lf "
+            "prefetch_write_threshold_KB=%.0lf "
+            "prefetch_throttle_time=%d bytes_per_prefetch=%d "
+            "max_outstanding_copy_on_read_data=%"PRId64"\n",
+            BOOL (s->copy_on_read), s->block_size * 512,
+            s->journal_size * 512, s->prefetch_start_delay,
+            s->num_prefetch_slots,
+            s->prefetch_min_read_throughput * 1000.0 / 1024.0,
+            s->prefetch_min_write_throughput * 1000.0 / 1024.0,
+            s->prefetch_throttle_time, s->sectors_per_prefetch * 512,
+            s->max_outstanding_copy_on_read_data);
+
+    return 0;
+
+  fail:
+    fprintf (stderr, "Failed to open %s using the FVD format.\n", filename);
+    fvd_close (bs);
+    return -1;
+}
+
+static int load_table (BDRVFvdState * s, FvdHeader * header,
+                       const char *const filename)
+{
+    if (!header->compact_image) {
+        return 0;
+    }
+
+    /* Initialize the table. */
+    s->table_offset = header->table_offset / 512;
+    s->chunk_size = header->chunk_size / 512;
+    int64_t vsize = header->virtual_disk_size + header->chunk_size - 1;
+    int table_entries = vsize / header->chunk_size;
+    int64_t table_size = sizeof (uint32_t) * table_entries;
+    table_size = ROUND_UP (table_size, DEF_PAGE_SIZE);
+    s->table = my_qemu_blockalign (s->fvd_metadata, (size_t) table_size);
+
+    if (bdrv_pread (s->fvd_metadata, header->table_offset, s->table, table_size)
+        != table_size) {
+        fprintf (stderr, "Failed to read the table of %s\n", filename);
+        return -1;
+    }
+
+    return 0;
+}
+
+static int init_compact_image (BDRVFvdState * s, FvdHeader * header,
+                               const char *const filename)
+{
+    if (!header->compact_image) {
+        s->data_region_prepared = FALSE;
+        return 0;
+    }
+
+    /* Scan the table to find the max allocated chunk. */
+    int i;
+    uint32_t max_chunk = 0;
+    int empty_disk = TRUE;
+    int table_entries =
+        (int) (ROUND_UP (header->virtual_disk_size, header->chunk_size) /
+               header->chunk_size);
+    for (i = 0; i < table_entries; i++) {
+        if (!IS_EMPTY (s->table[i])) {
+            empty_disk = FALSE;
+            uint32_t id = READ_TABLE (s->table[i]);
+            if (id > max_chunk) {
+                max_chunk = id;
+            }
+        }
+    }
+    if (!empty_disk) {
+        max_chunk++;
+    }
+    s->used_storage = max_chunk * s->chunk_size;
+    s->storage_grow_unit = header->storage_grow_unit / 512;
+
+    /* Check if the image is directly stored on a raw device, including
+     * logical volume. If so, figure out the size of the device. */
+    struct stat stat_buf;
+    if (stat (filename, &stat_buf) != 0) {
+        fprintf (stderr, "Failed to stat() %s\n", filename);
+        return -1;
+    }
+
+    /* Check how much storage space is already allocated. */
+    int64_t size = bdrv_getlength (s->fvd_data);
+    if (size < 0) {
+        fprintf (stderr, "Failed in bdrv_getlength(%s)\n", filename);
+        return -1;
+    }
+    const int64_t min_size = (s->data_offset + s->used_storage) * 512;
+    if (size < min_size) {
+        fprintf (stderr, "The size of device %s is not even big enough to "
+                 "store already allocated data.\n",
+                 filename);
+        return -1;
+    }
+
+    if (S_ISBLK (stat_buf.st_mode) || S_ISCHR (stat_buf.st_mode)) {
+        /* Initialize the command to grow storage space. */
+        char cmd[2048];
+        if (header->add_storage_cmd[0] == 0) {
+            s->add_storage_cmd = NULL;
+        } else {
+            if (strcmp (header->add_storage_cmd, "builtin:lvextend") == 0) {
+                /* Note the following:
+                 *     1. lvextend may generate warning messages like "File
+                 *     descriptor...leaked...", * which is fine.  See the
+                 *     following from LVM manual: "On invocation, lvm requires
+                 *     that only  the  standard  file  descriptors stdin,
+                 *     stdout * and stderr are available.  If others are
+                 *     found, they get closed and messages are issued warning
+                 *     about the leak."
+                 *     2. Instead of using the lvextend command line, one
+                 *     option is to use liblvm directly, which avoids creating
+                 *     a process to resize a LV.
+                 *     3. On Ubuntu, /bin/sh is linked to /bin/dash, which
+                 *     does not support ">&" for stdout and stderr
+                 *     redirection. */
+                snprintf (cmd, sizeof (cmd) - 1, "/sbin/lvextend -L+%" PRId64
+                          "B %s >/dev/null 2>/dev/null",
+                          header->storage_grow_unit,
+                          header->data_file[0] ? header->data_file : filename);
+            } else {
+                snprintf (cmd, sizeof (cmd) - 1, "%s %" PRId64
+                          " %s >/dev/null 2>/dev/null",
+                          header->add_storage_cmd, header->storage_grow_unit,
+                          header->data_file[0] ? header->data_file : filename);
+            }
+
+            int len = strlen (cmd);
+            s->add_storage_cmd = my_qemu_malloc (len + 1);
+            memcpy (s->add_storage_cmd, cmd, len + 1);
+        }
+    }
+
+    s->data_storage = size / 512 - s->data_offset;
+    s->fvd_data->growable = TRUE;
+    s->data_region_prepared = TRUE;
+
+    return 0;
+}
+
+static int init_data_file (BDRVFvdState * s, FvdHeader * header, int flags)
+{
+    int ret;
+
+    if (header->data_file[0]) {
+        /* Open a separate data file. */
+        s->data_offset = 0;
+        s->fvd_data = bdrv_new ("");
+        if (!s->fvd_data) {
+            fprintf (stderr, "Failed to create a new block device driver.\n");
+            return -1;
+        }
+
+        if (header->data_file_fmt[0] == 0) {
+            ret = bdrv_open (s->fvd_data, header->data_file, flags, NULL);
+        } else {
+            BlockDriver *data_drv = bdrv_find_format (header->data_file_fmt);
+            if (!data_drv) {
+                fprintf (stderr, "Failed to find driver for image format "
+                         "'%s' of data file %s\n",
+                         header->data_file_fmt, header->data_file);
+                return -1;
+            }
+            ret = bdrv_open (s->fvd_data, header->data_file, flags, data_drv);
+        }
+        if (ret != 0) {
+            fprintf (stderr, "Failed to open data file %s\n",
+                     header->data_file);
+            return -1;
+        }
+    } else {
+        s->data_offset = header->metadata_size / 512;        /* In sectors. */
+        s->fvd_data = s->fvd_metadata;
+    }
+
+    if (header->need_zero_init && !bdrv_has_zero_init (s->fvd_data)) {
+        /* A trick to figure out whether it runs a qemu tool such as qemu-nbd.*/
+        const int in_qemu_tool = (rt_clock == NULL);
+        if (in_qemu_tool) {
+            /* Only give a warning to allow 'qemu-img update' to modify
+             * need_zero_init if the user manually zero-init the device. */
+            fprintf (stderr, "Warning: image needs zero_init but it is not "
+                     "supported by the storage media.\n");
+        } else {
+            fprintf (stderr, "Error: image needs zero_init but it is not "
+                     "supported by the storage media.\n");
+            return -EINVAL;
+        }
+    }
+
+    return 0;
+}
+
+static int init_bitmap (BlockDriverState * bs, BDRVFvdState * s,
+                        FvdHeader * header, const char *const filename)
+{
+    if (header->all_data_in_fvd_img) {
+        /* This also covers the case of no base image. */
+        s->prefetch_state = PREFETCH_STATE_FINISHED;
+        s->copy_on_read = FALSE;
+        s->prefetch_start_delay = -1;
+
+        if (bs->backing_file[0] != 0) {
+            /* No need to use the base image. It may operate without problem
+             * even if the base image is no longer accessible. */
+            bs->backing_file[0] = 0;
+        }
+    } else {
+        ASSERT (header->base_img[0] != 0);
+        pstrcpy (bs->backing_file, 1024, header->base_img);
+        const int flags = O_RDONLY | O_BINARY | O_LARGEFILE;
+        int test_backing_fd = open (bs->backing_file, flags);
+        if (test_backing_fd < 0) {
+            fprintf (stderr, "Failed to open the base image %s for read.\n",
+                     bs->backing_file);
+            return -1;
+        }
+        close (test_backing_fd);
+
+        /* This will be enabled in init_prefetch() after a timer expires. */
+        s->prefetch_state = PREFETCH_STATE_DISABLED;
+
+        s->stale_bitmap = my_qemu_blockalign (s->fvd_metadata,
+                                              (size_t) s->bitmap_size);
+        if (bdrv_pread (s->fvd_metadata, header->bitmap_offset,
+                        s->stale_bitmap, s->bitmap_size) != s->bitmap_size) {
+            fprintf (stderr, "Failed to the bitmap of %s.\n", filename);
+            return -1;
+        }
+
+        if (s->copy_on_read || (s->prefetch_state != PREFETCH_STATE_FINISHED &&
+                                s->prefetch_start_delay > 0)) {
+            /* Use two bitmaps only if copy_on_read or prefetching is enabled.
+             * See Section 3.3.4 of the FVD-cow paper. */
+            s->fresh_bitmap = my_qemu_blockalign (s->fvd_metadata,
+                                                  s->bitmap_size);
+            memcpy (s->fresh_bitmap, s->stale_bitmap, s->bitmap_size);
+        } else {
+            s->fresh_bitmap = s->stale_bitmap;
+        }
+    }
+
+    return 0;
+}
+
+static void init_prefetch_timer (BlockDriverState * bs, BDRVFvdState * s)
+{
+#ifndef SIMULATED_TEST_WITH_QEMU_IO
+    /* A trick to figure out whether it is runningin a qemu tool. */
+    const int in_qemu_tool = (rt_clock == NULL);
+    if (in_qemu_tool) {
+        return;
+    }
+#endif
+
+    if (s->prefetch_state == PREFETCH_STATE_FINISHED ||
+        s->prefetch_start_delay <= 0) {
+        return;
+    }
+
+    /* Start prefetching after a delay. Times 1000 to convert sec to ms. */
+    int64_t expire = qemu_get_clock (rt_clock) + s->prefetch_start_delay * 1000;
+    s->prefetch_timer = qemu_new_timer (rt_clock, fvd_init_prefetch, bs);
+    qemu_mod_timer (s->prefetch_timer, expire);
+}
diff --git a/block/fvd-prefetch.c b/block/fvd-prefetch.c
new file mode 100644
index 0000000..0ad8a8e
--- /dev/null
+++ b/block/fvd-prefetch.c
@@ -0,0 +1,598 @@ 
+/*
+ * Copyright (c) 2010-2011 IBM
+ *
+ * Authors:
+ *         Chunqiang Tang <ctang@us.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.
+ * See the COPYING file in the top-level directory.
+ */
+
+/*=============================================================================
+ *  A short description: this FVD module implements the function of
+ *  prefetching data from the base image and storing it in the FVD image.
+ *============================================================================*/
+
+static void resume_prefetch (BlockDriverState * bs, int64_t current_time);
+static void do_next_prefetch_read (BlockDriverState * bs, int64_t current_time);
+
+void fvd_init_prefetch (void *opaque)
+{
+    BlockDriverState * bs = opaque;
+    BDRVFvdState *s = bs->opaque;
+    FvdAIOCB *acb;
+    int i;
+
+    QDEBUG ("Start prefetching\n");
+
+    if (bdrv_find_format ("blksim") == NULL) {
+        /* In simulation mode, the random seed should not be initialized here.*/
+        srandom (time (NULL) + getpid () + getpid () * 987654 + random ());
+    }
+
+    s->prefetch_acb =
+        my_qemu_malloc (sizeof (FvdAIOCB *) * s->num_prefetch_slots);
+
+    for (i = 0; i < s->num_prefetch_slots; i++) {
+        acb = s->prefetch_acb[i] =
+            my_qemu_aio_get (&fvd_aio_pool, bs, null_prefetch_cb, NULL);
+
+        if (!acb) {
+            s->prefetch_error = TRUE;
+            int j;
+            for (j = 0; j < i; j++) {
+                my_qemu_aio_release (s->prefetch_acb[j]);
+                s->prefetch_acb[j] = NULL;
+            }
+
+            my_qemu_free (s->prefetch_acb);
+            s->prefetch_acb = NULL;
+            fprintf (stderr,
+                     "qemu_aio_get() failed and cannot start prefetching.\n");
+            return;
+        }
+
+        acb->type = OP_COPY;
+    }
+
+    s->prefetch_state = PREFETCH_STATE_RUNNING;
+
+    for (i = 0; i < s->num_prefetch_slots; i++) {
+        acb = s->prefetch_acb[i];
+        acb->copy.buffered_sector_begin = acb->copy.buffered_sector_end = 0;
+        QLIST_INIT (&acb->copy_lock.dependent_writes);
+        acb->copy_lock.next.le_prev = NULL;
+        acb->copy.hd_acb = NULL;
+        acb->sector_num = 0;
+        acb->nb_sectors = 0;
+        acb->copy.iov.iov_len = s->sectors_per_prefetch * 512;
+        acb->copy.buf = acb->copy.iov.iov_base =
+            my_qemu_blockalign (bs->backing_hd, acb->copy.iov.iov_len);
+        qemu_iovec_init_external (&acb->copy.qiov, &acb->copy.iov, 1);
+    }
+
+    if (s->prefetch_timer) {
+        qemu_free_timer (s->prefetch_timer);
+        s->prefetch_timer =
+            qemu_new_timer (rt_clock, (QEMUTimerCB *) resume_prefetch, bs);
+    }
+
+    s->pause_prefetch_requested = FALSE;
+    s->unclaimed_prefetch_region_start = 0;
+    s->prefetch_read_throughput = -1;        /* Indicate not initialized. */
+    s->prefetch_write_throughput = -1;        /* Indicate not initialized. */
+    s->prefetch_read_time = 0;
+    s->prefetch_write_time = 0;
+    s->prefetch_data_read = 0;
+    s->prefetch_data_written = 0;
+    s->next_prefetch_read_slot = 0;
+    s->num_filled_prefetch_slots = 0;
+    s->prefetch_read_active = FALSE;
+
+    do_next_prefetch_read (bs, qemu_get_clock (rt_clock));
+}
+
+static void pause_prefetch (BDRVFvdState * s)
+{
+    int64_t ms = 1 + (int64_t) ((random () / ((double) RAND_MAX))
+                                * s->prefetch_throttle_time);
+    QDEBUG ("Pause prefetch for %" PRId64 " milliseconds\n", ms);
+    /* When the timer expires, it goes to resume_prefetch(). */
+    qemu_mod_timer (s->prefetch_timer, qemu_get_clock (rt_clock) + ms);
+}
+
+static void terminate_prefetch (BlockDriverState * bs, int final_state)
+{
+    BDRVFvdState *s = bs->opaque;
+    int i;
+
+    ASSERT (!s->prefetch_read_active && s->num_filled_prefetch_slots == 0);
+
+    for (i = 0; i < s->num_prefetch_slots; i++) {
+        if (s->prefetch_acb) {
+            my_qemu_vfree (s->prefetch_acb[i]->copy.buf);
+            my_qemu_aio_release (s->prefetch_acb[i]);
+            s->prefetch_acb[i] = NULL;
+        }
+    }
+    my_qemu_free (s->prefetch_acb);
+    s->prefetch_acb = NULL;
+
+    if (s->prefetch_timer) {
+        qemu_del_timer (s->prefetch_timer);
+        qemu_free_timer (s->prefetch_timer);
+        s->prefetch_timer = NULL;
+    }
+
+    if (final_state == PREFETCH_STATE_FINISHED) {
+        if (s->prefetch_error) {
+            s->prefetch_state = PREFETCH_STATE_DISABLED;
+        } else {
+            s->prefetch_state = PREFETCH_STATE_FINISHED;
+        }
+    } else {
+        s->prefetch_state = final_state;
+    }
+
+    if (s->prefetch_state == PREFETCH_STATE_FINISHED) {
+        QDEBUG ("FVD prefetching finished successfully.\n");
+
+        if (s->stale_bitmap) {
+            memset (s->stale_bitmap, 0xFF, s->bitmap_size);
+            if (s->fresh_bitmap && s->fresh_bitmap != s->stale_bitmap) {
+                memset (s->fresh_bitmap, 0xFF, s->bitmap_size);
+            }
+        }
+
+        /* Flush the table since its entries may be dirty due to 'soft-write'
+         * by prefetching or copy-on-read. */
+        flush_metadata_to_disk (bs);
+
+        /* Update the on-disk header. */
+        FvdHeader header;
+        read_fvd_header (s, &header);
+        header.all_data_in_fvd_img = TRUE;
+        update_fvd_header (s, &header);
+        s->copy_on_read = FALSE;
+    } else if (s->prefetch_state == PREFETCH_STATE_DISABLED) {
+        QDEBUG ("FVD disk prefetching disabled.\n");
+    }
+}
+
+static void do_next_prefetch_read (BlockDriverState * bs, int64_t current_time)
+{
+    FvdAIOCB *acb;
+    BDRVFvdState *s = bs->opaque;
+    int64_t begin, end;
+
+    ASSERT (!s->prefetch_read_active
+            && s->num_filled_prefetch_slots < s->num_prefetch_slots
+            && !s->pause_prefetch_requested);
+
+    /* Find the next region to prefetch. */
+    begin = s->unclaimed_prefetch_region_start;
+    while (1) {
+        if (begin >= s->nb_sectors_in_base_img) {
+            s->unclaimed_prefetch_region_start = s->nb_sectors_in_base_img;
+            if (s->num_filled_prefetch_slots == 0) {
+                terminate_prefetch (bs, PREFETCH_STATE_FINISHED);
+            }
+            return;
+        }
+        end = begin + s->sectors_per_prefetch;
+        if (end > s->nb_sectors_in_base_img) {
+            end = s->nb_sectors_in_base_img;
+        }
+        if (find_region_in_base_img (s, &begin, &end)) {
+            break;
+        }
+        begin = end;
+    }
+
+    ASSERT (begin % s->block_size == 0
+            && (end % s->block_size == 0 || end == s->nb_sectors_in_base_img));
+
+    acb = s->prefetch_acb[s->next_prefetch_read_slot];
+    acb->copy.buffered_sector_begin = acb->sector_num = begin;
+    acb->copy.buffered_sector_end = s->unclaimed_prefetch_region_start = end;
+    acb->nb_sectors = end - begin;
+    acb->copy.qiov.size = acb->copy.iov.iov_len = acb->nb_sectors * 512;
+    acb->copy.iov.iov_base = acb->copy.buf;
+    acb->copy.last_prefetch_op_start_time = current_time;
+    acb->copy.hd_acb = bdrv_aio_readv (bs->backing_hd, acb->sector_num,
+                                       &acb->copy.qiov, acb->nb_sectors,
+                                       finish_prefetch_read, acb);
+
+
+    if (acb->copy.hd_acb == NULL) {
+        QDEBUG ("PREFETCH: error when starting read for sector_num=%" PRId64
+                " nb_sectors=%d\n", acb->sector_num, acb->nb_sectors);
+        s->prefetch_error = TRUE;
+        s->prefetch_state = PREFETCH_STATE_DISABLED;
+        if (s->num_filled_prefetch_slots == 0) {
+            terminate_prefetch (bs, PREFETCH_STATE_DISABLED);
+        }
+    } else {
+        s->prefetch_read_active = TRUE;
+        QDEBUG ("PREFETCH: start read for sector_num=%" PRId64
+                " nb_sectors=%d total_prefetched_bytes=%" PRId64 "\n",
+                acb->sector_num, acb->nb_sectors, s->total_prefetch_data);
+#ifdef FVD_DEBUG
+    s->total_prefetch_data += acb->copy.iov.iov_len;
+#endif
+    }
+}
+
+static void finish_prefetch_write (void *opaque, int ret)
+{
+    FvdAIOCB *acb = (FvdAIOCB *) opaque;
+    BlockDriverState *bs = acb->common.bs;
+    BDRVFvdState *s = bs->opaque;
+    int64_t begin, end;
+    const int64_t current_time = qemu_get_clock (rt_clock);
+
+    ASSERT (acb->nb_sectors > 0 && s->num_filled_prefetch_slots > 0);
+
+    QLIST_REMOVE (acb, copy_lock.next);
+    restart_dependent_writes (acb);
+    acb->copy.hd_acb = NULL;
+    QLIST_INIT (&acb->copy_lock.dependent_writes);
+
+    if (ret != 0) {
+        QDEBUG ("PREFETCH: finished write with error for sector_num=%" PRId64
+                " nb_sectors=%d\n", acb->sector_num, acb->nb_sectors);
+        s->num_filled_prefetch_slots = 0;
+        s->prefetch_error = TRUE;
+        s->prefetch_state = PREFETCH_STATE_DISABLED;
+        if (!s->prefetch_read_active) {
+            terminate_prefetch (bs, PREFETCH_STATE_DISABLED);
+        }
+        return;
+    }
+
+    /* No need to update the on-disk bitmap or the stale bitmap. See Section
+     * 3.3.4 of the FVD-cow paper. */
+    update_fresh_bitmap (acb->sector_num, acb->nb_sectors, s);
+
+    const int64_t write_time =
+        current_time - acb->copy.last_prefetch_op_start_time;
+    s->prefetch_write_time += write_time;
+    s->prefetch_data_written += acb->nb_sectors * 512;
+
+    QDEBUG ("PREFETCH: write_finished  sector_num=%" PRId64
+            " nb_sectors=%d  write_time=%d (ms)\n", acb->sector_num,
+            acb->nb_sectors, (int) write_time);
+
+    /* Calculate throughput and determine if it needs to pause prefetching due
+     * to low throughput. */
+    if (s->prefetch_timer && s->prefetch_throttle_time > 0
+        && !s->pause_prefetch_requested
+        && s->prefetch_write_time > s->prefetch_write_throughput_measure_time) {
+        const double this_round_throughput =
+            s->prefetch_data_written / (double) s->prefetch_write_time;
+        if (s->prefetch_write_throughput < 0) {
+            /* Previously not initialized. */
+            s->prefetch_write_throughput = this_round_throughput;
+        } else {
+            s->prefetch_write_throughput =
+                s->prefetch_perf_calc_alpha * s->prefetch_write_throughput +
+                (1 - s->prefetch_perf_calc_alpha) * this_round_throughput;
+        }
+        if (s->prefetch_write_throughput < s->prefetch_min_write_throughput) {
+            QDEBUG ("PREFETCH: slow_write  this_write=%d (ms)  "
+                    "this_write_throughput=%.3lf (MB/s)   "
+                    "avg_write_throughput=%.3lf (MB/s)\n",
+                    (int) write_time,
+                    this_round_throughput / 1048576 * 1000,
+                    s->prefetch_write_throughput / 1048576 * 1000);
+
+            /* Make a randomized decision to pause prefetching. This avoids
+             * pausing all contending FVD drivers. See Section 3.4.2 of the
+             * FVD-cow paper. */
+            if (random () > (RAND_MAX / 2)) {
+                QDEBUG ("PREFETCH: pause requested.\n");
+                s->pause_prefetch_requested = TRUE;
+            } else {
+                QDEBUG ("PREFETCH: continue due to 50%% probability, despite "
+                        "slow write.\n");
+                s->prefetch_write_throughput = -1; /*Indicate not initialized.*/
+            }
+        } else {
+            QDEBUG ("PREFETCH: this_write_throughput=%.3lf (MB/s)   "
+                    "avg_write_throughput=%.3lf (MB/s)\n",
+                    this_round_throughput / 1048576 * 1000,
+                    s->prefetch_write_throughput / 1048576 * 1000);
+        }
+
+        /* Preparing for measuring the next round of throughput. */
+        s->prefetch_data_written = 0;
+        s->prefetch_write_time = 0;
+    }
+
+    /* Find in this prefetch slot the next section of prefetched but
+     * not-yet-written data. */
+    begin = acb->sector_num + acb->nb_sectors;
+    if (begin < acb->copy.buffered_sector_end) {
+        end = acb->copy.buffered_sector_end;
+        if (find_region_in_base_img (s, &begin, &end)) {
+            acb->sector_num = begin;
+            acb->nb_sectors = end - begin;
+            acb->copy.iov.iov_base = acb->copy.buf +
+                            (begin - acb->copy.buffered_sector_begin) * 512;
+            acb->copy.qiov.size = acb->copy.iov.iov_len = acb->nb_sectors * 512;
+            QDEBUG ("PREFETCH: write_data  sector_num=%" PRId64
+                    " nb_sectors=%d\n", acb->sector_num, acb->nb_sectors);
+            acb->copy.hd_acb = store_data (TRUE, acb, bs, acb->sector_num,
+                                           &acb->copy.qiov, acb->nb_sectors,
+                                           finish_prefetch_write, acb);
+            if (acb->copy.hd_acb == NULL) {
+                QDEBUG ("PREFETCH: error in starting bdrv_aio_writev().\n");
+                s->num_filled_prefetch_slots = 0;
+                s->prefetch_error = TRUE;
+                s->prefetch_state = PREFETCH_STATE_DISABLED;
+                if (!s->prefetch_read_active) {
+                    terminate_prefetch (bs, PREFETCH_STATE_DISABLED);
+                }
+            } else {
+                acb->copy_lock.begin = begin;
+                acb->copy_lock.end = end;
+                QLIST_INSERT_HEAD (&s->copy_locks, acb, copy_lock.next);
+            }
+
+            return;
+        }
+    }
+
+    s->num_filled_prefetch_slots--;
+
+    if (s->prefetch_state == PREFETCH_STATE_DISABLED) {
+        if (s->num_filled_prefetch_slots == 0 && !s->prefetch_read_active) {
+            terminate_prefetch (bs, PREFETCH_STATE_DISABLED);
+        }
+        return;
+    }
+
+    if (begin >= s->nb_sectors_in_base_img) {
+        /* Prefetching finished. */
+        ASSERT (s->num_filled_prefetch_slots == 0 && !s->prefetch_read_active);
+        terminate_prefetch (bs, PREFETCH_STATE_FINISHED);
+        return;
+    }
+
+    if (s->pause_prefetch_requested) {
+        if (s->num_filled_prefetch_slots == 0) {
+            if (!s->prefetch_read_active) {
+                pause_prefetch (s);
+            } else {
+                QDEBUG ("PREFETCH: wait for the read operation to finish in "
+                        "order to pause prefetch.\n");
+            }
+            return;
+        }
+    }
+
+    /* Write out data in the next prefetched slot. */
+    while (s->num_filled_prefetch_slots > 0) {
+        int k = s->next_prefetch_read_slot - s->num_filled_prefetch_slots;
+        if (k < 0) {
+            k += s->num_prefetch_slots;
+        }
+        acb = s->prefetch_acb[k];
+
+        int64_t begin = acb->copy.buffered_sector_begin;
+        int64_t end = acb->copy.buffered_sector_end;
+        if (find_region_in_base_img (s, &begin, &end)) {
+            acb->copy.last_prefetch_op_start_time = current_time;
+            acb->sector_num = begin;
+            acb->nb_sectors = end - begin;
+            acb->copy.iov.iov_base =
+                acb->copy.buf + (begin - acb->copy.buffered_sector_begin) * 512;
+            acb->copy.qiov.size = acb->copy.iov.iov_len = acb->nb_sectors * 512;
+            QDEBUG ("PREFETCH: writes data: sector_num=%" PRId64
+                    " nb_sectors=%d\n", acb->sector_num, acb->nb_sectors);
+            acb->copy.hd_acb = store_data (TRUE, acb, bs, acb->sector_num,
+                                           &acb->copy.qiov, acb->nb_sectors,
+                                           finish_prefetch_write, acb);
+
+            if (acb->copy.hd_acb == NULL) {
+                QDEBUG ("PREFETCH: error cannot get a control block to write "
+                        "a prefetched block.\n");
+                s->prefetch_error = TRUE;
+                s->prefetch_state = PREFETCH_STATE_DISABLED;
+                s->num_filled_prefetch_slots = 0;
+                if (!s->prefetch_read_active) {
+                    terminate_prefetch (bs, PREFETCH_STATE_DISABLED);
+                }
+                return;
+            }
+
+            acb->copy_lock.begin = begin;
+            acb->copy_lock.end = end;
+            QLIST_INSERT_HEAD (&s->copy_locks, acb, copy_lock.next);
+            break;
+        } else {
+            QDEBUG ("PREFETCH: discard prefetched data as they have been "
+                    "covered: sector_num=%" PRId64 " nb_sectors=%d\n",
+                    acb->sector_num, acb->nb_sectors);
+            s->num_filled_prefetch_slots--;
+        }
+    }
+
+    /* If the reader was stopped due to lack of slots, start the reader. */
+    if (!s->prefetch_read_active && !s->pause_prefetch_requested) {
+        do_next_prefetch_read (bs, current_time);
+    }
+}
+
+static void finish_prefetch_read (void *opaque, int ret)
+{
+    FvdAIOCB *acb = (FvdAIOCB *) opaque;
+    BlockDriverState *bs = acb->common.bs;
+    BDRVFvdState *s = bs->opaque;
+
+    ASSERT (s->prefetch_read_active && s->num_filled_prefetch_slots >= 0
+            && s->num_filled_prefetch_slots < s->num_prefetch_slots);
+
+    s->prefetch_read_active = FALSE;
+    acb->copy.hd_acb = NULL;
+
+    if (s->prefetch_state == PREFETCH_STATE_DISABLED) {
+        if (s->num_filled_prefetch_slots == 0) {
+            terminate_prefetch (bs, PREFETCH_STATE_DISABLED);
+        }
+        return;
+    }
+
+    if (ret != 0) {
+        QDEBUG ("PREFETCH: read_error  sector_num=%" PRId64 " nb_sectors=%d.\n",
+                acb->sector_num, acb->nb_sectors);
+        s->prefetch_error = TRUE;
+        s->prefetch_state = PREFETCH_STATE_DISABLED;
+        if (s->num_filled_prefetch_slots == 0) {
+            terminate_prefetch (bs, PREFETCH_STATE_DISABLED);
+        }
+        return;
+    }
+
+    const int64_t current_time = qemu_get_clock (rt_clock);
+    const int64_t read_time = current_time -
+                        acb->copy.last_prefetch_op_start_time;
+    s->prefetch_read_time += read_time;
+    s->prefetch_data_read += acb->nb_sectors * 512;
+
+    QDEBUG ("PREFETCH: read_finished  sector_num=%" PRId64
+            " nb_sectors=%d  read_time=%d (ms)\n", acb->sector_num,
+            acb->nb_sectors, (int) read_time);
+
+    /* Calculate throughput and determine if it needs to pause prefetching due
+     * to low throughput. */
+    if (s->prefetch_timer && s->prefetch_throttle_time > 0
+        && !s->pause_prefetch_requested
+        && s->prefetch_read_time > s->prefetch_read_throughput_measure_time) {
+        const double this_round_throughput =
+            s->prefetch_data_read / (double) s->prefetch_read_time;
+        if (s->prefetch_read_throughput < 0) {
+            /* Previously not initialized. */
+            s->prefetch_read_throughput = this_round_throughput;
+        } else {
+            s->prefetch_read_throughput = s->prefetch_perf_calc_alpha *
+                s->prefetch_read_throughput +
+                (1 - s->prefetch_perf_calc_alpha) * this_round_throughput;
+        }
+        if (s->prefetch_read_throughput < s->prefetch_min_read_throughput) {
+            QDEBUG ("PREFETCH: slow_read read_time=%d (ms)   "
+                    "this_read_throughput=%.3lf (MB/s) "
+                    "avg_read_throughput=%.3lf (MB/s)\n",
+                    (int) read_time, this_round_throughput / 1048576 * 1000,
+                    s->prefetch_read_throughput / 1048576 * 1000);
+
+            /* Make a randomized decision to pause prefetching. This avoids
+             * pausing all contending FVD drivers. See Section 3.4.2 of the
+             * FVD-cow paper. */
+            if (random () > (RAND_MAX / 2)) {
+                QDEBUG ("PREFETCH: pause requested.\n");
+                s->pause_prefetch_requested = TRUE;
+            } else {
+                QDEBUG ("PREFETCH: continue due to 50%% probability, "
+                        "despite slow read.\n");
+                s->prefetch_read_throughput = -1; /*Indicate not initialized.*/
+            }
+        } else {
+            QDEBUG ("PREFETCH: this_read_throughput=%.3lf (MB/s)    "
+                    "avg_read_throughput=%.3lf (MB/s)\n",
+                    this_round_throughput / 1048576 * 1000,
+                    s->prefetch_read_throughput / 1048576 * 1000);
+        }
+
+        /* Preparing for measuring the next round of throughput. */
+        s->prefetch_data_read = 0;
+        s->prefetch_read_time = 0;
+    }
+
+    if (s->num_filled_prefetch_slots > 0) {
+        /* There is one ongoing write for prefetched data. This slot will be
+         * written out later. */
+        s->num_filled_prefetch_slots++;
+        s->next_prefetch_read_slot++;
+        if (s->next_prefetch_read_slot >= s->num_prefetch_slots) {
+            s->next_prefetch_read_slot = 0;
+        }
+    } else {
+        /* The writer is not active. Start the writer. */
+        int64_t begin = acb->copy.buffered_sector_begin;
+        int64_t end = acb->copy.buffered_sector_end;
+        if (find_region_in_base_img (s, &begin, &end)) {
+            acb->copy.last_prefetch_op_start_time = current_time;
+            acb->sector_num = begin;
+            acb->nb_sectors = end - begin;
+            acb->copy.iov.iov_base =
+                acb->copy.buf + (begin - acb->copy.buffered_sector_begin) * 512;
+            acb->copy.qiov.size = acb->copy.iov.iov_len = acb->nb_sectors * 512;
+            QDEBUG ("PREFETCH: writes_data sector_num=%" PRId64
+                    " nb_sectors=%d\n", acb->sector_num, acb->nb_sectors);
+            acb->copy.hd_acb = store_data (TRUE, acb, bs, acb->sector_num,
+                                           &acb->copy.qiov, acb->nb_sectors,
+                                           finish_prefetch_write, acb);
+
+            if (acb->copy.hd_acb == NULL) {
+                QDEBUG ("PREFETCH: error cannot get control block to write a "
+                        "prefetched block.\n");
+                s->prefetch_error = TRUE;
+                s->prefetch_state = PREFETCH_STATE_DISABLED;
+                if (s->num_filled_prefetch_slots == 0) {
+                    terminate_prefetch (bs, PREFETCH_STATE_DISABLED);
+                }
+                return;
+            }
+
+            acb->copy_lock.begin = begin;
+            acb->copy_lock.end = end;
+            QLIST_INSERT_HEAD (&s->copy_locks, acb, copy_lock.next);
+            s->num_filled_prefetch_slots++;
+            s->next_prefetch_read_slot++;
+            if (s->next_prefetch_read_slot >= s->num_prefetch_slots) {
+                s->next_prefetch_read_slot = 0;
+            }
+        } else {
+            /* The current prefetch slot will be reused to prefetch the next
+             * bunch of data. */
+            QDEBUG ("PREFETCH: discard prefetched data as they have been "
+                    "covered: sector_num=%" PRId64 " nb_sectors=%d\n",
+                    acb->sector_num, acb->nb_sectors);
+        }
+    }
+
+    if (s->num_filled_prefetch_slots >= s->num_prefetch_slots) {
+        QDEBUG ("PREFETCH: halt read because no slot is available.\n");
+    } else {
+        if (s->pause_prefetch_requested) {
+            if (s->num_filled_prefetch_slots == 0) {
+                pause_prefetch (s);
+            }
+        } else {
+            do_next_prefetch_read (bs, current_time);
+        }
+    }
+}
+
+static void resume_prefetch (BlockDriverState * bs, int64_t current_time)
+{
+    BDRVFvdState *s = bs->opaque;
+
+    if (s->prefetch_state != PREFETCH_STATE_RUNNING) {
+        return;
+    }
+
+    ASSERT (s->num_filled_prefetch_slots == 0 && !s->prefetch_read_active);
+    QDEBUG ("PREFETCH: resume.\n");
+
+    s->pause_prefetch_requested = FALSE;
+    s->prefetch_read_throughput = -1;        /* Indicate not initialized. */
+    s->prefetch_write_throughput = -1;        /* Indicate not initialized. */
+    s->prefetch_read_time = 0;
+    s->prefetch_write_time = 0;
+    s->prefetch_data_read = 0;
+    s->prefetch_data_written = 0;
+
+    do_next_prefetch_read (bs, qemu_get_clock (rt_clock));
+}