diff mbox

[v7,10/14] migration: Add the core code for decompression

Message ID 1428474011-30797-11-git-send-email-liang.z.li@intel.com
State New
Headers show

Commit Message

Li, Liang Z April 8, 2015, 6:20 a.m. UTC
Implement the core logic of multiple thread decompression,
the decompression can work now.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
---
 arch_init.c | 51 ++++++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 48 insertions(+), 3 deletions(-)

Comments

Juan Quintela April 8, 2015, 11:34 a.m. UTC | #1
Liang Li <liang.z.li@intel.com> wrote:
> Implement the core logic of multiple thread decompression,
> the decompression can work now.
>
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
> ---
>  arch_init.c | 51 ++++++++++++++++++++++++++++++++++++++++++++++++---
>  1 file changed, 48 insertions(+), 3 deletions(-)
>
> diff --git a/arch_init.c b/arch_init.c
> index 8fb2ea4..b88e6b2 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -889,7 +889,6 @@ static inline void start_compression(CompressParam *param)
>      qemu_mutex_unlock(&param->mutex);
>  }
>  
> -
>  static uint64_t bytes_transferred;
>  
>  static void flush_compressed_data(QEMUFile *f)
> @@ -1458,8 +1457,28 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
>  
>  static void *do_data_decompress(void *opaque)
>  {
> +    DecompressParam *param = opaque;
> +    size_t pagesize;
> +
>      while (true) {
> -        /* To be done */
> +        qemu_mutex_lock(&param->mutex);
> +        while (!param->start && !param->quit) {
> +            qemu_cond_wait(&param->cond, &param->mutex);

We don't check here if we have received a quit


> +            pagesize = TARGET_PAGE_SIZE;
> +            /* uncompress() will return failed in some case, especially
> +             * when the page is dirted when doing the compression, it's
> +             * not a problem because the dirty page will be retransferred
> +             * and uncompress() won't break the data in other pages.
> +             */
> +            uncompress((Bytef *)param->des, &pagesize,
> +                       (const Bytef *)param->compbuf, param->len);
> +            param->start = false;
> +        }
> +        if (param->quit) {
> +            qemu_mutex_unlock(&param->mutex);
> +            break;
> +        }
> +        qemu_mutex_unlock(&param->mutex);
>      }
>  
>      return NULL;
> @@ -1489,6 +1508,12 @@ void migrate_decompress_threads_join(void)
>  
>      thread_count = migrate_decompress_threads();
>      for (i = 0; i < thread_count; i++) {
> +        qemu_mutex_lock(&decomp_param[i].mutex);
> +        decomp_param[i].quit = true;
> +        qemu_cond_signal(&decomp_param[i].cond);

We set quit and send a signal to wakeup the thread, but it will try to
uncompress a page.  Shouldn't we change the position of the param->quit
check for exit?  I think it should be inside the loop.

> +        qemu_mutex_unlock(&decomp_param[i].mutex);
> +    }
> +    for (i = 0; i < thread_count; i++) {
>          qemu_thread_join(decompress_threads + i);
>          qemu_mutex_destroy(&decomp_param[i].mutex);
>          qemu_cond_destroy(&decomp_param[i].cond);
> @@ -1505,7 +1530,27 @@ void migrate_decompress_threads_join(void)
>  static void decompress_data_with_multi_threads(uint8_t *compbuf,
>                                                 void *host, int len)
>  {
> -    /* To be done */
> +    int idx, thread_count;
> +
> +    thread_count = migrate_decompress_threads();
> +    while (true) {
> +        for (idx = 0; idx < thread_count; idx++) {
> +            qemu_mutex_lock(&decomp_param[idx].mutex);
> +            if (!decomp_param[idx].start) {
> +                memcpy(decomp_param[idx].compbuf, compbuf, len);
> +                decomp_param[idx].des = host;
> +                decomp_param[idx].len = len;
> +                decomp_param[idx].start = true;
> +                qemu_cond_signal(&decomp_param[idx].cond);
> +                qemu_mutex_unlock(&decomp_param[idx].mutex);
> +                break;
> +            }
> +            qemu_mutex_unlock(&decomp_param[idx].mutex);
> +        }
> +        if (idx < thread_count) {
> +            break;
> +        }
> +    }
>  }
>  
>  static int ram_load(QEMUFile *f, void *opaque, int version_id)
Li, Liang Z April 8, 2015, 2 p.m. UTC | #2
> > @@ -889,7 +889,6 @@ static inline void
> start_compression(CompressParam *param)
> >      qemu_mutex_unlock(&param->mutex);  }
> >
> > -
> >  static uint64_t bytes_transferred;
> >
> >  static void flush_compressed_data(QEMUFile *f) @@ -1458,8 +1457,28
> @@
> > void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
> >
> >  static void *do_data_decompress(void *opaque)  {
> > +    DecompressParam *param = opaque;
> > +    size_t pagesize;
> > +
> >      while (true) {
> > -        /* To be done */
> > +        qemu_mutex_lock(&param->mutex);
> > +        while (!param->start && !param->quit) {
> > +            qemu_cond_wait(&param->cond, &param->mutex);
> 
> We don't check here if we have received a quit
> 
> 
> > +            pagesize = TARGET_PAGE_SIZE;
> > +            /* uncompress() will return failed in some case, especially
> > +             * when the page is dirted when doing the compression, it's
> > +             * not a problem because the dirty page will be retransferred
> > +             * and uncompress() won't break the data in other pages.
> > +             */
> > +            uncompress((Bytef *)param->des, &pagesize,
> > +                       (const Bytef *)param->compbuf, param->len);
> > +            param->start = false;
> > +        }
> > +        if (param->quit) {
> > +            qemu_mutex_unlock(&param->mutex);
> > +            break;
> > +        }
> > +        qemu_mutex_unlock(&param->mutex);
> >      }
> >
> >      return NULL;
> > @@ -1489,6 +1508,12 @@ void migrate_decompress_threads_join(void)
> >
> >      thread_count = migrate_decompress_threads();
> >      for (i = 0; i < thread_count; i++) {
> > +        qemu_mutex_lock(&decomp_param[i].mutex);
> > +        decomp_param[i].quit = true;
> > +        qemu_cond_signal(&decomp_param[i].cond);
> 
> We set quit and send a signal to wakeup the thread, but it will try to
> uncompress a page.  Shouldn't we change the position of the param->quit
> check for exit?  I think it should be inside the loop.

Yes, you are right. if (param->quit)  should be put in front of uncompress().
 I will change it in the final version. (I really hope the next version is the final version :) )
Juan Quintela April 8, 2015, 2:48 p.m. UTC | #3
"Li, Liang Z" <liang.z.li@intel.com> wrote:
>> > @@ -889,7 +889,6 @@ static inline void
>> start_compression(CompressParam *param)
>> >      qemu_mutex_unlock(&param->mutex);  }
>> >
>> > -
>> >  static uint64_t bytes_transferred;
>> >
>> >  static void flush_compressed_data(QEMUFile *f) @@ -1458,8 +1457,28
>> @@
>> > void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
>> >
>> >  static void *do_data_decompress(void *opaque)  {
>> > +    DecompressParam *param = opaque;
>> > +    size_t pagesize;
>> > +
>> >      while (true) {
>> > -        /* To be done */
>> > +        qemu_mutex_lock(&param->mutex);
>> > +        while (!param->start && !param->quit) {
>> > +            qemu_cond_wait(&param->cond, &param->mutex);
>> 
>> We don't check here if we have received a quit
>> 
>> 
>> > +            pagesize = TARGET_PAGE_SIZE;
>> > +            /* uncompress() will return failed in some case, especially
>> > +             * when the page is dirted when doing the compression, it's
>> > +             * not a problem because the dirty page will be retransferred
>> > +             * and uncompress() won't break the data in other pages.
>> > +             */
>> > +            uncompress((Bytef *)param->des, &pagesize,
>> > +                       (const Bytef *)param->compbuf, param->len);
>> > +            param->start = false;
>> > +        }
>> > +        if (param->quit) {
>> > +            qemu_mutex_unlock(&param->mutex);
>> > +            break;
>> > +        }
>> > +        qemu_mutex_unlock(&param->mutex);
>> >      }
>> >
>> >      return NULL;
>> > @@ -1489,6 +1508,12 @@ void migrate_decompress_threads_join(void)
>> >
>> >      thread_count = migrate_decompress_threads();
>> >      for (i = 0; i < thread_count; i++) {
>> > +        qemu_mutex_lock(&decomp_param[i].mutex);
>> > +        decomp_param[i].quit = true;
>> > +        qemu_cond_signal(&decomp_param[i].cond);
>> 
>> We set quit and send a signal to wakeup the thread, but it will try to
>> uncompress a page.  Shouldn't we change the position of the param->quit
>> check for exit?  I think it should be inside the loop.
>
> Yes, you are right. if (param->quit)  should be put in front of uncompress().
>  I will change it in the final version. (I really hope the next
> version is the final version :) )

I am sorry this took so long :p

Later, Juan.
diff mbox

Patch

diff --git a/arch_init.c b/arch_init.c
index 8fb2ea4..b88e6b2 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -889,7 +889,6 @@  static inline void start_compression(CompressParam *param)
     qemu_mutex_unlock(&param->mutex);
 }
 
-
 static uint64_t bytes_transferred;
 
 static void flush_compressed_data(QEMUFile *f)
@@ -1458,8 +1457,28 @@  void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
 
 static void *do_data_decompress(void *opaque)
 {
+    DecompressParam *param = opaque;
+    size_t pagesize;
+
     while (true) {
-        /* To be done */
+        qemu_mutex_lock(&param->mutex);
+        while (!param->start && !param->quit) {
+            qemu_cond_wait(&param->cond, &param->mutex);
+            pagesize = TARGET_PAGE_SIZE;
+            /* uncompress() will return failed in some case, especially
+             * when the page is dirted when doing the compression, it's
+             * not a problem because the dirty page will be retransferred
+             * and uncompress() won't break the data in other pages.
+             */
+            uncompress((Bytef *)param->des, &pagesize,
+                       (const Bytef *)param->compbuf, param->len);
+            param->start = false;
+        }
+        if (param->quit) {
+            qemu_mutex_unlock(&param->mutex);
+            break;
+        }
+        qemu_mutex_unlock(&param->mutex);
     }
 
     return NULL;
@@ -1489,6 +1508,12 @@  void migrate_decompress_threads_join(void)
 
     thread_count = migrate_decompress_threads();
     for (i = 0; i < thread_count; i++) {
+        qemu_mutex_lock(&decomp_param[i].mutex);
+        decomp_param[i].quit = true;
+        qemu_cond_signal(&decomp_param[i].cond);
+        qemu_mutex_unlock(&decomp_param[i].mutex);
+    }
+    for (i = 0; i < thread_count; i++) {
         qemu_thread_join(decompress_threads + i);
         qemu_mutex_destroy(&decomp_param[i].mutex);
         qemu_cond_destroy(&decomp_param[i].cond);
@@ -1505,7 +1530,27 @@  void migrate_decompress_threads_join(void)
 static void decompress_data_with_multi_threads(uint8_t *compbuf,
                                                void *host, int len)
 {
-    /* To be done */
+    int idx, thread_count;
+
+    thread_count = migrate_decompress_threads();
+    while (true) {
+        for (idx = 0; idx < thread_count; idx++) {
+            qemu_mutex_lock(&decomp_param[idx].mutex);
+            if (!decomp_param[idx].start) {
+                memcpy(decomp_param[idx].compbuf, compbuf, len);
+                decomp_param[idx].des = host;
+                decomp_param[idx].len = len;
+                decomp_param[idx].start = true;
+                qemu_cond_signal(&decomp_param[idx].cond);
+                qemu_mutex_unlock(&decomp_param[idx].mutex);
+                break;
+            }
+            qemu_mutex_unlock(&decomp_param[idx].mutex);
+        }
+        if (idx < thread_count) {
+            break;
+        }
+    }
 }
 
 static int ram_load(QEMUFile *f, void *opaque, int version_id)