diff mbox

[v4,08/13] migration: Add the core code of multi-thread compression

Message ID 1422875149-13198-9-git-send-email-liang.z.li@intel.com
State New
Headers show

Commit Message

Li, Liang Z Feb. 2, 2015, 11:05 a.m. UTC
Implement the core logic of the multiple thread compression. At this
point, multiple thread compression can't co-work with xbzrle yet.

Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
---
 arch_init.c | 167 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 159 insertions(+), 8 deletions(-)

Comments

Dr. David Alan Gilbert Feb. 6, 2015, 12:12 p.m. UTC | #1
* Liang Li (liang.z.li@intel.com) wrote:
> Implement the core logic of the multiple thread compression. At this
> point, multiple thread compression can't co-work with xbzrle yet.
> 
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
> ---
>  arch_init.c | 167 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
>  1 file changed, 159 insertions(+), 8 deletions(-)
> 
> diff --git a/arch_init.c b/arch_init.c
> index eae082b..b8bdb16 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -364,16 +364,31 @@ static QemuCond *comp_done_cond;
>  /* The empty QEMUFileOps will be used by file in CompressParam */
>  static const QEMUFileOps empty_ops = { };
>  static bool quit_thread;
> +static int one_byte_count;

Please add a comment here about what one_byte_count is; it's
not obvious, but I can't think of a better name

