On Mon, Jul 17, 2017 at 03:42:38PM +0200, Juan Quintela wrote: > Each time that we sync the bitmap, it is a possiblity that we receive > a page that is being processed by a different thread. We fix this > problem just making sure that we wait for all receiving threads to > finish its work before we procedeed with the next stage. > > We are low on page flags, so we use a combination that is not valid to > emit that message: MULTIFD_PAGE and COMPRESSED. > > I tried to make a migration command for it, but it don't work because > we sync the bitmap sometimes when we have already sent the beggining > of the section, so I just added a new page flag. > > Signed-off-by: Juan Quintela <quint...@redhat.com> > --- > migration/ram.c | 57 > ++++++++++++++++++++++++++++++++++++++++++++++++++++++++- > 1 file changed, 56 insertions(+), 1 deletion(-) > > diff --git a/migration/ram.c b/migration/ram.c > index c78b286..bffe204 100644 > --- a/migration/ram.c > +++ b/migration/ram.c > @@ -71,6 +71,12 @@ > #define RAM_SAVE_FLAG_COMPRESS_PAGE 0x100 > #define RAM_SAVE_FLAG_MULTIFD_PAGE 0x200 > > +/* We are getting low on pages flags, so we start using combinations > + When we need to flush a page, we sent it as > + RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_COMPRESS_PAGE > + We don't allow that combination > +*/ > + > static inline bool is_zero_range(uint8_t *p, uint64_t size) > { > return buffer_is_zero(p, size); > @@ -193,6 +199,9 @@ struct RAMState { > uint64_t iterations_prev; > /* Iterations since start */ > uint64_t iterations; > + /* Indicates if we have synced the bitmap and we need to assure that > + target has processeed all previous pages */ > + bool multifd_needs_flush; > /* protects modification of the bitmap */ > uint64_t migration_dirty_pages; > /* number of dirty bits in the bitmap */ > @@ -363,7 +372,6 @@ static void compress_threads_save_setup(void) > > /* Multiple fd's */ > > - > typedef struct { > int num; > int size; > @@ -595,9 +603,11 @@ struct MultiFDRecvParams { > QIOChannel *c; > QemuSemaphore ready; > QemuSemaphore sem; > + QemuCond cond_sync; > QemuMutex mutex; > /* proteced by param mutex */ > bool quit; > + bool sync; > multifd_pages_t pages; > bool done; > }; > @@ -637,6 +647,7 @@ void multifd_load_cleanup(void) > qemu_thread_join(&p->thread); > qemu_mutex_destroy(&p->mutex); > qemu_sem_destroy(&p->sem); > + qemu_cond_destroy(&p->cond_sync); > socket_recv_channel_destroy(p->c); > g_free(p); > multifd_recv_state->params[i] = NULL; > @@ -675,6 +686,10 @@ static void *multifd_recv_thread(void *opaque) > return NULL; > } > p->done = true; > + if (p->sync) { > + qemu_cond_signal(&p->cond_sync); > + p->sync = false; > + }
Could we use the same p->ready for this purpose? They looks similar: all we want to do is to let the main thread know "worker thread has finished receiving the last piece and becomes idle again", right? > qemu_mutex_unlock(&p->mutex); > qemu_sem_post(&p->ready); > continue; > @@ -724,9 +739,11 @@ gboolean multifd_new_channel(QIOChannel *ioc) > qemu_mutex_init(&p->mutex); > qemu_sem_init(&p->sem, 0); > qemu_sem_init(&p->ready, 0); > + qemu_cond_init(&p->cond_sync); > p->quit = false; > p->id = id; > p->done = false; > + p->sync = false; > multifd_init_group(&p->pages); > p->c = ioc; > atomic_set(&multifd_recv_state->params[id], p); > @@ -792,6 +809,27 @@ static void multifd_recv_page(uint8_t *address, uint16_t > fd_num) > qemu_sem_post(&p->sem); > } > > +static int multifd_flush(void) > +{ > + int i, thread_count; > + > + if (!migrate_use_multifd()) { > + return 0; > + } > + thread_count = migrate_multifd_threads(); > + for (i = 0; i < thread_count; i++) { > + MultiFDRecvParams *p = multifd_recv_state->params[i]; > + > + qemu_mutex_lock(&p->mutex); > + while (!p->done) { > + p->sync = true; > + qemu_cond_wait(&p->cond_sync, &p->mutex); (similar comment like above) > + } > + qemu_mutex_unlock(&p->mutex); > + } > + return 0; > +} > + > /** > * save_page_header: write page header to wire > * > @@ -809,6 +847,12 @@ static size_t save_page_header(RAMState *rs, QEMUFile > *f, RAMBlock *block, > { > size_t size, len; > > + if (rs->multifd_needs_flush && > + (offset & RAM_SAVE_FLAG_MULTIFD_PAGE)) { If multifd_needs_flush is only for multifd, then we may skip this check, but it looks more like an assertion: if (rs->multifd_needs_flush) { assert(offset & RAM_SAVE_FLAG_MULTIFD_PAGE); offset |= RAM_SAVE_FLAG_ZERO; } (Dave mentioned about unaligned flag used in commit message and here: ZERO is used, but COMPRESS is mentioned) > + offset |= RAM_SAVE_FLAG_ZERO; > + rs->multifd_needs_flush = false; > + } > + > if (block == rs->last_sent_block) { > offset |= RAM_SAVE_FLAG_CONTINUE; > } > @@ -2496,6 +2540,9 @@ static int ram_save_complete(QEMUFile *f, void *opaque) > > if (!migration_in_postcopy()) { > migration_bitmap_sync(rs); > + if (migrate_use_multifd()) { > + rs->multifd_needs_flush = true; > + } Would it be good to move this block into entry of migration_bitmap_sync(), instead of setting it up at the callers of migration_bitmap_sync()? > } > > ram_control_before_iterate(f, RAM_CONTROL_FINISH); > @@ -2538,6 +2585,9 @@ static void ram_save_pending(QEMUFile *f, void *opaque, > uint64_t max_size, > qemu_mutex_lock_iothread(); > rcu_read_lock(); > migration_bitmap_sync(rs); > + if (migrate_use_multifd()) { > + rs->multifd_needs_flush = true; > + } > rcu_read_unlock(); > qemu_mutex_unlock_iothread(); > remaining_size = rs->migration_dirty_pages * TARGET_PAGE_SIZE; > @@ -3012,6 +3062,11 @@ static int ram_load(QEMUFile *f, void *opaque, int > version_id) > break; > } > > + if ((flags & (RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_ZERO)) > + == (RAM_SAVE_FLAG_MULTIFD_PAGE | RAM_SAVE_FLAG_ZERO)) { > + multifd_flush(); > + flags = flags & ~RAM_SAVE_FLAG_ZERO; > + } > if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE | > RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE | > RAM_SAVE_FLAG_MULTIFD_PAGE)) { > -- > 2.9.4 > Thanks, -- Peter Xu