* Liang Li (liang.z...@intel.com) wrote: > At this point, multiple thread compression can't co-work with xbzrle. > > Signed-off-by: Liang Li <liang.z...@intel.com> > Signed-off-by: Yang Zhang <yang.z.zh...@intel.com> > --- > arch_init.c | 164 > +++++++++++++++++++++++++++++++++++++++++++++++++++++++++--- > 1 file changed, 157 insertions(+), 7 deletions(-) > > diff --git a/arch_init.c b/arch_init.c > index 0a575ed..4109ad7 100644 > --- a/arch_init.c > +++ b/arch_init.c > @@ -369,23 +369,43 @@ static QemuMutex *mutex; > static QemuCond *cond; > static QEMUFileOps *empty_ops; > static bool quit_thread; > +static int one_byte_count; > static decompress_param *decomp_param; > static QemuThread *decompress_threads; > > +static int do_compress_ram_page(compress_param *param); > + > static void *do_data_compress(void *opaque) > { > + compress_param *param = opaque; > while (!quit_thread) { > - > - /* To be done */ > - > + qemu_mutex_lock(¶m->mutex); > + while (param->state != START) { > + qemu_cond_wait(¶m->cond, ¶m->mutex); > + if (quit_thread) { > + break; > + } > + do_compress_ram_page(param); > + qemu_mutex_lock(mutex); > + param->state = DONE; > + qemu_cond_signal(cond); > + qemu_mutex_unlock(mutex); > + } > + qemu_mutex_unlock(¶m->mutex); > } > + > return NULL; > } > > static inline void terminate_compression_threads(void) > { > + int idx, thread_count; > + > + thread_count = migrate_compress_threads(); > quit_thread = true; > - /* To be done */ > + for (idx = 0; idx < thread_count; idx++) { > + qemu_cond_signal(&comp_param[idx].cond); > + } > } > > void migrate_compress_threads_join(MigrationState *s) > @@ -770,13 +790,142 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, > ram_addr_t offset, > return bytes_sent; > } > > +static int do_compress_ram_page(compress_param *param) > +{ > + int bytes_sent; > + int blen = COMPRESS_BUF_SIZE; > + int cont; > + uint8_t *p; > + RAMBlock *block = param->block; > + ram_addr_t offset = param->offset; > + > + cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0; > + p = memory_region_get_ram_ptr(block->mr) + offset; > + > + bytes_sent = save_block_hdr(param->file, block, > + offset, cont, RAM_SAVE_FLAG_COMPRESS_PAGE); > + blen = migrate_qemu_add_compression_data(param->file, p, > + TARGET_PAGE_SIZE, migrate_compress_level()); > + bytes_sent += blen; > + atomic_inc(&acct_info.norm_pages); > + > + return bytes_sent; > +} > + > +static inline void start_compression(compress_param *param) > +{ > + qemu_mutex_lock(¶m->mutex); > + param->state = START; > + qemu_cond_signal(¶m->cond); > + qemu_mutex_unlock(¶m->mutex); > +} > + > + > +static uint64_t bytes_transferred; > + > +static void flush_compressed_data(QEMUFile *f) > +{ > + int idx, len, thread_count; > + > + if (!migrate_use_compression()) { > + return; > + } > + thread_count = migrate_compress_threads(); > + for (idx = 0; idx < thread_count; idx++) { > + if (comp_param[idx].state != DONE) { > + qemu_mutex_lock(mutex); > + while (comp_param[idx].state != DONE) { > + qemu_cond_wait(cond, mutex); > + } > + qemu_mutex_unlock(mutex); > + } > + len = migrate_qemu_flush(f, comp_param[idx].file); > + bytes_transferred += len; > + } > + if ((one_byte_count > 0) && (bytes_transferred > one_byte_count)) { > + bytes_transferred -= one_byte_count; > + one_byte_count = 0; > + } > +} > + > +static inline void set_compress_params(compress_param *param, > + RAMBlock *block, ram_addr_t offset) > +{ > + param->block = block; > + param->offset = offset; > +} > + > + > +static int compress_page_with_multi_thread(QEMUFile *f, > + RAMBlock *block, ram_addr_t offset) > +{ > + int idx, thread_count, bytes_sent = 0; > + > + thread_count = migrate_compress_threads(); > + qemu_mutex_lock(mutex); > + while (true) { > + for (idx = 0; idx < thread_count; idx++) { > + if (comp_param[idx].state == DONE) { > + bytes_sent = migrate_qemu_flush(f, comp_param[idx].file); > + set_compress_params(&comp_param[idx], > + block, offset); > + start_compression(&comp_param[idx]); > + if (bytes_sent == 0) { > + /* set bytes_sent to 1 in this case to prevent migration > + * from terminating, this 1 byte whill be added to > + * bytes_transferred later, minus 1 to keep the > + * bytes_transferred accurate */ > + bytes_sent = 1; > + if (bytes_transferred <= 0) { > + one_byte_count++; > + } else { > + bytes_transferred -= 1; > + } > + } > + break; > + } > + } > + if (bytes_sent > 0) { > + break; > + } else { > + qemu_cond_wait(cond, mutex); > + } > + } > + qemu_mutex_unlock(mutex); > + return bytes_sent; > +} > + > static int ram_save_compressed_page(QEMUFile *f, RAMBlock* block, > ram_addr_t offset, bool last_stage) > { > int bytes_sent = 0; > > - /* To be done*/ > - > + /* When starting the process of a new block, the first page of > + * the block should be sent out before other pages in the same > + * block, and all the pages in last block should have been sent > + * out, keeping this order is important.
Why? Is this just because of the 'cont' flag used to avoid sending the block names again? Dave > + */ > + if (block != last_sent_block) { > + flush_compressed_data(f); > + bytes_sent = save_zero_and_xbzrle_page(f, block, offset, > + last_stage, NULL); > + if (bytes_sent == -1) { > + set_compress_params(&comp_param[0], block, offset); > + /* Use the qemu thread to compress the data to make sure the > + * first page is sent out before other pages > + */ > + bytes_sent = do_compress_ram_page(&comp_param[0]); > + if (bytes_sent > 0) { > + migrate_qemu_flush(f, comp_param[0].file); > + } > + } > + } else { > + bytes_sent = save_zero_and_xbzrle_page(f, block, offset, > + last_stage, NULL); > + if (bytes_sent == -1) { > + bytes_sent = compress_page_with_multi_thread(f, block, offset); > + } > + } > return bytes_sent; > } > > @@ -834,7 +983,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool > last_stage) > return bytes_sent; > } > > -static uint64_t bytes_transferred; > > void acct_update_position(QEMUFile *f, size_t size, bool zero) > { > @@ -1043,6 +1191,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque) > i++; > } > > + flush_compressed_data(f); > qemu_mutex_unlock_ramlist(); > > /* > @@ -1089,6 +1238,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque) > bytes_transferred += bytes_sent; > } > > + flush_compressed_data(f); > ram_control_after_iterate(f, RAM_CONTROL_FINISH); > migration_end(); > > -- > 1.8.3.1 > -- Dr. David Alan Gilbert / dgilb...@redhat.com / Manchester, UK