@@ -24,6 +24,7 @@
#include <stdint.h>
#include <stdarg.h>
#include <stdlib.h>
+#include <zlib.h>
#ifndef _WIN32
#include <sys/types.h>
#include <sys/mman.h>
@@ -126,6 +127,7 @@ static uint64_t bitmap_sync_count; #define RAM_SAVE_FLAG_CONTINUE 0x20
#define RAM_SAVE_FLAG_XBZRLE 0x40
/* 0x80 is reserved in migration.h start with 0x100 next */
+#define RAM_SAVE_FLAG_COMPRESS_PAGE 0x100
static struct defconfig_file {
const char *filename;
@@ -332,6 +334,177 @@ static uint64_t migration_dirty_pages; static uint32_t last_version; static bool ram_bulk_stage;
+#define COMPRESS_BUF_SIZE (TARGET_PAGE_SIZE + 16) #define MIG_BUF_SIZE
+(COMPRESS_BUF_SIZE + 256 + 16) struct MigBuf {
+ int buf_index;
+ uint8_t buf[MIG_BUF_SIZE];
+};
+
+typedef struct MigBuf MigBuf;
+
+static void migrate_put_byte(MigBuf *f, int v) {
+ f->buf[f->buf_index] = v;
+ f->buf_index++;
+}
+
+static void migrate_put_be16(MigBuf *f, unsigned int v) {
+ migrate_put_byte(f, v >> 8);
+ migrate_put_byte(f, v);
+}
+
+static void migrate_put_be32(MigBuf *f, unsigned int v) {
+ migrate_put_byte(f, v >> 24);
+ migrate_put_byte(f, v >> 16);
+ migrate_put_byte(f, v >> 8);
+ migrate_put_byte(f, v);
+}
+
+static void migrate_put_be64(MigBuf *f, uint64_t v) {
+ migrate_put_be32(f, v >> 32);
+ migrate_put_be32(f, v);
+}
+
+static void migrate_put_buffer(MigBuf *f, const uint8_t *buf, int size)
+{
+ int l;
+
+ while (size > 0) {
+ l = MIG_BUF_SIZE - f->buf_index;
+ if (l > size) {
+ l = size;
+ }
+ memcpy(f->buf + f->buf_index, buf, l);
+ f->buf_index += l;
+ buf += l;
+ size -= l;
+ }
+}
+
+static size_t migrate_save_block_hdr(MigBuf *f, RAMBlock *block,
+ ram_addr_t offset, int cont, int flag) {
+ size_t size;
+
+ migrate_put_be64(f, offset | cont | flag);
+ size = 8;
+
+ if (!cont) {
+ migrate_put_byte(f, strlen(block->idstr));
+ migrate_put_buffer(f, (uint8_t *)block->idstr,
+ strlen(block->idstr));
+ size += 1 + strlen(block->idstr);
+ }
+ return size;
+}
+
+static int migrate_qemu_add_compress(MigBuf *f, const uint8_t *p,
+ int size, int level)
+{
+ uLong blen = COMPRESS_BUF_SIZE;
+ if (compress2(f->buf + f->buf_index + sizeof(int), &blen, (Bytef *)p,
+ size, level) != Z_OK) {
+ printf("compress failed!\n");
+ return 0;
+ }
+ migrate_put_be32(f, blen);
+ f->buf_index += blen;
+ return blen + sizeof(int);
+}
+
+enum {
+ COM_DONE = 0,
+ COM_START,
+};
+
+static int compress_thread_count = 1;
+static int decompress_thread_count = 1;
+
+struct compress_param {
+ int state;
+ MigBuf migbuf;
+ RAMBlock *block;
+ ram_addr_t offset;
+ bool last_stage;
+ int ret;
+ int bytes_sent;
+ uint8_t *p;
+ int cont;
+ bool bulk_stage;
+};
+
+typedef struct compress_param compress_param; compress_param
+*comp_param;
+
+struct decompress_param {
+ int state;
+ void *des;
+ uint8 compbuf[COMPRESS_BUF_SIZE];
+ int len;
+};
+typedef struct decompress_param decompress_param;
+
+static decompress_param *decomp_param;
+bool incomming_migration_done;
+static bool quit_thread;
+
+static int save_compress_ram_page(compress_param *param);
+
+
+static void *do_data_compress(void *opaque) {
+ compress_param *param = opaque;
+ while (!quit_thread) {
+ if (param->state == COM_START) {
+ save_compress_ram_page(param);
+ param->state = COM_DONE;
+ } else {
+ g_usleep(1);
+ }
+ }
+
+ return NULL;
+}
+
+
+void migrate_compress_threads_join(MigrationState *s) {
+ int i;
+ if (!migrate_use_compress()) {
+ return;
+ }
+ quit_thread = true;
+ for (i = 0; i < compress_thread_count; i++) {
+ qemu_thread_join(s->compress_thread + i);
+ }
+ g_free(s->compress_thread);
+ g_free(comp_param);
+ s->compress_thread = NULL;
+ comp_param = NULL;
+}
+
+void migrate_compress_threads_create(MigrationState *s) {
+ int i;
+ if (!migrate_use_compress()) {
+ return;
+ }
+ quit_thread = false;
+ compress_thread_count = s->compress_thread_count;
+ s->compress_thread = g_malloc0(sizeof(QemuThread)
+ * s->compress_thread_count);
+ comp_param = g_malloc0(sizeof(compress_param) * s->compress_thread_count);
+ for (i = 0; i < s->compress_thread_count; i++) {
+ qemu_thread_create(s->compress_thread + i, "compress",
+ do_data_compress, comp_param + i, QEMU_THREAD_JOINABLE);
+
+ }
+}
+
/* Update the xbzrle cache to reflect a page that's been sent as all 0.
* The important thing is that a stale (not-yet-0'd) page be replaced
* by the new data.
@@ -351,9 +524,10 @@ static void xbzrle_cache_zero_page(ram_addr_t current_addr)
#define ENCODING_FLAG_XBZRLE 0x1
-static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data,
+static int save_xbzrle_page(void *f, uint8_t **current_data,
ram_addr_t current_addr, RAMBlock *block,
- ram_addr_t offset, int cont, bool last_stage)
+ ram_addr_t offset, int cont, bool last_stage,
+ bool save_to_buf)
{
int encoded_len = 0, bytes_sent = -1;
uint8_t *prev_cached_page;
@@ -401,10 +575,19 @@ static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data,
}
/* Send XBZRLE based compressed page */
- bytes_sent = save_block_hdr(f, block, offset, cont, RAM_SAVE_FLAG_XBZRLE);
- qemu_put_byte(f, ENCODING_FLAG_XBZRLE);
- qemu_put_be16(f, encoded_len);
- qemu_put_buffer(f, XBZRLE.encoded_buf, encoded_len);
+ if (save_to_buf) {
+ bytes_sent = migrate_save_block_hdr((MigBuf *)f, block, offset,
+ cont, RAM_SAVE_FLAG_XBZRLE);
+ migrate_put_byte((MigBuf *)f, ENCODING_FLAG_XBZRLE);
+ migrate_put_be16((MigBuf *)f, encoded_len);
+ migrate_put_buffer((MigBuf *)f, XBZRLE.encoded_buf, encoded_len);
+ } else {
+ bytes_sent = save_block_hdr((QEMUFile *)f, block, offset,
+ cont, RAM_SAVE_FLAG_XBZRLE);
+ qemu_put_byte((QEMUFile *)f, ENCODING_FLAG_XBZRLE);
+ qemu_put_be16((QEMUFile *)f, encoded_len);
+ qemu_put_buffer((QEMUFile *)f, XBZRLE.encoded_buf, encoded_len);
+ }
bytes_sent += encoded_len + 1 + 2;
acct_info.xbzrle_pages++;
acct_info.xbzrle_bytes += bytes_sent; @@ -609,7 +792,7 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
xbzrle_cache_zero_page(current_addr);
} else if (!ram_bulk_stage && migrate_use_xbzrle()) {
bytes_sent = save_xbzrle_page(f, &p, current_addr, block,
- offset, cont, last_stage);
+ offset, cont, last_stage, false);
if (!last_stage) {
/* Can't send this cached data async, since the cache page
* might get updated before it gets to the wire @@ -635,6 +818,90 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
return bytes_sent;
}
+static int save_compress_ram_page(compress_param *param) {
+ int bytes_sent = param->bytes_sent;
+ int blen = COMPRESS_BUF_SIZE;
+ int cont = param->cont;
+ uint8_t *p = param->p;
+ int ret = param->ret;
+ RAMBlock *block = param->block;
+ ram_addr_t offset = param->offset;
+ bool last_stage = param->last_stage;
+ /* In doubt sent page as normal */
+ XBZRLE_cache_lock();
+ ram_addr_t current_addr = block->offset + offset;
+ if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
+ if (ret != RAM_SAVE_CONTROL_DELAYED) {
+ if (bytes_sent > 0) {
+ atomic_inc(&acct_info.norm_pages);
+ } else if (bytes_sent == 0) {
+ atomic_inc(&acct_info.dup_pages);
+ }
+ }
+ } else if (is_zero_range(p, TARGET_PAGE_SIZE)) {
+ atomic_inc(&acct_info.dup_pages);
+ bytes_sent = migrate_save_block_hdr(¶m->migbuf, block, offset, cont,
+ RAM_SAVE_FLAG_COMPRESS);
+ migrate_put_byte(¶m->migbuf, 0);
+ bytes_sent++;
+ /* Must let xbzrle know, otherwise a previous (now 0'd) cached
+ * page would be stale
+ */
+ xbzrle_cache_zero_page(current_addr);
+ } else if (!param->bulk_stage && migrate_use_xbzrle()) {
+ bytes_sent = save_xbzrle_page(¶m->migbuf, &p, current_addr, block,
+ offset, cont, last_stage, true);
+ }
+ XBZRLE_cache_unlock();
+ /* XBZRLE overflow or normal page */
+ if (bytes_sent == -1) {
+ bytes_sent = migrate_save_block_hdr(¶m->migbuf, block,
+ offset, cont, RAM_SAVE_FLAG_COMPRESS_PAGE);
+ blen = migrate_qemu_add_compress(¶m->migbuf, p,
+ TARGET_PAGE_SIZE, migrate_compress_level());
+ bytes_sent += blen;
+ atomic_inc(&acct_info.norm_pages);
+ }
+ return bytes_sent;
+}
+
+static uint64_t bytes_transferred;
+
+static void flush_compressed_data(QEMUFile *f) {
+ int idx;
+ if (!migrate_use_compress()) {
+ return;
+ }
+
+ for (idx = 0; idx < compress_thread_count; idx++) {
+ while (comp_param[idx].state != COM_DONE) {
+ g_usleep(0);
+ }
+ if (comp_param[idx].migbuf.buf_index > 0) {
+ qemu_put_buffer(f, comp_param[idx].migbuf.buf,
+ comp_param[idx].migbuf.buf_index);
+ bytes_transferred += comp_param[idx].migbuf.buf_index;
+ comp_param[idx].migbuf.buf_index = 0;
+ }
+ }
+}
+
+static inline void set_common_compress_params(compress_param *param,
+ int ret, int bytes_sent, RAMBlock *block, ram_addr_t offset,
+ bool last_stage, int cont, uint8_t *p, bool bulk_stage) {
+ param->ret = ret;
+ param->bytes_sent = bytes_sent;
+ param->block = block;
+ param->offset = offset;
+ param->last_stage = last_stage;
+ param->cont = cont;
+ param->p = p;
+ param->bulk_stage = bulk_stage;
+}
+
/*
* ram_find_and_save_block: Finds a page to send and sends it to f
*
@@ -649,6 +916,8 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
bool complete_round = false;
int bytes_sent = 0;
MemoryRegion *mr;
+ int cont, idx, ret, len = -1;
+ uint8_t *p;
if (!block)
block = QTAILQ_FIRST(&ram_list.blocks); @@ -667,14 +936,73 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
block = QTAILQ_FIRST(&ram_list.blocks);
complete_round = true;
ram_bulk_stage = false;
+ if (migrate_use_xbzrle()) {
+ /* terminate the used thread at this point*/
+ flush_compressed_data(f);
+ quit_thread = true;
+ }
}
} else {
- bytes_sent = ram_save_page(f, block, offset, last_stage);
-
- /* if page is unmodified, continue to the next */
- if (bytes_sent > 0) {
- last_sent_block = block;
- break;
+ if (!migrate_use_compress()) {
+ bytes_sent = ram_save_page(f, block, offset, last_stage);
+ /* if page is unmodified, continue to the next */
+ if (bytes_sent > 0) {
+ last_sent_block = block;
+ break;
+ }
+ } else {
+ cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
+ p = memory_region_get_ram_ptr(block->mr) + offset;
+ ret = ram_control_save_page(f, block->offset,
+ offset, TARGET_PAGE_SIZE, &len);
+ if ((!ram_bulk_stage && migrate_use_xbzrle()) || cont == 0) {
+ if (cont == 0) {
+ flush_compressed_data(f);
+ }
+ set_common_compress_params(&comp_param[0],
+ ret, len, block, offset, last_stage, cont,
+ p, ram_bulk_stage);
+ bytes_sent = save_compress_ram_page(&comp_param[0]);
+ if (bytes_sent > 0) {
+ qemu_put_buffer(f, comp_param[0].migbuf.buf,
+ comp_param[0].migbuf.buf_index);
+ comp_param[0].migbuf.buf_index = 0;
+ last_sent_block = block;
+ break;
+ }
+ } else {
+retry:
+ for (idx = 0; idx < compress_thread_count; idx++) {
+ if (comp_param[idx].state == COM_DONE) {
+ bytes_sent = comp_param[idx].migbuf.buf_index;
+ if (bytes_sent == 0) {
+ set_common_compress_params(&comp_param[idx],
+ ret, len, block, offset, last_stage,
+ cont, p, ram_bulk_stage);
+ comp_param[idx].state = COM_START;
+ bytes_sent = 1;
+ bytes_transferred -= 1;
+ break;
+ } else if (bytes_sent > 0) {
+ qemu_put_buffer(f, comp_param[idx].migbuf.buf,
+ comp_param[idx].migbuf.buf_index);
+ comp_param[idx].migbuf.buf_index = 0;
+ set_common_compress_params(&comp_param[idx],
+ ret, len, block, offset, last_stage,
+ cont, p, ram_bulk_stage);
+ comp_param[idx].state = COM_START;
+ break;
+ }
+ }
+ }
+ if (idx < compress_thread_count) {
+ last_sent_block = block;
+ break;
+ } else {
+ g_usleep(0);
+ goto retry;
+ }
+ }
}
}
}
@@ -684,7 +1012,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage)
return bytes_sent;
}
-static uint64_t bytes_transferred;
void acct_update_position(QEMUFile *f, size_t size, bool zero) { @@ -892,6 +1219,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
i++;
}
+ flush_compressed_data(f);
qemu_mutex_unlock_ramlist();
/*
@@ -938,6 +1266,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
bytes_transferred += bytes_sent;
}
+ flush_compressed_data(f);
ram_control_after_iterate(f, RAM_CONTROL_FINISH);
migration_end();
@@ -1038,10 +1367,61 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
}
}
+QemuThread *decompress_threads;
+
+static void *do_data_decompress(void *opaque) {
+ decompress_param *param = opaque;
+ while (incomming_migration_done == false) {
+ if (param->state == COM_START) {
+ uLong pagesize = TARGET_PAGE_SIZE;
+ if (uncompress((Bytef *)param->des, &pagesize,
+ (const Bytef *)param->compbuf, param->len) != Z_OK) {
+ printf("uncompress failed!\n");
+ break;
+ }
+ param->state = COM_DONE;
+ } else {
+ if (quit_thread) {
+ break;
+ }
+ g_usleep(1);
+ }
+ }
+ return NULL;
+}
+
+void migrate_decompress_threads_create(int count) {
+ int i;
+ decompress_thread_count = count;
+ decompress_threads = g_malloc0(sizeof(QemuThread) * count);
+ decomp_param = g_malloc0(sizeof(decompress_param) * count);
+ quit_thread = false;
+ for (i = 0; i < count; i++) {
+ qemu_thread_create(decompress_threads + i, "decompress",
+ do_data_decompress, decomp_param + i, QEMU_THREAD_JOINABLE);
+ }
+}
+
+void migrate_decompress_threads_join(void)
+{
+ int i;
+ for (i = 0; i < decompress_thread_count; i++) {
+ qemu_thread_join(decompress_threads + i);
+ }
+ g_free(decompress_threads);
+ g_free(decomp_param);
+ decompress_threads = NULL;
+ decomp_param = NULL;
+}
+
static int ram_load(QEMUFile *f, void *opaque, int version_id) {
int flags = 0, ret = 0;
static uint64_t seq_iter;
+ int len = 0;
+ uint8_t compbuf[COMPRESS_BUF_SIZE];
seq_iter++;
@@ -1106,6 +1486,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
break;
case RAM_SAVE_FLAG_PAGE:
+ quit_thread = true;
host = host_from_stream_offset(f, addr, flags);
if (!host) {
error_report("Illegal RAM offset " RAM_ADDR_FMT, addr); @@ -1115,6 +1496,32 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
break;
+ case RAM_SAVE_FLAG_COMPRESS_PAGE:
+ host = host_from_stream_offset(f, addr, flags);
+ if (!host) {
+ error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
+ ret = -EINVAL;
+ break;
+ }
+
+ len = qemu_get_be32(f);
+ qemu_get_buffer(f, compbuf, len);
+ int idx;
+retry:
+ for (idx = 0; idx < decompress_thread_count; idx++) {
+ if (decomp_param[idx].state == COM_DONE) {
+ memcpy(decomp_param[idx].compbuf, compbuf, len);
+ decomp_param[idx].des = host;
+ decomp_param[idx].len = len;
+ decomp_param[idx].state = COM_START;
+ break;
+ }
+ }
+ if (idx == decompress_thread_count) {
+ g_usleep(0);
+ goto retry;
+ }
+ break;
case RAM_SAVE_FLAG_XBZRLE:
host = host_from_stream_offset(f, addr, flags);
if (!host) {
@@ -941,6 +941,56 @@ Set cache size to @var{value} (in bytes) for xbzrle migrations.
ETEXI
{
+ .name = "migrate_set_compress_level",
+ .args_type = "value:i",
+ .params = "value",
+ .help = "set compress level for compress migrations,"
+ "the level is a number between 0 and 9, 0 stands for "
+ "no compression.\n"
+ "1 stands for the fast compress speed while 9 stands for"
+ "the highest compress ratio.",
+ .mhandler.cmd = hmp_migrate_set_compress_level,
+ },
+
+STEXI
+@item migrate_set_compress_level @var{value} @findex
+migrate_set_compress_level Set compress level to @var{value} for
+compress migrations.
+ETEXI
+
+ {
+ .name = "migrate_set_compress_threads",
+ .args_type = "value:i",
+ .params = "value",
+ .help = "set compress thread count for migrations. "
+ "a proper thread count will accelerate the migration speed,"
+ "the threads should be between 1 and the CPUS of your system",
+ .mhandler.cmd = hmp_migrate_set_compress_threads,
+ },
+
+STEXI
+@item migrate_set_compress_threads @var{value} @findex
+migrate_set_compress_threads Set compress threads to @var{value} for
+compress migrations.
+ETEXI
+
+ {
+ .name = "migrate_set_decompress_threads",
+ .args_type = "value:i",
+ .params = "value",
+ .help = "set decompress thread count for migrations. "
+ "a proper thread count will accelerate the migration speed,"
+ "the threads should be between 1 and the CPUS of your system",
+ .mhandler.cmd = hmp_migrate_set_decompress_threads,
+ },
+
+STEXI
+@item migrate_set_decompress_threads @var{value} @findex
+migrate_set_decompress_threads Set decompress threads to @var{value}
+for compress migrations.
+ETEXI
+
+ {
.name = "migrate_set_speed",
.args_type = "value:o",
.params = "value",
@@ -1766,6 +1816,12 @@ show migration status show current migration capabilities @item info migrate_cache_size show current migration XBZRLE cache size
+@item info migrate_compress_level
+show current migration compress level
+@item info migrate_compress_threads
+show current migration compress threads @item info
+migrate_decompress_threads show current migration decompress threads
@item info balloon
show balloon information
@item info qtree
@@ -252,6 +252,24 @@ void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict)
qmp_query_migrate_cache_size(NULL) >> 10); }
+void hmp_info_migrate_compress_level(Monitor *mon, const QDict *qdict)
+{
+ monitor_printf(mon, "compress level: %" PRId64 "\n",
+ qmp_query_migrate_compress_level(NULL));
+}
+
+void hmp_info_migrate_compress_threads(Monitor *mon, const QDict
+*qdict) {
+ monitor_printf(mon, "compress threads: %" PRId64 "\n",
+ qmp_query_migrate_compress_threads(NULL));
+}
+
+void hmp_info_migrate_decompress_threads(Monitor *mon, const QDict
+*qdict) {
+ monitor_printf(mon, "decompress threads: %" PRId64 "\n",
+ qmp_query_migrate_decompress_threads(NULL));
+}
+
void hmp_info_cpus(Monitor *mon, const QDict *qdict) {
CpuInfoList *cpu_list, *cpu;
@@ -1041,6 +1059,45 @@ void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict)
}
}
+void hmp_migrate_set_compress_level(Monitor *mon, const QDict *qdict) {
+ int64_t value = qdict_get_int(qdict, "value");
+ Error *err = NULL;
+
+ qmp_migrate_set_compress_level(value, &err);
+ if (err) {
+ monitor_printf(mon, "%s\n", error_get_pretty(err));
+ error_free(err);
+ return;
+ }
+}
+
+void hmp_migrate_set_compress_threads(Monitor *mon, const QDict *qdict)
+{
+ int64_t value = qdict_get_int(qdict, "value");
+ Error *err = NULL;
+
+ qmp_migrate_set_compress_threads(value, &err);
+ if (err) {
+ monitor_printf(mon, "%s\n", error_get_pretty(err));
+ error_free(err);
+ return;
+ }
+}
+
+void hmp_migrate_set_decompress_threads(Monitor *mon, const QDict
+*qdict) {
+ int64_t value = qdict_get_int(qdict, "value");
+ Error *err = NULL;
+
+ qmp_migrate_set_decompress_threads(value, &err);
+ if (err) {
+ monitor_printf(mon, "%s\n", error_get_pretty(err));
+ error_free(err);
+ return;
+ }
+}
+
void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict) {
int64_t value = qdict_get_int(qdict, "value"); diff --git a/hmp.h b/hmp.h index 4bb5dca..b348806 100644
@@ -29,6 +29,9 @@ void hmp_info_mice(Monitor *mon, const QDict *qdict); void hmp_info_migrate(Monitor *mon, const QDict *qdict); void hmp_info_migrate_capabilities(Monitor *mon, const QDict *qdict); void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict);
+void hmp_info_migrate_compress_level(Monitor *mon, const QDict *qdict);
+void hmp_info_migrate_compress_threads(Monitor *mon, const QDict
+*qdict); void hmp_info_migrate_decompress_threads(Monitor *mon, const
+QDict *qdict);
void hmp_info_cpus(Monitor *mon, const QDict *qdict); void hmp_info_block(Monitor *mon, const QDict *qdict); void hmp_info_blockstats(Monitor *mon, const QDict *qdict); @@ -64,6 +67,9 @@ void hmp_migrate_set_downtime(Monitor *mon, const QDict *qdict); void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict); void hmp_migrate_set_capability(Monitor *mon, const QDict *qdict); void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict);
+void hmp_migrate_set_compress_level(Monitor *mon, const QDict *qdict);
+void hmp_migrate_set_compress_threads(Monitor *mon, const QDict
+*qdict); void hmp_migrate_set_decompress_threads(Monitor *mon, const
+QDict *qdict);
void hmp_set_password(Monitor *mon, const QDict *qdict); void hmp_expire_password(Monitor *mon, const QDict *qdict); void hmp_eject(Monitor *mon, const QDict *qdict); diff --git a/include/migration/migration.h b/include/migration/migration.h index 3cb5ba8..03c8e0d 100644
@@ -49,6 +49,9 @@ struct MigrationState
QemuThread thread;
QEMUBH *cleanup_bh;
QEMUFile *file;
+ QemuThread *compress_thread;
+ int compress_thread_count;
+ int compress_level;
int state;
MigrationParams params;
@@ -64,6 +67,7 @@ struct MigrationState
int64_t dirty_sync_count;
};
+extern bool incomming_migration_done;
void process_incoming_migration(QEMUFile *f);
void qemu_start_incoming_migration(const char *uri, Error **errp); @@ -107,6 +111,10 @@ bool migration_has_finished(MigrationState *); bool migration_has_failed(MigrationState *); MigrationState *migrate_get_current(void);
+void migrate_compress_threads_create(MigrationState *s); void
+migrate_compress_threads_join(MigrationState *s); void
+migrate_decompress_threads_create(int count); void
+migrate_decompress_threads_join(void);
uint64_t ram_bytes_remaining(void);
uint64_t ram_bytes_transferred(void);
uint64_t ram_bytes_total(void);
@@ -144,7 +152,7 @@ void migrate_del_blocker(Error *reason);
bool migrate_rdma_pin_all(void);
bool migrate_zero_blocks(void);
-
+bool migrate_use_compress(void);
bool migrate_auto_converge(void);
int xbzrle_encode_buffer(uint8_t *old_buf, uint8_t *new_buf, int slen, @@ -153,6 +161,8 @@ int xbzrle_decode_buffer(uint8_t *src, int slen, uint8_t *dst, int dlen);
int migrate_use_xbzrle(void);
int64_t migrate_xbzrle_cache_size(void);
+int migrate_compress_level(void);
+int migrate_compress_threads(void);
int64_t xbzrle_cache_resize(int64_t new_size);
@@ -112,6 +112,7 @@ QEMUFile *qemu_bufopen(const char *mode, QEMUSizedBuffer *input); int qemu_get_fd(QEMUFile *f); int qemu_fclose(QEMUFile *f); int64_t qemu_ftell(QEMUFile *f);
+uint64_t qemu_add_compress(QEMUFile *f, const uint8_t *p, int size);
void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size); void qemu_put_byte(QEMUFile *f, int v);
/*
@@ -46,6 +46,12 @@ enum {
/* Migration XBZRLE default cache size */ #define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024)
+/* Migration compress default thread count */ #define
+DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8 #define
+DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2
+/*0: means nocompress, 1: best speed, ... 9: best compress ratio */
+#define DEFAULT_MIGRATE_COMPRESS_LEVEL 1
+
static NotifierList migration_state_notifiers =
NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
@@ -60,6 +66,8 @@ MigrationState *migrate_get_current(void)
.bandwidth_limit = MAX_THROTTLE,
.xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
.mbps = -1,
+ .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
+ .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
};
return ¤t_migration;
@@ -101,6 +109,7 @@ static void process_incoming_migration_co(void *opaque)
error_report("load of migration failed: %s", strerror(-ret));
exit(EXIT_FAILURE);
}
+ incomming_migration_done = true;
qemu_announce_self();
/* Make sure all file formats flush their mutable metadata */ @@ -116,10 +125,14 @@ static void process_incoming_migration_co(void *opaque)
} else {
runstate_set(RUN_STATE_PAUSED);
}
+ migrate_decompress_threads_join();
}
+static int uncompress_thread_count =
+DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT;
void process_incoming_migration(QEMUFile *f) {
+ incomming_migration_done = false;
+ migrate_decompress_threads_create(uncompress_thread_count);
Coroutine *co = qemu_coroutine_create(process_incoming_migration_co);
int fd = qemu_get_fd(f);
@@ -302,6 +315,7 @@ static void migrate_fd_cleanup(void *opaque)
qemu_thread_join(&s->thread);
qemu_mutex_lock_iothread();
+ migrate_compress_threads_join(s);
qemu_fclose(s->file);
s->file = NULL;
}
@@ -373,6 +387,8 @@ static MigrationState *migrate_init(const MigrationParams *params)
int64_t bandwidth_limit = s->bandwidth_limit;
bool enabled_capabilities[MIGRATION_CAPABILITY_MAX];
int64_t xbzrle_cache_size = s->xbzrle_cache_size;
+ int compress_level = s->compress_level;
+ int compress_thread_count = s->compress_thread_count;
memcpy(enabled_capabilities, s->enabled_capabilities,
sizeof(enabled_capabilities)); @@ -383,6 +399,8 @@ static MigrationState *migrate_init(const MigrationParams *params)
sizeof(enabled_capabilities));
s->xbzrle_cache_size = xbzrle_cache_size;
+ s->compress_level = compress_level;
+ s->compress_thread_count = compress_thread_count;
s->bandwidth_limit = bandwidth_limit;
s->state = MIG_STATE_SETUP;
trace_migrate_set_state(MIG_STATE_SETUP);
@@ -503,6 +521,59 @@ int64_t qmp_query_migrate_cache_size(Error **errp)
return migrate_xbzrle_cache_size(); }
+void qmp_migrate_set_compress_level(int64_t value, Error **errp) {
+ MigrationState *s = migrate_get_current();
+
+ if (value > 9 || value < 0) {
+ error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress level",
+ "is invalid, please input a integer between 0 and 9. ");
+ return;
+ }
+
+ s->compress_level = value;
+}
+
+int64_t qmp_query_migrate_compress_level(Error **errp) {
+ return migrate_compress_level();
+}
+
+void qmp_migrate_set_compress_threads(int64_t value, Error **errp) {
+ MigrationState *s = migrate_get_current();
+
+ if (value > 255 || value < 1) {
+ error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress thread count",
+ "is invalid, please input a integer between 1 and 255. ");
+ return;
+ }
+
+ s->compress_thread_count = value;
+}
+
+void qmp_migrate_set_decompress_threads(int64_t value, Error **errp) {
+
+ if (value > 64 || value < 1) {
+ error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress thread count",
+ "is invalid, please input a integer between 1 and 64. ");
+ return;
+ }
+
+ uncompress_thread_count = value;
+}
+
+int64_t qmp_query_migrate_compress_threads(Error **errp) {
+ return migrate_compress_threads();
+}
+
+int64_t qmp_query_migrate_decompress_threads(Error **errp) {
+ return uncompress_thread_count;
+}
+
void qmp_migrate_set_speed(int64_t value, Error **errp) {
MigrationState *s;
@@ -555,6 +626,33 @@ bool migrate_zero_blocks(void)
return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS];
}
+bool migrate_use_compress(void)
+{
+ MigrationState *s;
+
+ s = migrate_get_current();
+
+ return s->enabled_capabilities[MIGRATION_CAPABILITY_COMPRESS];
+}
+
+int migrate_compress_level(void)
+{
+ MigrationState *s;
+
+ s = migrate_get_current();
+
+ return s->compress_level;
+}
+
+int migrate_compress_threads(void)
+{
+ MigrationState *s;
+
+ s = migrate_get_current();
+
+ return s->compress_thread_count;
+}
+
int migrate_use_xbzrle(void)
{
MigrationState *s;
@@ -697,4 +795,5 @@ void migrate_fd_connect(MigrationState *s)
qemu_thread_create(&s->thread, "migration", migration_thread, s,
QEMU_THREAD_JOINABLE);
+ migrate_compress_threads_create(s);
}
@@ -2865,6 +2865,27 @@ static mon_cmd_t info_cmds[] = {
.mhandler.cmd = hmp_info_migrate_cache_size,
},
{
+ .name = "migrate_compress_level",
+ .args_type = "",
+ .params = "",
+ .help = "show current migration compress level",
+ .mhandler.cmd = hmp_info_migrate_compress_level,
+ },
+ {
+ .name = "migrate_compress_threads",
+ .args_type = "",
+ .params = "",
+ .help = "show current migration compress thread count",
+ .mhandler.cmd = hmp_info_migrate_compress_threads,
+ },
+ {
+ .name = "migrate_decompress_threads",
+ .args_type = "",
+ .params = "",
+ .help = "show current migration decompress thread count",
+ .mhandler.cmd = hmp_info_migrate_decompress_threads,
+ },
+ {
.name = "balloon",
.args_type = "",
.params = "",
@@ -497,7 +497,7 @@
# Since: 1.2
##
{ 'enum': 'MigrationCapability',
- 'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] }
+ 'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks',
+ 'compress'] }
##
# @MigrationCapabilityStatus
@@ -1382,6 +1382,88 @@
{ 'command': 'query-migrate-cache-size', 'returns': 'int' }
##
+# @migrate-set-compress-level
+#
+# Set compress level
+#
+# @value: compress level int
+#
+# The compress level will be an integer between 0 and 9.
+# The compress level can be modified before and during ongoing
+migration # # Returns: nothing on success # # Since: 1.2 ## {
+'command': 'migrate-set-compress-level', 'data': {'value': 'int'} }
+
+##
+# @query-migrate-compress-level
+#
+# query compress level
+#
+# Returns: compress level int
+#
+# Since: 1.2
+##
+{ 'command': 'query-migrate-compress-level', 'returns': 'int' }
+
+##
+# @migrate-set-compress-threads
+#
+# Set compress threads
+#
+# @value: compress threads int
+#
+# The compress thread count is an integer between 1 and 255.
+# The compress level can be modified only before migration # # Returns:
+nothing on success # # Since: 1.2 ## { 'command':
+'migrate-set-compress-threads', 'data': {'value': 'int'} }
+
+##
+# @query-migrate-compress-threads
+#
+# query compress threads
+#
+# Returns: compress threads int
+#
+# Since: 1.2
+##
+{ 'command': 'query-migrate-compress-threads', 'returns': 'int' }
+
+##
+##
+# @migrate-set-decompress-threads
+#
+# Set decompress threads
+#
+# @value: decompress threads int
+#
+# The decompress thread count is an integer between 1 and 64.
+# The decompress level can be modified only before migration # #
+Returns: nothing on success # # Since: 1.2 ## { 'command':
+'migrate-set-decompress-threads', 'data': {'value': 'int'} }
+
+##
+# @query-migrate-decompress-threads
+#
+# query decompress threads
+#
+# Returns: decompress threads int
+#
+# Since: 1.2
+##
+{ 'command': 'query-migrate-decompress-threads', 'returns': 'int' }
+
+##
# @ObjectPropertyInfo:
#
# @name: the name of the property
@@ -705,7 +705,138 @@ Example:
<- { "return": 67108864 }
EQMP
+{
+ .name = "migrate-set-compress-level",
+ .args_type = "value:i",
+ .mhandler.cmd_new = qmp_marshal_input_migrate_set_compress_level,
+ },
+