* Peter Xu (pet...@redhat.com) wrote:
> Introduce pss_channel for PageSearchStatus, define it as "the migration
> channel to be used to transfer this host page".
> 
> We used to have rs->f, which is a mirror to MigrationState.to_dst_file.
> 
> After postcopy preempt initial version, rs->f can be dynamically changed
> depending on which channel we want to use.
> 
> But that later work still doesn't grant full concurrency of sending pages
> in e.g. different threads, because rs->f can either be the PRECOPY channel
> or POSTCOPY channel.  This needs to be per-thread too.
> 
> PageSearchStatus is actually a good piece of struct which we can leverage
> if we want to have multiple threads sending pages.  Sending a single guest
> page may not make sense, so we make the granule to be "host page", and in
> the PSS structure we allow specify a QEMUFile* to migrate a specific host
> page.  Then we open the possibility to specify different channels in
> different threads with different PSS structures.
> 
> The PSS prefix can be slightly misleading here because e.g. for the
> upcoming usage of postcopy channel/thread it's not "searching" (or,
> scanning) at all but sending the explicit page that was requested.  However
> since PSS existed for some years keep it as-is until someone complains.
> 
> This patch mostly (simply) replace rs->f with pss->pss_channel only. No
> functional change intended for this patch yet.  But it does prepare to
> finally drop rs->f, and make ram_save_guest_page() thread safe.
> 
> Signed-off-by: Peter Xu <pet...@redhat.com>

Reviewed-by: Dr. David Alan Gilbert <dgilb...@redhat.com>

