On Mon, Jul 17, 2017 at 03:42:34PM +0200, Juan Quintela wrote: [...]
> struct MultiFDRecvParams { > + /* not changed */ > uint8_t id; > QemuThread thread; > QIOChannel *c; > + QemuSemaphore ready; > QemuSemaphore sem; > QemuMutex mutex; > + /* proteced by param mutex */ > bool quit; > + multifd_pages_t pages; > + bool done; (Again, I am thinking whether we can get rid of this "done" field just like the comment I left in sending part, but I'll wait to see how that discussion goes in case I missed anything, so will skip it here for now...) > }; > typedef struct MultiFDRecvParams MultiFDRecvParams; > > @@ -629,12 +637,20 @@ static void *multifd_recv_thread(void *opaque) > { > MultiFDRecvParams *p = opaque; > > + qemu_sem_post(&p->ready); > while (true) { > qemu_mutex_lock(&p->mutex); > if (p->quit) { > qemu_mutex_unlock(&p->mutex); > break; > } > + if (p->pages.num) { > + p->pages.num = 0; > + p->done = true; > + qemu_mutex_unlock(&p->mutex); > + qemu_sem_post(&p->ready); > + continue; > + } > qemu_mutex_unlock(&p->mutex); > qemu_sem_wait(&p->sem); > } > @@ -679,8 +695,11 @@ gboolean multifd_new_channel(QIOChannel *ioc) > } > qemu_mutex_init(&p->mutex); > qemu_sem_init(&p->sem, 0); > + qemu_sem_init(&p->ready, 0); > p->quit = false; > p->id = id; > + p->done = false; > + multifd_init_group(&p->pages); > p->c = ioc; > atomic_set(&multifd_recv_state->params[id], p); > qemu_thread_create(&p->thread, "multifd_recv", multifd_recv_thread, p, > @@ -709,6 +728,42 @@ int multifd_load_setup(void) > return 0; > } > > +static void multifd_recv_page(uint8_t *address, uint16_t fd_num) > +{ > + int thread_count; > + MultiFDRecvParams *p; > + static multifd_pages_t pages; > + static bool once; > + > + if (!once) { > + multifd_init_group(&pages); > + once = true; > + } > + > + pages.iov[pages.num].iov_base = address; > + pages.iov[pages.num].iov_len = TARGET_PAGE_SIZE; > + pages.num++; > + > + if (fd_num == UINT16_MAX) { (so this check is slightly mistery as well if we don't define something... O:-) > + return; > + } > + > + thread_count = migrate_multifd_threads(); > + assert(fd_num < thread_count); > + p = multifd_recv_state->params[fd_num]; > + > + qemu_sem_wait(&p->ready); Shall we check for p->pages.num == 0 before wait? What if the corresponding thread is already finished its old work and ready? > + > + qemu_mutex_lock(&p->mutex); > + p->done = false; > + iov_copy(p->pages.iov, pages.num, pages.iov, pages.num, 0, > + iov_size(pages.iov, pages.num)); Question: any reason why we don't use the same for loop in multifd-send codes, and just copy the IOVs in that loop? (offset is always zero, and we are copying the whole thing after all) > + p->pages.num = pages.num; > + pages.num = 0; > + qemu_mutex_unlock(&p->mutex); > + qemu_sem_post(&p->sem); > +} > + > /** > * save_page_header: write page header to wire > * > @@ -1155,7 +1210,7 @@ static int ram_multifd_page(RAMState *rs, > PageSearchStatus *pss, > ram_counters.transferred += > save_page_header(rs, rs->f, block, > offset | RAM_SAVE_FLAG_MULTIFD_PAGE); > - fd_num = multifd_send_page(p); > + fd_num = multifd_send_page(p, rs->migration_dirty_pages == 1); > qemu_put_be16(rs->f, fd_num); > ram_counters.transferred += 2; /* size of fd_num */ > qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE); > @@ -3020,10 +3075,7 @@ static int ram_load(QEMUFile *f, void *opaque, int > version_id) > > case RAM_SAVE_FLAG_MULTIFD_PAGE: > fd_num = qemu_get_be16(f); > - if (fd_num != 0) { > - /* this is yet an unused variable, changed later */ > - fd_num = fd_num; > - } > + multifd_recv_page(host, fd_num); > qemu_get_buffer(f, host, TARGET_PAGE_SIZE); > break; > > -- > 2.9.4 > Thanks, -- Peter Xu