We need communications protocol of user-defined to control the checkpoint process.
The new checkpoint request is started by Primary VM, and the interactive process like below: Checkpoint synchronizing points, Primary Secondary NEW @ Suspend SUSPENDED @ Suspend&Save state SEND @ Send state Receive state RECEIVED @ Flush network Load state LOADED @ Resume Resume Start Comparing NOTE: 1) '@' who sends the message 2) Every sync-point is synchronized by two sides with only one handshake(single direction) for low-latency. If more strict synchronization is required, a opposite direction sync-point should be added. 3) Since sync-points are single direction, the remote side may go forward a lot when this side just receives the sync-point. Signed-off-by: Yang Hongyang <yan...@cn.fujitsu.com> Signed-off-by: zhanghailiang <zhang.zhanghaili...@huawei.com> Signed-off-by: Li Zhijian <lizhij...@cn.fujitsu.com> Signed-off-by: Gonglei <arei.gong...@huawei.com> --- migration/colo.c | 248 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- trace-events | 3 +- 2 files changed, 248 insertions(+), 3 deletions(-) diff --git a/migration/colo.c b/migration/colo.c index 364e0dd..4ba6f65 100644 --- a/migration/colo.c +++ b/migration/colo.c @@ -14,6 +14,55 @@ #include "migration/colo.h" #include "trace.h" #include "qemu/error-report.h" +#include "qemu/sockets.h" + +/* Fix me: Convert to use QAPI */ +typedef enum COLOCommand { + COLO_CHECPOINT_READY = 0x46, + + /* + * Checkpoint synchronizing points. + * + * Primary Secondary + * NEW @ + * Suspend + * SUSPENDED @ + * Suspend&Save state + * SEND @ + * Send state Receive state + * RECEIVED @ + * Flush network Load state + * LOADED @ + * Resume Resume + * + * Start Comparing + * NOTE: + * 1) '@' who sends the message + * 2) Every sync-point is synchronized by two sides with only + * one handshake(single direction) for low-latency. + * If more strict synchronization is required, a opposite direction + * sync-point should be added. + * 3) Since sync-points are single direction, the remote side may + * go forward a lot when this side just receives the sync-point. + */ + COLO_CHECKPOINT_NEW, + COLO_CHECKPOINT_SUSPENDED, + COLO_CHECKPOINT_SEND, + COLO_CHECKPOINT_RECEIVED, + COLO_CHECKPOINT_LOADED, + + COLO_CHECKPOINT_MAX +} COLOCommand; + +const char * const COLOCommand_lookup[] = { + [COLO_CHECPOINT_READY] = "checkpoint-ready", + [COLO_CHECKPOINT_NEW] = "checkpoint-new", + [COLO_CHECKPOINT_SUSPENDED] = "checkpoint-suspend", + [COLO_CHECKPOINT_SEND] = "checheckpoint-send", + [COLO_CHECKPOINT_RECEIVED] = "checkpoint-received", + [COLO_CHECKPOINT_LOADED] = "checkpoint-loaded", + [COLO_CHECKPOINT_MAX] = NULL, +}; static QEMUBH *colo_bh; @@ -36,20 +85,137 @@ bool migration_incoming_in_colo_state(void) return (mis && (mis->state == MIGRATION_STATUS_COLO)); } +/* colo checkpoint control helper */ +static int colo_ctl_put(QEMUFile *f, uint64_t request) +{ + int ret = 0; + + qemu_put_be64(f, request); + qemu_fflush(f); + + ret = qemu_file_get_error(f); + if (request < COLO_CHECKPOINT_MAX) { + trace_colo_ctl_put(COLOCommand_lookup[request]); + } + return ret; +} + +static int colo_ctl_get_value(QEMUFile *f, uint64_t *value) +{ + int ret = 0; + uint64_t temp; + + temp = qemu_get_be64(f); + + ret = qemu_file_get_error(f); + if (ret < 0) { + return -1; + } + + *value = temp; + return 0; +} + +static int colo_ctl_get(QEMUFile *f, uint64_t require) +{ + int ret; + uint64_t value; + + ret = colo_ctl_get_value(f, &value); + if (ret < 0) { + return ret; + } + + if (value != require) { + error_report("unexpected state! expected: %"PRIu64 + ", received: %"PRIu64, require, value); + exit(1); + } + + trace_colo_ctl_get(COLOCommand_lookup[require]); + return ret; +} + +static int colo_do_checkpoint_transaction(MigrationState *s, QEMUFile *control) +{ + int ret; + + ret = colo_ctl_put(s->file, COLO_CHECKPOINT_NEW); + if (ret < 0) { + goto out; + } + + ret = colo_ctl_get(control, COLO_CHECKPOINT_SUSPENDED); + if (ret < 0) { + goto out; + } + + /* TODO: suspend and save vm state to colo buffer */ + + ret = colo_ctl_put(s->file, COLO_CHECKPOINT_SEND); + if (ret < 0) { + goto out; + } + + /* TODO: send vmstate to Secondary */ + + ret = colo_ctl_get(control, COLO_CHECKPOINT_RECEIVED); + if (ret < 0) { + goto out; + } + + ret = colo_ctl_get(control, COLO_CHECKPOINT_LOADED); + if (ret < 0) { + goto out; + } + + /* TODO: resume Primary */ + +out: + return ret; +} + static void *colo_thread(void *opaque) { MigrationState *s = opaque; + QEMUFile *colo_control = NULL; + int ret; + + colo_control = qemu_fopen_socket(qemu_get_fd(s->file), "rb"); + if (!colo_control) { + error_report("Open colo_control failed!"); + goto out; + } + + /* + * Wait for Secondary finish loading vm states and enter COLO + * restore. + */ + ret = colo_ctl_get(colo_control, COLO_CHECPOINT_READY); + if (ret < 0) { + goto out; + } qemu_mutex_lock_iothread(); vm_start(); qemu_mutex_unlock_iothread(); trace_colo_vm_state_change("stop", "run"); - /*TODO: COLO checkpoint savevm loop*/ + while (s->state == MIGRATION_STATUS_COLO) { + /* start a colo checkpoint */ + if (colo_do_checkpoint_transaction(s, colo_control)) { + goto out; + } + } +out: migrate_set_state(&s->state, MIGRATION_STATUS_COLO, MIGRATION_STATUS_COMPLETED); + if (colo_control) { + qemu_fclose(colo_control); + } + qemu_mutex_lock_iothread(); qemu_bh_schedule(s->cleanup_bh); qemu_mutex_unlock_iothread(); @@ -83,15 +249,93 @@ void colo_init_checkpointer(MigrationState *s) qemu_bh_schedule(colo_bh); } +/* + * return: + * 0: start a checkpoint + * -1: some error happened, exit colo restore + */ +static int colo_wait_handle_cmd(QEMUFile *f, int *checkpoint_request) +{ + int ret; + uint64_t cmd; + + ret = colo_ctl_get_value(f, &cmd); + if (ret < 0) { + return -1; + } + + switch (cmd) { + case COLO_CHECKPOINT_NEW: + *checkpoint_request = 1; + return 0; + default: + return -1; + } +} + void *colo_process_incoming_checkpoints(void *opaque) { MigrationIncomingState *mis = opaque; + QEMUFile *f = mis->file; + int fd = qemu_get_fd(f); + QEMUFile *ctl = NULL; + int ret; migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE, MIGRATION_STATUS_COLO); - /* TODO: COLO checkpoint restore loop */ + ctl = qemu_fopen_socket(fd, "wb"); + if (!ctl) { + error_report("Can't open incoming channel!"); + goto out; + } + ret = colo_ctl_put(ctl, COLO_CHECPOINT_READY); + if (ret < 0) { + goto out; + } + /* TODO: in COLO mode, Secondary is runing, so start the vm */ + while (mis->state == MIGRATION_STATUS_COLO) { + int request = 0; + int ret = colo_wait_handle_cmd(f, &request); + + if (ret < 0) { + break; + } else { + if (!request) { + continue; + } + } + /* TODO: suspend guest */ + ret = colo_ctl_put(ctl, COLO_CHECKPOINT_SUSPENDED); + if (ret < 0) { + goto out; + } + + ret = colo_ctl_get(f, COLO_CHECKPOINT_SEND); + if (ret < 0) { + goto out; + } + + /* TODO: read migration data into colo buffer */ + + ret = colo_ctl_put(ctl, COLO_CHECKPOINT_RECEIVED); + if (ret < 0) { + goto out; + } + + /* TODO: load vm state */ + + ret = colo_ctl_put(ctl, COLO_CHECKPOINT_LOADED); + if (ret < 0) { + goto out; + } +} + +out: + if (ctl) { + qemu_fclose(ctl); + } migration_incoming_exit_colo(); return NULL; diff --git a/trace-events b/trace-events index 025d71c..4487633 100644 --- a/trace-events +++ b/trace-events @@ -1473,7 +1473,8 @@ rdma_start_outgoing_migration_after_rdma_source_init(void) "" # migration/colo.c colo_vm_state_change(const char *old, const char *new) "Change '%s' => '%s'" -colo_receive_message(const char *msg) "Receive '%s'" +colo_ctl_put(const char *msg) "Send '%s'" +colo_ctl_get(const char *msg) "Receive '%s'" # kvm-all.c kvm_ioctl(int type, void *arg) "type 0x%x, arg %p" -- 1.8.3.1