Yichen Wang <yichen.w...@bytedance.com> writes: > On Tue, Dec 17, 2024 at 9:56 AM Fabiano Rosas <faro...@suse.de> wrote: >> >> Yichen Wang <yichen.w...@bytedance.com> writes: >> >> > From: Hao Xiang <hao.xi...@linux.dev> >> > >> > Multifd sender path gets an array of pages queued by the migration >> > thread. It performs zero page checking on every page in the array. >> > The pages are classfied as either a zero page or a normal page. This >> > change uses Intel DSA to offload the zero page checking from CPU to >> > the DSA accelerator. The sender thread submits a batch of pages to DSA >> > hardware and waits for the DSA completion thread to signal for work >> > completion. >> > >> > Signed-off-by: Hao Xiang <hao.xi...@linux.dev> >> > Signed-off-by: Yichen Wang <yichen.w...@bytedance.com> >> > --- >> > migration/multifd-zero-page.c | 149 ++++++++++++++++++++++++++++++---- >> > migration/multifd.c | 15 +++- >> > migration/multifd.h | 6 ++ >> > migration/options.c | 13 +++ >> > migration/options.h | 1 + >> > 5 files changed, 168 insertions(+), 16 deletions(-) >> > >> > diff --git a/migration/multifd-zero-page.c b/migration/multifd-zero-page.c >> > index f1e988a959..08e7fc3d92 100644 >> > --- a/migration/multifd-zero-page.c >> > +++ b/migration/multifd-zero-page.c >> > @@ -21,7 +21,9 @@ >> > >> > static bool multifd_zero_page_enabled(void) >> > { >> > - return migrate_zero_page_detection() == ZERO_PAGE_DETECTION_MULTIFD; >> > + ZeroPageDetection curMethod = migrate_zero_page_detection(); >> > + return (curMethod == ZERO_PAGE_DETECTION_MULTIFD || >> > + curMethod == ZERO_PAGE_DETECTION_DSA_ACCEL); >> > } >> > >> > static void swap_page_offset(ram_addr_t *pages_offset, int a, int b) >> > @@ -37,26 +39,49 @@ static void swap_page_offset(ram_addr_t *pages_offset, >> > int a, int b) >> > pages_offset[b] = temp; >> > } >> > >> > +#ifdef CONFIG_DSA_OPT >> > + >> > +static void swap_result(bool *results, int a, int b) >> > +{ >> > + bool temp; >> > + >> > + if (a == b) { >> > + return; >> > + } >> > + >> > + temp = results[a]; >> > + results[a] = results[b]; >> > + results[b] = temp; >> > +} >> > + >> > /** >> > - * multifd_send_zero_page_detect: Perform zero page detection on all >> > pages. >> > + * zero_page_detect_dsa: Perform zero page detection using >> > + * Intel Data Streaming Accelerator (DSA). >> > * >> > - * Sorts normal pages before zero pages in p->pages->offset and updates >> > - * p->pages->normal_num. >> > + * Sorts normal pages before zero pages in pages->offset and updates >> > + * pages->normal_num. >> > * >> > * @param p A pointer to the send params. >> > */ >> > -void multifd_send_zero_page_detect(MultiFDSendParams *p) >> > +static void zero_page_detect_dsa(MultiFDSendParams *p) >> > { >> > MultiFDPages_t *pages = &p->data->u.ram; >> > RAMBlock *rb = pages->block; >> > - int i = 0; >> > - int j = pages->num - 1; >> > + bool *results = p->dsa_batch_task->results; >> > >> > - if (!multifd_zero_page_enabled()) { >> > - pages->normal_num = pages->num; >> > - goto out; >> > + for (int i = 0; i < pages->num; i++) { >> > + p->dsa_batch_task->addr[i] = >> > + (ram_addr_t)(rb->host + pages->offset[i]); >> > } >> > >> > + buffer_is_zero_dsa_batch_sync(p->dsa_batch_task, >> > + (const void **)p->dsa_batch_task->addr, >> > + pages->num, >> > + multifd_ram_page_size()); >> > + >> > + int i = 0; >> > + int j = pages->num - 1; >> > + >> > /* >> > * Sort the page offset array by moving all normal pages to >> > * the left and all zero pages to the right of the array. >> > @@ -64,23 +89,59 @@ void multifd_send_zero_page_detect(MultiFDSendParams >> > *p) >> > while (i <= j) { >> > uint64_t offset = pages->offset[i]; >> > >> > - if (!buffer_is_zero(rb->host + offset, multifd_ram_page_size())) { >> > + if (!results[i]) { >> > i++; >> > continue; >> > } >> > >> > + swap_result(results, i, j); >> > swap_page_offset(pages->offset, i, j); >> > ram_release_page(rb->idstr, offset); >> > j--; >> > } >> > >> > pages->normal_num = i; >> > +} >> > >> > -out: >> > - stat64_add(&mig_stats.normal_pages, pages->normal_num); >> > - stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num); >> > +int multifd_dsa_setup(MigrationState *s, Error *local_err) >> > +{ >> > + g_autofree strList *dsa_parameter = g_malloc0(sizeof(strList)); >> > + migrate_dsa_accel_path(dsa_parameter); >> > + if (qemu_dsa_init(dsa_parameter, &local_err)) { >> > + migrate_set_error(s, local_err); >> > + return -1; >> > + } else { >> > + qemu_dsa_start(); >> > + } >> > + >> > + return 0; >> > +} >> > + >> > +void multifd_dsa_cleanup(void) >> > +{ >> > + qemu_dsa_cleanup(); >> > +} >> > + >> > +#else >> > + >> > +static void zero_page_detect_dsa(MultiFDSendParams *p) >> > +{ >> > + g_assert_not_reached(); >> > } >> > >> > +int multifd_dsa_setup(MigrationState *s, Error *local_err) >> > +{ >> > + g_assert_not_reached(); >> > + return -1; >> > +} >> > + >> > +void multifd_dsa_cleanup(void) >> > +{ >> > + return ; >> > +} >> > + >> > +#endif >> > + >> > void multifd_recv_zero_page_process(MultiFDRecvParams *p) >> > { >> > for (int i = 0; i < p->zero_num; i++) { >> > @@ -92,3 +153,63 @@ void multifd_recv_zero_page_process(MultiFDRecvParams >> > *p) >> > } >> > } >> > } >> > + >> > +/** >> > + * zero_page_detect_cpu: Perform zero page detection using CPU. >> > + * >> > + * Sorts normal pages before zero pages in p->pages->offset and updates >> > + * p->pages->normal_num. >> > + * >> > + * @param p A pointer to the send params. >> > + */ >> > +static void zero_page_detect_cpu(MultiFDSendParams *p) >> > +{ >> > + MultiFDPages_t *pages = &p->data->u.ram; >> > + RAMBlock *rb = pages->block; >> > + int i = 0; >> > + int j = pages->num - 1; >> > + >> > + /* >> > + * Sort the page offset array by moving all normal pages to >> > + * the left and all zero pages to the right of the array. >> > + */ >> > + while (i <= j) { >> > + uint64_t offset = pages->offset[i]; >> > + >> > + if (!buffer_is_zero(rb->host + offset, multifd_ram_page_size())) { >> > + i++; >> > + continue; >> > + } >> > + >> > + swap_page_offset(pages->offset, i, j); >> > + ram_release_page(rb->idstr, offset); >> > + j--; >> > + } >> > + >> > + pages->normal_num = i; >> > +} >> > + >> > +/** >> > + * multifd_send_zero_page_detect: Perform zero page detection on all >> > pages. >> > + * >> > + * @param p A pointer to the send params. >> > + */ >> > +void multifd_send_zero_page_detect(MultiFDSendParams *p) >> > +{ >> > + MultiFDPages_t *pages = &p->data->u.ram; >> > + >> > + if (!multifd_zero_page_enabled()) { >> > + pages->normal_num = pages->num; >> > + goto out; >> > + } >> > + >> > + if (qemu_dsa_is_running()) { >> > + zero_page_detect_dsa(p); >> > + } else { >> > + zero_page_detect_cpu(p); >> > + } >> > + >> > +out: >> > + stat64_add(&mig_stats.normal_pages, pages->normal_num); >> > + stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num); >> > +} >> > diff --git a/migration/multifd.c b/migration/multifd.c >> > index 498e71fd10..946144fc2f 100644 >> > --- a/migration/multifd.c >> > +++ b/migration/multifd.c >> > @@ -13,6 +13,7 @@ >> > #include "qemu/osdep.h" >> > #include "qemu/cutils.h" >> > #include "qemu/rcu.h" >> > +#include "qemu/dsa.h" >> > #include "exec/target_page.h" >> > #include "sysemu/sysemu.h" >> > #include "exec/ramblock.h" >> > @@ -462,6 +463,8 @@ static bool >> > multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) >> > p->name = NULL; >> > g_free(p->data); >> > p->data = NULL; >> > + buffer_zero_batch_task_destroy(p->dsa_batch_task); >> > + p->dsa_batch_task = NULL; >> > p->packet_len = 0; >> > g_free(p->packet); >> > p->packet = NULL; >> > @@ -493,6 +496,8 @@ void multifd_send_shutdown(void) >> > >> > multifd_send_terminate_threads(); >> > >> > + multifd_dsa_cleanup(); >> > + >> > for (i = 0; i < migrate_multifd_channels(); i++) { >> > MultiFDSendParams *p = &multifd_send_state->params[i]; >> > Error *local_err = NULL; >> > @@ -814,11 +819,17 @@ bool multifd_send_setup(void) >> > uint32_t page_count = multifd_ram_page_count(); >> > bool use_packets = multifd_use_packets(); >> > uint8_t i; >> > + Error *local_err = NULL; >> > >> > if (!migrate_multifd()) { >> > return true; >> > } >> > >> > + if (s && >> > + s->parameters.zero_page_detection == >> > ZERO_PAGE_DETECTION_DSA_ACCEL) { >> > + ret = multifd_dsa_setup(s, local_err); >> >> This leaves the local_err set and will cause an assert if the code below >> tries to set it again. We should at the very least report the error here >> and free it. But that's not ideal because it allows the code to >> continue. >> >> Can you move this whole block after the channels loop as suggested >> previously but this time take the p->dsa_batch_task along and put it the >> ->send_setup() loop? >> > > I am trying to make this change. If I put the p->dsa_batch_task into > the multifd_dsa_setup() in multifd-zero-page.c, things are a little > tricky. I will need to pass thread_count and multifd_send_state into > multifd_dsa_setup(), but multifd_send_state is a multifd.c only data > structure/variable. I have to make the whole struct into multifd.h, > and not sure if that is a good idea?
I meant something simpler: + if (s && + s->parameters.zero_page_detection == ZERO_PAGE_DETECTION_DSA_ACCEL) { + ret = multifd_dsa_setup(s, local_err); + } if (ret) { goto err; } for (i = 0; i < thread_count; i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; Error *local_err = NULL; + p->dsa_batch_task = buffer_zero_batch_task_init(page_count); ret = multifd_send_state->ops->send_setup(p, &local_err); if (ret) { migrate_set_error(s, local_err); goto err; } assert(p->iov); } > >> > + } >> > + >> > thread_count = migrate_multifd_channels(); >> > multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); >> > multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); >> > @@ -829,12 +840,12 @@ bool multifd_send_setup(void) >> > >> > for (i = 0; i < thread_count; i++) { >> > MultiFDSendParams *p = &multifd_send_state->params[i]; >> > - Error *local_err = NULL; >> > >> > qemu_sem_init(&p->sem, 0); >> > qemu_sem_init(&p->sem_sync, 0); >> > p->id = i; >> > p->data = multifd_send_data_alloc(); >> > + p->dsa_batch_task = buffer_zero_batch_task_init(page_count); >> > >> > if (use_packets) { >> > p->packet_len = sizeof(MultiFDPacket_t) >> > @@ -865,7 +876,6 @@ bool multifd_send_setup(void) >> > >> > for (i = 0; i < thread_count; i++) { >> > MultiFDSendParams *p = &multifd_send_state->params[i]; >> > - Error *local_err = NULL; >> > >> > ret = multifd_send_state->ops->send_setup(p, &local_err); >> > if (ret) { >> > @@ -1047,6 +1057,7 @@ void multifd_recv_cleanup(void) >> > qemu_thread_join(&p->thread); >> > } >> > } >> > + multifd_dsa_cleanup(); >> > for (i = 0; i < migrate_multifd_channels(); i++) { >> > multifd_recv_cleanup_channel(&multifd_recv_state->params[i]); >> > } >> > diff --git a/migration/multifd.h b/migration/multifd.h >> > index 50d58c0c9c..da53b0bdfd 100644 >> > --- a/migration/multifd.h >> > +++ b/migration/multifd.h >> > @@ -15,6 +15,7 @@ >> > >> > #include "exec/target_page.h" >> > #include "ram.h" >> > +#include "qemu/dsa.h" >> > >> > typedef struct MultiFDRecvData MultiFDRecvData; >> > typedef struct MultiFDSendData MultiFDSendData; >> > @@ -155,6 +156,9 @@ typedef struct { >> > bool pending_sync; >> > MultiFDSendData *data; >> > >> > + /* Zero page checking batch task */ >> > + QemuDsaBatchTask *dsa_batch_task; >> > + >> > /* thread local variables. No locking required */ >> > >> > /* pointer to the packet */ >> > @@ -313,6 +317,8 @@ void multifd_send_fill_packet(MultiFDSendParams *p); >> > bool multifd_send_prepare_common(MultiFDSendParams *p); >> > void multifd_send_zero_page_detect(MultiFDSendParams *p); >> > void multifd_recv_zero_page_process(MultiFDRecvParams *p); >> > +int multifd_dsa_setup(MigrationState *s, Error *local_err); >> > +void multifd_dsa_cleanup(void); >> > >> > static inline void multifd_send_prepare_header(MultiFDSendParams *p) >> > { >> > diff --git a/migration/options.c b/migration/options.c >> > index ca89fdc4f4..cc40d3dfea 100644 >> > --- a/migration/options.c >> > +++ b/migration/options.c >> > @@ -817,6 +817,19 @@ const strList *migrate_accel_path(void) >> > return s->parameters.accel_path; >> > } >> > >> > +void migrate_dsa_accel_path(strList *dsa_accel_path) >> > +{ >> > + MigrationState *s = migrate_get_current(); >> > + strList *accel_path = s->parameters.accel_path; >> > + strList **tail = &dsa_accel_path; >> > + while (accel_path) { >> > + if (strncmp(accel_path->value, "dsa:", 4) == 0) { >> > + QAPI_LIST_APPEND(tail, &accel_path->value[4]); >> > + } >> > + accel_path = accel_path->next; >> > + } >> > +} >> > + >> > const char *migrate_tls_hostname(void) >> > { >> > MigrationState *s = migrate_get_current(); >> > diff --git a/migration/options.h b/migration/options.h >> > index 3d1e91dc52..5e34b7c997 100644 >> > --- a/migration/options.h >> > +++ b/migration/options.h >> > @@ -85,6 +85,7 @@ const char *migrate_tls_hostname(void); >> > uint64_t migrate_xbzrle_cache_size(void); >> > ZeroPageDetection migrate_zero_page_detection(void); >> > const strList *migrate_accel_path(void); >> > +void migrate_dsa_accel_path(strList *dsa_accel_path); >> > >> > /* parameters helpers */