> -----Original Message----- > From: Juan Quintela [mailto:quint...@redhat.com] > Sent: Wednesday, February 11, 2015 7:45 PM > To: Li, Liang Z > Cc: qemu-devel@nongnu.org; ebl...@redhat.com; amit.s...@redhat.com; > lcapitul...@redhat.com; arm...@redhat.com; dgilb...@redhat.com; Zhang, > Yang Z > Subject: Re: [v5 08/12] migration: Add the core code of multi-thread > compression > > Liang Li <liang.z...@intel.com> wrote: > > Implement the core logic of the multiple thread compression. At this > > point, multiple thread compression can't co-work with xbzrle yet. > > > > Signed-off-by: Liang Li <liang.z...@intel.com> > > Signed-off-by: Yang Zhang <yang.z.zh...@intel.com> > > > > --- a/arch_init.c > > +++ b/arch_init.c > > @@ -363,18 +363,44 @@ static QemuMutex *comp_done_lock; static > > QemuCond *comp_done_cond; > > /* The empty QEMUFileOps will be used by file in CompressParam */ > > static const QEMUFileOps empty_ops = { }; > > + > > +/* one_byte_count is used to count the bytes that is added to > > + * bytes_transferred but not actually transferred, at the proper > > + * time, we should sub one_byte_count from bytes_transferred to > > + * make bytes_transferred accurate. > > + */ > > +static int one_byte_count; > > With the changes proposed previously to ram_save_compressed_page() this > shouldn't be needed. It can return 0 now. > > > +static int do_compress_ram_page(CompressParam *param); > > + > > static void *do_data_compress(void *opaque) { > > - while (!quit_comp_thread) { > > - > > - /* To be done */ > > + CompressParam *param = opaque; > > > > + while (!quit_comp_thread) { > > + qemu_mutex_lock(¶m->mutex); > > + /* Re-check the quit_comp_thread in case of > > + * terminate_compression_threads is called just before > > + * qemu_mutex_lock(¶m->mutex) and after > > + * while(!quit_comp_thread), re-check it here can make > > + * sure the compression thread terminate as expected. > > + */ > > + while (!param->busy && !quit_comp_thread) { > > + qemu_cond_wait(¶m->cond, ¶m->mutex); > > + } > > + qemu_mutex_unlock(¶m->mutex); > > + if (!quit_comp_thread) { > > + do_compress_ram_page(param); > > + } > > + qemu_mutex_lock(comp_done_lock); > > + param->busy = false; > > + qemu_cond_signal(comp_done_cond); > > + qemu_mutex_unlock(comp_done_lock); > > } > > > > return NULL; > > @@ -382,9 +408,15 @@ static void *do_data_compress(void *opaque) > > > > static inline void terminate_compression_threads(void) > > { > > - quit_comp_thread = true; > > + int idx, thread_count; > > > > - /* To be done */ > > + thread_count = migrate_compress_threads(); > > + quit_comp_thread = true; > > + for (idx = 0; idx < thread_count; idx++) { > > + qemu_mutex_lock(&comp_param[idx].mutex); > > + qemu_cond_signal(&comp_param[idx].cond); > > + qemu_mutex_unlock(&comp_param[idx].mutex); > > + } > > } > > > > void migrate_compress_threads_join(MigrationState *s) @@ -770,12 > > +802,157 @@ static int ram_save_page(QEMUFile *f, RAMBlock *block, > ram_addr_t offset, > > return bytes_sent; > > } > > > > +static int do_compress_ram_page(CompressParam *param) { > > + int bytes_sent, cont; > > + int blen; > > + uint8_t *p; > > + RAMBlock *block = param->block; > > + ram_addr_t offset = param->offset; > > + > > + cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0; > > + p = memory_region_get_ram_ptr(block->mr) + offset; > > + > > + bytes_sent = save_block_hdr(param->file, block, offset, cont, > > + RAM_SAVE_FLAG_COMPRESS_PAGE); > > + blen = qemu_put_compression_data(param->file, p, > TARGET_PAGE_SIZE, > > + migrate_compress_level()); > > + bytes_sent += blen; > > + atomic_inc(&acct_info.norm_pages); > > + > > + return bytes_sent; > > +} > > + > > +static inline void start_compression(CompressParam *param) { > > + qemu_mutex_lock(¶m->mutex); > > + param->busy = true; > > + qemu_cond_signal(¶m->cond); > > + qemu_mutex_unlock(¶m->mutex); } > > + > > + > > +static uint64_t bytes_transferred; > > + > > +static void flush_compressed_data(QEMUFile *f) { > > + int idx, len, thread_count; > > + > > + if (!migrate_use_compression()) { > > + return; > > + } > > + thread_count = migrate_compress_threads(); > > + for (idx = 0; idx < thread_count; idx++) { > > + if (comp_param[idx].busy) { > > + qemu_mutex_lock(comp_done_lock); > > + while (comp_param[idx].busy && !quit_comp_thread) { > > + qemu_cond_wait(comp_done_cond, comp_done_lock); > > + } > > + qemu_mutex_unlock(comp_done_lock); > > + } > > If we arrive here because quit_comp_thread == true, shouldn't we skip the > qemu_put_qemu_file()? > > > + len = qemu_put_qemu_file(f, comp_param[idx].file); > > + bytes_transferred += len; > > + } > > [remove one_byte stuff here] > > > +} > > + > > +static inline void set_compress_params(CompressParam *param, > RAMBlock *block, > > + ram_addr_t offset) { > > + param->block = block; > > + param->offset = offset; > > +} > > + > > +static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock > *block, > > + ram_addr_t offset) { > > + int idx, thread_count, bytes_sent = 0; > > + > > + thread_count = migrate_compress_threads(); > > + qemu_mutex_lock(comp_done_lock); > > + while (true) { > > + for (idx = 0; idx < thread_count; idx++) { > > + if (!comp_param[idx].busy) { > > + bytes_sent = qemu_put_qemu_file(f, comp_param[idx].file); > > + set_compress_params(&comp_param[idx], block, offset); > > + start_compression(&comp_param[idx]); > > [remove stuff here] > > > + break; > > + } > > + } > > + if (bytes_sent > 0) { > > Change this to: > if (bytes_sent >= 0) { > > > + break; > > + } else { > > + qemu_cond_wait(comp_done_cond, comp_done_lock); > > + } > > + } > > + qemu_mutex_unlock(comp_done_lock); > > + > > + return bytes_sent; > > +} > > + > > static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block, > > ram_addr_t offset, bool > > last_stage) { > > int bytes_sent = -1; > > + MemoryRegion *mr = block->mr; > > + uint8_t *p; > > + int ret; > > + int cont; > > > > - /* To be done*/ > > + p = memory_region_get_ram_ptr(mr) + offset; > > + cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0; > > + ret = ram_control_save_page(f, block->offset, > > + offset, TARGET_PAGE_SIZE, &bytes_sent); > > + if (ret != RAM_SAVE_CONTROL_NOT_SUPP) { > > + if (ret != RAM_SAVE_CONTROL_DELAYED) { > > + if (bytes_sent > 0) { > > + acct_info.norm_pages++; > > + } else if (bytes_sent == 0) { > > + acct_info.dup_pages++; > > + } > > + } > > + } else { > > + /* When starting the process of a new block, the first page of > > + * the block should be sent out before other pages in the same > > + * block, and all the pages in last block should have been sent > > + * out, keeping this order is important, because the 'cont' flag > > + * is used to avoid resending the block name. > > + */ > > + if (block != last_sent_block) { > > + flush_compressed_data(f); > > + bytes_sent = save_zero_page(f, block, offset, p, cont); > > + if (bytes_sent == -1) { > > + set_compress_params(&comp_param[0], block, offset); > > + /* Use the qemu thread to compress the data to make sure > > the > > + * first page is sent out before other pages > > + */ > > + bytes_sent = do_compress_ram_page(&comp_param[0]); > > + if (bytes_sent > 0) { > > This test is not needed > > assert(bytes_sent>0) > > or how can it be zero or negative here? So, we have to always call > qemu_put_qemu_file() no? > > > + qemu_put_qemu_file(f, comp_param[0].file); > > + } > > + } > > + } else { > > + bytes_sent = save_zero_page(f, block, offset, p, cont); > > + if (bytes_sent == -1) { > > + bytes_sent = compress_page_with_multi_thread(f, block, > > offset); > > + } > > + } > > + } > > > > return bytes_sent; > > } > > @@ -834,8 +1011,6 @@ static int ram_find_and_save_block(QEMUFile *f, > bool last_stage) > > return bytes_sent; > > } > > > > -static uint64_t bytes_transferred; > > - > > void acct_update_position(QEMUFile *f, size_t size, bool zero) { > > uint64_t pages = size / TARGET_PAGE_SIZE; @@ -1043,6 +1218,7 @@ > > static int ram_save_iterate(QEMUFile *f, void *opaque) > > i++; > > } > > > > + flush_compressed_data(f); > > qemu_mutex_unlock_ramlist(); > > > > /* > > @@ -1089,6 +1265,7 @@ static int ram_save_complete(QEMUFile *f, void > *opaque) > > bytes_transferred += bytes_sent; > > } > > > > + flush_compressed_data(f); > > ram_control_after_iterate(f, RAM_CONTROL_FINISH); > > migration_end(); > > > I thihnk this would make the code work, but not the locking. You are using > here: > > quit_comp_thread: global, and not completely clear what protects it > comp_done_lock: global > comp_done_cond: global > > param[i].busy: I would suggest renaming to pending work > param[i].mutex: > param[i].cond: > thread is waiting for work > > > Issues: > > param->busy is protected on do_data_compress() and start_compression() > with param->busy, but in flush_compressed_data() and > comress_page_with_multithread() it is protected by comp_done_lock. > > At this point, I would suggest to just drop param[i].mutex and use > everywhere comp_done_lock. We can make locking granularly later if > needed, but 1st get it correct? > > Code basically does (forget termination and locking) > > each compression_thread() > > while(1) { > while(!work_to_do) > wait_for_work > do_work > } > > And the main thread does: > > > while(1) { > foreacth compression_thread { > if thread free { > put it to work > break; > } > wait_for_thread_to_finish > } > } > > Notice how we are walking all threads each time that we need to do anything > > Perhaps code should be more simple if we put the data that needs to be > done on a global variable and change this to: > > compression_thread > > while(1) { > while(!work_to_do) > wait_for_work > pick work from global variable > wakeup main thread > do_work > } > > main thread: > > put work on global variable > while(nobody_pick_thework) { > signal all threads > wait for a compression thread to take the work } > > Why? because then we only have a global mutex and two condition variables, > with a clear semantics: > - lock protects two conditions and global variable with work > - one condition where threads wait for work > - one condition where main thread wait for a worker to be ready > > As we would need to lock every single tame to put the work in the global > variable, to wait or to pick up the work, we can stop all the: > > if (!foo) { > mutex_lock > if(!foo) /* this time with lock */ > .... > } > > > Sorry for the very long mail, if it makes you feel better, this is the second > time that I wrote it, because the 1st version, my locking proposal didn't > worked correctly. > > What do you think?
It sounds good, I will try according to your suggestion. Thanks for your detail explanation :) Liang