diff mbox

[v3,03/13] migration: Add the framework of muti-thread decompression

Message ID 1418347746-15829-4-git-send-email-liang.z.li@intel.com
State New
Headers show

Commit Message

Li, Liang Z Dec. 12, 2014, 1:28 a.m. UTC
Signed-off-by: Liang Li <liang.z.li@intel.com>
Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
---
 arch_init.c                   | 70 +++++++++++++++++++++++++++++++++++++++++++
 include/migration/migration.h |  4 +++
 migration.c                   | 15 ++++++++++
 3 files changed, 89 insertions(+)

Comments

Dr. David Alan Gilbert Jan. 23, 2015, 1:26 p.m. UTC | #1
* Liang Li (liang.z.li@intel.com) wrote:
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
> ---
>  arch_init.c                   | 70 +++++++++++++++++++++++++++++++++++++++++++
>  include/migration/migration.h |  4 +++
>  migration.c                   | 15 ++++++++++
>  3 files changed, 89 insertions(+)
> 
> diff --git a/arch_init.c b/arch_init.c
> index a988ec2..2f1d0c4 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -126,6 +126,7 @@ static uint64_t bitmap_sync_count;
>  #define RAM_SAVE_FLAG_CONTINUE 0x20
>  #define RAM_SAVE_FLAG_XBZRLE   0x40
>  /* 0x80 is reserved in migration.h start with 0x100 next */
> +#define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
>  
>  static struct defconfig_file {
>      const char *filename;
> @@ -332,13 +333,26 @@ static uint64_t migration_dirty_pages;
>  static uint32_t last_version;
>  static bool ram_bulk_stage;
>  
> +/* using compressBound() to calculate the buffer size needed to save the
> + * compressed data, to support the maximun TARGET_PAGE_SIZE bytes of

Typo: 'maximun'->'maximum'

> + * data, we need more 15 bytes, use 16 to align the data.
> + */
> +#define COMPRESS_BUF_SIZE (TARGET_PAGE_SIZE + 16)
> +
>  struct compress_param {
>      /* To be done */
>  };
>  typedef struct compress_param compress_param;
>  
> +struct decompress_param {
> +    /* To be done */
> +};
> +typedef struct decompress_param decompress_param;
> +
>  static compress_param *comp_param;
>  static bool quit_thread;
> +static decompress_param *decomp_param;
> +static QemuThread *decompress_threads;
>  
>  static void *do_data_compress(void *opaque)
>  {
> @@ -1124,10 +1138,54 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
>      }
>  }
>  
> +static void *do_data_decompress(void *opaque)
> +{
> +    while (!quit_thread) {
> +        /* To be done */
> +    }
> +    return NULL;
> +}
> +
> +void migrate_decompress_threads_create(int count)
> +{
> +    int i;
> +
> +    decompress_threads = g_malloc0(sizeof(QemuThread) * count);
> +    decomp_param = g_malloc0(sizeof(decompress_param) * count);

Again, g_new0 probably better.

> +    quit_thread = false;
> +    for (i = 0; i < count; i++) {
> +        qemu_thread_create(decompress_threads + i, "decompress",
> +            do_data_decompress, decomp_param + i, QEMU_THREAD_JOINABLE);
> +    }
> +}
> +
> +void migrate_decompress_threads_join(void)
> +{
> +    int i, thread_count;
> +
> +    quit_thread = true;
> +    thread_count = migrate_decompress_threads();
> +    for (i = 0; i < thread_count; i++) {
> +        qemu_thread_join(decompress_threads + i);
> +    }
> +    g_free(decompress_threads);
> +    g_free(decomp_param);
> +    decompress_threads = NULL;
> +    decomp_param = NULL;
> +}
> +
> +static void decompress_data_with_multi_threads(uint8_t *compbuf,
> +        void *host, int len)
> +{
> +    /* To be done */
> +}
> +
>  static int ram_load(QEMUFile *f, void *opaque, int version_id)
>  {
>      int flags = 0, ret = 0;
>      static uint64_t seq_iter;
> +    int len = 0;
> +    uint8_t compbuf[COMPRESS_BUF_SIZE];
>  
>      seq_iter++;
>  
> @@ -1201,6 +1259,18 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>  
>              qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
>              break;
> +        case RAM_SAVE_FLAG_COMPRESS_PAGE:
> +            host = host_from_stream_offset(f, addr, flags);
> +            if (!host) {
> +                error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
> +                ret = -EINVAL;
> +                break;
> +            }
> +
> +            len = qemu_get_be32(f);
> +            qemu_get_buffer(f, compbuf, len);

I think you need to check 'len' is sensible, if your stream goes wrong it
could end up being a negative or silly value.

> +            decompress_data_with_multi_threads(compbuf, host, len);
> +            break;
>          case RAM_SAVE_FLAG_XBZRLE:
>              host = host_from_stream_offset(f, addr, flags);
>              if (!host) {
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index daf6c81..0c4f21c 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -51,6 +51,7 @@ struct MigrationState
>      QEMUFile *file;
>      QemuThread *compress_thread;
>      int compress_thread_count;
> +    int decompress_thread_count;
>      int compress_level;
>  
>      int state;
> @@ -112,6 +113,8 @@ MigrationState *migrate_get_current(void);
>  
>  void migrate_compress_threads_create(MigrationState *s);
>  void migrate_compress_threads_join(MigrationState *s);
> +void migrate_decompress_threads_create(int count);
> +void migrate_decompress_threads_join(void);
>  uint64_t ram_bytes_remaining(void);
>  uint64_t ram_bytes_transferred(void);
>  uint64_t ram_bytes_total(void);
> @@ -164,6 +167,7 @@ int64_t xbzrle_cache_resize(int64_t new_size);
>  bool migrate_use_compression(void);
>  int migrate_compress_level(void);
>  int migrate_compress_threads(void);
> +int migrate_decompress_threads(void);
>  
>  void ram_control_before_iterate(QEMUFile *f, uint64_t flags);
>  void ram_control_after_iterate(QEMUFile *f, uint64_t flags);
> diff --git a/migration.c b/migration.c
> index 402daae..082ddb7 100644
> --- a/migration.c
> +++ b/migration.c
> @@ -45,6 +45,7 @@ enum {
>  
>  /* Default compression thread count */
>  #define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8
> +#define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2
>  /*0: means nocompress, 1: best speed, ... 9: best compress ratio */
>  #define DEFAULT_MIGRATE_COMPRESS_LEVEL 1
>  
> @@ -66,6 +67,7 @@ MigrationState *migrate_get_current(void)
>          .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
>          .mbps = -1,
>          .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
> +        .decompress_thread_count = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT,
>          .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
>      };
>  
> @@ -123,10 +125,13 @@ static void process_incoming_migration_co(void *opaque)
>      } else {
>          runstate_set(RUN_STATE_PAUSED);
>      }
> +    migrate_decompress_threads_join();
>  }
>  
>  void process_incoming_migration(QEMUFile *f)
>  {
> +    int thread_count = migrate_decompress_threads();
> +    migrate_decompress_threads_create(thread_count);
>      Coroutine *co = qemu_coroutine_create(process_incoming_migration_co);
>      int fd = qemu_get_fd(f);

The style rules need all of the declarations to be kept together;
so the 'migrate_decompress_threads_create' can't go in between.

> @@ -383,6 +388,7 @@ static MigrationState *migrate_init(const MigrationParams *params)
>      int64_t xbzrle_cache_size = s->xbzrle_cache_size;
>      int compress_level = s->compress_level;
>      int compress_thread_count = s->compress_thread_count;
> +    int decompress_thread_count = s->decompress_thread_count;
>  
>      memcpy(enabled_capabilities, s->enabled_capabilities,
>             sizeof(enabled_capabilities));
> @@ -395,6 +401,7 @@ static MigrationState *migrate_init(const MigrationParams *params)
>  
>      s->compress_level = compress_level;
>      s->compress_thread_count = compress_thread_count;
> +    s->decompress_thread_count = decompress_thread_count;
>      s->bandwidth_limit = bandwidth_limit;
>      s->state = MIG_STATE_SETUP;
>      trace_migrate_set_state(MIG_STATE_SETUP);
> @@ -591,6 +598,14 @@ int migrate_compress_threads(void)
>      return s->compress_thread_count;
>  }
>  
> +int migrate_decompress_threads(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->decompress_thread_count;
> +}
>  
>  int migrate_use_xbzrle(void)
>  {
> -- 
> 1.8.3.1

Dave
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Eric Blake Jan. 23, 2015, 4:22 p.m. UTC | #2
On 12/11/2014 06:28 PM, Liang Li wrote:

In addition to David's catches:

s/muti/multi/ in the subject line.

Commit message feels sparse - is the one subject line really all we need
to know about this framework?

> Signed-off-by: Liang Li <liang.z.li@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com>
> ---
>  arch_init.c                   | 70 +++++++++++++++++++++++++++++++++++++++++++
>  include/migration/migration.h |  4 +++
>  migration.c                   | 15 ++++++++++
>  3 files changed, 89 insertions(+)
> 

>  
> +/* using compressBound() to calculate the buffer size needed to save the
> + * compressed data, to support the maximun TARGET_PAGE_SIZE bytes of
> + * data, we need more 15 bytes, use 16 to align the data.
> + */
> +#define COMPRESS_BUF_SIZE (TARGET_PAGE_SIZE + 16)
> +

>  static int ram_load(QEMUFile *f, void *opaque, int version_id)
>  {
>      int flags = 0, ret = 0;
>      static uint64_t seq_iter;
> +    int len = 0;
> +    uint8_t compbuf[COMPRESS_BUF_SIZE];
>  

Ouch - you stack-allocated more than a page of data.  This is in general
bad practice (and you should use the heap for any function that requires
more than 4k of data) because there are some architectures (cough:
windows) where exceeding the stack by more than a page risks silent
termination of the application rather than a graceful SIGSEGV (if you
can even call a stack overflow SIGSEGV graceful).  Especially true when
using helper threads, which typically have smaller stacks than the main
application.

>      seq_iter++;
>  
> @@ -1201,6 +1259,18 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>  
>              qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
>              break;
> +        case RAM_SAVE_FLAG_COMPRESS_PAGE:
> +            host = host_from_stream_offset(f, addr, flags);
> +            if (!host) {
> +                error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);

s/Illegal/Invalid/ (the user isn't breaking a law, merely a constraint).
diff mbox

Patch

diff --git a/arch_init.c b/arch_init.c
index a988ec2..2f1d0c4 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -126,6 +126,7 @@  static uint64_t bitmap_sync_count;
 #define RAM_SAVE_FLAG_CONTINUE 0x20
 #define RAM_SAVE_FLAG_XBZRLE   0x40
 /* 0x80 is reserved in migration.h start with 0x100 next */
+#define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
 
 static struct defconfig_file {
     const char *filename;
@@ -332,13 +333,26 @@  static uint64_t migration_dirty_pages;
 static uint32_t last_version;
 static bool ram_bulk_stage;
 
+/* using compressBound() to calculate the buffer size needed to save the
+ * compressed data, to support the maximun TARGET_PAGE_SIZE bytes of
+ * data, we need more 15 bytes, use 16 to align the data.
+ */
+#define COMPRESS_BUF_SIZE (TARGET_PAGE_SIZE + 16)
+
 struct compress_param {
     /* To be done */
 };
 typedef struct compress_param compress_param;
 
+struct decompress_param {
+    /* To be done */
+};
+typedef struct decompress_param decompress_param;
+
 static compress_param *comp_param;
 static bool quit_thread;
+static decompress_param *decomp_param;
+static QemuThread *decompress_threads;
 
 static void *do_data_compress(void *opaque)
 {
@@ -1124,10 +1138,54 @@  void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
     }
 }
 
+static void *do_data_decompress(void *opaque)
+{
+    while (!quit_thread) {
+        /* To be done */
+    }
+    return NULL;
+}
+
+void migrate_decompress_threads_create(int count)
+{
+    int i;
+
+    decompress_threads = g_malloc0(sizeof(QemuThread) * count);
+    decomp_param = g_malloc0(sizeof(decompress_param) * count);
+    quit_thread = false;
+    for (i = 0; i < count; i++) {
+        qemu_thread_create(decompress_threads + i, "decompress",
+            do_data_decompress, decomp_param + i, QEMU_THREAD_JOINABLE);
+    }
+}
+
+void migrate_decompress_threads_join(void)
+{
+    int i, thread_count;
+
+    quit_thread = true;
+    thread_count = migrate_decompress_threads();
+    for (i = 0; i < thread_count; i++) {
+        qemu_thread_join(decompress_threads + i);
+    }
+    g_free(decompress_threads);
+    g_free(decomp_param);
+    decompress_threads = NULL;
+    decomp_param = NULL;
+}
+
+static void decompress_data_with_multi_threads(uint8_t *compbuf,
+        void *host, int len)
+{
+    /* To be done */
+}
+
 static int ram_load(QEMUFile *f, void *opaque, int version_id)
 {
     int flags = 0, ret = 0;
     static uint64_t seq_iter;
+    int len = 0;
+    uint8_t compbuf[COMPRESS_BUF_SIZE];
 
     seq_iter++;
 
@@ -1201,6 +1259,18 @@  static int ram_load(QEMUFile *f, void *opaque, int version_id)
 
             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;
+        case RAM_SAVE_FLAG_COMPRESS_PAGE:
+            host = host_from_stream_offset(f, addr, flags);
+            if (!host) {
+                error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
+                ret = -EINVAL;
+                break;
+            }
+
+            len = qemu_get_be32(f);
+            qemu_get_buffer(f, compbuf, len);
+            decompress_data_with_multi_threads(compbuf, host, len);
+            break;
         case RAM_SAVE_FLAG_XBZRLE:
             host = host_from_stream_offset(f, addr, flags);
             if (!host) {
diff --git a/include/migration/migration.h b/include/migration/migration.h
index daf6c81..0c4f21c 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -51,6 +51,7 @@  struct MigrationState
     QEMUFile *file;
     QemuThread *compress_thread;
     int compress_thread_count;
+    int decompress_thread_count;
     int compress_level;
 
     int state;
@@ -112,6 +113,8 @@  MigrationState *migrate_get_current(void);
 
 void migrate_compress_threads_create(MigrationState *s);
 void migrate_compress_threads_join(MigrationState *s);
+void migrate_decompress_threads_create(int count);
+void migrate_decompress_threads_join(void);
 uint64_t ram_bytes_remaining(void);
 uint64_t ram_bytes_transferred(void);
 uint64_t ram_bytes_total(void);
@@ -164,6 +167,7 @@  int64_t xbzrle_cache_resize(int64_t new_size);
 bool migrate_use_compression(void);
 int migrate_compress_level(void);
 int migrate_compress_threads(void);
+int migrate_decompress_threads(void);
 
 void ram_control_before_iterate(QEMUFile *f, uint64_t flags);
 void ram_control_after_iterate(QEMUFile *f, uint64_t flags);
diff --git a/migration.c b/migration.c
index 402daae..082ddb7 100644
--- a/migration.c
+++ b/migration.c
@@ -45,6 +45,7 @@  enum {
 
 /* Default compression thread count */
 #define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8
+#define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2
 /*0: means nocompress, 1: best speed, ... 9: best compress ratio */
 #define DEFAULT_MIGRATE_COMPRESS_LEVEL 1
 
@@ -66,6 +67,7 @@  MigrationState *migrate_get_current(void)
         .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
         .mbps = -1,
         .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
+        .decompress_thread_count = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT,
         .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
     };
 
@@ -123,10 +125,13 @@  static void process_incoming_migration_co(void *opaque)
     } else {
         runstate_set(RUN_STATE_PAUSED);
     }
+    migrate_decompress_threads_join();
 }
 
 void process_incoming_migration(QEMUFile *f)
 {
+    int thread_count = migrate_decompress_threads();
+    migrate_decompress_threads_create(thread_count);
     Coroutine *co = qemu_coroutine_create(process_incoming_migration_co);
     int fd = qemu_get_fd(f);
 
@@ -383,6 +388,7 @@  static MigrationState *migrate_init(const MigrationParams *params)
     int64_t xbzrle_cache_size = s->xbzrle_cache_size;
     int compress_level = s->compress_level;
     int compress_thread_count = s->compress_thread_count;
+    int decompress_thread_count = s->decompress_thread_count;
 
     memcpy(enabled_capabilities, s->enabled_capabilities,
            sizeof(enabled_capabilities));
@@ -395,6 +401,7 @@  static MigrationState *migrate_init(const MigrationParams *params)
 
     s->compress_level = compress_level;
     s->compress_thread_count = compress_thread_count;
+    s->decompress_thread_count = decompress_thread_count;
     s->bandwidth_limit = bandwidth_limit;
     s->state = MIG_STATE_SETUP;
     trace_migrate_set_state(MIG_STATE_SETUP);
@@ -591,6 +598,14 @@  int migrate_compress_threads(void)
     return s->compress_thread_count;
 }
 
+int migrate_decompress_threads(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->decompress_thread_count;
+}
 
 int migrate_use_xbzrle(void)
 {