Peter Xu <pet...@redhat.com> wrote: > On Thu, Oct 19, 2023 at 08:41:26PM +0200, Juan Quintela wrote: >> We can changing pending_job to a bool if you preffer. I think that we >> have nailed all the off_by_one errors by now (famous last words). > > Would it work to make pending_job a bool, even with SYNC? It seems to me > multifd_send_sync_main() now can boost pending_job even if pending_job==1.
Then a int is ok, I think. > That's also the place where I really think confusing too; where it looks > like the migration thread can modify a pending job's flag as long as it is > fast enough before the send thread put that onto the wire. It never does. for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { qemu_mutex_lock(&p->mutex); ... if (!p->pending_job) { p->pending_job++; next_channel = (i + 1) % migrate_multifd_channels(); break; } qemu_mutex_unlock(&p->mutex); } If pending_job == 0 -> owner of the channel is migration_thread and it can use it. If pending_job > 0 -> owner of the channel is the channel thread and migration_thread can't use it. I think that this is easy to understand. You are right that it is not _explained_. And clearly, if you have to ask, it is not obvious O:-) Yes, it was obvious to me, that is the reason why I wrote it on the 1st place. Notice also that it is a common idiom in multithreaded apps. That allows it to do stuff without having to have a mutex locked, so other threads can "look" into the state. > Then it's > unpredictable whether the SYNC flag will be sent with current packet (where > due to pending_jobs==1 already, can contain valid pages), or will be only > set for the next one (where there will have 0 real page). I have to think about this one. Decrease pending_jobs there if we are doing multiple jobs? But we still have the issue of the semaphore. > IMHO it'll be good to separate the sync task, then we can change > pending_jobs to bool. Something like: > > bool pending_send_page; > bool pending_send_sync; current code: qemu_mutex_lock(&p->mutex); qemu_mutex_lock(&p->mutex); if (p->pending_job) { uint64_t packet_num = p->packet_num; uint32_t flags; p->normal_num = 0; if (use_zero_copy_send) { p->iovs_num = 0; } else { p->iovs_num = 1; } for (int i = 0; i < p->pages->num; i++) { p->normal[p->normal_num] = p->pages->offset[i]; p->normal_num++; } if (p->normal_num) { ret = multifd_send_state->ops->send_prepare(p, &local_err); if (ret != 0) { qemu_mutex_unlock(&p->mutex); break; } } multifd_send_fill_packet(p); flags = p->flags; p->flags = 0; p->num_packets++; p->total_normal_pages += p->normal_num; p->pages->num = 0; p->pages->block = NULL; qemu_mutex_unlock(&p->mutex); trace_multifd_send(p->id, packet_num, p->normal_num, flags, p->next_packet_size); if (use_zero_copy_send) { /* Send header first, without zerocopy */ ret = qio_channel_write_all(p->c, (void *)p->packet, p->packet_len, &local_err); if (ret != 0) { break; } } else { /* Send header using the same writev call */ p->iov[0].iov_len = p->packet_len; p->iov[0].iov_base = p->packet; } ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL, 0, p->write_flags, &local_err); if (ret != 0) { break; } stat64_add(&mig_stats.multifd_bytes, p->next_packet_size + p->packet_len); stat64_add(&mig_stats.transferred, p->next_packet_size + p->packet_len); p->next_packet_size = 0; qemu_mutex_lock(&p->mutex); p->pending_job--; qemu_mutex_unlock(&p->mutex); if (flags & MULTIFD_FLAG_SYNC) { qemu_sem_post(&p->sem_sync); } } else { qemu_mutex_unlock(&p->mutex); /* sometimes there are spurious wakeups */ } Your suggested change: qemu_mutex_lock(&p->mutex); if (p->pending_job_page) { uint64_t packet_num = p->packet_num; uint32_t flags; p->normal_num = 0; if (use_zero_copy_send) { p->iovs_num = 0; } else { p->iovs_num = 1; } for (int i = 0; i < p->pages->num; i++) { p->normal[p->normal_num] = p->pages->offset[i]; p->normal_num++; } if (p->normal_num) { ret = multifd_send_state->ops->send_prepare(p, &local_err); if (ret != 0) { qemu_mutex_unlock(&p->mutex); break; } } multifd_send_fill_packet(p); flags = p->flags; p->flags = 0; p->num_packets++; p->total_normal_pages += p->normal_num; p->pages->num = 0; p->pages->block = NULL; qemu_mutex_unlock(&p->mutex); trace_multifd_send(p->id, packet_num, p->normal_num, flags, p->next_packet_size); if (use_zero_copy_send) { /* Send header first, without zerocopy */ ret = qio_channel_write_all(p->c, (void *)p->packet, p->packet_len, &local_err); if (ret != 0) { break; } } else { /* Send header using the same writev call */ p->iov[0].iov_len = p->packet_len; p->iov[0].iov_base = p->packet; } ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL, 0, p->write_flags, &local_err); if (ret != 0) { break; } stat64_add(&mig_stats.multifd_bytes, p->next_packet_size + p->packet_len); stat64_add(&mig_stats.transferred, p->next_packet_size + p->packet_len); p->next_packet_size = 0; qemu_mutex_lock(&p->mutex); p->pending_job_page = false; qemu_mutex_unlock(&p->mutex); else if (p->pending_job_sync) uint64_t packet_num = p->packet_num; uint32_t flags; p->normal_num = 0; if (use_zero_copy_send) { p->iovs_num = 0; } else { p->iovs_num = 1; } multifd_send_fill_packet(p); flags = p->flags; p->flags = 0; p->num_packets++; p->total_normal_pages += p->normal_num; p->pages->num = 0; p->pages->block = NULL; qemu_mutex_unlock(&p->mutex); trace_multifd_send(p->id, packet_num, p->normal_num, flags, p->next_packet_size); if (use_zero_copy_send) { /* Send header first, without zerocopy */ ret = qio_channel_write_all(p->c, (void *)p->packet, p->packet_len, &local_err); if (ret != 0) { break; } } else { /* Send header using the same writev call */ p->iov[0].iov_len = p->packet_len; p->iov[0].iov_base = p->packet; } ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL, 0, p->write_flags, &local_err); if (ret != 0) { break; } stat64_add(&mig_stats.multifd_bytes, p->next_packet_size + p->packet_len); stat64_add(&mig_stats.transferred, p->next_packet_size + p->packet_len); p->next_packet_size = 0; qemu_mutex_lock(&p->mutex); p->pending_job_sync = false; qemu_mutex_unlock(&p->mutex); if (flags & MULTIFD_FLAG_SYNC) { qemu_sem_post(&p->sem_sync); } } else { qemu_mutex_unlock(&p->mutex); /* sometimes there are spurious wakeups */ } I.e. we duplicate much more code than the one that we remove. I am not convinced. > Then multifd_send_thread() handles them separately, only attaching > p->flags=SYNC when pending_send_sync is requested. It guarantees a SYNC > message will always be a separate packet, which will be crystal clear then. This is not a requirement. Code should handle the reception of SYNC with a page. We just don't sent them because it is more complex.