diff mbox

[v8,10/14] migration: Add the core code for decompression

Message ID 1429089983-24644-11-git-send-email-liang.z.li@intel.com
State New
Headers show

Commit Message

Li, Liang Z April 15, 2015, 9:26 a.m. UTC
Implement the core logic of multiple thread decompression,
the decompression can work now.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
---
 arch_init.c | 51 ++++++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 48 insertions(+), 3 deletions(-)
diff mbox

Patch

diff --git a/arch_init.c b/arch_init.c
index 8fb2ea4..a4020d5 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -889,7 +889,6 @@  static inline void start_compression(CompressParam *param)
     qemu_mutex_unlock(&param->mutex);
 }
 
-
 static uint64_t bytes_transferred;
 
 static void flush_compressed_data(QEMUFile *f)
@@ -1458,8 +1457,28 @@  void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
 
 static void *do_data_decompress(void *opaque)
 {
+    DecompressParam *param = opaque;
+    size_t pagesize;
+
     while (true) {
-        /* To be done */
+        qemu_mutex_lock(&param->mutex);
+        while (!param->start && !param->quit) {
+            qemu_cond_wait(&param->cond, &param->mutex);
+        }
+        if (param->quit) {
+            qemu_mutex_unlock(&param->mutex);
+            break;
+        }
+        pagesize = TARGET_PAGE_SIZE;
+        /* uncompress() will return failed in some case, especially
+         * when the page is dirted when doing the compression, it's
+         * not a problem because the dirty page will be retransferred
+         * and uncompress() won't break the data in other pages.
+         */
+        uncompress((Bytef *)param->des, &pagesize,
+                   (const Bytef *)param->compbuf, param->len);
+        param->start = false;
+        qemu_mutex_unlock(&param->mutex);
     }
 
     return NULL;
@@ -1489,6 +1508,12 @@  void migrate_decompress_threads_join(void)
 
     thread_count = migrate_decompress_threads();
     for (i = 0; i < thread_count; i++) {
+        qemu_mutex_lock(&decomp_param[i].mutex);
+        decomp_param[i].quit = true;
+        qemu_cond_signal(&decomp_param[i].cond);
+        qemu_mutex_unlock(&decomp_param[i].mutex);
+    }
+    for (i = 0; i < thread_count; i++) {
         qemu_thread_join(decompress_threads + i);
         qemu_mutex_destroy(&decomp_param[i].mutex);
         qemu_cond_destroy(&decomp_param[i].cond);
@@ -1505,7 +1530,27 @@  void migrate_decompress_threads_join(void)
 static void decompress_data_with_multi_threads(uint8_t *compbuf,
                                                void *host, int len)
 {
-    /* To be done */
+    int idx, thread_count;
+
+    thread_count = migrate_decompress_threads();
+    while (true) {
+        for (idx = 0; idx < thread_count; idx++) {
+            qemu_mutex_lock(&decomp_param[idx].mutex);
+            if (!decomp_param[idx].start) {
+                memcpy(decomp_param[idx].compbuf, compbuf, len);
+                decomp_param[idx].des = host;
+                decomp_param[idx].len = len;
+                decomp_param[idx].start = true;
+                qemu_cond_signal(&decomp_param[idx].cond);
+                qemu_mutex_unlock(&decomp_param[idx].mutex);
+                break;
+            }
+            qemu_mutex_unlock(&decomp_param[idx].mutex);
+        }
+        if (idx < thread_count) {
+            break;
+        }
+    }
 }
 
 static int ram_load(QEMUFile *f, void *opaque, int version_id)