Jules Wang <junqing.w...@cs2c.com.cn> wrote: > The receiver does migration loop until the migration connection is > lost. Then, it is started as a backup. > > The receiver does not load vm state once a migration begins, > instead, it perfetches one whole migration data into a buffer, > then loads vm state from that buffer afterwards. > > Signed-off-by: Jules Wang <junqing.w...@cs2c.com.cn> > --- > include/migration/qemu-file.h | 1 + > include/sysemu/sysemu.h | 1 + > migration.c | 22 ++++-- > savevm.c | 154 > ++++++++++++++++++++++++++++++++++++++++-- > 4 files changed, 168 insertions(+), 10 deletions(-) > > diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h > index 0f757fb..f01ff10 100644 > --- a/include/migration/qemu-file.h > +++ b/include/migration/qemu-file.h > @@ -92,6 +92,7 @@ typedef struct QEMUFileOps { > QEMURamHookFunc *after_ram_iterate; > QEMURamHookFunc *hook_ram_load; > QEMURamSaveFunc *save_page; > + QEMUFileGetBufferFunc *get_prefetch_buffer; > } QEMUFileOps; > > QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops); > diff --git a/include/sysemu/sysemu.h b/include/sysemu/sysemu.h > index b1aa059..44f23d0 100644 > --- a/include/sysemu/sysemu.h > +++ b/include/sysemu/sysemu.h > @@ -81,6 +81,7 @@ void qemu_savevm_state_complete(QEMUFile *f); > void qemu_savevm_state_cancel(void); > uint64_t qemu_savevm_state_pending(QEMUFile *f, uint64_t max_size); > int qemu_loadvm_state(QEMUFile *f); > +int qemu_loadvm_state_ft(QEMUFile *f); > > /* SLIRP */ > void do_info_slirp(Monitor *mon); > diff --git a/migration.c b/migration.c > index d8a9b2d..9be22a4 100644 > --- a/migration.c > +++ b/migration.c > @@ -19,6 +19,7 @@ > #include "monitor/monitor.h" > #include "migration/qemu-file.h" > #include "sysemu/sysemu.h" > +#include "sysemu/cpus.h" > #include "block/block.h" > #include "qemu/sockets.h" > #include "migration/block.h" > @@ -112,13 +113,24 @@ static void process_incoming_migration_co(void *opaque) > { > QEMUFile *f = opaque; > int ret; > + int count = 0; > > - ret = qemu_loadvm_state(f); > - qemu_fclose(f); > - if (ret < 0) { > - fprintf(stderr, "load of migration failed\n"); > - exit(EXIT_FAILURE); > + if (ft_enabled()) { > + while (qemu_loadvm_state_ft(f) >= 0) { > + count++; > + DPRINTF("incoming count %d\r", count); > + } > + qemu_fclose(f); > + fprintf(stderr, "ft connection lost, launching self..\n");
Obviously, here we are needing something more that an fprintf,, right? We are not checking either if it is one error. > + } else { > + ret = qemu_loadvm_state(f); > + qemu_fclose(f); > + if (ret < 0) { > + fprintf(stderr, "load of migration failed\n"); > + exit(EXIT_FAILURE); > + } > } > + cpu_synchronize_all_post_init(); > qemu_announce_self(); > DPRINTF("successfully loaded vm state\n"); > > diff --git a/savevm.c b/savevm.c > index 6daf690..d5bf153 100644 > --- a/savevm.c > +++ b/savevm.c > @@ -52,6 +52,8 @@ > #define ARP_PTYPE_IP 0x0800 > #define ARP_OP_REQUEST_REV 0x3 > > +#define PFB_SIZE 0x010000 > + > static int announce_self_create(uint8_t *buf, > uint8_t *mac_addr) > { > @@ -135,6 +137,10 @@ struct QEMUFile { > unsigned int iovcnt; > > int last_error; > + > + uint8_t *pfb; /* pfb -> PerFetch Buffer */ s/PreFetch/Prefetcth/ prefetch_buffer as name? not used in so many places, makes things clearer or more convoluted? Other comments? > +static int socket_get_prefetch_buffer(void *opaque, uint8_t *buf, > + int64_t pos, int size) > +{ > + QEMUFile *f = opaque; > + > + if (f->pfb_size - pos <= 0) { > + return 0; > + } > + > + if (f->pfb_size - pos < size) { > + size = f->pfb_size - pos; > + } > + > + memcpy(buf, f->pfb+pos, size); > + > + return size; > +} > + > + > static int socket_close(void *opaque) > { > QEMUFileSocket *s = opaque; > @@ -440,6 +465,7 @@ QEMUFile *qemu_fdopen(int fd, const char *mode) > static const QEMUFileOps socket_read_ops = { > .get_fd = socket_get_fd, > .get_buffer = socket_get_buffer, > + .get_prefetch_buffer = socket_get_prefetch_buffer, > .close = socket_close > }; > > if (f->last_error) { > ret = f->last_error; > } > + > + if (f->pfb) { > + g_free(f->pfb); g_free(f->pfb); It already checks for NULL. > + } > + > g_free(f); > return ret; > } > @@ -822,6 +853,14 @@ void qemu_put_byte(QEMUFile *f, int v) > > static void qemu_file_skip(QEMUFile *f, int size) > { > + if (f->pfb_index + size <= f->pfb_size) { > + f->pfb_index += size; > + return; > + } else { > + size -= f->pfb_size - f->pfb_index; > + f->pfb_index = f->pfb_size; > + } > + > if (f->buf_index + size <= f->buf_size) { > f->buf_index += size; > } > @@ -831,6 +870,21 @@ static int qemu_peek_buffer(QEMUFile *f, uint8_t *buf, > int size, size_t offset) > { > int pending; > int index; > + int done; > + > + if (f->ops->get_prefetch_buffer) { > + if (f->pfb_index + offset < f->pfb_size) { > + done = f->ops->get_prefetch_buffer(f, buf, f->pfb_index + offset, > + size); > + if (done == size) { > + return size; > + } > + size -= done; > + buf += done; > + } else { > + offset -= f->pfb_size - f->pfb_index; > + } > + } > > assert(!qemu_file_is_writable(f)); > > @@ -875,7 +929,15 @@ int qemu_get_buffer(QEMUFile *f, uint8_t *buf, int size) > > static int qemu_peek_byte(QEMUFile *f, int offset) > { > - int index = f->buf_index + offset; > + int index; > + > + if (f->pfb_index + offset < f->pfb_size) { > + return f->pfb[f->pfb_index + offset]; > + } else { > + offset -= f->pfb_size - f->pfb_index; > + } > + > + index = f->buf_index + offset; > > assert(!qemu_file_is_writable(f)); > > @@ -1851,7 +1913,7 @@ void qemu_savevm_state_begin(QEMUFile *f, > } > se->ops->set_params(params, se->opaque); > } > - > + > qemu_put_be32(f, QEMU_VM_FILE_MAGIC); > qemu_put_be32(f, QEMU_VM_FILE_VERSION); > > @@ -2294,8 +2356,6 @@ int qemu_loadvm_state(QEMUFile *f) > } > } > > - cpu_synchronize_all_post_init(); > - > ret = 0; > > out: > @@ -2311,6 +2371,89 @@ out: > return ret; > } > > +int qemu_loadvm_state_ft(QEMUFile *f) > +{ > + int ret = 0; > + int i = 0; > + int j = 0; > + int done = 0; > + uint64_t size = 0; > + uint64_t count = 0; > + uint8_t *pfb = NULL; > + uint8_t *buf = NULL; > + > + uint64_t max_mem = last_ram_offset() * 1.5; > + > + if (!f->ops->get_prefetch_buffer) { > + fprintf(stderr, "Fault tolerant is not supported by this > protocol.\n"); > + return EINVAL; > + } > + > + size = PFB_SIZE; > + pfb = g_malloc(size); > + > + while (true) { > + if (count + TARGET_PAGE_SIZE >= size) { > + if (size*2 > max_mem) { > + fprintf(stderr, "qemu_loadvm_state_ft: warning:" \ > + "Prefetch buffer becomes too large.\n" \ > + "Fault tolerant is unstable when you see this,\n" \ > + "please increase the bandwidth or increase " \ > + "the max down time.\n"); > + break; > + } > + size = size * 2; > + buf = g_try_realloc(pfb, size); > + if (!buf) { > + error_report("qemu_loadvm_state_ft: out of memory.\n"); > + g_free(pfb); > + return ENOMEM; You are not handling this error in the caller. Notice that qemu normally > + } > + > + pfb = buf; > + } > + > + done = qemu_get_buffer(f, pfb + count, TARGET_PAGE_SIZE); > + > + ret = qemu_file_get_error(f); > + if (ret != 0) { > + g_free(pfb); > + return ret; > + } > + > + buf = pfb + count; > + count += done; > + for (i = 0; i < done; i++) { > + if (buf[i] != 0xfe) { > + continue; > + } > + if (buf[i-1] != 0xCa) { > + continue; > + } > + if (buf[i-2] != 0xed) { > + continue; > + } > + if (buf[i-3] == 0xFe) { > + goto out; > + } Using consistent capitalation here? Better way to look for the signature? Or, what happens if it just happens that the data contains that magic constant? > + } > + } > + out: > + if (f->pfb) { > + free(f->pfb); > + } > + f->pfb_size = count; > + f->pfb_index = 0; > + f->pfb = pfb; > + > + ret = qemu_loadvm_state(f); > + > + /* Skip magic number */ > + qemu_get_be32(f); > + > + return ret; > +} > + > static BlockDriverState *find_vmstate_bs(void) > { > BlockDriverState *bs = NULL; > @@ -2419,6 +2562,7 @@ void do_savevm(Monitor *mon, const QDict *qdict) > goto the_end; > } > ret = qemu_savevm_state(f); > + cpu_synchronize_all_post_init(); > vm_state_size = qemu_ftell(f); > qemu_fclose(f); > if (ret < 0) {