> ---
>  migration/ram.c | 70 +++++++++++++++++++++++++++----------------------
>  1 file changed, 38 insertions(+), 32 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 3f720b6de2..40ff5dc49f 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -446,6 +446,8 @@ void dirty_sync_missed_zero_copy(void)
>  
>  /* used by the search for pages to send */
>  struct PageSearchStatus {
> +    /* The migration channel used for a specific host page */
> +    QEMUFile    *pss_channel;
>      /* Current block being searched */
>      RAMBlock    *block;
>      /* Current page to search from */
> @@ -768,9 +770,9 @@ static void xbzrle_cache_zero_page(RAMState *rs, 
> ram_addr_t current_addr)
>   * @block: block that contains the page we want to send
>   * @offset: offset inside the block for the page
>   */
> -static int save_xbzrle_page(RAMState *rs, uint8_t **current_data,
> -                            ram_addr_t current_addr, RAMBlock *block,
> -                            ram_addr_t offset)
> +static int save_xbzrle_page(RAMState *rs, QEMUFile *file,
> +                            uint8_t **current_data, ram_addr_t current_addr,
> +                            RAMBlock *block, ram_addr_t offset)
>  {
>      int encoded_len = 0, bytes_xbzrle;
>      uint8_t *prev_cached_page;
> @@ -838,11 +840,11 @@ static int save_xbzrle_page(RAMState *rs, uint8_t 
> **current_data,
>      }
>  
>      /* Send XBZRLE based compressed page */
> -    bytes_xbzrle = save_page_header(rs, rs->f, block,
> +    bytes_xbzrle = save_page_header(rs, file, block,
>                                      offset | RAM_SAVE_FLAG_XBZRLE);
> -    qemu_put_byte(rs->f, ENCODING_FLAG_XBZRLE);
> -    qemu_put_be16(rs->f, encoded_len);
> -    qemu_put_buffer(rs->f, XBZRLE.encoded_buf, encoded_len);
> +    qemu_put_byte(file, ENCODING_FLAG_XBZRLE);
> +    qemu_put_be16(file, encoded_len);
> +    qemu_put_buffer(file, XBZRLE.encoded_buf, encoded_len);
>      bytes_xbzrle += encoded_len + 1 + 2;
>      /*
>       * Like compressed_size (please see update_compress_thread_counts),
> @@ -1298,9 +1300,10 @@ static int save_zero_page_to_file(RAMState *rs, 
> QEMUFile *file,
>   * @block: block that contains the page we want to send
>   * @offset: offset inside the block for the page
>   */
> -static int save_zero_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
> +static int save_zero_page(RAMState *rs, QEMUFile *file, RAMBlock *block,
> +                          ram_addr_t offset)
>  {
> -    int len = save_zero_page_to_file(rs, rs->f, block, offset);
> +    int len = save_zero_page_to_file(rs, file, block, offset);
>  
>      if (len) {
>          qatomic_inc(&ram_counters.duplicate);
> @@ -1317,15 +1320,15 @@ static int save_zero_page(RAMState *rs, RAMBlock 
> *block, ram_addr_t offset)
>   *
>   * Return true if the pages has been saved, otherwise false is returned.
>   */
> -static bool control_save_page(RAMState *rs, RAMBlock *block, ram_addr_t 
> offset,
> -                              int *pages)
> +static bool control_save_page(PageSearchStatus *pss, RAMBlock *block,
> +                              ram_addr_t offset, int *pages)
>  {
>      uint64_t bytes_xmit = 0;
>      int ret;
>  
>      *pages = -1;
> -    ret = ram_control_save_page(rs->f, block->offset, offset, 
> TARGET_PAGE_SIZE,
> -                                &bytes_xmit);
> +    ret = ram_control_save_page(pss->pss_channel, block->offset, offset,
> +                                TARGET_PAGE_SIZE, &bytes_xmit);
>      if (ret == RAM_SAVE_CONTROL_NOT_SUPP) {
>          return false;
>      }
> @@ -1359,17 +1362,17 @@ static bool control_save_page(RAMState *rs, RAMBlock 
> *block, ram_addr_t offset,
>   * @buf: the page to be sent
>   * @async: send to page asyncly
>   */
> -static int save_normal_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
> -                            uint8_t *buf, bool async)
> +static int save_normal_page(RAMState *rs, QEMUFile *file, RAMBlock *block,
> +                            ram_addr_t offset, uint8_t *buf, bool async)
>  {
> -    ram_transferred_add(save_page_header(rs, rs->f, block,
> +    ram_transferred_add(save_page_header(rs, file, block,
>                                           offset | RAM_SAVE_FLAG_PAGE));
>      if (async) {
> -        qemu_put_buffer_async(rs->f, buf, TARGET_PAGE_SIZE,
> +        qemu_put_buffer_async(file, buf, TARGET_PAGE_SIZE,
>                                migrate_release_ram() &&
>                                migration_in_postcopy());
>      } else {
> -        qemu_put_buffer(rs->f, buf, TARGET_PAGE_SIZE);
> +        qemu_put_buffer(file, buf, TARGET_PAGE_SIZE);
>      }
>      ram_transferred_add(TARGET_PAGE_SIZE);
>      qatomic_inc(&ram_counters.normal);
> @@ -1402,8 +1405,8 @@ static int ram_save_page(RAMState *rs, PageSearchStatus 
> *pss)
>  
>      XBZRLE_cache_lock();
>      if (rs->xbzrle_enabled && !migration_in_postcopy()) {
> -        pages = save_xbzrle_page(rs, &p, current_addr, block,
> -                                 offset);
> +        pages = save_xbzrle_page(rs, pss->pss_channel, &p, current_addr,
> +                                 block, offset);
>          if (!rs->last_stage) {
>              /* Can't send this cached data async, since the cache page
>               * might get updated before it gets to the wire
> @@ -1414,7 +1417,8 @@ static int ram_save_page(RAMState *rs, PageSearchStatus 
> *pss)
>  
>      /* XBZRLE overflow or normal page */
>      if (pages == -1) {
> -        pages = save_normal_page(rs, block, offset, p, send_async);
> +        pages = save_normal_page(rs, pss->pss_channel, block, offset,
> +                                 p, send_async);
>      }
>  
>      XBZRLE_cache_unlock();
> @@ -1422,10 +1426,10 @@ static int ram_save_page(RAMState *rs, 
> PageSearchStatus *pss)
>      return pages;
>  }
>  
> -static int ram_save_multifd_page(RAMState *rs, RAMBlock *block,
> +static int ram_save_multifd_page(QEMUFile *file, RAMBlock *block,
>                                   ram_addr_t offset)
>  {
> -    if (multifd_queue_page(rs->f, block, offset) < 0) {
> +    if (multifd_queue_page(file, block, offset) < 0) {
>          return -1;
>      }
>      ram_counters.normal++;
> @@ -1720,7 +1724,7 @@ static int ram_save_release_protection(RAMState *rs, 
> PageSearchStatus *pss,
>          uint64_t run_length = (pss->page - start_page) << TARGET_PAGE_BITS;
>  
>          /* Flush async buffers before un-protect. */
> -        qemu_fflush(rs->f);
> +        qemu_fflush(pss->pss_channel);
>          /* Un-protect memory range. */
>          res = uffd_change_protection(rs->uffdio_fd, page_address, run_length,
>                  false, false);
> @@ -2307,7 +2311,7 @@ static int ram_save_target_page(RAMState *rs, 
> PageSearchStatus *pss)
>      ram_addr_t offset = ((ram_addr_t)pss->page) << TARGET_PAGE_BITS;
>      int res;
>  
> -    if (control_save_page(rs, block, offset, &res)) {
> +    if (control_save_page(pss, block, offset, &res)) {
>          return res;
>      }
>  
> @@ -2315,7 +2319,7 @@ static int ram_save_target_page(RAMState *rs, 
> PageSearchStatus *pss)
>          return 1;
>      }
>  
> -    res = save_zero_page(rs, block, offset);
> +    res = save_zero_page(rs, pss->pss_channel, block, offset);
>      if (res > 0) {
>          /* Must let xbzrle know, otherwise a previous (now 0'd) cached
>           * page would be stale
> @@ -2336,7 +2340,7 @@ static int ram_save_target_page(RAMState *rs, 
> PageSearchStatus *pss)
>       */
>      if (!save_page_use_compression(rs) && migrate_use_multifd()
>          && !migration_in_postcopy()) {
> -        return ram_save_multifd_page(rs, block, offset);
> +        return ram_save_multifd_page(pss->pss_channel, block, offset);
>      }
>  
>      return ram_save_page(rs, pss);
> @@ -2533,10 +2537,6 @@ static int ram_save_host_page(RAMState *rs, 
> PageSearchStatus *pss)
>          return 0;
>      }
>  
> -    if (postcopy_preempt_active()) {
> -        postcopy_preempt_choose_channel(rs, pss);
> -    }
> -
>      /* Update host page boundary information */
>      pss_host_page_prepare(pss);
>  
> @@ -2597,7 +2597,7 @@ static int ram_save_host_page(RAMState *rs, 
> PageSearchStatus *pss)
>       * explicit flush or it won't flush until the buffer is full.
>       */
>      if (migrate_postcopy_preempt() && pss->postcopy_requested) {
> -        qemu_fflush(rs->f);
> +        qemu_fflush(pss->pss_channel);
>      }
>  
>      res = ram_save_release_protection(rs, pss, start_page);
> @@ -2663,6 +2663,12 @@ static int ram_find_and_save_block(RAMState *rs)
>          }
>  
>          if (found) {
> +            /* Update rs->f with correct channel */
> +            if (postcopy_preempt_active()) {
> +                postcopy_preempt_choose_channel(rs, &pss);
> +            }
> +            /* Cache rs->f in pss_channel (TODO: remove rs->f) */
> +            pss.pss_channel = rs->f;
>              pages = ram_save_host_page(rs, &pss);
>          }
>      } while (!pages && again);
> -- 
> 2.32.0
> 
-- 
Dr. David Alan Gilbert / dgilb...@redhat.com / Manchester, UK


Reply via email to