>  static DecompressParam *decomp_param;
>  static QemuThread *decompress_threads;
>  static uint8_t *compressed_data_buf;
>  
> +static int do_compress_ram_page(CompressParam *param);
> +
>  static void *do_data_compress(void *opaque)
>  {
> -    while (!quit_thread) {
> -
> -    /* To be done */
> +    CompressParam *param = opaque;
>  
> +    while (!quit_thread) {

This is something I missed on 02/   - can you rename 'quit_thread'
to comp_quit_thread or something, so it's obvious which thread.

> +        qemu_mutex_lock(&param->mutex);
> +        while (!param->busy) {
> +            qemu_cond_wait(&param->cond, &param->mutex);
> +            if (quit_thread) {
> +                break;
> +            }
> +        }
> +        qemu_mutex_unlock(&param->mutex);
> +        do_compress_ram_page(param);
> +        qemu_mutex_lock(comp_done_lock);
> +        param->busy = false;
> +        qemu_cond_signal(comp_done_cond);
> +        qemu_mutex_unlock(comp_done_lock);

This is interestingly different from your previous version; param->mutex
used to be held all of the time except during the cond_wait itself.

I'm also worried about the quit_thread behaviour;  is there
any guarantee that 'terminate_compression_threads' is called
while this code is in the param->cond cond_wait?   If terminate_compression_threads
was called while the thread was busy, then the cond_signal on param->cond
would be too early.  I'm thinking perhaps you need to check quit_thread before
the cond_wait as well?  (It's mostly error cases and migrate_cancel I'm worried
about here).

>      }
>  
>      return NULL;
> @@ -381,9 +396,13 @@ static void *do_data_compress(void *opaque)
>  
>  static inline void terminate_compression_threads(void)
>  {
> -    quit_thread = true;
> +    int idx, thread_count;
>  
> -    /* To be done */
> +    thread_count = migrate_compress_threads();
> +    quit_thread = true;
> +    for (idx = 0; idx < thread_count; idx++) {
> +        qemu_cond_signal(&comp_param[idx].cond);
> +    }
>  }
>  
>  void migrate_compress_threads_join(MigrationState *s)
> @@ -764,12 +783,144 @@ static int ram_save_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset,
>      return bytes_sent;
>  }
>  
> +static int do_compress_ram_page(CompressParam *param)
> +{
> +    int bytes_sent, cont;
> +    int blen;
> +    uint8_t *p;
> +    RAMBlock *block = param->block;
> +    ram_addr_t offset = param->offset;
> +
> +    cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> +    p = memory_region_get_ram_ptr(block->mr) + offset;
> +
> +    bytes_sent = save_block_hdr(param->file, block, offset, cont,
> +                                RAM_SAVE_FLAG_COMPRESS_PAGE);
> +    blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
> +                                     migrate_compress_level());
> +    bytes_sent += blen;
> +    atomic_inc(&acct_info.norm_pages);
> +
> +    return bytes_sent;
> +}
> +
> +static inline void start_compression(CompressParam *param)
> +{
> +    qemu_mutex_lock(&param->mutex);
> +    param->busy = true;
> +    qemu_cond_signal(&param->cond);
> +    qemu_mutex_unlock(&param->mutex);
> +}
> +
> +
> +static uint64_t bytes_transferred;
> +
> +static void flush_compressed_data(QEMUFile *f)
> +{
> +    int idx, len, thread_count;
> +
> +    if (!migrate_use_compression()) {
> +        return;
> +    }
> +    thread_count = migrate_compress_threads();
> +    for (idx = 0; idx < thread_count; idx++) {
> +        if (comp_param[idx].busy) {
> +            qemu_mutex_lock(comp_done_lock);
> +            while (comp_param[idx].busy) {
> +                qemu_cond_wait(comp_done_cond, comp_done_lock);
> +            }
> +            qemu_mutex_unlock(comp_done_lock);
> +        }
> +        len = qemu_put_qemu_file(f, comp_param[idx].file);
> +        bytes_transferred += len;
> +    }
> +    if ((one_byte_count > 0) && (bytes_transferred > one_byte_count)) {
> +        bytes_transferred -= one_byte_count;
> +        one_byte_count = 0;
> +    }
> +}
> +
> +static inline void set_compress_params(CompressParam *param, RAMBlock *block,
> +                                       ram_addr_t offset)
> +{
> +    param->block = block;
> +    param->offset = offset;
> +}
> +
> +static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
> +                                           ram_addr_t offset)
> +{
> +    int idx, thread_count, bytes_sent = 0;
> +
> +    thread_count = migrate_compress_threads();
> +    qemu_mutex_lock(comp_done_lock);
> +    while (true) {
> +        for (idx = 0; idx < thread_count; idx++) {
> +            if (!comp_param[idx].busy) {
> +                bytes_sent = qemu_put_qemu_file(f, comp_param[idx].file);
> +                set_compress_params(&comp_param[idx], block, offset);
> +                start_compression(&comp_param[idx]);
> +                if (bytes_sent == 0) {
> +                    /* set bytes_sent to 1 in this case to prevent migration
> +                     * from terminating, this 1 byte whill be added to
> +                     * bytes_transferred later, minus 1 to keep the
> +                     * bytes_transferred accurate */
> +                    bytes_sent = 1;
> +                    if (bytes_transferred <= 0) {
> +                        one_byte_count++;
> +                    } else {
> +                        bytes_transferred -= 1;
> +                    }
> +                }
> +                break;
> +            }
> +        }
> +        if (bytes_sent > 0) {
> +            break;
> +        } else {
> +            qemu_cond_wait(comp_done_cond, comp_done_lock);
> +        }
> +    }
> +    qemu_mutex_unlock(comp_done_lock);
> +
> +    return bytes_sent;
> +}
> +
>  static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
>                                      ram_addr_t offset, bool last_stage)
>  {
>      int bytes_sent = 0;
> +    MemoryRegion *mr = block->mr;
> +    uint8_t *p;
>  
> -    /* To be done*/
> +    p = memory_region_get_ram_ptr(mr) + offset;
> +    /* When starting the process of a new block, the first page of
> +     * the block should be sent out before other pages in the same
> +     * block, and all the pages in last block should have been sent
> +     * out, keeping this order is important, because the 'cont' flag
> +     * is used to avoid resending the block name.
> +     */
> +    if (block != last_sent_block) {
> +        flush_compressed_data(f);
> +        bytes_sent = save_zero_and_xbzrle_page(f, &p, block, offset,
> +                                               last_stage, NULL);
> +        if (bytes_sent == -1) {
> +            set_compress_params(&comp_param[0], block, offset);
> +            /* Use the qemu thread to compress the data to make sure the
> +             * first page is sent out before other pages
> +             */
> +            bytes_sent = do_compress_ram_page(&comp_param[0]);
> +            if (bytes_sent > 0) {
> +                qemu_put_qemu_file(f, comp_param[0].file);
> +            }
> +        }
> +    } else {
> +        bytes_sent = save_zero_and_xbzrle_page(f, &p, block, offset,
> +                                               last_stage, NULL);
> +        if (bytes_sent == -1) {
> +            bytes_sent = compress_page_with_multi_thread(f, block, offset);
> +        }
> +    }
>  
>      return bytes_sent;
>  }
> @@ -828,8 +979,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
>      return bytes_sent;
>  }
>  
> -static uint64_t bytes_transferred;
> -
>  void acct_update_position(QEMUFile *f, size_t size, bool zero)
>  {
>      uint64_t pages = size / TARGET_PAGE_SIZE;
> @@ -1037,6 +1186,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
>          i++;
>      }
>  
> +    flush_compressed_data(f);
>      qemu_mutex_unlock_ramlist();
>  
>      /*
> @@ -1083,6 +1233,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>          bytes_transferred += bytes_sent;
>      }
>  
> +    flush_compressed_data(f);
>      ram_control_after_iterate(f, RAM_CONTROL_FINISH);
>      migration_end();
>  
> -- 
> 1.9.1
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
diff mbox

Patch

diff --git a/arch_init.c b/arch_init.c
index eae082b..b8bdb16 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -364,16 +364,31 @@  static QemuCond *comp_done_cond;
 /* The empty QEMUFileOps will be used by file in CompressParam */
 static const QEMUFileOps empty_ops = { };
 static bool quit_thread;
