> -----Original Message-----
> From: Juan Quintela [mailto:quint...@redhat.com]
> Sent: Wednesday, February 11, 2015 7:45 PM
> To: Li, Liang Z
> Cc: qemu-devel@nongnu.org; ebl...@redhat.com; amit.s...@redhat.com;
> lcapitul...@redhat.com; arm...@redhat.com; dgilb...@redhat.com; Zhang,
> Yang Z
> Subject: Re: [v5 08/12] migration: Add the core code of multi-thread
> compression
> 
> Liang Li <liang.z...@intel.com> wrote:
> > Implement the core logic of the multiple thread compression. At this
> > point, multiple thread compression can't co-work with xbzrle yet.
> >
> > Signed-off-by: Liang Li <liang.z...@intel.com>
> > Signed-off-by: Yang Zhang <yang.z.zh...@intel.com>
> 
> 
> > --- a/arch_init.c
> > +++ b/arch_init.c
> > @@ -363,18 +363,44 @@ static QemuMutex *comp_done_lock;  static
> > QemuCond *comp_done_cond;
> >  /* The empty QEMUFileOps will be used by file in CompressParam */
> > static const QEMUFileOps empty_ops = { };
> > +
> > +/* one_byte_count is used to count the bytes that is added to
> > + * bytes_transferred but not actually transferred, at the proper
> > + * time, we should sub one_byte_count from bytes_transferred to
> > + * make bytes_transferred accurate.
> > + */
> > +static int one_byte_count;
> 
> With the changes proposed previously to ram_save_compressed_page() this
> shouldn't be needed.  It can return 0 now.
> 
> > +static int do_compress_ram_page(CompressParam *param);
> > +
> >  static void *do_data_compress(void *opaque)  {
> > -    while (!quit_comp_thread) {
> > -
> > -    /* To be done */
> > +    CompressParam *param = opaque;
> >
> > +    while (!quit_comp_thread) {
> > +        qemu_mutex_lock(&param->mutex);
> > +        /* Re-check the quit_comp_thread in case of
> > +         * terminate_compression_threads is called just before
> > +         * qemu_mutex_lock(&param->mutex) and after
> > +         * while(!quit_comp_thread), re-check it here can make
> > +         * sure the compression thread terminate as expected.
> > +         */
> > +        while (!param->busy && !quit_comp_thread) {
> > +            qemu_cond_wait(&param->cond, &param->mutex);
> > +        }
> > +        qemu_mutex_unlock(&param->mutex);
> > +        if (!quit_comp_thread) {
> > +            do_compress_ram_page(param);
> > +        }
> > +        qemu_mutex_lock(comp_done_lock);
> > +        param->busy = false;
> > +        qemu_cond_signal(comp_done_cond);
> > +        qemu_mutex_unlock(comp_done_lock);
> >      }
> >
> >      return NULL;
> > @@ -382,9 +408,15 @@ static void *do_data_compress(void *opaque)
> >
> >  static inline void terminate_compression_threads(void)
> >  {
> > -    quit_comp_thread = true;
> > +    int idx, thread_count;
> >
> > -    /* To be done */
> > +    thread_count = migrate_compress_threads();
> > +    quit_comp_thread = true;
> > +    for (idx = 0; idx < thread_count; idx++) {
> > +        qemu_mutex_lock(&comp_param[idx].mutex);
> > +        qemu_cond_signal(&comp_param[idx].cond);
> > +        qemu_mutex_unlock(&comp_param[idx].mutex);
> > +    }
> >  }
> >
> >  void migrate_compress_threads_join(MigrationState *s) @@ -770,12
> > +802,157 @@ static int ram_save_page(QEMUFile *f, RAMBlock *block,
> ram_addr_t offset,
> >      return bytes_sent;
> >  }
> >
> > +static int do_compress_ram_page(CompressParam *param) {
> > +    int bytes_sent, cont;
> > +    int blen;
> > +    uint8_t *p;
> > +    RAMBlock *block = param->block;
> > +    ram_addr_t offset = param->offset;
> > +
> > +    cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> > +    p = memory_region_get_ram_ptr(block->mr) + offset;
> > +
> > +    bytes_sent = save_block_hdr(param->file, block, offset, cont,
> > +                                RAM_SAVE_FLAG_COMPRESS_PAGE);
> > +    blen = qemu_put_compression_data(param->file, p,
> TARGET_PAGE_SIZE,
> > +                                     migrate_compress_level());
> > +    bytes_sent += blen;
> > +    atomic_inc(&acct_info.norm_pages);
> > +
> > +    return bytes_sent;
> > +}
> > +
> > +static inline void start_compression(CompressParam *param) {
> > +    qemu_mutex_lock(&param->mutex);
> > +    param->busy = true;
> > +    qemu_cond_signal(&param->cond);
> > +    qemu_mutex_unlock(&param->mutex); }
> > +
> > +
> > +static uint64_t bytes_transferred;
> > +
> > +static void flush_compressed_data(QEMUFile *f) {
> > +    int idx, len, thread_count;
> > +
> > +    if (!migrate_use_compression()) {
> > +        return;
> > +    }
> > +    thread_count = migrate_compress_threads();
> > +    for (idx = 0; idx < thread_count; idx++) {
> > +        if (comp_param[idx].busy) {
> > +            qemu_mutex_lock(comp_done_lock);
> > +            while (comp_param[idx].busy && !quit_comp_thread) {
> > +                qemu_cond_wait(comp_done_cond, comp_done_lock);
> > +            }
> > +            qemu_mutex_unlock(comp_done_lock);
> > +        }
> 
> If we arrive here because quit_comp_thread == true, shouldn't we skip the
> qemu_put_qemu_file()?
> 
> > +        len = qemu_put_qemu_file(f, comp_param[idx].file);
> > +        bytes_transferred += len;
> > +    }
> 
> [remove one_byte stuff here]
> 
> > +}
> > +
> > +static inline void set_compress_params(CompressParam *param,
> RAMBlock *block,
> > +                                       ram_addr_t offset) {
> > +    param->block = block;
> > +    param->offset = offset;
> > +}
> > +
> > +static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock
> *block,
> > +                                           ram_addr_t offset) {
> > +    int idx, thread_count, bytes_sent = 0;
> > +
> > +    thread_count = migrate_compress_threads();
> > +    qemu_mutex_lock(comp_done_lock);
> > +    while (true) {
> > +        for (idx = 0; idx < thread_count; idx++) {
> > +            if (!comp_param[idx].busy) {
> > +                bytes_sent = qemu_put_qemu_file(f, comp_param[idx].file);
> > +                set_compress_params(&comp_param[idx], block, offset);
> > +                start_compression(&comp_param[idx]);
> 
> [remove stuff here]
> 
> > +                break;
> > +            }
> > +        }
> > +        if (bytes_sent > 0) {
> 
> Change this to:
>           if (bytes_sent >= 0) {
> 
> > +            break;
> > +        } else {
> > +            qemu_cond_wait(comp_done_cond, comp_done_lock);
> > +        }
> > +    }
> > +    qemu_mutex_unlock(comp_done_lock);
> > +
> > +    return bytes_sent;
> > +}
> > +
> >  static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
> >                                      ram_addr_t offset, bool
> > last_stage)  {
> >      int bytes_sent = -1;
> > +    MemoryRegion *mr = block->mr;
> > +    uint8_t *p;
> > +    int ret;
> > +    int cont;
> >
> > -    /* To be done*/
> > +    p = memory_region_get_ram_ptr(mr) + offset;
> > +    cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> > +    ret = ram_control_save_page(f, block->offset,
> > +                                offset, TARGET_PAGE_SIZE, &bytes_sent);
> > +    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
> > +        if (ret != RAM_SAVE_CONTROL_DELAYED) {
> > +            if (bytes_sent > 0) {
> > +                acct_info.norm_pages++;
> > +            } else if (bytes_sent == 0) {
> > +                acct_info.dup_pages++;
> > +            }
> > +        }
> > +    } else {
> > +        /* When starting the process of a new block, the first page of
> > +         * the block should be sent out before other pages in the same
> > +         * block, and all the pages in last block should have been sent
> > +         * out, keeping this order is important, because the 'cont' flag
> > +         * is used to avoid resending the block name.
> > +         */
> > +        if (block != last_sent_block) {
> > +            flush_compressed_data(f);
> > +            bytes_sent = save_zero_page(f, block, offset, p, cont);
> > +            if (bytes_sent == -1) {
> > +                set_compress_params(&comp_param[0], block, offset);
> > +                /* Use the qemu thread to compress the data to make sure 
> > the
> > +                 * first page is sent out before other pages
> > +                 */
> > +                bytes_sent = do_compress_ram_page(&comp_param[0]);
> > +                if (bytes_sent > 0) {
> 
> This test is not needed
> 
> assert(bytes_sent>0)
> 
> or how can it be zero or negative here?  So, we have to always call
> qemu_put_qemu_file() no?
> 
> > +                    qemu_put_qemu_file(f, comp_param[0].file);
> > +                }
> > +            }
> > +        } else {
> > +            bytes_sent = save_zero_page(f, block, offset, p, cont);
> > +            if (bytes_sent == -1) {
> > +                bytes_sent = compress_page_with_multi_thread(f, block, 
> > offset);
> > +            }
> > +        }
> > +    }
> >
> >      return bytes_sent;
> >  }
> > @@ -834,8 +1011,6 @@ static int ram_find_and_save_block(QEMUFile *f,
> bool last_stage)
> >      return bytes_sent;
> >  }
> >
> > -static uint64_t bytes_transferred;
> > -
> >  void acct_update_position(QEMUFile *f, size_t size, bool zero)  {
> >      uint64_t pages = size / TARGET_PAGE_SIZE; @@ -1043,6 +1218,7 @@
> > static int ram_save_iterate(QEMUFile *f, void *opaque)
> >          i++;
> >      }
> >
> > +    flush_compressed_data(f);
> >      qemu_mutex_unlock_ramlist();
> >
> >      /*
> > @@ -1089,6 +1265,7 @@ static int ram_save_complete(QEMUFile *f, void
> *opaque)
> >          bytes_transferred += bytes_sent;
> >      }
> >
> > +    flush_compressed_data(f);
> >      ram_control_after_iterate(f, RAM_CONTROL_FINISH);
> >      migration_end();
> 
> 
> I thihnk this would make the code work, but not the locking.  You are using
> here:
> 
> quit_comp_thread:  global, and not completely clear what protects it
> comp_done_lock: global
> comp_done_cond: global
> 
> param[i].busy: I would suggest renaming to pending work
> param[i].mutex:
> param[i].cond:
>        thread is waiting for work
> 
> 
> Issues:
> 
> param->busy is protected on do_data_compress() and start_compression()
> with param->busy, but in flush_compressed_data() and
> comress_page_with_multithread() it is protected by comp_done_lock.
> 
> At this point, I would suggest to just drop param[i].mutex and use
> everywhere comp_done_lock.  We can make locking granularly later if
> needed, but 1st get it correct?
> 
> Code basically does (forget termination and locking)
> 
> each compression_thread()
> 
>   while(1) {
>      while(!work_to_do)
>         wait_for_work
>      do_work
>   }
> 
> And the main thread does:
> 
> 
> while(1) {
>      foreacth compression_thread {
>           if thread free {
>              put it to work
>              break;
>           }
>           wait_for_thread_to_finish
>      }
> }
> 
> Notice how we are walking all threads each time that we need to do anything
> 
> Perhaps code should be more simple if we put the data that needs to be
> done on a global variable and change this to:
> 
> compression_thread
> 
>   while(1) {
>      while(!work_to_do)
>         wait_for_work
>      pick work from global variable
>      wakeup main thread
>      do_work
>   }
> 
> main thread:
> 
> put work on global variable
> while(nobody_pick_thework) {
>      signal all threads
>      wait for a compression thread to take the work }
> 
> Why?  because then we only have a global mutex and two condition variables,
> with a clear semantics:
> - lock protects two conditions and global variable with work
> - one condition where threads wait for work
> - one condition where main thread wait for a worker to be ready
> 
> As we would need to lock every single tame to put the work in the global
> variable, to wait or to pick up the work, we can stop all the:
> 
> if (!foo) {
>     mutex_lock
>     if(!foo) /* this time with lock */
>         ....
> }
> 
> 
> Sorry for the very long mail, if it makes you feel better, this is the second
> time that I wrote it, because the 1st version, my locking proposal didn't
> worked correctly.
> 
> What do you think?

It sounds good, I will try according to your suggestion.  Thanks for your 
detail explanation :)

Liang

Reply via email to