Patchwork [RFC,V6,24/33] qcow2: Integrate deduplication in qcow2_co_writev loop.

login
register
mail settings
Submitter Benoît Canet
Date Feb. 6, 2013, 12:31 p.m.
Message ID <1360153926-9492-25-git-send-email-benoit@irqsave.net>
Download mbox | patch
Permalink /patch/218636/
State New
Headers show

Comments

Benoît Canet - Feb. 6, 2013, 12:31 p.m.
Signed-off-by: Benoit Canet <benoit@irqsave.net>
---
 block/qcow2.c |   87 +++++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 85 insertions(+), 2 deletions(-)
Stefan Hajnoczi - Feb. 8, 2013, 10:43 a.m.
On Wed, Feb 06, 2013 at 01:31:57PM +0100, Benoît Canet wrote:
>      while (remaining_sectors != 0) {
>  
>          l2meta = NULL;
>  
>          trace_qcow2_writev_start_part(qemu_coroutine_self());
> +
> +        if (atomic_dedup_is_running && ds.nb_undedupable_sectors == 0) {
> +            /* Try to deduplicate as much clusters as possible */
> +            deduped_sectors_nr = qcow2_dedup(bs,
> +                                             &ds,
> +                                             sector_num,
> +                                             dedup_cluster_data,
> +                                             dedup_cluster_data_nr);
> +
> +            if (deduped_sectors_nr < 0) {
> +                goto fail;
> +            }
> +
> +            remaining_sectors -= deduped_sectors_nr;
> +            sector_num += deduped_sectors_nr;
> +            bytes_done += deduped_sectors_nr * 512;
> +
> +            /* no more data to write -> exit */
> +            if (remaining_sectors <= 0) {
> +                goto fail;
> +            }

goto fail?  Should this be "break" so we get ret = 0?

> +
> +            /* if we deduped something trace it */
> +            if (deduped_sectors_nr) {
> +                trace_qcow2_writev_done_part(qemu_coroutine_self(),
> +                                             deduped_sectors_nr);
> +                trace_qcow2_writev_start_part(qemu_coroutine_self());
> +            }
> +        }
> +
>          index_in_cluster = sector_num & (s->cluster_sectors - 1);
> -        n_end = index_in_cluster + remaining_sectors;
> +        n_end = atomic_dedup_is_running &&
> +                ds.nb_undedupable_sectors < remaining_sectors ?
> +                index_in_cluster + ds.nb_undedupable_sectors :
> +                index_in_cluster + remaining_sectors;

I find this expression hard to understand.

We only get here if ds.nb_undedupable_sectors > 0.  In other words, we
tried to dedup but failed, so we must write data into the image file.

Can we ensure that ds.nb_undedupable_sectors is limited to at most
remaining_sectors?  Then the expression becomes clearer:

if (atomic_dedup_is_running) {
    n_end = index_in_cluster + ds.nb_undedupable_sectors;
} else {
    n_end = index_in_cluster + remaining_sectors;
}

> +
>          if (s->crypt_method &&
>              n_end > QCOW_MAX_CRYPT_CLUSTERS * s->cluster_sectors) {
>              n_end = QCOW_MAX_CRYPT_CLUSTERS * s->cluster_sectors;
> @@ -852,6 +915,24 @@ static coroutine_fn int qcow2_co_writev(BlockDriverState *bs,
>                  cur_nr_sectors * 512);
>          }
>  
> +        /* Write the non duplicated clusters hashes to disk */
> +        if (atomic_dedup_is_running) {
> +            int count = cur_nr_sectors / s->cluster_sectors;
> +            int has_ending = ((cluster_offset >> 9) + index_in_cluster +
> +                             cur_nr_sectors) & (s->cluster_sectors - 1);
> +            count = index_in_cluster ? count + 1 : count;

An if statement is easier to read:

if (index_in_cluster) {
    count++;
}

> +            count = has_ending ? count + 1 : count;

Same here.

> +            ret = qcow2_dedup_store_new_hashes(bs,
> +                                               &ds,
> +                                               count,
> +                                               sector_num,
> +                                               (cluster_offset >> 9));

Is it safe to store the new hashes before the data itself is written?
What happens if there is a power failure before data is written.

> +            if (ret < 0) {
> +                goto fail;
> +            }
> +        }
> +
> +        BLKDBG_EVENT(bs->file, BLKDBG_WRITE_AIO);

Merge conflict?  BLKDBG_EVENT() is already called below.
Benoît Canet - Feb. 27, 2013, 2:31 p.m.
> I find this expression hard to understand.
> 
> We only get here if ds.nb_undedupable_sectors > 0.  In other words, we
> tried to dedup but failed, so we must write data into the image file.
> 
> Can we ensure that ds.nb_undedupable_sectors is limited to at most
> remaining_sectors?  Then the expression becomes clearer:

ds.nb_undedupable_sectors can be bigger than remaining_sectors if
dedup_cluster_data was completed at its begining and/or at its ending in case of
an unaligned write.

Maybe I could grow/replace qiov at the begining of writev to ingest the
completed data and increase remaining_sectors so its will be alway >=
ds.nb_undedupable_sectors ?

What do you think about this solution ?

Best regards

Benoît
Stefan Hajnoczi - Feb. 28, 2013, 9:38 a.m.
On Wed, Feb 27, 2013 at 03:31:48PM +0100, Benoît Canet wrote:
> > I find this expression hard to understand.
> > 
> > We only get here if ds.nb_undedupable_sectors > 0.  In other words, we
> > tried to dedup but failed, so we must write data into the image file.
> > 
> > Can we ensure that ds.nb_undedupable_sectors is limited to at most
> > remaining_sectors?  Then the expression becomes clearer:
> 
> ds.nb_undedupable_sectors can be bigger than remaining_sectors if
> dedup_cluster_data was completed at its begining and/or at its ending in case of
> an unaligned write.
> 
> Maybe I could grow/replace qiov at the begining of writev to ingest the
> completed data and increase remaining_sectors so its will be alway >=
> ds.nb_undedupable_sectors ?
> 
> What do you think about this solution ?

Sure, if the semantics of the rest of this function can cope with
extending qiov.

Stefan

Patch

diff --git a/block/qcow2.c b/block/qcow2.c
index 9cbb2f0..f39b6d5 100644
--- a/block/qcow2.c
+++ b/block/qcow2.c
@@ -331,6 +331,7 @@  static int qcow2_open(BlockDriverState *bs, int flags)
     QCowHeader header;
     uint64_t ext_end;
 
+    s->has_dedup = false;
     ret = bdrv_pread(bs->file, 0, &header, sizeof(header));
     if (ret < 0) {
         goto fail;
@@ -793,13 +794,18 @@  static coroutine_fn int qcow2_co_writev(BlockDriverState *bs,
     BDRVQcowState *s = bs->opaque;
     int index_in_cluster;
     int n_end;
-    int ret;
+    int ret = 0;
     int cur_nr_sectors; /* number of sectors in current iteration */
     uint64_t cluster_offset;
     QEMUIOVector hd_qiov;
     uint64_t bytes_done = 0;
     uint8_t *cluster_data = NULL;
     QCowL2Meta *l2meta = NULL;
+    uint8_t *dedup_cluster_data = NULL;
+    int dedup_cluster_data_nr;
+    int deduped_sectors_nr;
+    QCowDedupState ds;
+    bool atomic_dedup_is_running;
 
     trace_qcow2_writev_start_req(qemu_coroutine_self(), sector_num,
                                  remaining_sectors);
@@ -810,13 +816,70 @@  static coroutine_fn int qcow2_co_writev(BlockDriverState *bs,
 
     qemu_co_mutex_lock(&s->lock);
 
+    atomic_dedup_is_running = qcow2_dedup_is_running(bs);
+    if (atomic_dedup_is_running) {
+        QTAILQ_INIT(&ds.undedupables);
+        ds.phash.reuse = false;
+        ds.nb_undedupable_sectors = 0;
+        ds.nb_clusters_processed = 0;
+
+        /* if deduplication is on we make sure dedup_cluster_data
+         * contains a multiple of cluster size of data in order
+         * to compute the hashes
+         */
+        ret = qcow2_dedup_read_missing_and_concatenate(bs,
+                                                       qiov,
+                                                       sector_num,
+                                                       remaining_sectors,
+                                                       &dedup_cluster_data,
+                                                       &dedup_cluster_data_nr);
+
+        if (ret < 0) {
+            goto fail;
+        }
+    }
+
     while (remaining_sectors != 0) {
 
         l2meta = NULL;
 
         trace_qcow2_writev_start_part(qemu_coroutine_self());
+
+        if (atomic_dedup_is_running && ds.nb_undedupable_sectors == 0) {
+            /* Try to deduplicate as much clusters as possible */
+            deduped_sectors_nr = qcow2_dedup(bs,
+                                             &ds,
+                                             sector_num,
+                                             dedup_cluster_data,
+                                             dedup_cluster_data_nr);
+
+            if (deduped_sectors_nr < 0) {
+                goto fail;
+            }
+
+            remaining_sectors -= deduped_sectors_nr;
+            sector_num += deduped_sectors_nr;
+            bytes_done += deduped_sectors_nr * 512;
+
+            /* no more data to write -> exit */
+            if (remaining_sectors <= 0) {
+                goto fail;
+            }
+
+            /* if we deduped something trace it */
+            if (deduped_sectors_nr) {
+                trace_qcow2_writev_done_part(qemu_coroutine_self(),
+                                             deduped_sectors_nr);
+                trace_qcow2_writev_start_part(qemu_coroutine_self());
+            }
+        }
+
         index_in_cluster = sector_num & (s->cluster_sectors - 1);
-        n_end = index_in_cluster + remaining_sectors;
+        n_end = atomic_dedup_is_running &&
+                ds.nb_undedupable_sectors < remaining_sectors ?
+                index_in_cluster + ds.nb_undedupable_sectors :
+                index_in_cluster + remaining_sectors;
+
         if (s->crypt_method &&
             n_end > QCOW_MAX_CRYPT_CLUSTERS * s->cluster_sectors) {
             n_end = QCOW_MAX_CRYPT_CLUSTERS * s->cluster_sectors;
@@ -852,6 +915,24 @@  static coroutine_fn int qcow2_co_writev(BlockDriverState *bs,
                 cur_nr_sectors * 512);
         }
 
+        /* Write the non duplicated clusters hashes to disk */
+        if (atomic_dedup_is_running) {
+            int count = cur_nr_sectors / s->cluster_sectors;
+            int has_ending = ((cluster_offset >> 9) + index_in_cluster +
+                             cur_nr_sectors) & (s->cluster_sectors - 1);
+            count = index_in_cluster ? count + 1 : count;
+            count = has_ending ? count + 1 : count;
+            ret = qcow2_dedup_store_new_hashes(bs,
+                                               &ds,
+                                               count,
+                                               sector_num,
+                                               (cluster_offset >> 9));
+            if (ret < 0) {
+                goto fail;
+            }
+        }
+
+        BLKDBG_EVENT(bs->file, BLKDBG_WRITE_AIO);
         qemu_co_mutex_unlock(&s->lock);
         BLKDBG_EVENT(bs->file, BLKDBG_WRITE_AIO);
         trace_qcow2_writev_data(qemu_coroutine_self(),
@@ -883,6 +964,7 @@  static coroutine_fn int qcow2_co_writev(BlockDriverState *bs,
             l2meta = NULL;
         }
 
+        ds.nb_undedupable_sectors -= cur_nr_sectors;
         remaining_sectors -= cur_nr_sectors;
         sector_num += cur_nr_sectors;
         bytes_done += cur_nr_sectors * 512;
@@ -903,6 +985,7 @@  fail:
 
     qemu_iovec_destroy(&hd_qiov);
     qemu_vfree(cluster_data);
+    qemu_vfree(dedup_cluster_data);
     trace_qcow2_writev_done_req(qemu_coroutine_self(), ret);
 
     return ret;