On 2014/11/6 19:08, Li Liang wrote: > Instead of sending the guest memory directly, this solution compress > the ram page before sending, after receiving, the data will be > decompressed. > This feature can help to reduce the data transferred about > 60%, this is very useful when the network bandwidth is limited, > and the migration time can also be reduced about 70%. The > feature is off by default, following the document > docs/multiple-compression-threads.txt for information to use it. > > Reviewed-by: Eric Blake <ebl...@redhat.com> > Signed-off-by: Li Liang <liang.z...@intel.com> > --- > arch_init.c | 435 > ++++++++++++++++++++++++++++++++++++++++-- > hmp-commands.hx | 56 ++++++ > hmp.c | 57 ++++++ > hmp.h | 6 + > include/migration/migration.h | 12 +- > include/migration/qemu-file.h | 1 + > migration.c | 99 ++++++++++ > monitor.c | 21 ++ > qapi-schema.json | 88 ++++++++- > qmp-commands.hx | 131 +++++++++++++ > 10 files changed, 890 insertions(+), 16 deletions(-) > > diff --git a/arch_init.c b/arch_init.c > index 88a5ba0..a27d87b 100644 > --- a/arch_init.c > +++ b/arch_init.c > @@ -24,6 +24,7 @@ > #include <stdint.h> > #include <stdarg.h> > #include <stdlib.h> > +#include <zlib.h> > #ifndef _WIN32 > #include <sys/types.h> > #include <sys/mman.h> > @@ -126,6 +127,7 @@ static uint64_t bitmap_sync_count; > #define RAM_SAVE_FLAG_CONTINUE 0x20 > #define RAM_SAVE_FLAG_XBZRLE 0x40 > /* 0x80 is reserved in migration.h start with 0x100 next */ > +#define RAM_SAVE_FLAG_COMPRESS_PAGE 0x100 > > static struct defconfig_file { > const char *filename; > @@ -332,6 +334,177 @@ static uint64_t migration_dirty_pages; > static uint32_t last_version; > static bool ram_bulk_stage; > > +#define COMPRESS_BUF_SIZE (TARGET_PAGE_SIZE + 16) > +#define MIG_BUF_SIZE (COMPRESS_BUF_SIZE + 256 + 16) > +struct MigBuf { > + int buf_index; > + uint8_t buf[MIG_BUF_SIZE]; > +}; > + > +typedef struct MigBuf MigBuf; > + > +static void migrate_put_byte(MigBuf *f, int v) > +{ > + f->buf[f->buf_index] = v; > + f->buf_index++; > +} > + > +static void migrate_put_be16(MigBuf *f, unsigned int v) > +{ > + migrate_put_byte(f, v >> 8); > + migrate_put_byte(f, v); > +} > + > +static void migrate_put_be32(MigBuf *f, unsigned int v) > +{ > + migrate_put_byte(f, v >> 24); > + migrate_put_byte(f, v >> 16); > + migrate_put_byte(f, v >> 8); > + migrate_put_byte(f, v); > +} > + > +static void migrate_put_be64(MigBuf *f, uint64_t v) > +{ > + migrate_put_be32(f, v >> 32); > + migrate_put_be32(f, v); > +} > + > +static void migrate_put_buffer(MigBuf *f, const uint8_t *buf, int size) > +{ > + int l; > + > + while (size > 0) { > + l = MIG_BUF_SIZE - f->buf_index; > + if (l > size) { > + l = size; > + } > + memcpy(f->buf + f->buf_index, buf, l); > + f->buf_index += l; > + buf += l; > + size -= l; > + } > +} > + > +static size_t migrate_save_block_hdr(MigBuf *f, RAMBlock *block, > + ram_addr_t offset, int cont, int flag) > +{ > + size_t size; > + > + migrate_put_be64(f, offset | cont | flag); > + size = 8; > + > + if (!cont) { > + migrate_put_byte(f, strlen(block->idstr)); > + migrate_put_buffer(f, (uint8_t *)block->idstr, > + strlen(block->idstr)); > + size += 1 + strlen(block->idstr); > + } > + return size; > +} > + > +static int migrate_qemu_add_compress(MigBuf *f, const uint8_t *p, > + int size, int level) > +{ > + uLong blen = COMPRESS_BUF_SIZE; > + if (compress2(f->buf + f->buf_index + sizeof(int), &blen, (Bytef *)p, > + size, level) != Z_OK) { > + error_report("Compress Failed!\n"); > + return 0; > + } > + migrate_put_be32(f, blen); > + f->buf_index += blen; > + return blen + sizeof(int); > +} > + > +enum { > + COM_DONE = 0, > + COM_START, > +}; > + > +static int compress_thread_count; > +static int decompress_thread_count; > + > +struct compress_param { > + int state; > + MigBuf migbuf; > + RAMBlock *block; > + ram_addr_t offset; > + bool last_stage; > + int ret; > + int bytes_sent; > + uint8_t *p; > + int cont; > + bool bulk_stage; > +}; > + > +typedef struct compress_param compress_param; > +compress_param *comp_param; > + > +struct decompress_param { > + int state; > + void *des; > + uint8 compbuf[COMPRESS_BUF_SIZE]; > + int len; > +}; > +typedef struct decompress_param decompress_param; > + > +static decompress_param *decomp_param; > +bool incomming_migration_done; > +static bool quit_thread; > + > +static int save_compress_ram_page(compress_param *param); > + > + > +static void *do_data_compress(void *opaque) > +{ > + compress_param *param = opaque; > + while (!quit_thread) { > + if (param->state == COM_START) { > + save_compress_ram_page(param); > + param->state = COM_DONE; > + } else { > + g_usleep(1); > + } > + } > + > + return NULL; > +} > + > + > +void migrate_compress_threads_join(MigrationState *s) > +{ > + int i; > + if (!migrate_use_compress()) { > + return; > + } > + quit_thread = true; > + for (i = 0; i < compress_thread_count; i++) { > + qemu_thread_join(s->compress_thread + i); > + } > + g_free(s->compress_thread); > + g_free(comp_param); > + s->compress_thread = NULL; > + comp_param = NULL; > +} > + > +void migrate_compress_threads_create(MigrationState *s) > +{ > + int i; > + if (!migrate_use_compress()) { > + return; > + } > + quit_thread = false; > + compress_thread_count = s->compress_thread_count; > + s->compress_thread = g_malloc0(sizeof(QemuThread) > + * s->compress_thread_count); > + comp_param = g_malloc0(sizeof(compress_param) * > s->compress_thread_count); > + for (i = 0; i < s->compress_thread_count; i++) { > + qemu_thread_create(s->compress_thread + i, "compress", > + do_data_compress, comp_param + i, QEMU_THREAD_JOINABLE); > + > + } > +} > + > /* Update the xbzrle cache to reflect a page that's been sent as all 0. > * The important thing is that a stale (not-yet-0'd) page be replaced > * by the new data. > @@ -351,9 +524,10 @@ static void xbzrle_cache_zero_page(ram_addr_t > current_addr) > > #define ENCODING_FLAG_XBZRLE 0x1 > > -static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data, > +static int save_xbzrle_page(void *f, uint8_t **current_data, > ram_addr_t current_addr, RAMBlock *block, > - ram_addr_t offset, int cont, bool last_stage) > + ram_addr_t offset, int cont, bool last_stage, > + bool save_to_buf) > { > int encoded_len = 0, bytes_sent = -1; > uint8_t *prev_cached_page; > @@ -401,10 +575,19 @@ static int save_xbzrle_page(QEMUFile *f, uint8_t > **current_data, > } > > /* Send XBZRLE based compressed page */ > - bytes_sent = save_block_hdr(f, block, offset, cont, > RAM_SAVE_FLAG_XBZRLE); > - qemu_put_byte(f, ENCODING_FLAG_XBZRLE); > - qemu_put_be16(f, encoded_len); > - qemu_put_buffer(f, XBZRLE.encoded_buf, encoded_len); > + if (save_to_buf) { > + bytes_sent = migrate_save_block_hdr((MigBuf *)f, block, offset, > + cont, RAM_SAVE_FLAG_XBZRLE); > + migrate_put_byte((MigBuf *)f, ENCODING_FLAG_XBZRLE); > + migrate_put_be16((MigBuf *)f, encoded_len); > + migrate_put_buffer((MigBuf *)f, XBZRLE.encoded_buf, encoded_len); > + } else { > + bytes_sent = save_block_hdr((QEMUFile *)f, block, offset, > + cont, RAM_SAVE_FLAG_XBZRLE); > + qemu_put_byte((QEMUFile *)f, ENCODING_FLAG_XBZRLE); > + qemu_put_be16((QEMUFile *)f, encoded_len); > + qemu_put_buffer((QEMUFile *)f, XBZRLE.encoded_buf, encoded_len); > + } > bytes_sent += encoded_len + 1 + 2; > acct_info.xbzrle_pages++; > acct_info.xbzrle_bytes += bytes_sent; > @@ -609,7 +792,7 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, > ram_addr_t offset, > xbzrle_cache_zero_page(current_addr); > } else if (!ram_bulk_stage && migrate_use_xbzrle()) { > bytes_sent = save_xbzrle_page(f, &p, current_addr, block, > - offset, cont, last_stage); > + offset, cont, last_stage, false); > if (!last_stage) { > /* Can't send this cached data async, since the cache page > * might get updated before it gets to the wire > @@ -635,6 +818,90 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, > ram_addr_t offset, > return bytes_sent; > } > > +static int save_compress_ram_page(compress_param *param) > +{ > + int bytes_sent = param->bytes_sent; > + int blen = COMPRESS_BUF_SIZE; > + int cont = param->cont; > + uint8_t *p = param->p; > + int ret = param->ret; > + RAMBlock *block = param->block; > + ram_addr_t offset = param->offset; > + bool last_stage = param->last_stage; > + /* In doubt sent page as normal */ > + XBZRLE_cache_lock(); > + ram_addr_t current_addr = block->offset + offset; > + if (ret != RAM_SAVE_CONTROL_NOT_SUPP) { > + if (ret != RAM_SAVE_CONTROL_DELAYED) { > + if (bytes_sent > 0) { > + atomic_inc(&acct_info.norm_pages); > + } else if (bytes_sent == 0) { > + atomic_inc(&acct_info.dup_pages); > + } > + } > + } else if (is_zero_range(p, TARGET_PAGE_SIZE)) { > + atomic_inc(&acct_info.dup_pages); > + bytes_sent = migrate_save_block_hdr(¶m->migbuf, block, offset, > cont, > + RAM_SAVE_FLAG_COMPRESS); > + migrate_put_byte(¶m->migbuf, 0); > + bytes_sent++; > + /* Must let xbzrle know, otherwise a previous (now 0'd) cached > + * page would be stale > + */ > + xbzrle_cache_zero_page(current_addr); > + } else if (!param->bulk_stage && migrate_use_xbzrle()) { > + bytes_sent = save_xbzrle_page(¶m->migbuf, &p, current_addr, > block, > + offset, cont, last_stage, true); > + } > + XBZRLE_cache_unlock(); > + /* XBZRLE overflow or normal page */ > + if (bytes_sent == -1) { > + bytes_sent = migrate_save_block_hdr(¶m->migbuf, block, > + offset, cont, RAM_SAVE_FLAG_COMPRESS_PAGE); > + blen = migrate_qemu_add_compress(¶m->migbuf, p, > + TARGET_PAGE_SIZE, migrate_compress_level()); > + bytes_sent += blen; > + atomic_inc(&acct_info.norm_pages); > + } > + return bytes_sent; > +} > + > +static uint64_t bytes_transferred; > + > +static void flush_compressed_data(QEMUFile *f) > +{ > + int idx; > + if (!migrate_use_compress()) { > + return; > + } > + > + for (idx = 0; idx < compress_thread_count; idx++) { > + while (comp_param[idx].state != COM_DONE) { > + g_usleep(0); > + } > + if (comp_param[idx].migbuf.buf_index > 0) { > + qemu_put_buffer(f, comp_param[idx].migbuf.buf, > + comp_param[idx].migbuf.buf_index); > + bytes_transferred += comp_param[idx].migbuf.buf_index; > + comp_param[idx].migbuf.buf_index = 0; > + } > + } > +} > + > +static inline void set_common_compress_params(compress_param *param, > + int ret, int bytes_sent, RAMBlock *block, ram_addr_t offset, > + bool last_stage, int cont, uint8_t *p, bool bulk_stage) > +{ > + param->ret = ret; > + param->bytes_sent = bytes_sent; > + param->block = block; > + param->offset = offset; > + param->last_stage = last_stage; > + param->cont = cont; > + param->p = p; > + param->bulk_stage = bulk_stage; > +} > + > /* > * ram_find_and_save_block: Finds a page to send and sends it to f > * > @@ -649,6 +916,8 @@ static int ram_find_and_save_block(QEMUFile *f, bool > last_stage) > bool complete_round = false; > int bytes_sent = 0; > MemoryRegion *mr; > + int cont, idx, ret, len = -1; > + uint8_t *p; > > if (!block) > block = QTAILQ_FIRST(&ram_list.blocks); > @@ -667,14 +936,73 @@ static int ram_find_and_save_block(QEMUFile *f, bool > last_stage) > block = QTAILQ_FIRST(&ram_list.blocks); > complete_round = true; > ram_bulk_stage = false; > + if (migrate_use_xbzrle()) { > + /* terminate the used thread at this point*/ > + flush_compressed_data(f); > + quit_thread = true; > + } > } > } else { > - bytes_sent = ram_save_page(f, block, offset, last_stage); > - > - /* if page is unmodified, continue to the next */ > - if (bytes_sent > 0) { > - last_sent_block = block; > - break; > + if (!migrate_use_compress()) { > + bytes_sent = ram_save_page(f, block, offset, last_stage); > + /* if page is unmodified, continue to the next */ > + if (bytes_sent > 0) { > + last_sent_block = block; > + break; > + } > + } else { > + cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : > 0; > + p = memory_region_get_ram_ptr(block->mr) + offset; > + ret = ram_control_save_page(f, block->offset, > + offset, TARGET_PAGE_SIZE, &len); > + if ((!ram_bulk_stage && migrate_use_xbzrle()) || cont == 0) { > + if (cont == 0) { > + flush_compressed_data(f); > + } > + set_common_compress_params(&comp_param[0], > + ret, len, block, offset, last_stage, cont, > + p, ram_bulk_stage); > + bytes_sent = save_compress_ram_page(&comp_param[0]); > + if (bytes_sent > 0) { > + qemu_put_buffer(f, comp_param[0].migbuf.buf, > + comp_param[0].migbuf.buf_index); > + comp_param[0].migbuf.buf_index = 0; > + last_sent_block = block; > + break; > + } > + } else { > +retry: > + for (idx = 0; idx < compress_thread_count; idx++) { > + if (comp_param[idx].state == COM_DONE) { > + bytes_sent = comp_param[idx].migbuf.buf_index; > + if (bytes_sent == 0) { > + set_common_compress_params(&comp_param[idx], > + ret, len, block, offset, last_stage, > + cont, p, ram_bulk_stage); > + comp_param[idx].state = COM_START; > + bytes_sent = 1; > + bytes_transferred -= 1; > + break; > + } else if (bytes_sent > 0) { > + qemu_put_buffer(f, > comp_param[idx].migbuf.buf, > + comp_param[idx].migbuf.buf_index); > + comp_param[idx].migbuf.buf_index = 0; > + set_common_compress_params(&comp_param[idx], > + ret, len, block, offset, last_stage, > + cont, p, ram_bulk_stage); > + comp_param[idx].state = COM_START; > + break; > + } > + } > + } > + if (idx < compress_thread_count) { > + last_sent_block = block; > + break; > + } else { > + g_usleep(0); > + goto retry; > + } > + } > } > } > } > @@ -684,7 +1012,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool > last_stage) > return bytes_sent; > } > > -static uint64_t bytes_transferred; > > void acct_update_position(QEMUFile *f, size_t size, bool zero) > { > @@ -892,6 +1219,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque) > i++; > } > > + flush_compressed_data(f); > qemu_mutex_unlock_ramlist(); > > /* > @@ -938,6 +1266,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque) > bytes_transferred += bytes_sent; > } > > + flush_compressed_data(f); > ram_control_after_iterate(f, RAM_CONTROL_FINISH); > migration_end(); > > @@ -1038,10 +1367,61 @@ void ram_handle_compressed(void *host, uint8_t ch, > uint64_t size) > } > } > > +QemuThread *decompress_threads; > + > +static void *do_data_decompress(void *opaque) > +{ > + decompress_param *param = opaque; > + while (incomming_migration_done == false) { > + if (param->state == COM_START) { > + uLong pagesize = TARGET_PAGE_SIZE; > + if (uncompress((Bytef *)param->des, &pagesize, > + (const Bytef *)param->compbuf, param->len) != Z_OK) { > + error_report("Uncompress Failed!\n"); > + break; > + } > + param->state = COM_DONE; > + } else { > + if (quit_thread) { > + break; > + } > + g_usleep(1); > + } > + } > + return NULL; > +} > + > +void migrate_decompress_threads_create(int count) > +{ > + int i; > + decompress_thread_count = count; > + decompress_threads = g_malloc0(sizeof(QemuThread) * count); > + decomp_param = g_malloc0(sizeof(decompress_param) * count); > + quit_thread = false; > + for (i = 0; i < count; i++) { > + qemu_thread_create(decompress_threads + i, "decompress", > + do_data_decompress, decomp_param + i, QEMU_THREAD_JOINABLE); > + } > +} > + > +void migrate_decompress_threads_join(void) > +{ > + int i; > + for (i = 0; i < decompress_thread_count; i++) { > + qemu_thread_join(decompress_threads + i); > + } > + g_free(decompress_threads); > + g_free(decomp_param); > + decompress_threads = NULL; > + decomp_param = NULL; > +} > + > static int ram_load(QEMUFile *f, void *opaque, int version_id) > { > int flags = 0, ret = 0; > static uint64_t seq_iter; > + int len = 0; > + uint8_t compbuf[COMPRESS_BUF_SIZE]; > > seq_iter++; > > @@ -1106,6 +1486,7 @@ static int ram_load(QEMUFile *f, void *opaque, int > version_id) > ram_handle_compressed(host, ch, TARGET_PAGE_SIZE); > break; > case RAM_SAVE_FLAG_PAGE: > + quit_thread = true; > host = host_from_stream_offset(f, addr, flags); > if (!host) { > error_report("Illegal RAM offset " RAM_ADDR_FMT, addr); > @@ -1115,6 +1496,32 @@ static int ram_load(QEMUFile *f, void *opaque, int > version_id) > > qemu_get_buffer(f, host, TARGET_PAGE_SIZE); > break; > + case RAM_SAVE_FLAG_COMPRESS_PAGE: > + host = host_from_stream_offset(f, addr, flags); > + if (!host) { > + error_report("Illegal RAM offset " RAM_ADDR_FMT, addr); > + ret = -EINVAL; > + break; > + } > + > + len = qemu_get_be32(f); > + qemu_get_buffer(f, compbuf, len); > + int idx; > +retry: > + for (idx = 0; idx < decompress_thread_count; idx++) { > + if (decomp_param[idx].state == COM_DONE) { > + memcpy(decomp_param[idx].compbuf, compbuf, len); > + decomp_param[idx].des = host; > + decomp_param[idx].len = len; > + decomp_param[idx].state = COM_START; > + break; > + } > + } > + if (idx == decompress_thread_count) { > + g_usleep(0); > + goto retry; > + } > + break; > case RAM_SAVE_FLAG_XBZRLE: > host = host_from_stream_offset(f, addr, flags); > if (!host) { > diff --git a/hmp-commands.hx b/hmp-commands.hx > index e37bc8b..8b93bed 100644 > --- a/hmp-commands.hx > +++ b/hmp-commands.hx > @@ -941,6 +941,56 @@ Set cache size to @var{value} (in bytes) for xbzrle > migrations. > ETEXI > > { > + .name = "migrate_set_compress_level", > + .args_type = "value:i", > + .params = "value", > + .help = "set compress level for compress migrations," > + "the level is a number between 0 and 9, 0 stands for " > + "no compression.\n" > + "1 stands for the fast compress speed while 9 stands > for" > + "the highest compress ratio.", > + .mhandler.cmd = hmp_migrate_set_compress_level, > + }, > + > +STEXI > +@item migrate_set_compress_level @var{value} > +@findex migrate_set_compress_level > +Set compress level to @var{value} for compress migrations. > +ETEXI > + > + { > + .name = "migrate_set_compress_threads", > + .args_type = "value:i", > + .params = "value", > + .help = "set compress thread count for migrations. " > + "a proper thread count will accelerate the migration > speed," > + "the threads should be between 1 and the CPUS of your > system", > + .mhandler.cmd = hmp_migrate_set_compress_threads, > + }, > + > +STEXI > +@item migrate_set_compress_threads @var{value} > +@findex migrate_set_compress_threads > +Set compress threads to @var{value} for compress migrations. > +ETEXI > + > + { > + .name = "migrate_set_decompress_threads", > + .args_type = "value:i", > + .params = "value", > + .help = "set decompress thread count for migrations. " > + "a proper thread count will accelerate the migration > speed," > + "the threads should be between 1 and the CPUS of your > system", > + .mhandler.cmd = hmp_migrate_set_decompress_threads, > + }, > + > +STEXI > +@item migrate_set_decompress_threads @var{value} > +@findex migrate_set_decompress_threads > +Set decompress threads to @var{value} for compress migrations. > +ETEXI > + > + { > .name = "migrate_set_speed", > .args_type = "value:o", > .params = "value", > @@ -1766,6 +1816,12 @@ show migration status > show current migration capabilities > @item info migrate_cache_size > show current migration XBZRLE cache size > +@item info migrate_compress_level > +show current migration compress level > +@item info migrate_compress_threads > +show current migration compress threads > +@item info migrate_decompress_threads > +show current migration decompress threads > @item info balloon > show balloon information > @item info qtree > diff --git a/hmp.c b/hmp.c > index 63d7686..b1936a3 100644 > --- a/hmp.c > +++ b/hmp.c > @@ -252,6 +252,24 @@ void hmp_info_migrate_cache_size(Monitor *mon, const > QDict *qdict) > qmp_query_migrate_cache_size(NULL) >> 10); > } > > +void hmp_info_migrate_compress_level(Monitor *mon, const QDict *qdict) > +{ > + monitor_printf(mon, "compress level: %" PRId64 "\n", > + qmp_query_migrate_compress_level(NULL)); > +} > + > +void hmp_info_migrate_compress_threads(Monitor *mon, const QDict *qdict) > +{ > + monitor_printf(mon, "compress threads: %" PRId64 "\n", > + qmp_query_migrate_compress_threads(NULL)); > +} > + > +void hmp_info_migrate_decompress_threads(Monitor *mon, const QDict *qdict) > +{ > + monitor_printf(mon, "decompress threads: %" PRId64 "\n", > + qmp_query_migrate_decompress_threads(NULL)); > +} > + > void hmp_info_cpus(Monitor *mon, const QDict *qdict) > { > CpuInfoList *cpu_list, *cpu; > @@ -1041,6 +1059,45 @@ void hmp_migrate_set_cache_size(Monitor *mon, const > QDict *qdict) > } > } > > +void hmp_migrate_set_compress_level(Monitor *mon, const QDict *qdict) > +{ > + int64_t value = qdict_get_int(qdict, "value"); > + Error *err = NULL; > + > + qmp_migrate_set_compress_level(value, &err); > + if (err) { > + monitor_printf(mon, "%s\n", error_get_pretty(err)); > + error_free(err); > + return; > + } > +} > + > +void hmp_migrate_set_compress_threads(Monitor *mon, const QDict *qdict) > +{ > + int64_t value = qdict_get_int(qdict, "value"); > + Error *err = NULL; > + > + qmp_migrate_set_compress_threads(value, &err); > + if (err) { > + monitor_printf(mon, "%s\n", error_get_pretty(err)); > + error_free(err); > + return; > + } > +} > + > +void hmp_migrate_set_decompress_threads(Monitor *mon, const QDict *qdict) > +{ > + int64_t value = qdict_get_int(qdict, "value"); > + Error *err = NULL; > + > + qmp_migrate_set_decompress_threads(value, &err); > + if (err) { > + monitor_printf(mon, "%s\n", error_get_pretty(err)); > + error_free(err); > + return; > + } > +} > + > void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict) > { > int64_t value = qdict_get_int(qdict, "value"); > diff --git a/hmp.h b/hmp.h > index 4bb5dca..b348806 100644 > --- a/hmp.h > +++ b/hmp.h > @@ -29,6 +29,9 @@ void hmp_info_mice(Monitor *mon, const QDict *qdict); > void hmp_info_migrate(Monitor *mon, const QDict *qdict); > void hmp_info_migrate_capabilities(Monitor *mon, const QDict *qdict); > void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict); > +void hmp_info_migrate_compress_level(Monitor *mon, const QDict *qdict); > +void hmp_info_migrate_compress_threads(Monitor *mon, const QDict *qdict); > +void hmp_info_migrate_decompress_threads(Monitor *mon, const QDict *qdict); > void hmp_info_cpus(Monitor *mon, const QDict *qdict); > void hmp_info_block(Monitor *mon, const QDict *qdict); > void hmp_info_blockstats(Monitor *mon, const QDict *qdict); > @@ -64,6 +67,9 @@ void hmp_migrate_set_downtime(Monitor *mon, const QDict > *qdict); > void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict); > void hmp_migrate_set_capability(Monitor *mon, const QDict *qdict); > void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict); > +void hmp_migrate_set_compress_level(Monitor *mon, const QDict *qdict); > +void hmp_migrate_set_compress_threads(Monitor *mon, const QDict *qdict); > +void hmp_migrate_set_decompress_threads(Monitor *mon, const QDict *qdict); > void hmp_set_password(Monitor *mon, const QDict *qdict); > void hmp_expire_password(Monitor *mon, const QDict *qdict); > void hmp_eject(Monitor *mon, const QDict *qdict); > diff --git a/include/migration/migration.h b/include/migration/migration.h > index 3cb5ba8..03c8e0d 100644 > --- a/include/migration/migration.h > +++ b/include/migration/migration.h > @@ -49,6 +49,9 @@ struct MigrationState > QemuThread thread; > QEMUBH *cleanup_bh; > QEMUFile *file; > + QemuThread *compress_thread; > + int compress_thread_count; > + int compress_level; > > int state; > MigrationParams params; > @@ -64,6 +67,7 @@ struct MigrationState > int64_t dirty_sync_count; > }; > > +extern bool incomming_migration_done; > void process_incoming_migration(QEMUFile *f); > > void qemu_start_incoming_migration(const char *uri, Error **errp); > @@ -107,6 +111,10 @@ bool migration_has_finished(MigrationState *); > bool migration_has_failed(MigrationState *); > MigrationState *migrate_get_current(void); > > +void migrate_compress_threads_create(MigrationState *s); > +void migrate_compress_threads_join(MigrationState *s); > +void migrate_decompress_threads_create(int count); > +void migrate_decompress_threads_join(void); > uint64_t ram_bytes_remaining(void); > uint64_t ram_bytes_transferred(void); > uint64_t ram_bytes_total(void); > @@ -144,7 +152,7 @@ void migrate_del_blocker(Error *reason); > > bool migrate_rdma_pin_all(void); > bool migrate_zero_blocks(void); > - > +bool migrate_use_compress(void); > bool migrate_auto_converge(void); > > int xbzrle_encode_buffer(uint8_t *old_buf, uint8_t *new_buf, int slen, > @@ -153,6 +161,8 @@ int xbzrle_decode_buffer(uint8_t *src, int slen, uint8_t > *dst, int dlen); > > int migrate_use_xbzrle(void); > int64_t migrate_xbzrle_cache_size(void); > +int migrate_compress_level(void); > +int migrate_compress_threads(void); > > int64_t xbzrle_cache_resize(int64_t new_size); > > diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h > index 401676b..431e6cc 100644 > --- a/include/migration/qemu-file.h > +++ b/include/migration/qemu-file.h > @@ -112,6 +112,7 @@ QEMUFile *qemu_bufopen(const char *mode, QEMUSizedBuffer > *input); > int qemu_get_fd(QEMUFile *f); > int qemu_fclose(QEMUFile *f); > int64_t qemu_ftell(QEMUFile *f); > +uint64_t qemu_add_compress(QEMUFile *f, const uint8_t *p, int size); > void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size); > void qemu_put_byte(QEMUFile *f, int v); > /* > diff --git a/migration.c b/migration.c > index c49a05a..716de97 100644 > --- a/migration.c > +++ b/migration.c > @@ -46,6 +46,12 @@ enum { > /* Migration XBZRLE default cache size */ > #define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024) > > +/* Migration compress default thread count */ > +#define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8 > +#define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2 > +/*0: means nocompress, 1: best speed, ... 9: best compress ratio */ > +#define DEFAULT_MIGRATE_COMPRESS_LEVEL 1 > + > static NotifierList migration_state_notifiers = > NOTIFIER_LIST_INITIALIZER(migration_state_notifiers); > > @@ -60,6 +66,8 @@ MigrationState *migrate_get_current(void) > .bandwidth_limit = MAX_THROTTLE, > .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE, > .mbps = -1, > + .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT, > + .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL, > }; > > return ¤t_migration; > @@ -101,6 +109,7 @@ static void process_incoming_migration_co(void *opaque) > error_report("load of migration failed: %s", strerror(-ret)); > exit(EXIT_FAILURE); > } > + incomming_migration_done = true; > qemu_announce_self(); > > /* Make sure all file formats flush their mutable metadata */ > @@ -116,10 +125,14 @@ static void process_incoming_migration_co(void *opaque) > } else { > runstate_set(RUN_STATE_PAUSED); > } > + migrate_decompress_threads_join(); > } > > +static int uncompress_thread_count = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT; > void process_incoming_migration(QEMUFile *f) > { > + incomming_migration_done = false; > + migrate_decompress_threads_create(uncompress_thread_count); > Coroutine *co = qemu_coroutine_create(process_incoming_migration_co); > int fd = qemu_get_fd(f); > > @@ -302,6 +315,7 @@ static void migrate_fd_cleanup(void *opaque) > qemu_thread_join(&s->thread); > qemu_mutex_lock_iothread(); > > + migrate_compress_threads_join(s); > qemu_fclose(s->file); > s->file = NULL; > } > @@ -373,6 +387,8 @@ static MigrationState *migrate_init(const MigrationParams > *params) > int64_t bandwidth_limit = s->bandwidth_limit; > bool enabled_capabilities[MIGRATION_CAPABILITY_MAX]; > int64_t xbzrle_cache_size = s->xbzrle_cache_size; > + int compress_level = s->compress_level; > + int compress_thread_count = s->compress_thread_count; > > memcpy(enabled_capabilities, s->enabled_capabilities, > sizeof(enabled_capabilities)); > @@ -383,6 +399,8 @@ static MigrationState *migrate_init(const MigrationParams > *params) > sizeof(enabled_capabilities)); > s->xbzrle_cache_size = xbzrle_cache_size; > > + s->compress_level = compress_level; > + s->compress_thread_count = compress_thread_count; > s->bandwidth_limit = bandwidth_limit; > s->state = MIG_STATE_SETUP; > trace_migrate_set_state(MIG_STATE_SETUP); > @@ -503,6 +521,59 @@ int64_t qmp_query_migrate_cache_size(Error **errp) > return migrate_xbzrle_cache_size(); > } > > +void qmp_migrate_set_compress_level(int64_t value, Error **errp) > +{ > + MigrationState *s = migrate_get_current(); > + > + if (value > 9 || value < 0) { > + error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress level", > + "is invalid, please input a integer between 0 and 9. "); > + return; > + } > + > + s->compress_level = value; > +} > + > +int64_t qmp_query_migrate_compress_level(Error **errp) > +{ > + return migrate_compress_level(); > +} > + > +void qmp_migrate_set_compress_threads(int64_t value, Error **errp) > +{ > + MigrationState *s = migrate_get_current(); > + > + if (value > 255 || value < 1) { > + error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress thread > count", > + "is invalid, please input a integer between 1 and 255. "); > + return; > + } > + > + s->compress_thread_count = value; > +} > + > +void qmp_migrate_set_decompress_threads(int64_t value, Error **errp) > +{ > + > + if (value > 255 || value < 1) { > + error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress thread > count", > + "is invalid, please input a integer between 1 and 255. "); > + return; > + } > + > + uncompress_thread_count = value; > +} > + > +int64_t qmp_query_migrate_compress_threads(Error **errp) > +{ > + return migrate_compress_threads(); > +} > + > +int64_t qmp_query_migrate_decompress_threads(Error **errp) > +{ > + return uncompress_thread_count; > +} > + > void qmp_migrate_set_speed(int64_t value, Error **errp) > { > MigrationState *s; > @@ -555,6 +626,33 @@ bool migrate_zero_blocks(void) > return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS]; > } > > +bool migrate_use_compress(void) > +{ > + MigrationState *s; > + > + s = migrate_get_current(); > + > + return s->enabled_capabilities[MIGRATION_CAPABILITY_COMPRESS]; > +} > + > +int migrate_compress_level(void) > +{ > + MigrationState *s; > + > + s = migrate_get_current(); > + > + return s->compress_level; > +} > + > +int migrate_compress_threads(void) > +{ > + MigrationState *s; > + > + s = migrate_get_current(); > + > + return s->compress_thread_count; > +} > + > int migrate_use_xbzrle(void) > { > MigrationState *s; > @@ -697,4 +795,5 @@ void migrate_fd_connect(MigrationState *s) > > qemu_thread_create(&s->thread, "migration", migration_thread, s, > QEMU_THREAD_JOINABLE); > + migrate_compress_threads_create(s);
don't create compress_threads always. It may be better: if (!migrate_use_xbzrle()) { migrate_compress_threads_create(s); } BTW, this patch is too big to review. Spliting it into some patch will be welcome. > } > diff --git a/monitor.c b/monitor.c > index 905d8cf..365547e 100644 > --- a/monitor.c > +++ b/monitor.c > @@ -2865,6 +2865,27 @@ static mon_cmd_t info_cmds[] = { > .mhandler.cmd = hmp_info_migrate_cache_size, > }, > { > + .name = "migrate_compress_level", > + .args_type = "", > + .params = "", > + .help = "show current migration compress level", > + .mhandler.cmd = hmp_info_migrate_compress_level, > + }, > + { > + .name = "migrate_compress_threads", > + .args_type = "", > + .params = "", > + .help = "show current migration compress thread count", > + .mhandler.cmd = hmp_info_migrate_compress_threads, > + }, > + { > + .name = "migrate_decompress_threads", > + .args_type = "", > + .params = "", > + .help = "show current migration decompress thread count", > + .mhandler.cmd = hmp_info_migrate_decompress_threads, > + }, > + { > .name = "balloon", > .args_type = "", > .params = "", > diff --git a/qapi-schema.json b/qapi-schema.json > index 24379ab..71a9e0f 100644 > --- a/qapi-schema.json > +++ b/qapi-schema.json > @@ -491,13 +491,17 @@ > # to enable the capability on the source VM. The feature is > disabled by > # default. (since 1.6) > # > +# @compress: Using the multiple compression threads to accelerate live > migration. > +# This feature can help to reduce the migration traffic, by sending > +# compressed pages. The feature is disabled by default. (since 2.3) > +# > # @auto-converge: If enabled, QEMU will automatically throttle down the guest > # to speed up convergence of RAM migration. (since 1.6) > # > # Since: 1.2 > ## > { 'enum': 'MigrationCapability', > - 'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] } > + 'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks', > 'compress'] } > > ## > # @MigrationCapabilityStatus > @@ -1382,6 +1386,88 @@ > { 'command': 'query-migrate-cache-size', 'returns': 'int' } > > ## > +# @migrate-set-compress-level > +# > +# Set compress level > +# > +# @value: compress level int > +# > +# The compress level will be an integer between 0 and 9. > +# The compress level can be modified before and during ongoing migration > +# > +# Returns: nothing on success > +# > +# Since: 2.3 > +## > +{ 'command': 'migrate-set-compress-level', 'data': {'value': 'int'} } > + > +## > +# @query-migrate-compress-level > +# > +# query compress level > +# > +# Returns: compress level int > +# > +# Since: 2.3 > +## > +{ 'command': 'query-migrate-compress-level', 'returns': 'int' } > + > +## > +# @migrate-set-compress-threads > +# > +# Set compress threads > +# > +# @value: compress threads int > +# > +# The compress thread count is an integer between 1 and 255. > +# The compress level can be modified only before migration > +# > +# Returns: nothing on success > +# > +# Since: 2.3 > +## > +{ 'command': 'migrate-set-compress-threads', 'data': {'value': 'int'} } > + > +## > +# @query-migrate-compress-threads > +# > +# query compress threads > +# > +# Returns: compress threads int > +# > +# Since: 2.3 > +## > +{ 'command': 'query-migrate-compress-threads', 'returns': 'int' } > + > +## > +## > +# @migrate-set-decompress-threads > +# > +# Set decompress threads > +# > +# @value: decompress threads int > +# > +# The decompress thread count is an integer between 1 and 255. > +# The decompress level can be modified only before migration > +# > +# Returns: nothing on success > +# > +# Since: 2.3 > +## > +{ 'command': 'migrate-set-decompress-threads', 'data': {'value': 'int'} } > + > +## > +# @query-migrate-decompress-threads > +# > +# query decompress threads > +# > +# Returns: decompress threads int > +# > +# Since: 2.3 > +## > +{ 'command': 'query-migrate-decompress-threads', 'returns': 'int' } > + > +## > # @ObjectPropertyInfo: > # > # @name: the name of the property > diff --git a/qmp-commands.hx b/qmp-commands.hx > index 1abd619..b60fdab 100644 > --- a/qmp-commands.hx > +++ b/qmp-commands.hx > @@ -705,7 +705,138 @@ Example: > <- { "return": 67108864 } > > EQMP > +{ > + .name = "migrate-set-compress-level", > + .args_type = "value:i", > + .mhandler.cmd_new = qmp_marshal_input_migrate_set_compress_level, > + }, > + > +SQMP > +migrate-set-compress-level > +---------------------- > + > +Set compress level to be used by compress migration, the compress level is > an integer > +between 0 and 9 > + > +Arguments: > + > +- "value": compress level (json-int) > + > +Example: > + > +-> { "execute": "migrate-set-compress-level", "arguments": { "value": > 536870912 } } > +<- { "return": {} } > + > +EQMP > + { > + .name = "query-migrate-compress-level", > + .args_type = "", > + .mhandler.cmd_new = qmp_marshal_input_query_migrate_compress_level, > + }, > + > +SQMP > +query-migrate-compress-level > +------------------------ > + > +Show compress level to be used by compress migration > + > +returns a json-object with the following information: > +- "size" : json-int > + > +Example: > + > +-> { "execute": "query-migrate-compress-level" } > +<- { "return": 67108864 } > + > +EQMP > +{ > + .name = "migrate-set-compress-threads", > + .args_type = "value:i", > + .mhandler.cmd_new = qmp_marshal_input_migrate_set_compress_threads, > + }, > + > +SQMP > +migrate-set-compress-threads > +---------------------- > + > +Set compress thread count to be used by compress migration, the compress > thread count is an integer > +between 1 and 255 > + > +Arguments: > + > +- "value": compress threads (json-int) > + > +Example: > + > +-> { "execute": "migrate-set-compress-threads", "arguments": { "value": > 536870912 } } > +<- { "return": {} } > + > +EQMP > + { > + .name = "query-migrate-compress-threads", > + .args_type = "", > + .mhandler.cmd_new = qmp_marshal_input_query_migrate_compress_threads, > + }, > + > +SQMP > +query-migrate-compress-threads > +------------------------ > + > +Show compress thread count to be used by compress migration > + > +returns a json-object with the following information: > +- "size" : json-int > + > +Example: > + > +-> { "execute": "query-migrate-compress-threads" } > +<- { "return": 67108864 } > + > +EQMP > +{ > + .name = "migrate-set-decompress-threads", > + .args_type = "value:i", > + .mhandler.cmd_new = qmp_marshal_input_migrate_set_decompress_threads, > + }, > + > +SQMP > +migrate-set-decompress-threads > +---------------------- > + > +Set decompress thread count to be used by compress migration, the decompress > thread count is an integer > +between 1 and 255 > + > +Arguments: > + > +- "value": decompress threads (json-int) > + > +Example: > + > +-> { "execute": "migrate-set-decompress-threads", "arguments": { "value": > 536870912 } } > +<- { "return": {} } > > +EQMP > + { > + .name = "query-migrate-decompress-threads", > + .args_type = "", > + .mhandler.cmd_new = > qmp_marshal_input_query_migrate_decompress_threads, > + }, > + > +SQMP > +query-migrate-decompress-threads > +------------------------ > + > +Show decompress thread count to be used by compress migration > + > +returns a json-object with the following information: > +- "size" : json-int > + > +Example: > + > +-> { "execute": "query-migrate-compress-threads" } > +<- { "return": 67108864 } > + > +EQMP > { > .name = "migrate_set_speed", > .args_type = "value:o",