diff mbox

[v5,10/12] migration: Add the core code for decompression

Message ID 1423623986-590-11-git-send-email-liang.z.li@intel.com
State New
Headers show

Commit Message

Li, Liang Z Feb. 11, 2015, 3:06 a.m. UTC
Implement the core logic of multiple thread decompression,
the decompression can work now.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
---
 arch_init.c | 50 ++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 48 insertions(+), 2 deletions(-)

Comments

Juan Quintela Feb. 11, 2015, 11:48 a.m. UTC | #1
Liang Li <liang.z.li@intel.com> wrote:
> Implement the core logic of multiple thread decompression,
> the decompression can work now.
>
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>

I think it should be a good idea to use the same scheme for compression
and decompression.  So, if you decide to use my proposal for locking
there, please do the same here.

Later, Juan.
diff mbox

Patch

diff --git a/arch_init.c b/arch_init.c
index 12dfa34..f9e6a95 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -833,6 +833,13 @@  static inline void start_compression(CompressParam *param)
     qemu_mutex_unlock(&param->mutex);
 }
 
+static inline void start_decompression(DecompressParam *param)
+{
+    qemu_mutex_lock(&param->mutex);
+    param->busy = true;
+    qemu_cond_signal(&param->cond);
+    qemu_mutex_unlock(&param->mutex);
+}
 
 static uint64_t bytes_transferred;
 
@@ -1379,8 +1386,26 @@  void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
 
 static void *do_data_decompress(void *opaque)
 {
+    DecompressParam *param = opaque;
+    size_t pagesize;
+
     while (!quit_decomp_thread) {
-        /* To be done */
+        qemu_mutex_lock(&param->mutex);
+        while (!param->busy && !quit_decomp_thread) {
+            qemu_cond_wait(&param->cond, &param->mutex);
+            pagesize = TARGET_PAGE_SIZE;
+            if (!quit_decomp_thread) {
+                /* uncompress() will return failed in some case, especially
+                 * when the page is dirted when doing the compression, it's
+                 * not a problem because the dirty page will be retransferred
+                 * and uncompress() won't break the data in other pages.
+                 */
+                uncompress((Bytef *)param->des, &pagesize,
+                           (const Bytef *)param->compbuf, param->len);
+            }
+            param->busy = false;
+        }
+        qemu_mutex_unlock(&param->mutex);
     }
 
     return NULL;
@@ -1411,6 +1436,11 @@  void migrate_decompress_threads_join(void)
     quit_decomp_thread = true;
     thread_count = migrate_decompress_threads();
     for (i = 0; i < thread_count; i++) {
+        qemu_mutex_lock(&decomp_param[i].mutex);
+        qemu_cond_signal(&decomp_param[i].cond);
+        qemu_mutex_unlock(&decomp_param[i].mutex);
+    }
+    for (i = 0; i < thread_count; i++) {
         qemu_thread_join(decompress_threads + i);
         qemu_mutex_destroy(&decomp_param[i].mutex);
         qemu_cond_destroy(&decomp_param[i].cond);
@@ -1427,7 +1457,23 @@  void migrate_decompress_threads_join(void)
 static void decompress_data_with_multi_threads(uint8_t *compbuf,
                                                void *host, int len)
 {
-    /* To be done */
+    int idx, thread_count;
+
+    thread_count = migrate_decompress_threads();
+    while (true) {
+        for (idx = 0; idx < thread_count; idx++) {
+            if (!decomp_param[idx].busy) {
+                memcpy(decomp_param[idx].compbuf, compbuf, len);
+                decomp_param[idx].des = host;
+                decomp_param[idx].len = len;
+                start_decompression(&decomp_param[idx]);
+                break;
+            }
+        }
+        if (idx < thread_count) {
+            break;
+        }
+    }
 }
 
 static int ram_load(QEMUFile *f, void *opaque, int version_id)