* Liang Li (liang.z...@intel.com) wrote: > Implement the core logic of the multiple thread compression. At this > point, multiple thread compression can't co-work with xbzrle yet. > > Signed-off-by: Liang Li <liang.z...@intel.com> > Signed-off-by: Yang Zhang <yang.z.zh...@intel.com> > --- > arch_init.c | 167 > +++++++++++++++++++++++++++++++++++++++++++++++++++++++++--- > 1 file changed, 159 insertions(+), 8 deletions(-) > > diff --git a/arch_init.c b/arch_init.c > index eae082b..b8bdb16 100644 > --- a/arch_init.c > +++ b/arch_init.c > @@ -364,16 +364,31 @@ static QemuCond *comp_done_cond; > /* The empty QEMUFileOps will be used by file in CompressParam */ > static const QEMUFileOps empty_ops = { }; > static bool quit_thread; > +static int one_byte_count;
Please add a comment here about what one_byte_count is; it's not obvious, but I can't think of a better name > static DecompressParam *decomp_param; > static QemuThread *decompress_threads; > static uint8_t *compressed_data_buf; > > +static int do_compress_ram_page(CompressParam *param); > + > static void *do_data_compress(void *opaque) > { > - while (!quit_thread) { > - > - /* To be done */ > + CompressParam *param = opaque; > > + while (!quit_thread) { This is something I missed on 02/ - can you rename 'quit_thread' to comp_quit_thread or something, so it's obvious which thread. > + qemu_mutex_lock(¶m->mutex); > + while (!param->busy) { > + qemu_cond_wait(¶m->cond, ¶m->mutex); > + if (quit_thread) { > + break; > + } > + } > + qemu_mutex_unlock(¶m->mutex); > + do_compress_ram_page(param); > + qemu_mutex_lock(comp_done_lock); > + param->busy = false; > + qemu_cond_signal(comp_done_cond); > + qemu_mutex_unlock(comp_done_lock); This is interestingly different from your previous version; param->mutex used to be held all of the time except during the cond_wait itself. I'm also worried about the quit_thread behaviour; is there any guarantee that 'terminate_compression_threads' is called while this code is in the param->cond cond_wait? If terminate_compression_threads was called while the thread was busy, then the cond_signal on param->cond would be too early. I'm thinking perhaps you need to check quit_thread before the cond_wait as well? (It's mostly error cases and migrate_cancel I'm worried about here). > } > > return NULL; > @@ -381,9 +396,13 @@ static void *do_data_compress(void *opaque) > > static inline void terminate_compression_threads(void) > { > - quit_thread = true; > + int idx, thread_count; > > - /* To be done */ > + thread_count = migrate_compress_threads(); > + quit_thread = true; > + for (idx = 0; idx < thread_count; idx++) { > + qemu_cond_signal(&comp_param[idx].cond); > + } > } > > void migrate_compress_threads_join(MigrationState *s) > @@ -764,12 +783,144 @@ static int ram_save_page(QEMUFile *f, RAMBlock *block, > ram_addr_t offset, > return bytes_sent; > } > > +static int do_compress_ram_page(CompressParam *param) > +{ > + int bytes_sent, cont; > + int blen; > + uint8_t *p; > + RAMBlock *block = param->block; > + ram_addr_t offset = param->offset; > + > + cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0; > + p = memory_region_get_ram_ptr(block->mr) + offset; > + > + bytes_sent = save_block_hdr(param->file, block, offset, cont, > + RAM_SAVE_FLAG_COMPRESS_PAGE); > + blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE, > + migrate_compress_level()); > + bytes_sent += blen; > + atomic_inc(&acct_info.norm_pages); > + > + return bytes_sent; > +} > + > +static inline void start_compression(CompressParam *param) > +{ > + qemu_mutex_lock(¶m->mutex); > + param->busy = true; > + qemu_cond_signal(¶m->cond); > + qemu_mutex_unlock(¶m->mutex); > +} > + > + > +static uint64_t bytes_transferred; > + > +static void flush_compressed_data(QEMUFile *f) > +{ > + int idx, len, thread_count; > + > + if (!migrate_use_compression()) { > + return; > + } > + thread_count = migrate_compress_threads(); > + for (idx = 0; idx < thread_count; idx++) { > + if (comp_param[idx].busy) { > + qemu_mutex_lock(comp_done_lock); > + while (comp_param[idx].busy) { > + qemu_cond_wait(comp_done_cond, comp_done_lock); > + } > + qemu_mutex_unlock(comp_done_lock); > + } > + len = qemu_put_qemu_file(f, comp_param[idx].file); > + bytes_transferred += len; > + } > + if ((one_byte_count > 0) && (bytes_transferred > one_byte_count)) { > + bytes_transferred -= one_byte_count; > + one_byte_count = 0; > + } > +} > + > +static inline void set_compress_params(CompressParam *param, RAMBlock *block, > + ram_addr_t offset) > +{ > + param->block = block; > + param->offset = offset; > +} > + > +static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block, > + ram_addr_t offset) > +{ > + int idx, thread_count, bytes_sent = 0; > + > + thread_count = migrate_compress_threads(); > + qemu_mutex_lock(comp_done_lock); > + while (true) { > + for (idx = 0; idx < thread_count; idx++) { > + if (!comp_param[idx].busy) { > + bytes_sent = qemu_put_qemu_file(f, comp_param[idx].file); > + set_compress_params(&comp_param[idx], block, offset); > + start_compression(&comp_param[idx]); > + if (bytes_sent == 0) { > + /* set bytes_sent to 1 in this case to prevent migration > + * from terminating, this 1 byte whill be added to > + * bytes_transferred later, minus 1 to keep the > + * bytes_transferred accurate */ > + bytes_sent = 1; > + if (bytes_transferred <= 0) { > + one_byte_count++; > + } else { > + bytes_transferred -= 1; > + } > + } > + break; > + } > + } > + if (bytes_sent > 0) { > + break; > + } else { > + qemu_cond_wait(comp_done_cond, comp_done_lock); > + } > + } > + qemu_mutex_unlock(comp_done_lock); > + > + return bytes_sent; > +} > + > static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block, > ram_addr_t offset, bool last_stage) > { > int bytes_sent = 0; > + MemoryRegion *mr = block->mr; > + uint8_t *p; > > - /* To be done*/ > + p = memory_region_get_ram_ptr(mr) + offset; > + /* When starting the process of a new block, the first page of > + * the block should be sent out before other pages in the same > + * block, and all the pages in last block should have been sent > + * out, keeping this order is important, because the 'cont' flag > + * is used to avoid resending the block name. > + */ > + if (block != last_sent_block) { > + flush_compressed_data(f); > + bytes_sent = save_zero_and_xbzrle_page(f, &p, block, offset, > + last_stage, NULL); > + if (bytes_sent == -1) { > + set_compress_params(&comp_param[0], block, offset); > + /* Use the qemu thread to compress the data to make sure the > + * first page is sent out before other pages > + */ > + bytes_sent = do_compress_ram_page(&comp_param[0]); > + if (bytes_sent > 0) { > + qemu_put_qemu_file(f, comp_param[0].file); > + } > + } > + } else { > + bytes_sent = save_zero_and_xbzrle_page(f, &p, block, offset, > + last_stage, NULL); > + if (bytes_sent == -1) { > + bytes_sent = compress_page_with_multi_thread(f, block, offset); > + } > + } > > return bytes_sent; > } > @@ -828,8 +979,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool > last_stage) > return bytes_sent; > } > > -static uint64_t bytes_transferred; > - > void acct_update_position(QEMUFile *f, size_t size, bool zero) > { > uint64_t pages = size / TARGET_PAGE_SIZE; > @@ -1037,6 +1186,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque) > i++; > } > > + flush_compressed_data(f); > qemu_mutex_unlock_ramlist(); > > /* > @@ -1083,6 +1233,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque) > bytes_transferred += bytes_sent; > } > > + flush_compressed_data(f); > ram_control_after_iterate(f, RAM_CONTROL_FINISH); > migration_end(); > > -- > 1.9.1 > -- Dr. David Alan Gilbert / dgilb...@redhat.com / Manchester, UK