> -----Original Message-----
> From: Fabiano Rosas <faro...@suse.de>
> Sent: Thursday, June 6, 2024 6:26 AM
> To: Liu, Yuan1 <yuan1....@intel.com>; pet...@redhat.com;
> pbonz...@redhat.com; marcandre.lur...@redhat.com; berra...@redhat.com;
> th...@redhat.com; phi...@linaro.org
> Cc: qemu-devel@nongnu.org; Liu, Yuan1 <yuan1....@intel.com>; Zou, Nanhai
> <nanhai....@intel.com>; shameerali.kolothum.th...@huawei.com
> Subject: Re: [PATCH v7 6/7] migration/multifd: implement qpl compression
> and decompression
> 
> Yuan Liu <yuan1....@intel.com> writes:
> 
> > QPL compression and decompression will use IAA hardware first.
> > If IAA hardware is not available, it will automatically fall
> > back to QPL software path, if the software job also fails,
> > the uncompressed page is sent directly.
> >
> > Signed-off-by: Yuan Liu <yuan1....@intel.com>
> > Reviewed-by: Nanhai Zou <nanhai....@intel.com>
> > ---
> >  migration/multifd-qpl.c | 412 +++++++++++++++++++++++++++++++++++++++-
> >  1 file changed, 408 insertions(+), 4 deletions(-)
> >
> > diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
> > index 6791a204d5..18b3384bd5 100644
> > --- a/migration/multifd-qpl.c
> > +++ b/migration/multifd-qpl.c
> > @@ -13,9 +13,14 @@
> >  #include "qemu/osdep.h"
> >  #include "qemu/module.h"
> >  #include "qapi/error.h"
> > +#include "qapi/qapi-types-migration.h"
> > +#include "exec/ramblock.h"
> >  #include "multifd.h"
> >  #include "qpl/qpl.h"
> >
> > +/* Maximum number of retries to resubmit a job if IAA work queues are
> full */
> > +#define MAX_SUBMIT_RETRY_NUM (3)
> > +
> >  typedef struct {
> >      /* the QPL hardware path job */
> >      qpl_job *job;
> > @@ -260,6 +265,219 @@ static void
> multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
> >      p->iov = NULL;
> >  }
> >
> > +/**
> > + * multifd_qpl_prepare_job: prepare the job
> > + *
> > + * Set the QPL job parameters and properties.
> > + *
> > + * @job: pointer to the qpl_job structure
> > + * @is_compression: indicates compression and decompression
> > + * @input: pointer to the input data buffer
> > + * @input_len: the length of the input data
> > + * @output: pointer to the output data buffer
> > + * @output_len: the length of the output data
> > + */
> > +static void multifd_qpl_prepare_job(qpl_job *job, bool is_compression,
> > +                                    uint8_t *input, uint32_t input_len,
> > +                                    uint8_t *output, uint32_t
> output_len)
> > +{
> > +    job->op = is_compression ? qpl_op_compress : qpl_op_decompress;
> > +    job->next_in_ptr = input;
> > +    job->next_out_ptr = output;
> > +    job->available_in = input_len;
> > +    job->available_out = output_len;
> > +    job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
> > +    /* only supports compression level 1 */
> > +    job->level = 1;
> > +}
> > +
> > +/**
> > + * multifd_qpl_prepare_job: prepare the compression job
> 
> function name is wrong

Thanks, I will fix this next version.
 
> > + *
> > + * Set the compression job parameters and properties.
> > + *
> > + * @job: pointer to the qpl_job structure
> > + * @input: pointer to the input data buffer
> > + * @input_len: the length of the input data
> > + * @output: pointer to the output data buffer
> > + * @output_len: the length of the output data
> > + */
> > +static void multifd_qpl_prepare_comp_job(qpl_job *job, uint8_t *input,
> > +                                         uint32_t input_len, uint8_t
> *output,
> > +                                         uint32_t output_len)
> > +{
> > +    multifd_qpl_prepare_job(job, true, input, input_len, output,
> output_len);
> > +}
> > +
> > +/**
> > + * multifd_qpl_prepare_job: prepare the decompression job

Thanks, I will fix this next version.
 
> > + *
> > + * Set the decompression job parameters and properties.
> > + *
> > + * @job: pointer to the qpl_job structure
> > + * @input: pointer to the input data buffer
> > + * @input_len: the length of the input data
> > + * @output: pointer to the output data buffer
> > + * @output_len: the length of the output data
> > + */
> > +static void multifd_qpl_prepare_decomp_job(qpl_job *job, uint8_t
> *input,
> > +                                           uint32_t input_len, uint8_t
> *output,
> > +                                           uint32_t output_len)
> > +{
> > +    multifd_qpl_prepare_job(job, false, input, input_len, output,
> output_len);
> > +}
> > +
> > +/**
> > + * multifd_qpl_fill_iov: fill in the IOV
> > + *
> > + * Fill in the QPL packet IOV
> > + *
> > + * @p: Params for the channel being used
> > + * @data: pointer to the IOV data
> > + * @len: The length of the IOV data
> > + */
> > +static void multifd_qpl_fill_iov(MultiFDSendParams *p, uint8_t *data,
> > +                                 uint32_t len)
> > +{
> > +    p->iov[p->iovs_num].iov_base = data;
> > +    p->iov[p->iovs_num].iov_len = len;
> > +    p->iovs_num++;
> > +    p->next_packet_size += len;
> > +}
> > +
> > +/**
> > + * multifd_qpl_fill_packet: fill the compressed page into the QPL
> packet
> > + *
> > + * Fill the compressed page length and IOV into the QPL packet
> > + *
> > + * @idx: The index of the compressed length array
> > + * @p: Params for the channel being used
> > + * @data: pointer to the compressed page buffer
> > + * @len: The length of the compressed page
> > + */
> > +static void multifd_qpl_fill_packet(uint32_t idx, MultiFDSendParams *p,
> > +                                    uint8_t *data, uint32_t len)
> > +{
> > +    QplData *qpl = p->compress_data;
> > +
> > +    qpl->zlen[idx] = cpu_to_be32(len);
> > +    multifd_qpl_fill_iov(p, data, len);
> > +}
> > +
> > +/**
> > + * multifd_qpl_submit_job: submit a job to the hardware
> > + *
> > + * Submit a QPL hardware job to the IAA device
> > + *
> > + * Returns true if the job is submitted successfully, otherwise false.
> > + *
> > + * @job: pointer to the qpl_job structure
> > + */
> > +static bool multifd_qpl_submit_job(qpl_job *job)
> > +{
> > +    qpl_status status;
> > +    uint32_t num = 0;
> > +
> > +retry:
> > +    status = qpl_submit_job(job);
> > +    if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
> > +        if (num < MAX_SUBMIT_RETRY_NUM) {
> > +            num++;
> > +            goto retry;
> > +        }
> > +    }
> > +    return (status == QPL_STS_OK);
> 
> How often do we expect this to fail? Will the queues be busy frequently
> or is this an unlikely event? I'm thinking whether we really need to
> allow a fallback for the hw path. Sorry if this has been discussed
> already, I don't remember.

