Liang Li <liang.z...@intel.com> wrote:
> Implement the core logic of the multiple thread compression. At this
> point, multiple thread compression can't co-work with xbzrle yet.
>
> Signed-off-by: Liang Li <liang.z...@intel.com>
> Signed-off-by: Yang Zhang <yang.z.zh...@intel.com>


> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -363,18 +363,44 @@ static QemuMutex *comp_done_lock;
>  static QemuCond *comp_done_cond;
>  /* The empty QEMUFileOps will be used by file in CompressParam */
>  static const QEMUFileOps empty_ops = { };
> +
> +/* one_byte_count is used to count the bytes that is added to
> + * bytes_transferred but not actually transferred, at the proper
> + * time, we should sub one_byte_count from bytes_transferred to
> + * make bytes_transferred accurate.
> + */
> +static int one_byte_count;

With the changes proposed previously to ram_save_compressed_page() this
shouldn't be needed.  It can return 0 now.

> +static int do_compress_ram_page(CompressParam *param);
> +
>  static void *do_data_compress(void *opaque)
>  {
> -    while (!quit_comp_thread) {
> -
> -    /* To be done */
> +    CompressParam *param = opaque;
>  
> +    while (!quit_comp_thread) {
> +        qemu_mutex_lock(&param->mutex);
> +        /* Re-check the quit_comp_thread in case of
> +         * terminate_compression_threads is called just before
> +         * qemu_mutex_lock(&param->mutex) and after
> +         * while(!quit_comp_thread), re-check it here can make
> +         * sure the compression thread terminate as expected.
> +         */
> +        while (!param->busy && !quit_comp_thread) {
> +            qemu_cond_wait(&param->cond, &param->mutex);
> +        }
> +        qemu_mutex_unlock(&param->mutex);
> +        if (!quit_comp_thread) {
> +            do_compress_ram_page(param);
> +        }
> +        qemu_mutex_lock(comp_done_lock);
> +        param->busy = false;
> +        qemu_cond_signal(comp_done_cond);
> +        qemu_mutex_unlock(comp_done_lock);
>      }
>  
>      return NULL;
> @@ -382,9 +408,15 @@ static void *do_data_compress(void *opaque)
>  
>  static inline void terminate_compression_threads(void)
>  {
> -    quit_comp_thread = true;
> +    int idx, thread_count;
>  
> -    /* To be done */
> +    thread_count = migrate_compress_threads();
> +    quit_comp_thread = true;
> +    for (idx = 0; idx < thread_count; idx++) {
> +        qemu_mutex_lock(&comp_param[idx].mutex);
> +        qemu_cond_signal(&comp_param[idx].cond);
> +        qemu_mutex_unlock(&comp_param[idx].mutex);
> +    }
>  }
>  
>  void migrate_compress_threads_join(MigrationState *s)
> @@ -770,12 +802,157 @@ static int ram_save_page(QEMUFile *f, RAMBlock *block, 
> ram_addr_t offset,
>      return bytes_sent;
>  }
>  
> +static int do_compress_ram_page(CompressParam *param)
> +{
> +    int bytes_sent, cont;
> +    int blen;
> +    uint8_t *p;
> +    RAMBlock *block = param->block;
> +    ram_addr_t offset = param->offset;
> +
> +    cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> +    p = memory_region_get_ram_ptr(block->mr) + offset;
> +
> +    bytes_sent = save_block_hdr(param->file, block, offset, cont,
> +                                RAM_SAVE_FLAG_COMPRESS_PAGE);
> +    blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
> +                                     migrate_compress_level());
> +    bytes_sent += blen;
> +    atomic_inc(&acct_info.norm_pages);
> +
> +    return bytes_sent;
> +}
> +
> +static inline void start_compression(CompressParam *param)
> +{
> +    qemu_mutex_lock(&param->mutex);
> +    param->busy = true;
> +    qemu_cond_signal(&param->cond);
> +    qemu_mutex_unlock(&param->mutex);
> +}
> +
> +
> +static uint64_t bytes_transferred;
> +
> +static void flush_compressed_data(QEMUFile *f)
> +{
> +    int idx, len, thread_count;
> +
> +    if (!migrate_use_compression()) {
> +        return;
> +    }
> +    thread_count = migrate_compress_threads();
> +    for (idx = 0; idx < thread_count; idx++) {
> +        if (comp_param[idx].busy) {
> +            qemu_mutex_lock(comp_done_lock);
> +            while (comp_param[idx].busy && !quit_comp_thread) {
> +                qemu_cond_wait(comp_done_cond, comp_done_lock);
> +            }
> +            qemu_mutex_unlock(comp_done_lock);
> +        }

If we arrive here because quit_comp_thread == true, shouldn't we skip
the qemu_put_qemu_file()?

> +        len = qemu_put_qemu_file(f, comp_param[idx].file);
> +        bytes_transferred += len;
> +    }

[remove one_byte stuff here]

> +}
> +
> +static inline void set_compress_params(CompressParam *param, RAMBlock *block,
> +                                       ram_addr_t offset)
> +{
> +    param->block = block;
> +    param->offset = offset;
> +}
> +
> +static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
> +                                           ram_addr_t offset)
> +{
> +    int idx, thread_count, bytes_sent = 0;
> +
> +    thread_count = migrate_compress_threads();
> +    qemu_mutex_lock(comp_done_lock);
> +    while (true) {
> +        for (idx = 0; idx < thread_count; idx++) {
> +            if (!comp_param[idx].busy) {
> +                bytes_sent = qemu_put_qemu_file(f, comp_param[idx].file);
> +                set_compress_params(&comp_param[idx], block, offset);
> +                start_compression(&comp_param[idx]);

[remove stuff here]

> +                break;
> +            }
> +        }
> +        if (bytes_sent > 0) {

Change this to:
          if (bytes_sent >= 0) {

> +            break;
> +        } else {
> +            qemu_cond_wait(comp_done_cond, comp_done_lock);
> +        }
> +    }
> +    qemu_mutex_unlock(comp_done_lock);
> +
> +    return bytes_sent;
> +}
> +
>  static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
>                                      ram_addr_t offset, bool last_stage)
>  {
>      int bytes_sent = -1;
> +    MemoryRegion *mr = block->mr;
> +    uint8_t *p;
> +    int ret;
> +    int cont;
>  
> -    /* To be done*/
> +    p = memory_region_get_ram_ptr(mr) + offset;
> +    cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> +    ret = ram_control_save_page(f, block->offset,
> +                                offset, TARGET_PAGE_SIZE, &bytes_sent);
> +    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
> +        if (ret != RAM_SAVE_CONTROL_DELAYED) {
> +            if (bytes_sent > 0) {
> +                acct_info.norm_pages++;
> +            } else if (bytes_sent == 0) {
> +                acct_info.dup_pages++;
> +            }
> +        }
> +    } else {
> +        /* When starting the process of a new block, the first page of
> +         * the block should be sent out before other pages in the same
> +         * block, and all the pages in last block should have been sent
> +         * out, keeping this order is important, because the 'cont' flag
> +         * is used to avoid resending the block name.
> +         */
> +        if (block != last_sent_block) {
> +            flush_compressed_data(f);
> +            bytes_sent = save_zero_page(f, block, offset, p, cont);
> +            if (bytes_sent == -1) {
> +                set_compress_params(&comp_param[0], block, offset);
> +                /* Use the qemu thread to compress the data to make sure the
> +                 * first page is sent out before other pages
> +                 */
> +                bytes_sent = do_compress_ram_page(&comp_param[0]);
> +                if (bytes_sent > 0) {

This test is not needed

assert(bytes_sent>0)

or how can it be zero or negative here?  So, we have to always call
qemu_put_qemu_file() no?

> +                    qemu_put_qemu_file(f, comp_param[0].file);
> +                }
> +            }
> +        } else {
> +            bytes_sent = save_zero_page(f, block, offset, p, cont);
> +            if (bytes_sent == -1) {
> +                bytes_sent = compress_page_with_multi_thread(f, block, 
> offset);
> +            }
> +        }
> +    }
>  
>      return bytes_sent;
>  }
> @@ -834,8 +1011,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool 
> last_stage)
>      return bytes_sent;
>  }
>  
> -static uint64_t bytes_transferred;
> -
>  void acct_update_position(QEMUFile *f, size_t size, bool zero)
>  {
>      uint64_t pages = size / TARGET_PAGE_SIZE;
> @@ -1043,6 +1218,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
>          i++;
>      }
>  
> +    flush_compressed_data(f);
>      qemu_mutex_unlock_ramlist();
>  
>      /*
> @@ -1089,6 +1265,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>          bytes_transferred += bytes_sent;
>      }
>  
> +    flush_compressed_data(f);
>      ram_control_after_iterate(f, RAM_CONTROL_FINISH);
>      migration_end();


I thihnk this would make the code work, but not the locking.  You are
using here:

quit_comp_thread:  global, and not completely clear what protects it
comp_done_lock: global
comp_done_cond: global

param[i].busy: I would suggest renaming to pending work
param[i].mutex:
param[i].cond:
       thread is waiting for work


Issues:

param->busy is protected on do_data_compress() and start_compression()
with param->busy, but in flush_compressed_data() and
comress_page_with_multithread() it is protected by
comp_done_lock.

At this point, I would suggest to just drop param[i].mutex and use
everywhere comp_done_lock.  We can make locking granularly later if
needed, but 1st get it correct?

Code basically does (forget termination and locking)

each compression_thread()

  while(1) {
     while(!work_to_do)
        wait_for_work
     do_work
  }

And the main thread does:


while(1) {
     foreacth compression_thread {
          if thread free {
             put it to work
             break;
          }
          wait_for_thread_to_finish
     }
}

Notice how we are walking all threads each time that we need to do anything

Perhaps code should be more simple if we put the data that needs to be
done on a global variable and change this to:

compression_thread

  while(1) {
     while(!work_to_do)
        wait_for_work
     pick work from global variable
     wakeup main thread
     do_work
  }

main thread:

put work on global variable
while(nobody_pick_thework) {
     signal all threads
     wait for a compression thread to take the work
}

Why?  because then we only have a global mutex and two condition
variables, with a clear semantics:
- lock protects two conditions and global variable with work
- one condition where threads wait for work
- one condition where main thread wait for a worker to be ready

As we would need to lock every single tame to put the work in the global
variable, to wait or to pick up the work, we can stop all the:

if (!foo) {
    mutex_lock
    if(!foo) /* this time with lock */
        ....
}


Sorry for the very long mail, if it makes you feel better, this is the
second time that I wrote it, because the 1st version, my locking
proposal didn't worked correctly.

What do you think?

Later, Juan.

Reply via email to