Implement the functions of IAA for data compression and decompression. The implementation uses non-blocking job submission and polling to check the job completion status to reduce IAA's overhead in the live migration process.
Signed-off-by: Yuan Liu <yuan1....@intel.com> Reviewed-by: Nanhai Zou <nanhai....@intel.com> --- migration/iaa-ram-compress.c | 167 +++++++++++++++++++++++++++++++++++ migration/iaa-ram-compress.h | 7 ++ migration/ram-compress.c | 10 ++- migration/ram.c | 56 ++++++++++-- 4 files changed, 232 insertions(+), 8 deletions(-) diff --git a/migration/iaa-ram-compress.c b/migration/iaa-ram-compress.c index da45952594..243aeb6d55 100644 --- a/migration/iaa-ram-compress.c +++ b/migration/iaa-ram-compress.c @@ -12,6 +12,7 @@ #include "qemu/osdep.h" #include "qemu/cutils.h" + #include "qemu/error-report.h" #include "migration.h" #include "options.h" @@ -62,6 +63,31 @@ static IaaJobPool iaa_job_pool; static QSIMPLEQ_HEAD(, IaaJob) polling_queue = QSIMPLEQ_HEAD_INITIALIZER(polling_queue); +static IaaJob *get_job(send_iaa_data send_page) +{ + IaaJob *job; + +retry: + /* Wait for a job to complete when there is no available job */ + if (iaa_job_pool.cnt == IAA_JOB_NUM) { + flush_iaa_jobs(false, send_page); + goto retry; + } + job = iaa_job_pool.jobs[iaa_job_pool.pos]; + iaa_job_pool.pos++; + iaa_job_pool.cnt++; + if (iaa_job_pool.pos == IAA_JOB_NUM) { + iaa_job_pool.pos = 0; + } + return job; +} + +static void put_job(IaaJob *job) +{ + assert(iaa_job_pool.cnt > 0); + iaa_job_pool.cnt--; +} + void iaa_compress_deinit(void) { for (int i = 0; i < IAA_JOB_NUM; i++) { @@ -150,3 +176,144 @@ init_err: iaa_compress_deinit(); return -1; } + +static void process_completed_job(IaaJob *job, send_iaa_data send_page) +{ + if (job->is_compression) { + send_page(job->param.comp.block, job->param.comp.offset, + job->out_buf, job->out_len, job->param.comp.result); + } else { + assert(job->out_len == qemu_target_page_size()); + memcpy(job->param.decomp.host, job->out_buf, job->out_len); + } + put_job(job); +} + +static qpl_status check_job_status(IaaJob *job, bool block) +{ + qpl_status status; + qpl_job *qpl = job->qpl; + + status = block ? qpl_wait_job(qpl) : qpl_check_job(qpl); + if (status == QPL_STS_OK) { + job->out_len = qpl->total_out; + if (job->is_compression) { + job->param.comp.result = RES_COMPRESS; + /* if no compression benefit, send a normal page for migration */ + if (job->out_len == qemu_target_page_size()) { + iaa_comp_param *param = &(job->param.comp); + memcpy(job->out_buf, (param->block->host + param->offset), + job->out_len); + job->param.comp.result = RES_NONE; + } + } + } else if (status == QPL_STS_MORE_OUTPUT_NEEDED) { + if (job->is_compression) { + /* + * if the compressed data is larger than the original data, send a + * normal page for migration, in this case, IAA has copied the + * original data to job->out_buf automatically. + */ + job->out_len = qemu_target_page_size(); + job->param.comp.result = RES_NONE; + status = QPL_STS_OK; + } + } + return status; +} + +static void check_polling_jobs(send_iaa_data send_page) +{ + IaaJob *job, *job_next; + qpl_status status; + + QSIMPLEQ_FOREACH_SAFE(job, &polling_queue, entry, job_next) { + status = check_job_status(job, false); + if (status == QPL_STS_OK) { /* job has done */ + process_completed_job(job, send_page); + QSIMPLEQ_REMOVE_HEAD(&polling_queue, entry); + } else if (status == QPL_STS_BEING_PROCESSED) { /* job is running */ + break; + } else { + abort(); + } + } +} + +static int submit_new_job(IaaJob *job) +{ + qpl_status status; + qpl_job *qpl = job->qpl; + + qpl->op = job->is_compression ? qpl_op_compress : qpl_op_decompress; + qpl->next_in_ptr = job->in_buf; + qpl->next_out_ptr = job->out_buf; + qpl->available_in = job->in_len; + qpl->available_out = qemu_target_page_size(); /* outbuf maximum size */ + qpl->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY; + qpl->level = 1; /* only level 1 compression is supported */ + + do { + status = qpl_submit_job(qpl); + } while (status == QPL_STS_QUEUES_ARE_BUSY_ERR); + + if (status != QPL_STS_OK) { + error_report("Failed to submit iaa job, error %d", status); + return -1; + } + QSIMPLEQ_INSERT_TAIL(&polling_queue, job, entry); + return 0; +} + +int flush_iaa_jobs(bool flush_all_jobs, send_iaa_data send_page) +{ + IaaJob *job, *job_next; + + QSIMPLEQ_FOREACH_SAFE(job, &polling_queue, entry, job_next) { + if (check_job_status(job, true) != QPL_STS_OK) { + return -1; + } + process_completed_job(job, send_page); + QSIMPLEQ_REMOVE_HEAD(&polling_queue, entry); + if (!flush_all_jobs) { + break; + } + } + return 0; +} + +int compress_page_with_iaa(RAMBlock *block, ram_addr_t offset, + send_iaa_data send_page) +{ + IaaJob *job; + + if (iaa_job_pool.cnt != 0) { + check_polling_jobs(send_page); + } + if (buffer_is_zero(block->host + offset, qemu_target_page_size())) { + send_page(block, offset, NULL, 0, RES_ZEROPAGE); + return 1; + } + job = get_job(send_page); + job->is_compression = true; + job->in_buf = block->host + offset; + job->in_len = qemu_target_page_size(); + job->param.comp.offset = offset; + job->param.comp.block = block; + return (submit_new_job(job) == 0 ? 1 : 0); +} + +int decompress_data_with_iaa(QEMUFile *f, void *host, int len) +{ + IaaJob *job; + + if (iaa_job_pool.cnt != 0) { + check_polling_jobs(NULL); + } + job = get_job(NULL); + job->is_compression = false; + qemu_get_buffer(f, job->in_buf, len); + job->in_len = len; + job->param.decomp.host = host; + return submit_new_job(job); +} diff --git a/migration/iaa-ram-compress.h b/migration/iaa-ram-compress.h index 27998b255b..5a555b3b8d 100644 --- a/migration/iaa-ram-compress.h +++ b/migration/iaa-ram-compress.h @@ -15,6 +15,13 @@ #include "qemu-file.h" #include "ram-compress.h" +typedef int (*send_iaa_data) (RAMBlock *block, ram_addr_t offset, uint8_t *data, + uint32_t data_len, CompressResult result); + int iaa_compress_init(bool is_decompression); void iaa_compress_deinit(void); +int compress_page_with_iaa(RAMBlock *block, ram_addr_t offset, + send_iaa_data send_page); +int decompress_data_with_iaa(QEMUFile *f, void *host, int len); +int flush_iaa_jobs(bool flush_all_jobs, send_iaa_data send_page); #endif diff --git a/migration/ram-compress.c b/migration/ram-compress.c index acc511ce57..0bddf8b9ea 100644 --- a/migration/ram-compress.c +++ b/migration/ram-compress.c @@ -370,10 +370,11 @@ int wait_for_decompress_done(void) return 0; } +#ifdef CONFIG_QPL if (migrate_compress_with_iaa()) { - /* Implement in next patch */ - return 0; + return flush_iaa_jobs(true, NULL); } +#endif thread_count = migrate_decompress_threads(); qemu_mutex_lock(&decomp_done_lock); @@ -511,9 +512,12 @@ void ram_compress_save_cleanup(void) void ram_decompress_data(QEMUFile *f, void *host, int len) { +#ifdef CONFIG_QPL if (migrate_compress_with_iaa()) { - /* Implement in next patch */ + decompress_data_with_iaa(f, host, len); + return; } +#endif decompress_data_with_multi_threads(f, host, len); } diff --git a/migration/ram.c b/migration/ram.c index 34ee1de332..5ef818112c 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -69,6 +69,9 @@ #include "qemu/userfaultfd.h" #endif /* defined(__linux__) */ +#ifdef CONFIG_QPL +#include "iaa-ram-compress.h" +#endif /***********************************************************/ /* ram save/restore */ @@ -1342,16 +1345,59 @@ static int send_queued_data(CompressParam *param) return len; } +#ifdef CONFIG_QPL +static int send_iaa_compressed_page(RAMBlock *block, ram_addr_t offset, + uint8_t *data, uint32_t data_len, + CompressResult result) +{ + PageSearchStatus *pss = &ram_state->pss[RAM_CHANNEL_PRECOPY]; + MigrationState *ms = migrate_get_current(); + QEMUFile *file = ms->to_dst_file; + int len = 0; + + assert(block == pss->last_sent_block); + if (result == RES_ZEROPAGE) { + len += save_page_header(pss, file, block, offset | RAM_SAVE_FLAG_ZERO); + qemu_put_byte(file, 0); + len += 1; + ram_release_page(block->idstr, offset); + stat64_add(&mig_stats.zero_pages, 1); + } else if (result == RES_COMPRESS) { + assert(data != NULL); + assert((data_len > 0) && (data_len < qemu_target_page_size())); + len += save_page_header(pss, file, block, + offset | RAM_SAVE_FLAG_COMPRESS_PAGE); + qemu_put_be32(file, data_len); + qemu_put_buffer(file, data, data_len); + len += data_len; + /* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */ + compression_counters.compressed_size += len - 8; + compression_counters.pages++; + } else if (result == RES_NONE) { + assert((data != NULL) && (data_len == TARGET_PAGE_SIZE)); + len += save_page_header(pss, file, block, offset | RAM_SAVE_FLAG_PAGE); + qemu_put_buffer(file, data, data_len); + len += data_len; + stat64_add(&mig_stats.normal_pages, 1); + } else { + abort(); + } + ram_transferred_add(len); + return len; +} +#endif + static void ram_flush_compressed_data(RAMState *rs) { if (!save_page_use_compression(rs)) { return; } +#ifdef CONFIG_QPL if (migrate_compress_with_iaa()) { - /* Implement in next patch */ + flush_iaa_jobs(true, send_iaa_compressed_page); return; } - +#endif flush_compressed_data(send_queued_data); } @@ -2102,11 +2148,11 @@ static bool save_compress_page(RAMState *rs, PageSearchStatus *pss, ram_flush_compressed_data(rs); return false; } - +#ifdef CONFIG_QPL if (migrate_compress_with_iaa()) { - /* Implement in next patch */ - return true; + return compress_page_with_iaa(block, offset, send_iaa_compressed_page); } +#endif if (compress_page_with_multi_thread(block, offset, send_queued_data) > 0) { return true; } -- 2.39.3