i) Dynamically decide appropriate source and destination ip pairs for the corresponding multi-FD channel to be connected.
ii) Removed the support for setting the number of multi-fd channels from qmp commands. As now all multiFD parameters will be passed via qmp: migrate command or incoming flag itself. Suggested-by: Manish Mishra <manish.mis...@nutanix.com> Signed-off-by: Het Gala <het.g...@nutanix.com> --- migration/migration.c | 15 --------------- migration/migration.h | 1 - migration/multifd.c | 42 +++++++++++++++++++++--------------------- migration/socket.c | 42 +++++++++++++++++++++++++++++++++--------- migration/socket.h | 4 +++- monitor/hmp-cmds.c | 4 ---- qapi/migration.json | 6 ------ 7 files changed, 57 insertions(+), 57 deletions(-) diff --git a/migration/migration.c b/migration/migration.c index 9b0ad732e7..57dd4494b4 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -1585,9 +1585,6 @@ static void migrate_params_test_apply(MigrateSetParameters *params, if (params->has_block_incremental) { dest->block_incremental = params->block_incremental; } - if (params->has_multifd_channels) { - dest->multifd_channels = params->multifd_channels; - } if (params->has_multifd_compression) { dest->multifd_compression = params->multifd_compression; } @@ -1702,9 +1699,6 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp) if (params->has_block_incremental) { s->parameters.block_incremental = params->block_incremental; } - if (params->has_multifd_channels) { - s->parameters.multifd_channels = params->multifd_channels; - } if (params->has_multifd_compression) { s->parameters.multifd_compression = params->multifd_compression; } @@ -2686,15 +2680,6 @@ bool migrate_pause_before_switchover(void) MIGRATION_CAPABILITY_PAUSE_BEFORE_SWITCHOVER]; } -int migrate_multifd_channels(void) -{ - MigrationState *s; - - s = migrate_get_current(); - - return s->parameters.multifd_channels; -} - MultiFDCompression migrate_multifd_compression(void) { MigrationState *s; diff --git a/migration/migration.h b/migration/migration.h index fa8717ec9e..9464de8ef7 100644 --- a/migration/migration.h +++ b/migration/migration.h @@ -372,7 +372,6 @@ bool migrate_validate_uuid(void); bool migrate_auto_converge(void); bool migrate_use_multifd(void); bool migrate_pause_before_switchover(void); -int migrate_multifd_channels(void); MultiFDCompression migrate_multifd_compression(void); int migrate_multifd_zlib_level(void); int migrate_multifd_zstd_level(void); diff --git a/migration/multifd.c b/migration/multifd.c index 9282ab6aa4..ce017436fb 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -225,7 +225,7 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) return -1; } - if (msg.id > migrate_multifd_channels()) { + if (msg.id > total_multifd_channels()) { error_setg(errp, "multifd: received channel version %u " "expected %u", msg.version, MULTIFD_VERSION); return -1; @@ -410,8 +410,8 @@ static int multifd_send_pages(QEMUFile *f) * using more channels, so ensure it doesn't overflow if the * limit is lower now. */ - next_channel %= migrate_multifd_channels(); - for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { + next_channel %= total_multifd_channels(); + for (i = next_channel;; i = (i + 1) % total_multifd_channels()) { p = &multifd_send_state->params[i]; qemu_mutex_lock(&p->mutex); @@ -422,7 +422,7 @@ static int multifd_send_pages(QEMUFile *f) } if (!p->pending_job) { p->pending_job++; - next_channel = (i + 1) % migrate_multifd_channels(); + next_channel = (i + 1) % total_multifd_channels(); break; } qemu_mutex_unlock(&p->mutex); @@ -500,7 +500,7 @@ static void multifd_send_terminate_threads(Error *err) return; } - for (i = 0; i < migrate_multifd_channels(); i++) { + for (i = 0; i < total_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; qemu_mutex_lock(&p->mutex); @@ -521,14 +521,14 @@ void multifd_save_cleanup(void) return; } multifd_send_terminate_threads(NULL); - for (i = 0; i < migrate_multifd_channels(); i++) { + for (i = 0; i < total_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; if (p->running) { qemu_thread_join(&p->thread); } } - for (i = 0; i < migrate_multifd_channels(); i++) { + for (i = 0; i < total_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; Error *local_err = NULL; @@ -594,7 +594,7 @@ int multifd_send_sync_main(QEMUFile *f) flush_zero_copy = migrate_use_zero_copy_send(); - for (i = 0; i < migrate_multifd_channels(); i++) { + for (i = 0; i < total_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; trace_multifd_send_sync_main_signal(p->id); @@ -627,7 +627,7 @@ int multifd_send_sync_main(QEMUFile *f) } } } - for (i = 0; i < migrate_multifd_channels(); i++) { + for (i = 0; i < total_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; trace_multifd_send_sync_main_wait(p->id); @@ -903,7 +903,7 @@ int multifd_save_setup(Error **errp) int thread_count; uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); uint8_t i; - + int idx; if (!migrate_use_multifd()) { return 0; } @@ -912,7 +912,7 @@ int multifd_save_setup(Error **errp) return -1; } - thread_count = migrate_multifd_channels(); + thread_count = total_multifd_channels(); multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); multifd_send_state->pages = multifd_pages_init(page_count); @@ -945,8 +945,8 @@ int multifd_save_setup(Error **errp) } else { p->write_flags = 0; } - - socket_send_channel_create(multifd_new_send_channel_async, p); + idx = multifd_index(i); + socket_send_channel_create(multifd_new_send_channel_async, p, idx); } for (i = 0; i < thread_count; i++) { @@ -991,7 +991,7 @@ static void multifd_recv_terminate_threads(Error *err) } } - for (i = 0; i < migrate_multifd_channels(); i++) { + for (i = 0; i < total_multifd_channels(); i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; qemu_mutex_lock(&p->mutex); @@ -1017,7 +1017,7 @@ int multifd_load_cleanup(Error **errp) return 0; } multifd_recv_terminate_threads(NULL); - for (i = 0; i < migrate_multifd_channels(); i++) { + for (i = 0; i < total_multifd_channels(); i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; if (p->running) { @@ -1030,7 +1030,7 @@ int multifd_load_cleanup(Error **errp) qemu_thread_join(&p->thread); } } - for (i = 0; i < migrate_multifd_channels(); i++) { + for (i = 0; i < total_multifd_channels(); i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; migration_ioc_unregister_yank(p->c); @@ -1065,13 +1065,13 @@ void multifd_recv_sync_main(void) if (!migrate_use_multifd()) { return; } - for (i = 0; i < migrate_multifd_channels(); i++) { + for (i = 0; i < total_multifd_channels(); i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; trace_multifd_recv_sync_main_wait(p->id); qemu_sem_wait(&multifd_recv_state->sem_sync); } - for (i = 0; i < migrate_multifd_channels(); i++) { + for (i = 0; i < total_multifd_channels(); i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; WITH_QEMU_LOCK_GUARD(&p->mutex) { @@ -1166,7 +1166,7 @@ int multifd_load_setup(Error **errp) error_setg(errp, "multifd is not supported by current protocol"); return -1; } - thread_count = migrate_multifd_channels(); + thread_count = total_multifd_channels(); multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); qatomic_set(&multifd_recv_state->count, 0); @@ -1204,7 +1204,7 @@ int multifd_load_setup(Error **errp) bool multifd_recv_all_channels_created(void) { - int thread_count = migrate_multifd_channels(); + int thread_count = total_multifd_channels(); if (!migrate_use_multifd()) { return true; @@ -1259,5 +1259,5 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) QEMU_THREAD_JOINABLE); qatomic_inc(&multifd_recv_state->count); return qatomic_read(&multifd_recv_state->count) == - migrate_multifd_channels(); + total_multifd_channels(); } diff --git a/migration/socket.c b/migration/socket.c index d0cb7cc6a6..c0ac6dbbe2 100644 --- a/migration/socket.c +++ b/migration/socket.c @@ -28,9 +28,6 @@ #include "trace.h" -struct SocketOutgoingArgs { - SocketAddress *saddr; -} outgoing_args; struct SocketArgs { struct SrcDestAddr data; @@ -43,20 +40,47 @@ struct OutgoingMigrateParams { uint64_t total_multifd_channel; } outgoing_migrate_params; -void socket_send_channel_create(QIOTaskFunc f, void *data) + +int total_multifd_channels(void) +{ + return outgoing_migrate_params.total_multifd_channel; +} + +int multifd_index(int i) +{ + int length = outgoing_migrate_params.length; + int j = 0; + int runn_sum = 0; + while (j < length) { + runn_sum += outgoing_migrate_params.socket_args[j].multifd_channels; + if (i >= runn_sum) { + j++; + } else { + break; + } + } + return j; +} + +void socket_send_channel_create(QIOTaskFunc f, void *data, int idx) { QIOChannelSocket *sioc = qio_channel_socket_new(); - qio_channel_socket_connect_async(sioc, outgoing_args.saddr, - f, data, NULL, NULL, NULL); + qio_channel_socket_connect_async(sioc, + outgoing_migrate_params.socket_args[idx].data.dst_addr, + f, data, NULL, NULL, + outgoing_migrate_params.socket_args[idx].data.src_addr); } int socket_send_channel_destroy(QIOChannel *send) { /* Remove channel */ object_unref(OBJECT(send)); - if (outgoing_args.saddr) { - qapi_free_SocketAddress(outgoing_args.saddr); - outgoing_args.saddr = NULL; + if (outgoing_migrate_params.socket_args != NULL) { + g_free(outgoing_migrate_params.socket_args); + outgoing_migrate_params.socket_args = NULL; + } + if (outgoing_migrate_params.length) { + outgoing_migrate_params.length = 0; } if (outgoing_migrate_params.socket_args != NULL) { diff --git a/migration/socket.h b/migration/socket.h index b9e3699167..c8b9252384 100644 --- a/migration/socket.h +++ b/migration/socket.h @@ -27,7 +27,9 @@ struct SrcDestAddr { SocketAddress *src_addr; }; -void socket_send_channel_create(QIOTaskFunc f, void *data); +int total_multifd_channels(void); +int multifd_index(int i); +void socket_send_channel_create(QIOTaskFunc f, void *data, int idx); int socket_send_channel_destroy(QIOChannel *send); void socket_start_incoming_migration(const char *str, uint8_t number, diff --git a/monitor/hmp-cmds.c b/monitor/hmp-cmds.c index 32a6b67d5f..9a3d76d6ba 100644 --- a/monitor/hmp-cmds.c +++ b/monitor/hmp-cmds.c @@ -1281,10 +1281,6 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict) p->has_block_incremental = true; visit_type_bool(v, param, &p->block_incremental, &err); break; - case MIGRATION_PARAMETER_MULTIFD_CHANNELS: - p->has_multifd_channels = true; - visit_type_uint8(v, param, &p->multifd_channels, &err); - break; case MIGRATION_PARAMETER_MULTIFD_COMPRESSION: p->has_multifd_compression = true; visit_type_MultiFDCompression(v, param, &p->multifd_compression, diff --git a/qapi/migration.json b/qapi/migration.json index 62a7b22d19..1b1c6d01d3 100644 --- a/qapi/migration.json +++ b/qapi/migration.json @@ -877,11 +877,6 @@ # migrated and the destination must already have access to the # same backing chain as was used on the source. (since 2.10) # -# @multifd-channels: Number of channels used to migrate data in -# parallel. This is the same number that the -# number of sockets used for migration. The -# default value is 2 (since 4.0) -# # @xbzrle-cache-size: cache size to be used by XBZRLE migration. It # needs to be a multiple of the target page size # and a power of 2 @@ -965,7 +960,6 @@ '*x-checkpoint-delay': { 'type': 'uint32', 'features': [ 'unstable' ] }, '*block-incremental': 'bool', - '*multifd-channels': 'uint8', '*xbzrle-cache-size': 'size', '*max-postcopy-bandwidth': 'size', '*max-cpu-throttle': 'uint8', -- 2.22.3