From: Nikita Lapshin <nikita.laps...@openvz.org> It is a modified load part from previous patch.
Implemented new rp listen thread for snapshot-tool. Also implemented functions for starting postcopy. This mode can be turned on by specifying --postcopy flag. Signed-off-by: Nikita Lapshin <nikita.laps...@openvz.org> --- include/qemu-snapshot.h | 12 ++ migration/migration.c | 123 +++++++++++++++++++ migration/migration.h | 1 + migration/qemu-snapshot.c | 249 +++++++++++++++++++++++++++++++++++++- migration/savevm.c | 25 ++++ migration/savevm.h | 4 + qemu-snapshot.c | 10 ++ 7 files changed, 422 insertions(+), 2 deletions(-) diff --git a/include/qemu-snapshot.h b/include/qemu-snapshot.h index 74885c03bb..b0f235747f 100644 --- a/include/qemu-snapshot.h +++ b/include/qemu-snapshot.h @@ -65,6 +65,15 @@ typedef struct StateLoadCtx { QEMUFile *f_fd; QEMUFile *f_vmstate; + /* Postcopy part */ + bool postcopy; + bool in_postcopy; + + /* Return path part */ + QemuThread rp_listen_thread; + QEMUFile *f_rp_fd; + bool has_rp_listen_thread; + StateInfo state_parameters; } StateLoadCtx; @@ -76,6 +85,9 @@ int coroutine_fn save_state_main(StateSaveCtx *s); void save_vmstate(StateSaveCtx *s); int coroutine_fn load_state_main(StateLoadCtx *s); +int queue_page_request(const char *idstr, uint64_t offset, + uint32_t size); + QEMUFile *qemu_fopen_bdrv_vmstate(BlockDriverState *bs, int is_writable); void qemu_fsplice(QEMUFile *f_dst, QEMUFile *f_src, size_t size); size_t qemu_fsplice_tail(QEMUFile *f_dst, QEMUFile *f_src); diff --git a/migration/migration.c b/migration/migration.c index 6528b3ad41..6f82e8ea48 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -61,6 +61,7 @@ #include "sysemu/cpus.h" #include "yank_functions.h" #include "sysemu/qtest.h" +#include "qemu-snapshot.h" #define MAX_THROTTLE (128 << 20) /* Migration transfer speed throttling */ @@ -3517,6 +3518,128 @@ static MigThrError postcopy_pause(MigrationState *s) } } +/* + * Return-path message processing thread for qemu-snapshot tool + */ +void *qemu_snapshot_rp_listen_thread(void *opaque) +{ + QEMUFile *f = (QEMUFile *) opaque; + int res = 0; + uint64_t pages = 0; + + while (!res) { + uint8_t h_buf[512]; + const int h_max_len = sizeof(h_buf); + int h_type; + int h_len; + size_t count; + + h_type = qemu_get_be16(f); + h_len = qemu_get_be16(f); + + /* Make early check for input errors */ + res = qemu_file_get_error(f); + if (res) { + break; + } + + /* Check message type */ + if (h_type >= MIG_RP_MSG_MAX || h_type == MIG_RP_MSG_INVALID) { + error_report("RP: received invalid message type %d length %d", + h_type, h_len); + res = -EINVAL; + break; + } + + /* Check message length */ + if (rp_cmd_args[h_type].len != -1 && h_len != rp_cmd_args[h_type].len) { + error_report("RP: received %s message len %d expected %ld", + rp_cmd_args[h_type].name, + h_len, rp_cmd_args[h_type].len); + res = -EINVAL; + break; + } else if (h_len > h_max_len) { + error_report("RP: received %s message len %d max_len %d", + rp_cmd_args[h_type].name, h_len, h_max_len); + res = -EINVAL; + break; + } + + count = qemu_get_buffer(f, h_buf, h_len); + if (count != h_len) { + break; + } + + switch (h_type) { + case MIG_RP_MSG_SHUT: + { + int shut_error; + + shut_error = be32_to_cpu(*(uint32_t *) h_buf); + if (shut_error) { + error_report("RP: sibling shutdown, error %d", shut_error); + } + + /* Exit processing loop */ + res = 1; + break; + } + + case MIG_RP_MSG_REQ_PAGES: + case MIG_RP_MSG_REQ_PAGES_ID: + { + pages++; + uint64_t offset; + uint32_t size; + char *id_str = NULL; + + offset = be64_to_cpu(*(uint64_t *) (h_buf + 0)); + size = be32_to_cpu(*(uint32_t *) (h_buf + 8)); + + if (h_type == MIG_RP_MSG_REQ_PAGES_ID) { + int h_parsed_len = rp_cmd_args[MIG_RP_MSG_REQ_PAGES].len; + + if (h_len > h_parsed_len) { + int id_len; + + /* RAM block id string */ + id_len = h_buf[h_parsed_len]; + id_str = (char *) &h_buf[h_parsed_len + 1]; + id_str[id_len] = 0; + + h_parsed_len += id_len + 1; + } + + if (h_parsed_len != h_len) { + error_report("RP: received %s message len %d expected %d", + rp_cmd_args[MIG_RP_MSG_REQ_PAGES_ID].name, + h_len, h_parsed_len); + res = -EINVAL; + break; + } + } + + res = queue_page_request(id_str, offset, size); + break; + } + + default: + error_report("RP: received unexpected message type %d len %d", + h_type, h_len); + res = -EINVAL; + } + } + + if (res >= 0) { + res = qemu_file_get_error(f); + } + if (res) { + error_report("RP: listen thread exit, error %d", res); + } + + return NULL; +} + static MigThrError migration_detect_error(MigrationState *s) { int ret; diff --git a/migration/migration.h b/migration/migration.h index 5c43788a2b..fd6f8d3083 100644 --- a/migration/migration.h +++ b/migration/migration.h @@ -433,4 +433,5 @@ void migration_cancel(const Error *error); void populate_vfio_info(MigrationInfo *info); void postcopy_temp_page_reset(PostcopyTmpPage *tmp_page); +void *qemu_snapshot_rp_listen_thread(void *opaque); #endif diff --git a/migration/qemu-snapshot.c b/migration/qemu-snapshot.c index 280f5be25c..8090c7032d 100644 --- a/migration/qemu-snapshot.c +++ b/migration/qemu-snapshot.c @@ -44,6 +44,16 @@ typedef struct RAMBlock { char idstr[256]; /* RAM block id string */ } RAMBlock; +/* Page request from destination in postcopy */ +typedef struct RAMPageRequest { + RAMBlock *block; + int64_t offset; + unsigned size; + + /* Link into ram_ctx.page_req */ + QSIMPLEQ_ENTRY(RAMPageRequest) next; +} RAMPageRequest; + typedef struct RAMPage { RAMBlock *block; /* RAM block containing the page */ int64_t offset; /* Page offset in RAM block */ @@ -57,6 +67,11 @@ typedef struct RAMCtx { QSIMPLEQ_HEAD(, RAMBlock) ram_block_list; RAMPage last_page; RAMBlock *last_sent_block; + + /* Page request queue for postcopy */ + QemuMutex page_req_mutex; + QSIMPLEQ_HEAD(, RAMPageRequest) page_req; + RAMBlock *last_req_block; } RAMCtx; static RAMCtx ram_ctx; @@ -627,6 +642,143 @@ static int ram_send_setup(QEMUFile *f, void *opaque) return res; } +static int send_each_ram_block_discard(QEMUFile *f) +{ + RAMBlock *block; + int res = 0; + uint64_t start; + uint64_t len; + + QSIMPLEQ_FOREACH(block, &ram_ctx.ram_block_list, next) { + start = block->discard_offset; + len = block->length - block->discard_offset; + qemu_savevm_send_postcopy_ram_discard(f, block->idstr, 1, &start, &len); + + res = qemu_file_get_error(f); + if (res) { + break; + } + } + + return res; +} + +static int prepare_postcopy(StateLoadCtx *s) +{ + int res = 0; + + res = qemu_snapshot_postcopy_prepare(s->f_fd, page_size, page_size); + + if (!res) { + qemu_thread_create(&s->rp_listen_thread, "rp_thread", + qemu_snapshot_rp_listen_thread, s->f_rp_fd, + QEMU_THREAD_JOINABLE); + s->has_rp_listen_thread = true; + } + + return res; +} + +static int start_postcopy(StateLoadCtx *s) +{ + QIOChannelBuffer *bioc; + QEMUFile *fb; + int eof_pos; + int res = 0; + + /* + * Send RAM discards for each block's unsent part. Without discards, + * the userfault_fd code on destination will not trigger page requests + * as expected. Also, the UFFDIO_COPY ioctl that is used to place incoming + * page in postcopy would give an error if that page has not faulted + * with MISSING reason. + */ + res = send_each_ram_block_discard(s->f_fd); + if (res) { + error_report("here"); + return res; + } + + /* + * To perform a switch to postcopy on destination, we need to send + * commands and the device state data in the following order: + * * MIG_CMD_POSTCOPY_LISTEN + * * Non-iterable device state sections + * * MIG_CMD_POSTCOPY_RUN + * + * All this has to be packaged into a single blob using MIG_CMD_PACKAGED + * command. While loading the device state we may trigger page transfer + * requests and the fd must be free to process those, thus the destination + * must read the whole device state off the fd before it starts + * processing it. To wrap it up in a package, QEMU buffer channel is used. + */ + bioc = qio_channel_buffer_new(512 * 1024); + qio_channel_set_name(QIO_CHANNEL(bioc), "migration-postcopy-buffer"); + fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc)); + object_unref(OBJECT(bioc)); + + /* MIG_CMD_POSTCOPY_LISTEN command */ + qemu_savevm_send_postcopy_listen(fb); + + /* The rest of non-iterable device state with an optional vmdesc section */ + send_vmstate(s->f_vmstate, fb); + qemu_fflush(fb); + + /* + * vmdesc section may optionally be present at the end of the stream + * so we'll try to locate it and truncate the trailer. + */ + eof_pos = bioc->usage - 1; + + for (int offset = (bioc->usage - 11); offset >= 0; offset--) { + if (bioc->data[offset] == QEMU_VM_SECTION_FOOTER && + bioc->data[offset + 5] == QEMU_VM_EOF && + bioc->data[offset + 6] == QEMU_VM_VMDESCRIPTION) { + uint32_t expected_length = bioc->usage - (offset + 11); + uint32_t json_length; + + json_length = be32_to_cpu(*(uint32_t *) &bioc->data[offset + 7]); + if (json_length != expected_length) { + error_report("Corrupted vmdesc trailer: length %" PRIu32 + " expected %" PRIu32, + json_length, expected_length); + res = -EINVAL; + goto fail; + } + + eof_pos = offset + 5; + break; + } + } + + /* + * When switching to postcopy we need to skip QEMU_VM_EOF token which + * normally is placed after the last non-iterable device state section + * (but before the vmdesc section). + * + * Skipping QEMU_VM_EOF is required to allow migration process to + * continue in postcopy. Vmdesc section also has to be skipped here. + */ + if (eof_pos >= 0 && bioc->data[eof_pos] == QEMU_VM_EOF) { + bioc->usage = eof_pos; + bioc->offset = eof_pos; + } + + /* Finally is the MIG_CMD_POSTCOPY_RUN command */ + qemu_savevm_send_postcopy_run(fb); + + /* Now send that blob */ + qemu_savevm_send_packaged(s->f_fd, bioc->data, bioc->usage); + qemu_fflush(s->f_fd); + + s->in_postcopy = true; +fail: + qemu_fclose(fb); + load_state_check_errors(s, &res); + + return res; +} + static bool find_next_page(RAMPage *page) { RAMCtx *ram = &ram_ctx; @@ -709,6 +861,74 @@ void clear_page_range(RAMPage *page, int64_t length) ((length - 1) >> slice_bits) + 1); } +int queue_page_request(const char *idstr, uint64_t offset, + uint32_t size) +{ + RAMCtx *ram = &ram_ctx; + RAMBlock *block; + RAMPageRequest *new_entry; + + if (!idstr) { + block = ram->last_req_block; + if (!block) { + error_report("RP-REQ_PAGES: no previous block"); + return -EINVAL; + } + } else { + block = ram_block_by_idstr(idstr); + if (!block) { + error_report("RP-REQ_PAGES: cannot find block %s", idstr); + return -EINVAL; + } + + ram->last_req_block = block; + } + + if (!ram_offset_in_block(block, offset)) { + error_report("RP-REQ_PAGES: offset 0x%" PRIx64 " out of RAM block %s", + offset, idstr); + return -EINVAL; + } + + new_entry = g_new0(RAMPageRequest, 1); + new_entry->block = block; + new_entry->offset = offset; + new_entry->size = size; + + qemu_mutex_lock(&ram->page_req_mutex); + QSIMPLEQ_INSERT_TAIL(&ram->page_req, new_entry, next); + qemu_mutex_unlock(&ram->page_req_mutex); + + return 0; +} + +static bool get_queued_page(RAMPage *page) { + RAMCtx *ram = &ram_ctx; + + if (QSIMPLEQ_EMPTY_ATOMIC(&ram->page_req)) { + return false; + } + + QEMU_LOCK_GUARD(&ram->page_req_mutex); + if (!QSIMPLEQ_EMPTY(&ram->page_req)) { + RAMPageRequest *entry = QSIMPLEQ_FIRST(&ram->page_req); + RAMBlock *block = entry->block; + int64_t slice = entry->offset >> slice_bits; + + QSIMPLEQ_REMOVE_HEAD(&ram->page_req, next); + g_free(entry); + + if (test_bit(slice, block->bitmap)) { + page->block = block; + page->offset = slice << slice_bits; + + return true; + } + } + + return false; +} + static int coroutine_fn ram_load_pages(StateLoadCtx *s) { RAMCtx *ram = &ram_ctx; @@ -718,8 +938,9 @@ static int coroutine_fn ram_load_pages(StateLoadCtx *s) int64_t blk_offset, slice_offset, bdrv_offset; ssize_t res; int64_t flags = RAM_SAVE_FLAG_CONTINUE; + bool urgent = get_queued_page(&page); - if (!find_next_page(&page)) { + if (!urgent && !find_next_page(&page)) { return 0; } @@ -835,6 +1056,20 @@ int coroutine_fn load_state_main(StateLoadCtx *s) goto fail; } + if (s->postcopy) { + res = prepare_postcopy(s); + if (res) { + error_report("Prepare postcopy failed"); + goto fail; + } + /* TODO: Add condition to start postcopy during the cycle below */ + res = start_postcopy(s); + if (res) { + error_report("Postcopy start failed"); + goto fail; + } + } + do { res = qemu_savevm_state_iterate(s->f_fd, false); /* Check for file errors */ @@ -845,8 +1080,11 @@ int coroutine_fn load_state_main(StateLoadCtx *s) goto fail; } + /* If tool is in posctopy mode then vmstate have been already sent */ + if (!s->in_postcopy) { + send_vmstate(s->f_vmstate, s->f_fd); + } - send_vmstate(s->f_vmstate, s->f_fd); qemu_put_byte(s->f_fd, QEMU_VM_EOF); qemu_fflush(s->f_fd); fail: @@ -865,6 +1103,10 @@ void ram_init_state(void) /* Initialize RAM block list head */ QSIMPLEQ_INIT(&ram->ram_block_list); + + /* Initialize postcopy page request queue */ + qemu_mutex_init(&ram->page_req_mutex); + QSIMPLEQ_INIT(&ram->page_req); } /* Destroy snapshot RAM state */ @@ -878,4 +1120,7 @@ void ram_destroy_state(void) g_free(block->bitmap); g_free(block); } + + /* Destroy page request mutex */ + qemu_mutex_destroy(&ram_ctx.page_req_mutex); } diff --git a/migration/savevm.c b/migration/savevm.c index b722e51163..b1320bd813 100644 --- a/migration/savevm.c +++ b/migration/savevm.c @@ -3356,3 +3356,28 @@ void qmp_snapshot_delete(const char *job_id, job_start(&s->common); } + +/* Do preparation before qemu-snapshot tool start postcopy */ +int qemu_snapshot_postcopy_prepare(QEMUFile *f_dest, + uint64_t sps, + uint64_t tps) +{ + uint64_t tmp[2]; + int res; + + /* Send POSTCOPY_ADVISE */ + tmp[0] = cpu_to_be64(sps); + tmp[1] = cpu_to_be64(tps); + qemu_savevm_command_send(f_dest, MIG_CMD_POSTCOPY_ADVISE, 16, (uint8_t *) tmp); + + /* Open return path on destination */ + qemu_savevm_command_send(f_dest, MIG_CMD_OPEN_RETURN_PATH, 0, NULL); + + /* + * Check for file errors after sending POSTCOPY_ADVISE command + * since destination may already have closed input pipe in case + * postcopy had not been enabled in advance. + */ + res = qemu_file_get_error(f_dest); + return res; +} diff --git a/migration/savevm.h b/migration/savevm.h index 9abfcd88e5..94b6f60496 100644 --- a/migration/savevm.h +++ b/migration/savevm.h @@ -72,4 +72,8 @@ int qemu_loadvm_section_part_end(QEMUFile *f, MigrationIncomingState *mis); int qemu_replace_ram_handler(void *ram_ops, void *opaque); +int qemu_snapshot_postcopy_prepare(QEMUFile *f_dest, + uint64_t sps, + uint64_t tps); + #endif diff --git a/qemu-snapshot.c b/qemu-snapshot.c index 04bda74fb4..893086eb8a 100644 --- a/qemu-snapshot.c +++ b/qemu-snapshot.c @@ -239,6 +239,7 @@ static void coroutine_fn snapshot_load_co(void *opaque) StateLoadCtx *s = get_load_context(); int res; QIOChannel *ioc_fd; + QIOChannel *ioc_rp_fd; init_load_context(); @@ -260,8 +261,17 @@ static void coroutine_fn snapshot_load_co(void *opaque) object_unref(OBJECT(ioc_fd)); qemu_file_set_blocking(s->f_fd, false); + /* qemufile on return path fd if we are going to use postcopy */ + if (params.postcopy) { + ioc_rp_fd = qio_channel_new_fd(params.rp_fd, NULL); + qio_channel_set_name(QIO_CHANNEL(ioc_fd), "migration-channel-rp"); + s->f_rp_fd = qemu_fopen_channel_input(ioc_rp_fd); + object_unref(OBJECT(ioc_rp_fd)); + } + s->state_parameters = *((StateInfo *) opaque); + s->postcopy = params.postcopy; res = load_state_main(s); if (res) { error_report("Failed to load snapshot: %s", strerror(-res)); -- 2.31.1