Yichen Wang <yichen.w...@bytedance.com> writes:

> On Tue, Dec 17, 2024 at 9:56 AM Fabiano Rosas <faro...@suse.de> wrote:
>>
>> Yichen Wang <yichen.w...@bytedance.com> writes:
>>
>> > From: Hao Xiang <hao.xi...@linux.dev>
>> >
>> > Multifd sender path gets an array of pages queued by the migration
>> > thread. It performs zero page checking on every page in the array.
>> > The pages are classfied as either a zero page or a normal page. This
>> > change uses Intel DSA to offload the zero page checking from CPU to
>> > the DSA accelerator. The sender thread submits a batch of pages to DSA
>> > hardware and waits for the DSA completion thread to signal for work
>> > completion.
>> >
>> > Signed-off-by: Hao Xiang <hao.xi...@linux.dev>
>> > Signed-off-by: Yichen Wang <yichen.w...@bytedance.com>
>> > ---
>> >  migration/multifd-zero-page.c | 149 ++++++++++++++++++++++++++++++----
>> >  migration/multifd.c           |  15 +++-
>> >  migration/multifd.h           |   6 ++
>> >  migration/options.c           |  13 +++
>> >  migration/options.h           |   1 +
>> >  5 files changed, 168 insertions(+), 16 deletions(-)
>> >
>> > diff --git a/migration/multifd-zero-page.c b/migration/multifd-zero-page.c
>> > index f1e988a959..08e7fc3d92 100644
>> > --- a/migration/multifd-zero-page.c
>> > +++ b/migration/multifd-zero-page.c
>> > @@ -21,7 +21,9 @@
>> >
>> >  static bool multifd_zero_page_enabled(void)
>> >  {
>> > -    return migrate_zero_page_detection() == ZERO_PAGE_DETECTION_MULTIFD;
>> > +    ZeroPageDetection curMethod = migrate_zero_page_detection();
>> > +    return (curMethod == ZERO_PAGE_DETECTION_MULTIFD ||
>> > +            curMethod == ZERO_PAGE_DETECTION_DSA_ACCEL);
>> >  }
>> >
>> >  static void swap_page_offset(ram_addr_t *pages_offset, int a, int b)
>> > @@ -37,26 +39,49 @@ static void swap_page_offset(ram_addr_t *pages_offset, 
>> > int a, int b)
>> >      pages_offset[b] = temp;
>> >  }
>> >
>> > +#ifdef CONFIG_DSA_OPT
>> > +
>> > +static void swap_result(bool *results, int a, int b)
>> > +{
>> > +    bool temp;
>> > +
>> > +    if (a == b) {
>> > +        return;
>> > +    }
>> > +
>> > +    temp = results[a];
>> > +    results[a] = results[b];
>> > +    results[b] = temp;
>> > +}
>> > +
>> >  /**
>> > - * multifd_send_zero_page_detect: Perform zero page detection on all 
>> > pages.
>> > + * zero_page_detect_dsa: Perform zero page detection using
>> > + * Intel Data Streaming Accelerator (DSA).
>> >   *
>> > - * Sorts normal pages before zero pages in p->pages->offset and updates
>> > - * p->pages->normal_num.
>> > + * Sorts normal pages before zero pages in pages->offset and updates
>> > + * pages->normal_num.
>> >   *
>> >   * @param p A pointer to the send params.
>> >   */
>> > -void multifd_send_zero_page_detect(MultiFDSendParams *p)
>> > +static void zero_page_detect_dsa(MultiFDSendParams *p)
>> >  {
>> >      MultiFDPages_t *pages = &p->data->u.ram;
>> >      RAMBlock *rb = pages->block;
>> > -    int i = 0;
>> > -    int j = pages->num - 1;
>> > +    bool *results = p->dsa_batch_task->results;
>> >
>> > -    if (!multifd_zero_page_enabled()) {
>> > -        pages->normal_num = pages->num;
>> > -        goto out;
>> > +    for (int i = 0; i < pages->num; i++) {
>> > +        p->dsa_batch_task->addr[i] =
>> > +            (ram_addr_t)(rb->host + pages->offset[i]);
>> >      }
>> >
>> > +    buffer_is_zero_dsa_batch_sync(p->dsa_batch_task,
>> > +                                  (const void **)p->dsa_batch_task->addr,
>> > +                                  pages->num,
>> > +                                  multifd_ram_page_size());
>> > +
>> > +    int i = 0;
>> > +    int j = pages->num - 1;
>> > +
>> >      /*
>> >       * Sort the page offset array by moving all normal pages to
>> >       * the left and all zero pages to the right of the array.
>> > @@ -64,23 +89,59 @@ void multifd_send_zero_page_detect(MultiFDSendParams 
>> > *p)
>> >      while (i <= j) {
>> >          uint64_t offset = pages->offset[i];
>> >
>> > -        if (!buffer_is_zero(rb->host + offset, multifd_ram_page_size())) {
>> > +        if (!results[i]) {
>> >              i++;
>> >              continue;
>> >          }
>> >
>> > +        swap_result(results, i, j);
>> >          swap_page_offset(pages->offset, i, j);
>> >          ram_release_page(rb->idstr, offset);
>> >          j--;
>> >      }
>> >
>> >      pages->normal_num = i;
>> > +}
>> >
>> > -out:
>> > -    stat64_add(&mig_stats.normal_pages, pages->normal_num);
>> > -    stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num);
>> > +int multifd_dsa_setup(MigrationState *s, Error *local_err)
>> > +{
>> > +    g_autofree strList *dsa_parameter = g_malloc0(sizeof(strList));
>> > +    migrate_dsa_accel_path(dsa_parameter);
>> > +    if (qemu_dsa_init(dsa_parameter, &local_err)) {
>> > +        migrate_set_error(s, local_err);
>> > +        return -1;
>> > +    } else {
>> > +        qemu_dsa_start();
>> > +    }
>> > +
>> > +    return 0;
>> > +}
>> > +
>> > +void multifd_dsa_cleanup(void)
>> > +{
>> > +    qemu_dsa_cleanup();
>> > +}
>> > +
>> > +#else
>> > +
>> > +static void zero_page_detect_dsa(MultiFDSendParams *p)
>> > +{
>> > +    g_assert_not_reached();
>> >  }
>> >
>> > +int multifd_dsa_setup(MigrationState *s, Error *local_err)
>> > +{
>> > +    g_assert_not_reached();
>> > +    return -1;
>> > +}
>> > +
>> > +void multifd_dsa_cleanup(void)
>> > +{
>> > +    return ;
>> > +}
>> > +
>> > +#endif
>> > +
>> >  void multifd_recv_zero_page_process(MultiFDRecvParams *p)
>> >  {
>> >      for (int i = 0; i < p->zero_num; i++) {
>> > @@ -92,3 +153,63 @@ void multifd_recv_zero_page_process(MultiFDRecvParams 
>> > *p)
>> >          }
>> >      }
>> >  }
>> > +
>> > +/**
>> > + * zero_page_detect_cpu: Perform zero page detection using CPU.
>> > + *
>> > + * Sorts normal pages before zero pages in p->pages->offset and updates
>> > + * p->pages->normal_num.
>> > + *
>> > + * @param p A pointer to the send params.
>> > + */
>> > +static void zero_page_detect_cpu(MultiFDSendParams *p)
>> > +{
>> > +    MultiFDPages_t *pages = &p->data->u.ram;
>> > +    RAMBlock *rb = pages->block;
>> > +    int i = 0;
>> > +    int j = pages->num - 1;
>> > +
>> > +    /*
>> > +     * Sort the page offset array by moving all normal pages to
>> > +     * the left and all zero pages to the right of the array.
>> > +     */
>> > +    while (i <= j) {
>> > +        uint64_t offset = pages->offset[i];
>> > +
>> > +        if (!buffer_is_zero(rb->host + offset, multifd_ram_page_size())) {
>> > +            i++;
>> > +            continue;
>> > +        }
>> > +
>> > +        swap_page_offset(pages->offset, i, j);
>> > +        ram_release_page(rb->idstr, offset);
>> > +        j--;
>> > +    }
>> > +
>> > +    pages->normal_num = i;
>> > +}
>> > +
>> > +/**
>> > + * multifd_send_zero_page_detect: Perform zero page detection on all 
>> > pages.
>> > + *
>> > + * @param p A pointer to the send params.
>> > + */
>> > +void multifd_send_zero_page_detect(MultiFDSendParams *p)
>> > +{
>> > +    MultiFDPages_t *pages = &p->data->u.ram;
>> > +
>> > +    if (!multifd_zero_page_enabled()) {
>> > +        pages->normal_num = pages->num;
>> > +        goto out;
>> > +    }
>> > +
>> > +    if (qemu_dsa_is_running()) {
>> > +        zero_page_detect_dsa(p);
>> > +    } else {
>> > +        zero_page_detect_cpu(p);
>> > +    }
>> > +
>> > +out:
>> > +    stat64_add(&mig_stats.normal_pages, pages->normal_num);
>> > +    stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num);
>> > +}
>> > diff --git a/migration/multifd.c b/migration/multifd.c
>> > index 498e71fd10..946144fc2f 100644
>> > --- a/migration/multifd.c
>> > +++ b/migration/multifd.c
>> > @@ -13,6 +13,7 @@
>> >  #include "qemu/osdep.h"
>> >  #include "qemu/cutils.h"
>> >  #include "qemu/rcu.h"
>> > +#include "qemu/dsa.h"
>> >  #include "exec/target_page.h"
>> >  #include "sysemu/sysemu.h"
>> >  #include "exec/ramblock.h"
>> > @@ -462,6 +463,8 @@ static bool 
>> > multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
>> >      p->name = NULL;
>> >      g_free(p->data);
>> >      p->data = NULL;
>> > +    buffer_zero_batch_task_destroy(p->dsa_batch_task);
>> > +    p->dsa_batch_task = NULL;
>> >      p->packet_len = 0;
>> >      g_free(p->packet);
>> >      p->packet = NULL;
>> > @@ -493,6 +496,8 @@ void multifd_send_shutdown(void)
>> >
>> >      multifd_send_terminate_threads();
>> >
>> > +    multifd_dsa_cleanup();
>> > +
>> >      for (i = 0; i < migrate_multifd_channels(); i++) {
>> >          MultiFDSendParams *p = &multifd_send_state->params[i];
>> >          Error *local_err = NULL;
>> > @@ -814,11 +819,17 @@ bool multifd_send_setup(void)
>> >      uint32_t page_count = multifd_ram_page_count();
>> >      bool use_packets = multifd_use_packets();
>> >      uint8_t i;
>> > +    Error *local_err = NULL;
>> >
>> >      if (!migrate_multifd()) {
>> >          return true;
>> >      }
>> >
>> > +    if (s &&
>> > +        s->parameters.zero_page_detection == 
>> > ZERO_PAGE_DETECTION_DSA_ACCEL) {
>> > +        ret = multifd_dsa_setup(s, local_err);
>>
>> This leaves the local_err set and will cause an assert if the code below
>> tries to set it again. We should at the very least report the error here
>> and free it. But that's not ideal because it allows the code to
>> continue.
>>
>> Can you move this whole block after the channels loop as suggested
>> previously but this time take the p->dsa_batch_task along and put it the
>> ->send_setup() loop?
>>
>
> I am trying to make this change. If I put the p->dsa_batch_task into
> the multifd_dsa_setup() in multifd-zero-page.c, things are a little
> tricky. I will need to pass thread_count and multifd_send_state into
> multifd_dsa_setup(), but multifd_send_state is a multifd.c only data
> structure/variable. I have to make the whole struct into multifd.h,
> and not sure if that is a good idea?

I meant something simpler:

+    if (s &&
+        s->parameters.zero_page_detection == ZERO_PAGE_DETECTION_DSA_ACCEL) {
+        ret = multifd_dsa_setup(s, local_err);
+    }

    if (ret) {
        goto err;
    }

    for (i = 0; i < thread_count; i++) {
        MultiFDSendParams *p = &multifd_send_state->params[i];
        Error *local_err = NULL;

+        p->dsa_batch_task = buffer_zero_batch_task_init(page_count);

        ret = multifd_send_state->ops->send_setup(p, &local_err);
        if (ret) {
            migrate_set_error(s, local_err);
            goto err;
        }
        assert(p->iov);
    }


>
>> > +    }
>> > +
>> >      thread_count = migrate_multifd_channels();
>> >      multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
>> >      multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
>> > @@ -829,12 +840,12 @@ bool multifd_send_setup(void)
>> >
>> >      for (i = 0; i < thread_count; i++) {
>> >          MultiFDSendParams *p = &multifd_send_state->params[i];
>> > -        Error *local_err = NULL;
>> >
>> >          qemu_sem_init(&p->sem, 0);
>> >          qemu_sem_init(&p->sem_sync, 0);
>> >          p->id = i;
>> >          p->data = multifd_send_data_alloc();
>> > +        p->dsa_batch_task = buffer_zero_batch_task_init(page_count);
>> >
>> >          if (use_packets) {
>> >              p->packet_len = sizeof(MultiFDPacket_t)
>> > @@ -865,7 +876,6 @@ bool multifd_send_setup(void)
>> >
>> >      for (i = 0; i < thread_count; i++) {
>> >          MultiFDSendParams *p = &multifd_send_state->params[i];
>> > -        Error *local_err = NULL;
>> >
>> >          ret = multifd_send_state->ops->send_setup(p, &local_err);
>> >          if (ret) {
>> > @@ -1047,6 +1057,7 @@ void multifd_recv_cleanup(void)
>> >              qemu_thread_join(&p->thread);
>> >          }
>> >      }
>> > +    multifd_dsa_cleanup();
>> >      for (i = 0; i < migrate_multifd_channels(); i++) {
>> >          multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
>> >      }
>> > diff --git a/migration/multifd.h b/migration/multifd.h
>> > index 50d58c0c9c..da53b0bdfd 100644
>> > --- a/migration/multifd.h
>> > +++ b/migration/multifd.h
>> > @@ -15,6 +15,7 @@
>> >
>> >  #include "exec/target_page.h"
>> >  #include "ram.h"
>> > +#include "qemu/dsa.h"
>> >
>> >  typedef struct MultiFDRecvData MultiFDRecvData;
>> >  typedef struct MultiFDSendData MultiFDSendData;
>> > @@ -155,6 +156,9 @@ typedef struct {
>> >      bool pending_sync;
>> >      MultiFDSendData *data;
>> >
>> > +    /* Zero page checking batch task */
>> > +    QemuDsaBatchTask *dsa_batch_task;
>> > +
>> >      /* thread local variables. No locking required */
>> >
>> >      /* pointer to the packet */
>> > @@ -313,6 +317,8 @@ void multifd_send_fill_packet(MultiFDSendParams *p);
>> >  bool multifd_send_prepare_common(MultiFDSendParams *p);
>> >  void multifd_send_zero_page_detect(MultiFDSendParams *p);
>> >  void multifd_recv_zero_page_process(MultiFDRecvParams *p);
>> > +int multifd_dsa_setup(MigrationState *s, Error *local_err);
>> > +void multifd_dsa_cleanup(void);
>> >
>> >  static inline void multifd_send_prepare_header(MultiFDSendParams *p)
>> >  {
>> > diff --git a/migration/options.c b/migration/options.c
>> > index ca89fdc4f4..cc40d3dfea 100644
>> > --- a/migration/options.c
>> > +++ b/migration/options.c
>> > @@ -817,6 +817,19 @@ const strList *migrate_accel_path(void)
>> >      return s->parameters.accel_path;
>> >  }
>> >
>> > +void migrate_dsa_accel_path(strList *dsa_accel_path)
>> > +{
>> > +    MigrationState *s = migrate_get_current();
>> > +    strList *accel_path = s->parameters.accel_path;
>> > +    strList **tail = &dsa_accel_path;
>> > +    while (accel_path) {
>> > +        if (strncmp(accel_path->value, "dsa:", 4) == 0) {
>> > +            QAPI_LIST_APPEND(tail, &accel_path->value[4]);
>> > +        }
>> > +        accel_path = accel_path->next;
>> > +    }
>> > +}
>> > +
>> >  const char *migrate_tls_hostname(void)
>> >  {
>> >      MigrationState *s = migrate_get_current();
>> > diff --git a/migration/options.h b/migration/options.h
>> > index 3d1e91dc52..5e34b7c997 100644
>> > --- a/migration/options.h
>> > +++ b/migration/options.h
>> > @@ -85,6 +85,7 @@ const char *migrate_tls_hostname(void);
>> >  uint64_t migrate_xbzrle_cache_size(void);
>> >  ZeroPageDetection migrate_zero_page_detection(void);
>> >  const strList *migrate_accel_path(void);
>> > +void migrate_dsa_accel_path(strList *dsa_accel_path);
>> >
>> >  /* parameters helpers */

Reply via email to