In some scenarios, this may happen frequently, such as configuring 4 channels 
but only one IAA device is available. In the case of insufficient IAA hardware 
resources, retry and fallback can help optimize performance.
I have a comparison test below

1. Retry + SW fallback:
   total time: 14649 ms
   downtime: 25 ms
   throughput: 17666.57 mbps
   pages-per-second: 1509647

2. No fallback, always wait for work queues to become available
   total time: 18381 ms
   downtime: 25 ms
   throughput: 13698.65 mbps
   pages-per-second: 859607

> > +}
> > +
> > +/**
> > + * multifd_qpl_compress_pages_slow_path: compress pages using slow path
> > + *
> > + * Compress the pages using software. If compression fails, the page
> will
> > + * be sent directly.
> > + *
> > + * @p: Params for the channel being used
> > + */
> > +static void multifd_qpl_compress_pages_slow_path(MultiFDSendParams *p)
> > +{
> > +    QplData *qpl = p->compress_data;
> > +    uint32_t size = p->page_size;
> > +    qpl_job *job = qpl->sw_job;
> > +    uint8_t *zbuf = qpl->zbuf;
> > +    uint8_t *buf;
> > +
> > +    for (int i = 0; i < p->pages->normal_num; i++) {
> > +        buf = p->pages->block->host + p->pages->offset[i];
> > +        /* Set output length to less than the page to reduce
> decompression */
> > +        multifd_qpl_prepare_comp_job(job, buf, size, zbuf, size - 1);
> > +        if (qpl_execute_job(job) == QPL_STS_OK) {
> > +            multifd_qpl_fill_packet(i, p, zbuf, job->total_out);
> > +        } else {
> > +            /* send the page directly */
> 
> s/directly/uncompressed/
> 
> a bit clearer.

Sure, I will fix it next version. 

> > +            multifd_qpl_fill_packet(i, p, buf, size);
> > +        }
> > +        zbuf += size;
> > +    }
> > +}
> > +
> > +/**
> > + * multifd_qpl_compress_pages: compress pages
> > + *
> > + * Submit the pages to the IAA hardware for compression. If hardware
> > + * compression fails, it falls back to software compression. If
> software
> > + * compression also fails, the page is sent directly
> > + *
> > + * @p: Params for the channel being used
> > + */
> > +static void multifd_qpl_compress_pages(MultiFDSendParams *p)
> > +{
> > +    QplData *qpl = p->compress_data;
> > +    MultiFDPages_t *pages = p->pages;
> > +    uint32_t size = p->page_size;
> > +    QplHwJob *hw_job;
> > +    uint8_t *buf;
> > +    uint8_t *zbuf;
> > +
> 
> Let's document the output size choice more explicitly:
> 
>     /*
>      * Set output length to less than the page size to force the job to
>      * fail in case it compresses to a larger size. We'll send that page
>      * without compression and skip the decompression operation on the
>      * destination.
>      */
>      out_size = size - 1;
> 
> you can then omit the other comments.

Thanks for the comments, I will refine this next version.
 
> > +    for (int i = 0; i < pages->normal_num; i++) {
> > +        buf = pages->block->host + pages->offset[i];
> > +        zbuf = qpl->zbuf + (size * i);
> > +        hw_job = &qpl->hw_jobs[i];
> > +        /* Set output length to less than the page to reduce
> decompression */
> > +        multifd_qpl_prepare_comp_job(hw_job->job, buf, size, zbuf, size
> - 1);
> > +        if (multifd_qpl_submit_job(hw_job->job)) {
> > +            hw_job->fallback_sw_path = false;
> > +        } else {
> > +            hw_job->fallback_sw_path = true;
> > +            /* Set output length less than page size to reduce
> decompression */
> > +            multifd_qpl_prepare_comp_job(qpl->sw_job, buf, size, zbuf,
> > +                                         size - 1);
> > +            if (qpl_execute_job(qpl->sw_job) == QPL_STS_OK) {
> > +                hw_job->sw_output = zbuf;
> > +                hw_job->sw_output_len = qpl->sw_job->total_out;
> > +            } else {
> > +                hw_job->sw_output = buf;
> > +                hw_job->sw_output_len = size;
> > +            }
> 
> Hmm, these look a bit cumbersome, would it work if we moved the fallback
> qpl_execute_job() down into the other loop? We could then avoid the
> extra fields. Something like:
> 
> static void multifd_qpl_compress_pages(MultiFDSendParams *p)
> {
>     QplData *qpl = p->compress_data;
>     MultiFDPages_t *pages = p->pages;
>     uint32_t out_size, size = p->page_size;
>     uint8_t *buf, *zbuf;
> 
>     /*
>      * Set output length to less than the page size to force the job to
>      * fail in case it compresses to a larger size. We'll send that page
>      * without compression to skip the decompression operation on the
>      * destination.
>      */
>     out_size = size - 1;
> 
>     for (int i = 0; i < pages->normal_num; i++) {
>         QplHwJob *hw_job = &qpl->hw_jobs[i];
> 
>         hw_job->fallback_sw_path = false;
>         buf = pages->block->host + pages->offset[i];
>         zbuf = qpl->zbuf + (size * i);
> 
>         multifd_qpl_prepare_comp_job(hw_job->job, buf, size, zbuf,
> out_size);
> 
>         if (!multifd_qpl_submit_job(hw_job->job)) {
>             hw_job->fallback_sw_path = true;
>         }
>     }
> 
>     for (int i = 0; i < pages->normal_num; i++) {
>         QplHwJob *hw_job = &qpl->hw_jobs[i];
>         qpl_job *job;
> 
>         buf = pages->block->host + pages->offset[i];
>         zbuf = qpl->zbuf + (size * i);
> 
>         if (hw_job->fallback_sw_path) {
>             job = qpl->sw_job;
>             multifd_qpl_prepare_comp_job(job, buf, size, zbuf, out_size);
>             ret = qpl_execute_job(job);
>         } else {
>             job = hw_job->job;
>             ret = qpl_wait_job(job);
>         }
> 
>         if (ret == QPL_STS_OK) {
>             multifd_qpl_fill_packet(i, p, zbuf, job->total_out);
>         } else {
>             multifd_qpl_fill_packet(i, p, buf, size);
>         }
>     }
> }

Very thanks for the reference code, I have test the code and the performance is 
not good.
When the work queue is full, after a hardware job fails to be submitted, the 
subsequent
job submission will most likely fail as well. so my idea is to use software job 
execution
instead immediately, but all subsequent jobs will still give priority to 
hardware path. 

There is almost no overhead in job submission because Intel uses the new 
"enqcmd" instruction,
which allows the user program to submit the job directly to the hardware.

According to the implementation of the reference code, when a job fails to be 
submitted, there 
is a high probability that "ALL" subsequent jobs will fail to be submitted and 
then use software
compression, resulting in the IAA hardware not being fully utilized.

For 4 Channel, 1 IAA device test case, using the reference code will reduce IAA 
throughput 
from 3.4GBps to 2.2GBps, thus affecting live migration performance.(total time 
from 14s to 18s)

> > +        }
> > +    }
> > +
> > +    for (int i = 0; i < pages->normal_num; i++) {
> > +        buf = pages->block->host + pages->offset[i];
> > +        zbuf = qpl->zbuf + (size * i);
> > +        hw_job = &qpl->hw_jobs[i];
> > +        if (hw_job->fallback_sw_path) {
> > +            multifd_qpl_fill_packet(i, p, hw_job->sw_output,
> > +                                    hw_job->sw_output_len);
> > +            continue;
> > +        }
> > +        if (qpl_wait_job(hw_job->job) == QPL_STS_OK) {
> > +            multifd_qpl_fill_packet(i, p, zbuf, hw_job->job-
> >total_out);
> > +        } else {
> > +            /* send the page directly */
> > +            multifd_qpl_fill_packet(i, p, buf, size);
> > +        }
> > +    }
> > +}
> > +
> >  /**
> >   * multifd_qpl_send_prepare: prepare data to be able to send
> >   *
> > @@ -273,8 +491,26 @@ static void
> multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
> >   */
> >  static int multifd_qpl_send_prepare(MultiFDSendParams *p, Error **errp)
> >  {
> > -    /* Implement in next patch */
> > -    return -1;
> > +    QplData *qpl = p->compress_data;
> > +    uint32_t len = 0;
> > +
> > +    if (!multifd_send_prepare_common(p)) {
> > +        goto out;
> > +    }
> > +
> > +    /* The first IOV is used to store the compressed page lengths */
> > +    len = p->pages->normal_num * sizeof(uint32_t);
> > +    multifd_qpl_fill_iov(p, (uint8_t *) qpl->zlen, len);
> > +    if (qpl->hw_avail) {
> > +        multifd_qpl_compress_pages(p);
> > +    } else {
> > +        multifd_qpl_compress_pages_slow_path(p);
> > +    }
> > +
> > +out:
> > +    p->flags |= MULTIFD_FLAG_QPL;
> > +    multifd_send_fill_packet(p);
> > +    return 0;
> >  }
> >
> >  /**
> > @@ -312,6 +548,134 @@ static void
> multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
> >      p->compress_data = NULL;
> >  }
> >
> > +/**
> > + * multifd_qpl_process_and_check_job: process and check a QPL job
> > + *
> > + * Process the job and check whether the job output length is the
> > + * same as the specified length
> > + *
> > + * Returns true if the job execution succeeded and the output length
> > + * is equal to the specified length, otherwise false.
> > + *
> > + * @job: pointer to the qpl_job structure
> > + * @is_hardware: indicates whether the job is a hardware job
> > + * @len: Specified output length
> > + * @errp: pointer to an error
> > + */
> > +static bool multifd_qpl_process_and_check_job(qpl_job *job, bool
> is_hardware,
> > +                                              uint32_t len, Error
> **errp)
> > +{
> > +    qpl_status status;
> > +
> > +    status = (is_hardware ? qpl_wait_job(job) : qpl_execute_job(job));
> > +    if (status != QPL_STS_OK) {
> > +        error_setg(errp, "qpl_execute_job failed with error %d",
> status);
> 
> The error message should also cover qpl_wait_job(), right? Maybe just
> use "qpl job failed".

You are right, I will fix this next version.

> > +        return false;
> > +    }
> > +    if (job->total_out != len) {
> > +        error_setg(errp, "qpl decompressed len %u, expected len %u",
> > +                   job->total_out, len);
> > +        return false;
> > +    }
> > +    return true;
> > +}
> > +
> > +/**
> > + * multifd_qpl_decompress_pages_slow_path: decompress pages using slow
> path
> > + *
> > + * Decompress the pages using software
> > + *
> > + * Returns 0 on success or -1 on error
> > + *
> > + * @p: Params for the channel being used
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_decompress_pages_slow_path(MultiFDRecvParams *p,
> > +                                                  Error **errp)
> > +{
> > +    QplData *qpl = p->compress_data;
> > +    uint32_t size = p->page_size;
> > +    qpl_job *job = qpl->sw_job;
> > +    uint8_t *zbuf = qpl->zbuf;
> > +    uint8_t *addr;
> > +    uint32_t len;
> > +
> > +    for (int i = 0; i < p->normal_num; i++) {
> > +        len = qpl->zlen[i];
> > +        addr = p->host + p->normal[i];
> > +        /* the page is uncompressed, load it */
> > +        if (len == size) {
> > +            memcpy(addr, zbuf, size);
> > +            zbuf += size;
> > +            continue;
> > +        }
> > +        multifd_qpl_prepare_decomp_job(job, zbuf, len, addr, size);
> > +        if (!multifd_qpl_process_and_check_job(job, false, size, errp))
> {
> > +            return -1;
> > +        }
> > +        zbuf += len;
> > +    }
> > +    return 0;
> > +}
> > +
> > +/**
> > + * multifd_qpl_decompress_pages: decompress pages
> > + *
> > + * Decompress the pages using the IAA hardware. If hardware
> > + * decompression fails, it falls back to software decompression.
> > + *
> > + * Returns 0 on success or -1 on error
> > + *
> > + * @p: Params for the channel being used
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_decompress_pages(MultiFDRecvParams *p, Error
> **errp)
> > +{
> > +    QplData *qpl = p->compress_data;
> > +    uint32_t size = p->page_size;
> > +    uint8_t *zbuf = qpl->zbuf;
> > +    uint8_t *addr;
> > +    uint32_t len;
> > +    qpl_job *job;
> > +
> > +    for (int i = 0; i < p->normal_num; i++) {
> > +        addr = p->host + p->normal[i];
> > +        len = qpl->zlen[i];
> > +        /* the page is uncompressed if received length equals the page
> size */
> > +        if (len == size) {
> > +            memcpy(addr, zbuf, size);
> > +            zbuf += size;
> > +            continue;
> > +        }
> > +
> > +        job = qpl->hw_jobs[i].job;
> > +        multifd_qpl_prepare_decomp_job(job, zbuf, len, addr, size);
> > +        if (multifd_qpl_submit_job(job)) {
> > +            qpl->hw_jobs[i].fallback_sw_path = false;
> > +        } else {
> > +            qpl->hw_jobs[i].fallback_sw_path = true;
> > +            job = qpl->sw_job;
> > +            multifd_qpl_prepare_decomp_job(job, zbuf, len, addr, size);
> > +            if (!multifd_qpl_process_and_check_job(job, false, size,
> errp)) {
> > +                return -1;
> > +            }
> 
> Here the same suggestion applies. You created
> multifd_qpl_process_and_check_job() but is now calling it twice, which
> seems to lose the purpose. If the fallback moves to the loop below, then
> you do it all in one place:
> 
>     for (int i = 0; i < p->normal_num; i++) {
>         bool is_sw = !qpl->hw_jobs[i].fallback_sw_path;
> 
>         if (is_sw) {
>             job = qpl->sw_job;
>             multifd_qpl_prepare_decomp_job(job, zbuf, len, addr, size);
>         } else {
>             job = qpl->hw_jobs[i].job;
>         }
> 
>         if (!multifd_qpl_process_and_check_job(job, !is_sw, size, errp)) {
>             return -1;
>         }
>     }

I think this is the same issue as discussed above, after a hardware job fails to
be submitted, execute a software job immediately, and subsequent jobs are
prioritized to use hardware jobs. So use the same 
multifd_qpl_process_and_check_job 
in two parts.
 
> > +        }
> > +        zbuf += len;
> > +    }
> > +
> > +    for (int i = 0; i < p->normal_num; i++) {
> > +        /* ignore pages that have already been processed */
> > +        if (qpl->zlen[i] == size || qpl->hw_jobs[i].fallback_sw_path) {
> > +            continue;
> > +        }
> > +
> > +        job = qpl->hw_jobs[i].job;
> > +        if (!multifd_qpl_process_and_check_job(job, true, size, errp))
> {
> > +            return -1;
> > +        }
> > +    }
> > +    return 0;
> > +}
> >  /**
> >   * multifd_qpl_recv: read the data from the channel into actual pages
> >   *
> > @@ -325,8 +689,48 @@ static void
> multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
> >   */
> >  static int multifd_qpl_recv(MultiFDRecvParams *p, Error **errp)
> >  {
> > -    /* Implement in next patch */
> > -    return -1;
> > +    QplData *qpl = p->compress_data;
> > +    uint32_t in_size = p->next_packet_size;
> > +    uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
> > +    uint32_t len = 0;
> > +    uint32_t zbuf_len = 0;
> > +    int ret;
> > +
> > +    if (flags != MULTIFD_FLAG_QPL) {
> > +        error_setg(errp, "multifd %u: flags received %x flags
> expected %x",
> > +                   p->id, flags, MULTIFD_FLAG_QPL);
> > +        return -1;
> > +    }
> > +    multifd_recv_zero_page_process(p);
> > +    if (!p->normal_num) {
> > +        assert(in_size == 0);
> > +        return 0;
> > +    }
> > +
> > +    /* read compressed page lengths */
> > +    len = p->normal_num * sizeof(uint32_t);
> > +    assert(len < in_size);
> > +    ret = qio_channel_read_all(p->c, (void *) qpl->zlen, len, errp);
> > +    if (ret != 0) {
> > +        return ret;
> > +    }
> > +    for (int i = 0; i < p->normal_num; i++) {
> > +        qpl->zlen[i] = be32_to_cpu(qpl->zlen[i]);
> > +        assert(qpl->zlen[i] <= p->page_size);
> > +        zbuf_len += qpl->zlen[i];
> > +    }
> > +
> > +    /* read compressed pages */
> > +    assert(in_size == len + zbuf_len);
> > +    ret = qio_channel_read_all(p->c, (void *) qpl->zbuf, zbuf_len,
> errp);
> > +    if (ret != 0) {
> > +        return ret;
> > +    }
> > +
> > +    if (qpl->hw_avail) {
> > +        return multifd_qpl_decompress_pages(p, errp);
> > +    }
> > +    return multifd_qpl_decompress_pages_slow_path(p, errp);
> >  }
> >
> >  static MultiFDMethods multifd_qpl_ops = {

Reply via email to