Liang Li <liang.z...@intel.com> wrote: > Implement the core logic of the multiple thread compression. At this > point, multiple thread compression can't co-work with xbzrle yet. > > Signed-off-by: Liang Li <liang.z...@intel.com> > Signed-off-by: Yang Zhang <yang.z.zh...@intel.com>
> --- a/arch_init.c > +++ b/arch_init.c > @@ -363,18 +363,44 @@ static QemuMutex *comp_done_lock; > static QemuCond *comp_done_cond; > /* The empty QEMUFileOps will be used by file in CompressParam */ > static const QEMUFileOps empty_ops = { }; > + > +/* one_byte_count is used to count the bytes that is added to > + * bytes_transferred but not actually transferred, at the proper > + * time, we should sub one_byte_count from bytes_transferred to > + * make bytes_transferred accurate. > + */ > +static int one_byte_count; With the changes proposed previously to ram_save_compressed_page() this shouldn't be needed. It can return 0 now. > +static int do_compress_ram_page(CompressParam *param); > + > static void *do_data_compress(void *opaque) > { > - while (!quit_comp_thread) { > - > - /* To be done */ > + CompressParam *param = opaque; > > + while (!quit_comp_thread) { > + qemu_mutex_lock(¶m->mutex); > + /* Re-check the quit_comp_thread in case of > + * terminate_compression_threads is called just before > + * qemu_mutex_lock(¶m->mutex) and after > + * while(!quit_comp_thread), re-check it here can make > + * sure the compression thread terminate as expected. > + */ > + while (!param->busy && !quit_comp_thread) { > + qemu_cond_wait(¶m->cond, ¶m->mutex); > + } > + qemu_mutex_unlock(¶m->mutex); > + if (!quit_comp_thread) { > + do_compress_ram_page(param); > + } > + qemu_mutex_lock(comp_done_lock); > + param->busy = false; > + qemu_cond_signal(comp_done_cond); > + qemu_mutex_unlock(comp_done_lock); > } > > return NULL; > @@ -382,9 +408,15 @@ static void *do_data_compress(void *opaque) > > static inline void terminate_compression_threads(void) > { > - quit_comp_thread = true; > + int idx, thread_count; > > - /* To be done */ > + thread_count = migrate_compress_threads(); > + quit_comp_thread = true; > + for (idx = 0; idx < thread_count; idx++) { > + qemu_mutex_lock(&comp_param[idx].mutex); > + qemu_cond_signal(&comp_param[idx].cond); > + qemu_mutex_unlock(&comp_param[idx].mutex); > + } > } > > void migrate_compress_threads_join(MigrationState *s) > @@ -770,12 +802,157 @@ static int ram_save_page(QEMUFile *f, RAMBlock *block, > ram_addr_t offset, > return bytes_sent; > } > > +static int do_compress_ram_page(CompressParam *param) > +{ > + int bytes_sent, cont; > + int blen; > + uint8_t *p; > + RAMBlock *block = param->block; > + ram_addr_t offset = param->offset; > + > + cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0; > + p = memory_region_get_ram_ptr(block->mr) + offset; > + > + bytes_sent = save_block_hdr(param->file, block, offset, cont, > + RAM_SAVE_FLAG_COMPRESS_PAGE); > + blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE, > + migrate_compress_level()); > + bytes_sent += blen; > + atomic_inc(&acct_info.norm_pages); > + > + return bytes_sent; > +} > + > +static inline void start_compression(CompressParam *param) > +{ > + qemu_mutex_lock(¶m->mutex); > + param->busy = true; > + qemu_cond_signal(¶m->cond); > + qemu_mutex_unlock(¶m->mutex); > +} > + > + > +static uint64_t bytes_transferred; > + > +static void flush_compressed_data(QEMUFile *f) > +{ > + int idx, len, thread_count; > + > + if (!migrate_use_compression()) { > + return; > + } > + thread_count = migrate_compress_threads(); > + for (idx = 0; idx < thread_count; idx++) { > + if (comp_param[idx].busy) { > + qemu_mutex_lock(comp_done_lock); > + while (comp_param[idx].busy && !quit_comp_thread) { > + qemu_cond_wait(comp_done_cond, comp_done_lock); > + } > + qemu_mutex_unlock(comp_done_lock); > + } If we arrive here because quit_comp_thread == true, shouldn't we skip the qemu_put_qemu_file()? > + len = qemu_put_qemu_file(f, comp_param[idx].file); > + bytes_transferred += len; > + } [remove one_byte stuff here] > +} > + > +static inline void set_compress_params(CompressParam *param, RAMBlock *block, > + ram_addr_t offset) > +{ > + param->block = block; > + param->offset = offset; > +} > + > +static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block, > + ram_addr_t offset) > +{ > + int idx, thread_count, bytes_sent = 0; > + > + thread_count = migrate_compress_threads(); > + qemu_mutex_lock(comp_done_lock); > + while (true) { > + for (idx = 0; idx < thread_count; idx++) { > + if (!comp_param[idx].busy) { > + bytes_sent = qemu_put_qemu_file(f, comp_param[idx].file); > + set_compress_params(&comp_param[idx], block, offset); > + start_compression(&comp_param[idx]); [remove stuff here] > + break; > + } > + } > + if (bytes_sent > 0) { Change this to: if (bytes_sent >= 0) { > + break; > + } else { > + qemu_cond_wait(comp_done_cond, comp_done_lock); > + } > + } > + qemu_mutex_unlock(comp_done_lock); > + > + return bytes_sent; > +} > + > static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block, > ram_addr_t offset, bool last_stage) > { > int bytes_sent = -1; > + MemoryRegion *mr = block->mr; > + uint8_t *p; > + int ret; > + int cont; > > - /* To be done*/ > + p = memory_region_get_ram_ptr(mr) + offset; > + cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0; > + ret = ram_control_save_page(f, block->offset, > + offset, TARGET_PAGE_SIZE, &bytes_sent); > + if (ret != RAM_SAVE_CONTROL_NOT_SUPP) { > + if (ret != RAM_SAVE_CONTROL_DELAYED) { > + if (bytes_sent > 0) { > + acct_info.norm_pages++; > + } else if (bytes_sent == 0) { > + acct_info.dup_pages++; > + } > + } > + } else { > + /* When starting the process of a new block, the first page of > + * the block should be sent out before other pages in the same > + * block, and all the pages in last block should have been sent > + * out, keeping this order is important, because the 'cont' flag > + * is used to avoid resending the block name. > + */ > + if (block != last_sent_block) { > + flush_compressed_data(f); > + bytes_sent = save_zero_page(f, block, offset, p, cont); > + if (bytes_sent == -1) { > + set_compress_params(&comp_param[0], block, offset); > + /* Use the qemu thread to compress the data to make sure the > + * first page is sent out before other pages > + */ > + bytes_sent = do_compress_ram_page(&comp_param[0]); > + if (bytes_sent > 0) { This test is not needed assert(bytes_sent>0) or how can it be zero or negative here? So, we have to always call qemu_put_qemu_file() no? > + qemu_put_qemu_file(f, comp_param[0].file); > + } > + } > + } else { > + bytes_sent = save_zero_page(f, block, offset, p, cont); > + if (bytes_sent == -1) { > + bytes_sent = compress_page_with_multi_thread(f, block, > offset); > + } > + } > + } > > return bytes_sent; > } > @@ -834,8 +1011,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool > last_stage) > return bytes_sent; > } > > -static uint64_t bytes_transferred; > - > void acct_update_position(QEMUFile *f, size_t size, bool zero) > { > uint64_t pages = size / TARGET_PAGE_SIZE; > @@ -1043,6 +1218,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque) > i++; > } > > + flush_compressed_data(f); > qemu_mutex_unlock_ramlist(); > > /* > @@ -1089,6 +1265,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque) > bytes_transferred += bytes_sent; > } > > + flush_compressed_data(f); > ram_control_after_iterate(f, RAM_CONTROL_FINISH); > migration_end(); I thihnk this would make the code work, but not the locking. You are using here: quit_comp_thread: global, and not completely clear what protects it comp_done_lock: global comp_done_cond: global param[i].busy: I would suggest renaming to pending work param[i].mutex: param[i].cond: thread is waiting for work Issues: param->busy is protected on do_data_compress() and start_compression() with param->busy, but in flush_compressed_data() and comress_page_with_multithread() it is protected by comp_done_lock. At this point, I would suggest to just drop param[i].mutex and use everywhere comp_done_lock. We can make locking granularly later if needed, but 1st get it correct? Code basically does (forget termination and locking) each compression_thread() while(1) { while(!work_to_do) wait_for_work do_work } And the main thread does: while(1) { foreacth compression_thread { if thread free { put it to work break; } wait_for_thread_to_finish } } Notice how we are walking all threads each time that we need to do anything Perhaps code should be more simple if we put the data that needs to be done on a global variable and change this to: compression_thread while(1) { while(!work_to_do) wait_for_work pick work from global variable wakeup main thread do_work } main thread: put work on global variable while(nobody_pick_thework) { signal all threads wait for a compression thread to take the work } Why? because then we only have a global mutex and two condition variables, with a clear semantics: - lock protects two conditions and global variable with work - one condition where threads wait for work - one condition where main thread wait for a worker to be ready As we would need to lock every single tame to put the work in the global variable, to wait or to pick up the work, we can stop all the: if (!foo) { mutex_lock if(!foo) /* this time with lock */ .... } Sorry for the very long mail, if it makes you feel better, this is the second time that I wrote it, because the 1st version, my locking proposal didn't worked correctly. What do you think? Later, Juan.