+static int one_byte_count;
 static DecompressParam *decomp_param;
 static QemuThread *decompress_threads;
 static uint8_t *compressed_data_buf;
 
+static int do_compress_ram_page(CompressParam *param);
+
 static void *do_data_compress(void *opaque)
 {
-    while (!quit_thread) {
-
-    /* To be done */
+    CompressParam *param = opaque;
 
+    while (!quit_thread) {
+        qemu_mutex_lock(&param->mutex);
+        while (!param->busy) {
+            qemu_cond_wait(&param->cond, &param->mutex);
+            if (quit_thread) {
+                break;
+            }
+        }
+        qemu_mutex_unlock(&param->mutex);
+        do_compress_ram_page(param);
+        qemu_mutex_lock(comp_done_lock);
+        param->busy = false;
+        qemu_cond_signal(comp_done_cond);
+        qemu_mutex_unlock(comp_done_lock);
     }
 
     return NULL;
@@ -381,9 +396,13 @@  static void *do_data_compress(void *opaque)
 
 static inline void terminate_compression_threads(void)
 {
-    quit_thread = true;
+    int idx, thread_count;
 
-    /* To be done */
+    thread_count = migrate_compress_threads();
+    quit_thread = true;
+    for (idx = 0; idx < thread_count; idx++) {
+        qemu_cond_signal(&comp_param[idx].cond);
+    }
 }
 
 void migrate_compress_threads_join(MigrationState *s)
@@ -764,12 +783,144 @@  static int ram_save_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset,
     return bytes_sent;
 }
 
+static int do_compress_ram_page(CompressParam *param)
+{
+    int bytes_sent, cont;
+    int blen;
+    uint8_t *p;
+    RAMBlock *block = param->block;
+    ram_addr_t offset = param->offset;
+
+    cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
+    p = memory_region_get_ram_ptr(block->mr) + offset;
+
+    bytes_sent = save_block_hdr(param->file, block, offset, cont,
+                                RAM_SAVE_FLAG_COMPRESS_PAGE);
+    blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
+                                     migrate_compress_level());
+    bytes_sent += blen;
+    atomic_inc(&acct_info.norm_pages);
+
+    return bytes_sent;
+}
+
+static inline void start_compression(CompressParam *param)
+{
+    qemu_mutex_lock(&param->mutex);
+    param->busy = true;
+    qemu_cond_signal(&param->cond);
+    qemu_mutex_unlock(&param->mutex);
+}
+
+
+static uint64_t bytes_transferred;
+
+static void flush_compressed_data(QEMUFile *f)
+{
+    int idx, len, thread_count;
+
+    if (!migrate_use_compression()) {
+        return;
+    }
+    thread_count = migrate_compress_threads();
+    for (idx = 0; idx < thread_count; idx++) {
+        if (comp_param[idx].busy) {
+            qemu_mutex_lock(comp_done_lock);
+            while (comp_param[idx].busy) {
+                qemu_cond_wait(comp_done_cond, comp_done_lock);
+            }
+            qemu_mutex_unlock(comp_done_lock);
+        }
+        len = qemu_put_qemu_file(f, comp_param[idx].file);
+        bytes_transferred += len;
+    }
+    if ((one_byte_count > 0) && (bytes_transferred > one_byte_count)) {
+        bytes_transferred -= one_byte_count;
+        one_byte_count = 0;
+    }
+}
+
+static inline void set_compress_params(CompressParam *param, RAMBlock *block,
+                                       ram_addr_t offset)
+{
+    param->block = block;
+    param->offset = offset;
+}
+
+static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
+                                           ram_addr_t offset)
+{
+    int idx, thread_count, bytes_sent = 0;
+
+    thread_count = migrate_compress_threads();
+    qemu_mutex_lock(comp_done_lock);
+    while (true) {
+        for (idx = 0; idx < thread_count; idx++) {
+            if (!comp_param[idx].busy) {
+                bytes_sent = qemu_put_qemu_file(f, comp_param[idx].file);
+                set_compress_params(&comp_param[idx], block, offset);
+                start_compression(&comp_param[idx]);
+                if (bytes_sent == 0) {
+                    /* set bytes_sent to 1 in this case to prevent migration
+                     * from terminating, this 1 byte whill be added to
+                     * bytes_transferred later, minus 1 to keep the
+                     * bytes_transferred accurate */
+                    bytes_sent = 1;
+                    if (bytes_transferred <= 0) {
+                        one_byte_count++;
+                    } else {
+                        bytes_transferred -= 1;
+                    }
+                }
+                break;
+            }
+        }
+        if (bytes_sent > 0) {
+            break;
+        } else {
+            qemu_cond_wait(comp_done_cond, comp_done_lock);
+        }
+    }
+    qemu_mutex_unlock(comp_done_lock);
+
+    return bytes_sent;
+}
+
 static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
                                     ram_addr_t offset, bool last_stage)
 {
     int bytes_sent = 0;
+    MemoryRegion *mr = block->mr;
+    uint8_t *p;
 
-    /* To be done*/
+    p = memory_region_get_ram_ptr(mr) + offset;
+    /* When starting the process of a new block, the first page of
+     * the block should be sent out before other pages in the same
+     * block, and all the pages in last block should have been sent
+     * out, keeping this order is important, because the 'cont' flag
+     * is used to avoid resending the block name.
+     */
+    if (block != last_sent_block) {
+        flush_compressed_data(f);
+        bytes_sent = save_zero_and_xbzrle_page(f, &p, block, offset,
+                                               last_stage, NULL);
+        if (bytes_sent == -1) {
+            set_compress_params(&comp_param[0], block, offset);
+            /* Use the qemu thread to compress the data to make sure the
+             * first page is sent out before other pages
+             */
+            bytes_sent = do_compress_ram_page(&comp_param[0]);
+            if (bytes_sent > 0) {
+                qemu_put_qemu_file(f, comp_param[0].file);
+            }
+        }
+    } else {
+        bytes_sent = save_zero_and_xbzrle_page(f, &p, block, offset,
+                                               last_stage, NULL);
+        if (bytes_sent == -1) {
+            bytes_sent = compress_page_with_multi_thread(f, block, offset);
+        }
+    }
 
     return bytes_sent;
 }
@@ -828,8 +979,6 @@  static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
     return bytes_sent;
 }
 
-static uint64_t bytes_transferred;
-
 void acct_update_position(QEMUFile *f, size_t size, bool zero)
 {
     uint64_t pages = size / TARGET_PAGE_SIZE;
@@ -1037,6 +1186,7 @@  static int ram_save_iterate(QEMUFile *f, void *opaque)
         i++;
     }
 
+    flush_compressed_data(f);
     qemu_mutex_unlock_ramlist();
 
     /*
@@ -1083,6 +1233,7 @@  static int ram_save_complete(QEMUFile *f, void *opaque)
         bytes_transferred += bytes_sent;
     }
 
+    flush_compressed_data(f);
     ram_control_after_iterate(f, RAM_CONTROL_FINISH);
     migration_end();