* Liang Li (liang.z...@intel.com) wrote: > Add the code to create and destroy the multiple threads those will be > used to do data decompression. Left some functions empty just to keep > clearness, and the code will be added later.
Reviewed-by: Dr. David Alan Gilbert <dgilb...@redhat.com> > > Signed-off-by: Liang Li <liang.z...@intel.com> > Signed-off-by: Yang Zhang <yang.z.zh...@intel.com> > --- > arch_init.c | 75 > +++++++++++++++++++++++++++++++++++++++++++ > include/migration/migration.h | 4 +++ > migration/migration.c | 16 +++++++++ > 3 files changed, 95 insertions(+) > > diff --git a/arch_init.c b/arch_init.c > index 1831f1a..ed34eb3 100644 > --- a/arch_init.c > +++ b/arch_init.c > @@ -24,6 +24,7 @@ > #include <stdint.h> > #include <stdarg.h> > #include <stdlib.h> > +#include <zlib.h> > #ifndef _WIN32 > #include <sys/types.h> > #include <sys/mman.h> > @@ -126,6 +127,7 @@ static uint64_t bitmap_sync_count; > #define RAM_SAVE_FLAG_CONTINUE 0x20 > #define RAM_SAVE_FLAG_XBZRLE 0x40 > /* 0x80 is reserved in migration.h start with 0x100 next */ > +#define RAM_SAVE_FLAG_COMPRESS_PAGE 0x100 > > static struct defconfig_file { > const char *filename; > @@ -337,8 +339,16 @@ struct CompressParam { > }; > typedef struct CompressParam CompressParam; > > +struct DecompressParam { > + /* To be done */ > +}; > +typedef struct DecompressParam DecompressParam; > + > static CompressParam *comp_param; > static bool quit_thread; > +static DecompressParam *decomp_param; > +static QemuThread *decompress_threads; > +static uint8_t *compressed_data_buf; > > static void *do_data_compress(void *opaque) > { > @@ -1128,10 +1138,58 @@ void ram_handle_compressed(void *host, uint8_t ch, > uint64_t size) > } > } > > +static void *do_data_decompress(void *opaque) > +{ > + while (!quit_thread) { > + /* To be done */ > + } > + > + return NULL; > +} > + > +void migrate_decompress_threads_create(int count) > +{ > + int i; > + > + decompress_threads = g_new0(QemuThread, count); > + decomp_param = g_new0(DecompressParam, count); > + compressed_data_buf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); > + quit_thread = false; > + for (i = 0; i < count; i++) { > + qemu_thread_create(decompress_threads + i, "decompress", > + do_data_decompress, decomp_param + i, > + QEMU_THREAD_JOINABLE); > + } > +} > + > +void migrate_decompress_threads_join(void) > +{ > + int i, thread_count; > + > + quit_thread = true; > + thread_count = migrate_decompress_threads(); > + for (i = 0; i < thread_count; i++) { > + qemu_thread_join(decompress_threads + i); > + } > + g_free(decompress_threads); > + g_free(decomp_param); > + g_free(compressed_data_buf); > + decompress_threads = NULL; > + decomp_param = NULL; > + compressed_data_buf = NULL; > +} > + > +static void decompress_data_with_multi_threads(uint8_t *compbuf, > + void *host, int len) > +{ > + /* To be done */ > +} > + > static int ram_load(QEMUFile *f, void *opaque, int version_id) > { > int flags = 0, ret = 0; > static uint64_t seq_iter; > + int len = 0; > > seq_iter++; > > @@ -1208,6 +1266,23 @@ static int ram_load(QEMUFile *f, void *opaque, int > version_id) > > qemu_get_buffer(f, host, TARGET_PAGE_SIZE); > break; > + case RAM_SAVE_FLAG_COMPRESS_PAGE: > + host = host_from_stream_offset(f, addr, flags); > + if (!host) { > + error_report("Invalid RAM offset " RAM_ADDR_FMT, addr); > + ret = -EINVAL; > + break; > + } > + > + len = qemu_get_be32(f); > + if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) { > + error_report("Invalid compressed data length: %d", len); > + ret = -EINVAL; > + break; > + } > + qemu_get_buffer(f, compressed_data_buf, len); > + decompress_data_with_multi_threads(compressed_data_buf, host, > len); > + break; > case RAM_SAVE_FLAG_XBZRLE: > host = host_from_stream_offset(f, addr, flags); > if (!host) { > diff --git a/include/migration/migration.h b/include/migration/migration.h > index daf6c81..0c4f21c 100644 > --- a/include/migration/migration.h > +++ b/include/migration/migration.h > @@ -51,6 +51,7 @@ struct MigrationState > QEMUFile *file; > QemuThread *compress_thread; > int compress_thread_count; > + int decompress_thread_count; > int compress_level; > > int state; > @@ -112,6 +113,8 @@ MigrationState *migrate_get_current(void); > > void migrate_compress_threads_create(MigrationState *s); > void migrate_compress_threads_join(MigrationState *s); > +void migrate_decompress_threads_create(int count); > +void migrate_decompress_threads_join(void); > uint64_t ram_bytes_remaining(void); > uint64_t ram_bytes_transferred(void); > uint64_t ram_bytes_total(void); > @@ -164,6 +167,7 @@ int64_t xbzrle_cache_resize(int64_t new_size); > bool migrate_use_compression(void); > int migrate_compress_level(void); > int migrate_compress_threads(void); > +int migrate_decompress_threads(void); > > void ram_control_before_iterate(QEMUFile *f, uint64_t flags); > void ram_control_after_iterate(QEMUFile *f, uint64_t flags); > diff --git a/migration/migration.c b/migration/migration.c > index 309443e..a6f6e02 100644 > --- a/migration/migration.c > +++ b/migration/migration.c > @@ -45,6 +45,7 @@ enum { > > /* Default compression thread count */ > #define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8 > +#define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2 > /*0: means nocompress, 1: best speed, ... 9: best compress ratio */ > #define DEFAULT_MIGRATE_COMPRESS_LEVEL 1 > > @@ -66,6 +67,7 @@ MigrationState *migrate_get_current(void) > .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE, > .mbps = -1, > .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT, > + .decompress_thread_count = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT, > .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL, > }; > > @@ -123,12 +125,15 @@ static void process_incoming_migration_co(void *opaque) > } else { > runstate_set(RUN_STATE_PAUSED); > } > + migrate_decompress_threads_join(); > } > > void process_incoming_migration(QEMUFile *f) > { > Coroutine *co = qemu_coroutine_create(process_incoming_migration_co); > int fd = qemu_get_fd(f); > + int thread_count = migrate_decompress_threads(); > + migrate_decompress_threads_create(thread_count); > > assert(fd != -1); > qemu_set_nonblock(fd); > @@ -395,6 +400,7 @@ static MigrationState *migrate_init(const MigrationParams > *params) > int64_t xbzrle_cache_size = s->xbzrle_cache_size; > int compress_level = s->compress_level; > int compress_thread_count = s->compress_thread_count; > + int decompress_thread_count = s->decompress_thread_count; > > memcpy(enabled_capabilities, s->enabled_capabilities, > sizeof(enabled_capabilities)); > @@ -407,6 +413,7 @@ static MigrationState *migrate_init(const MigrationParams > *params) > > s->compress_level = compress_level; > s->compress_thread_count = compress_thread_count; > + s->decompress_thread_count = decompress_thread_count; > s->bandwidth_limit = bandwidth_limit; > s->state = MIG_STATE_SETUP; > trace_migrate_set_state(MIG_STATE_SETUP); > @@ -603,6 +610,15 @@ int migrate_compress_threads(void) > return s->compress_thread_count; > } > > +int migrate_decompress_threads(void) > +{ > + MigrationState *s; > + > + s = migrate_get_current(); > + > + return s->decompress_thread_count; > +} > + > int migrate_use_xbzrle(void) > { > MigrationState *s; > -- > 1.9.1 > -- Dr. David Alan Gilbert / dgilb...@redhat.com / Manchester, UK