* Peter Xu (pet...@redhat.com) wrote: > Introduce pss_channel for PageSearchStatus, define it as "the migration > channel to be used to transfer this host page". > > We used to have rs->f, which is a mirror to MigrationState.to_dst_file. > > After postcopy preempt initial version, rs->f can be dynamically changed > depending on which channel we want to use. > > But that later work still doesn't grant full concurrency of sending pages > in e.g. different threads, because rs->f can either be the PRECOPY channel > or POSTCOPY channel. This needs to be per-thread too. > > PageSearchStatus is actually a good piece of struct which we can leverage > if we want to have multiple threads sending pages. Sending a single guest > page may not make sense, so we make the granule to be "host page", and in > the PSS structure we allow specify a QEMUFile* to migrate a specific host > page. Then we open the possibility to specify different channels in > different threads with different PSS structures. > > The PSS prefix can be slightly misleading here because e.g. for the > upcoming usage of postcopy channel/thread it's not "searching" (or, > scanning) at all but sending the explicit page that was requested. However > since PSS existed for some years keep it as-is until someone complains. > > This patch mostly (simply) replace rs->f with pss->pss_channel only. No > functional change intended for this patch yet. But it does prepare to > finally drop rs->f, and make ram_save_guest_page() thread safe. > > Signed-off-by: Peter Xu <pet...@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilb...@redhat.com> > --- > migration/ram.c | 70 +++++++++++++++++++++++++++---------------------- > 1 file changed, 38 insertions(+), 32 deletions(-) > > diff --git a/migration/ram.c b/migration/ram.c > index 3f720b6de2..40ff5dc49f 100644 > --- a/migration/ram.c > +++ b/migration/ram.c > @@ -446,6 +446,8 @@ void dirty_sync_missed_zero_copy(void) > > /* used by the search for pages to send */ > struct PageSearchStatus { > + /* The migration channel used for a specific host page */ > + QEMUFile *pss_channel; > /* Current block being searched */ > RAMBlock *block; > /* Current page to search from */ > @@ -768,9 +770,9 @@ static void xbzrle_cache_zero_page(RAMState *rs, > ram_addr_t current_addr) > * @block: block that contains the page we want to send > * @offset: offset inside the block for the page > */ > -static int save_xbzrle_page(RAMState *rs, uint8_t **current_data, > - ram_addr_t current_addr, RAMBlock *block, > - ram_addr_t offset) > +static int save_xbzrle_page(RAMState *rs, QEMUFile *file, > + uint8_t **current_data, ram_addr_t current_addr, > + RAMBlock *block, ram_addr_t offset) > { > int encoded_len = 0, bytes_xbzrle; > uint8_t *prev_cached_page; > @@ -838,11 +840,11 @@ static int save_xbzrle_page(RAMState *rs, uint8_t > **current_data, > } > > /* Send XBZRLE based compressed page */ > - bytes_xbzrle = save_page_header(rs, rs->f, block, > + bytes_xbzrle = save_page_header(rs, file, block, > offset | RAM_SAVE_FLAG_XBZRLE); > - qemu_put_byte(rs->f, ENCODING_FLAG_XBZRLE); > - qemu_put_be16(rs->f, encoded_len); > - qemu_put_buffer(rs->f, XBZRLE.encoded_buf, encoded_len); > + qemu_put_byte(file, ENCODING_FLAG_XBZRLE); > + qemu_put_be16(file, encoded_len); > + qemu_put_buffer(file, XBZRLE.encoded_buf, encoded_len); > bytes_xbzrle += encoded_len + 1 + 2; > /* > * Like compressed_size (please see update_compress_thread_counts), > @@ -1298,9 +1300,10 @@ static int save_zero_page_to_file(RAMState *rs, > QEMUFile *file, > * @block: block that contains the page we want to send > * @offset: offset inside the block for the page > */ > -static int save_zero_page(RAMState *rs, RAMBlock *block, ram_addr_t offset) > +static int save_zero_page(RAMState *rs, QEMUFile *file, RAMBlock *block, > + ram_addr_t offset) > { > - int len = save_zero_page_to_file(rs, rs->f, block, offset); > + int len = save_zero_page_to_file(rs, file, block, offset); > > if (len) { > qatomic_inc(&ram_counters.duplicate); > @@ -1317,15 +1320,15 @@ static int save_zero_page(RAMState *rs, RAMBlock > *block, ram_addr_t offset) > * > * Return true if the pages has been saved, otherwise false is returned. > */ > -static bool control_save_page(RAMState *rs, RAMBlock *block, ram_addr_t > offset, > - int *pages) > +static bool control_save_page(PageSearchStatus *pss, RAMBlock *block, > + ram_addr_t offset, int *pages) > { > uint64_t bytes_xmit = 0; > int ret; > > *pages = -1; > - ret = ram_control_save_page(rs->f, block->offset, offset, > TARGET_PAGE_SIZE, > - &bytes_xmit); > + ret = ram_control_save_page(pss->pss_channel, block->offset, offset, > + TARGET_PAGE_SIZE, &bytes_xmit); > if (ret == RAM_SAVE_CONTROL_NOT_SUPP) { > return false; > } > @@ -1359,17 +1362,17 @@ static bool control_save_page(RAMState *rs, RAMBlock > *block, ram_addr_t offset, > * @buf: the page to be sent > * @async: send to page asyncly > */ > -static int save_normal_page(RAMState *rs, RAMBlock *block, ram_addr_t offset, > - uint8_t *buf, bool async) > +static int save_normal_page(RAMState *rs, QEMUFile *file, RAMBlock *block, > + ram_addr_t offset, uint8_t *buf, bool async) > { > - ram_transferred_add(save_page_header(rs, rs->f, block, > + ram_transferred_add(save_page_header(rs, file, block, > offset | RAM_SAVE_FLAG_PAGE)); > if (async) { > - qemu_put_buffer_async(rs->f, buf, TARGET_PAGE_SIZE, > + qemu_put_buffer_async(file, buf, TARGET_PAGE_SIZE, > migrate_release_ram() && > migration_in_postcopy()); > } else { > - qemu_put_buffer(rs->f, buf, TARGET_PAGE_SIZE); > + qemu_put_buffer(file, buf, TARGET_PAGE_SIZE); > } > ram_transferred_add(TARGET_PAGE_SIZE); > qatomic_inc(&ram_counters.normal); > @@ -1402,8 +1405,8 @@ static int ram_save_page(RAMState *rs, PageSearchStatus > *pss) > > XBZRLE_cache_lock(); > if (rs->xbzrle_enabled && !migration_in_postcopy()) { > - pages = save_xbzrle_page(rs, &p, current_addr, block, > - offset); > + pages = save_xbzrle_page(rs, pss->pss_channel, &p, current_addr, > + block, offset); > if (!rs->last_stage) { > /* Can't send this cached data async, since the cache page > * might get updated before it gets to the wire > @@ -1414,7 +1417,8 @@ static int ram_save_page(RAMState *rs, PageSearchStatus > *pss) > > /* XBZRLE overflow or normal page */ > if (pages == -1) { > - pages = save_normal_page(rs, block, offset, p, send_async); > + pages = save_normal_page(rs, pss->pss_channel, block, offset, > + p, send_async); > } > > XBZRLE_cache_unlock(); > @@ -1422,10 +1426,10 @@ static int ram_save_page(RAMState *rs, > PageSearchStatus *pss) > return pages; > } > > -static int ram_save_multifd_page(RAMState *rs, RAMBlock *block, > +static int ram_save_multifd_page(QEMUFile *file, RAMBlock *block, > ram_addr_t offset) > { > - if (multifd_queue_page(rs->f, block, offset) < 0) { > + if (multifd_queue_page(file, block, offset) < 0) { > return -1; > } > ram_counters.normal++; > @@ -1720,7 +1724,7 @@ static int ram_save_release_protection(RAMState *rs, > PageSearchStatus *pss, > uint64_t run_length = (pss->page - start_page) << TARGET_PAGE_BITS; > > /* Flush async buffers before un-protect. */ > - qemu_fflush(rs->f); > + qemu_fflush(pss->pss_channel); > /* Un-protect memory range. */ > res = uffd_change_protection(rs->uffdio_fd, page_address, run_length, > false, false); > @@ -2307,7 +2311,7 @@ static int ram_save_target_page(RAMState *rs, > PageSearchStatus *pss) > ram_addr_t offset = ((ram_addr_t)pss->page) << TARGET_PAGE_BITS; > int res; > > - if (control_save_page(rs, block, offset, &res)) { > + if (control_save_page(pss, block, offset, &res)) { > return res; > } > > @@ -2315,7 +2319,7 @@ static int ram_save_target_page(RAMState *rs, > PageSearchStatus *pss) > return 1; > } > > - res = save_zero_page(rs, block, offset); > + res = save_zero_page(rs, pss->pss_channel, block, offset); > if (res > 0) { > /* Must let xbzrle know, otherwise a previous (now 0'd) cached > * page would be stale > @@ -2336,7 +2340,7 @@ static int ram_save_target_page(RAMState *rs, > PageSearchStatus *pss) > */ > if (!save_page_use_compression(rs) && migrate_use_multifd() > && !migration_in_postcopy()) { > - return ram_save_multifd_page(rs, block, offset); > + return ram_save_multifd_page(pss->pss_channel, block, offset); > } > > return ram_save_page(rs, pss); > @@ -2533,10 +2537,6 @@ static int ram_save_host_page(RAMState *rs, > PageSearchStatus *pss) > return 0; > } > > - if (postcopy_preempt_active()) { > - postcopy_preempt_choose_channel(rs, pss); > - } > - > /* Update host page boundary information */ > pss_host_page_prepare(pss); > > @@ -2597,7 +2597,7 @@ static int ram_save_host_page(RAMState *rs, > PageSearchStatus *pss) > * explicit flush or it won't flush until the buffer is full. > */ > if (migrate_postcopy_preempt() && pss->postcopy_requested) { > - qemu_fflush(rs->f); > + qemu_fflush(pss->pss_channel); > } > > res = ram_save_release_protection(rs, pss, start_page); > @@ -2663,6 +2663,12 @@ static int ram_find_and_save_block(RAMState *rs) > } > > if (found) { > + /* Update rs->f with correct channel */ > + if (postcopy_preempt_active()) { > + postcopy_preempt_choose_channel(rs, &pss); > + } > + /* Cache rs->f in pss_channel (TODO: remove rs->f) */ > + pss.pss_channel = rs->f; > pages = ram_save_host_page(rs, &pss); > } > } while (!pages && again); > -- > 2.32.0 > -- Dr. David Alan Gilbert / dgilb...@redhat.com / Manchester, UK