* Liang Li (liang.z...@intel.com) wrote: > Add the code to create and destroy the multiple threads those will > be used to do data compression. Left some functions empty to keep > clearness, and the code will be added later. > > Signed-off-by: Liang Li <liang.z...@intel.com> > Signed-off-by: Yang Zhang <yang.z.zh...@intel.com>
Reviewed-by: Dr. David Alan Gilbert <dgilb...@redhat.com> > --- > arch_init.c | 79 > ++++++++++++++++++++++++++++++++++++++++++- > include/migration/migration.h | 9 +++++ > migration/migration.c | 37 ++++++++++++++++++++ > 3 files changed, 124 insertions(+), 1 deletion(-) > > diff --git a/arch_init.c b/arch_init.c > index 89c8fa4..1831f1a 100644 > --- a/arch_init.c > +++ b/arch_init.c > @@ -332,6 +332,68 @@ static uint64_t migration_dirty_pages; > static uint32_t last_version; > static bool ram_bulk_stage; > > +struct CompressParam { > + /* To be done */ > +}; > +typedef struct CompressParam CompressParam; > + > +static CompressParam *comp_param; > +static bool quit_thread; > + > +static void *do_data_compress(void *opaque) > +{ > + while (!quit_thread) { > + > + /* To be done */ > + > + } > + > + return NULL; > +} > + > +static inline void terminate_compression_threads(void) > +{ > + quit_thread = true; > + > + /* To be done */ > +} > + > +void migrate_compress_threads_join(MigrationState *s) > +{ > + int i, thread_count; > + > + if (!migrate_use_compression()) { > + return; > + } > + terminate_compression_threads(); > + thread_count = migrate_compress_threads(); > + for (i = 0; i < thread_count; i++) { > + qemu_thread_join(s->compress_thread + i); > + } > + g_free(s->compress_thread); > + g_free(comp_param); > + s->compress_thread = NULL; > + comp_param = NULL; > +} > + > +void migrate_compress_threads_create(MigrationState *s) > +{ > + int i, thread_count; > + > + if (!migrate_use_compression()) { > + return; > + } > + quit_thread = false; > + thread_count = migrate_compress_threads(); > + s->compress_thread = g_new0(QemuThread, thread_count); > + comp_param = g_new0(CompressParam, thread_count); > + for (i = 0; i < thread_count; i++) { > + qemu_thread_create(s->compress_thread + i, "compress", > + do_data_compress, comp_param + i, > + QEMU_THREAD_JOINABLE); > + } > +} > + > /* Update the xbzrle cache to reflect a page that's been sent as all 0. > * The important thing is that a stale (not-yet-0'd) page be replaced > * by the new data. > @@ -645,6 +707,16 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, > ram_addr_t offset, > return bytes_sent; > } > > +static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block, > + ram_addr_t offset, bool last_stage) > +{ > + int bytes_sent = 0; > + > + /* To be done*/ > + > + return bytes_sent; > +} > + > /* > * ram_find_and_save_block: Finds a page to send and sends it to f > * > @@ -679,7 +751,12 @@ static int ram_find_and_save_block(QEMUFile *f, bool > last_stage) > ram_bulk_stage = false; > } > } else { > - bytes_sent = ram_save_page(f, block, offset, last_stage); > + if (migrate_use_compression()) { > + bytes_sent = ram_save_compressed_page(f, block, offset, > + last_stage); > + } else { > + bytes_sent = ram_save_page(f, block, offset, last_stage); > + } > > /* if page is unmodified, continue to the next */ > if (bytes_sent > 0) { > diff --git a/include/migration/migration.h b/include/migration/migration.h > index 3cb5ba8..daf6c81 100644 > --- a/include/migration/migration.h > +++ b/include/migration/migration.h > @@ -49,6 +49,9 @@ struct MigrationState > QemuThread thread; > QEMUBH *cleanup_bh; > QEMUFile *file; > + QemuThread *compress_thread; > + int compress_thread_count; > + int compress_level; > > int state; > MigrationParams params; > @@ -107,6 +110,8 @@ bool migration_has_finished(MigrationState *); > bool migration_has_failed(MigrationState *); > MigrationState *migrate_get_current(void); > > +void migrate_compress_threads_create(MigrationState *s); > +void migrate_compress_threads_join(MigrationState *s); > uint64_t ram_bytes_remaining(void); > uint64_t ram_bytes_transferred(void); > uint64_t ram_bytes_total(void); > @@ -156,6 +161,10 @@ int64_t migrate_xbzrle_cache_size(void); > > int64_t xbzrle_cache_resize(int64_t new_size); > > +bool migrate_use_compression(void); > +int migrate_compress_level(void); > +int migrate_compress_threads(void); > + > void ram_control_before_iterate(QEMUFile *f, uint64_t flags); > void ram_control_after_iterate(QEMUFile *f, uint64_t flags); > void ram_control_load_hook(QEMUFile *f, uint64_t flags); > diff --git a/migration/migration.c b/migration/migration.c > index b3adbc6..309443e 100644 > --- a/migration/migration.c > +++ b/migration/migration.c > @@ -43,6 +43,11 @@ enum { > #define BUFFER_DELAY 100 > #define XFER_LIMIT_RATIO (1000 / BUFFER_DELAY) > > +/* Default compression thread count */ > +#define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8 > +/*0: means nocompress, 1: best speed, ... 9: best compress ratio */ > +#define DEFAULT_MIGRATE_COMPRESS_LEVEL 1 > + > /* Migration XBZRLE default cache size */ > #define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024) > > @@ -60,6 +65,8 @@ MigrationState *migrate_get_current(void) > .bandwidth_limit = MAX_THROTTLE, > .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE, > .mbps = -1, > + .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT, > + .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL, > }; > > return ¤t_migration; > @@ -302,6 +309,7 @@ static void migrate_fd_cleanup(void *opaque) > qemu_thread_join(&s->thread); > qemu_mutex_lock_iothread(); > > + migrate_compress_threads_join(s); > qemu_fclose(s->file); > s->file = NULL; > } > @@ -385,6 +393,8 @@ static MigrationState *migrate_init(const MigrationParams > *params) > int64_t bandwidth_limit = s->bandwidth_limit; > bool enabled_capabilities[MIGRATION_CAPABILITY_MAX]; > int64_t xbzrle_cache_size = s->xbzrle_cache_size; > + int compress_level = s->compress_level; > + int compress_thread_count = s->compress_thread_count; > > memcpy(enabled_capabilities, s->enabled_capabilities, > sizeof(enabled_capabilities)); > @@ -395,6 +405,8 @@ static MigrationState *migrate_init(const MigrationParams > *params) > sizeof(enabled_capabilities)); > s->xbzrle_cache_size = xbzrle_cache_size; > > + s->compress_level = compress_level; > + s->compress_thread_count = compress_thread_count; > s->bandwidth_limit = bandwidth_limit; > s->state = MIG_STATE_SETUP; > trace_migrate_set_state(MIG_STATE_SETUP); > @@ -567,6 +579,30 @@ bool migrate_zero_blocks(void) > return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS]; > } > > +bool migrate_use_compression(void) > +{ > + /* Disable compression before the patch series are applied */ > + return false; > +} > + > +int migrate_compress_level(void) > +{ > + MigrationState *s; > + > + s = migrate_get_current(); > + > + return s->compress_level; > +} > + > +int migrate_compress_threads(void) > +{ > + MigrationState *s; > + > + s = migrate_get_current(); > + > + return s->compress_thread_count; > +} > + > int migrate_use_xbzrle(void) > { > MigrationState *s; > @@ -707,6 +743,7 @@ void migrate_fd_connect(MigrationState *s) > /* Notify before starting migration thread */ > notifier_list_notify(&migration_state_notifiers, s); > > + migrate_compress_threads_create(s); > qemu_thread_create(&s->thread, "migration", migration_thread, s, > QEMU_THREAD_JOINABLE); > } > -- > 1.9.1 > -- Dr. David Alan Gilbert / dgilb...@redhat.com / Manchester, UK