2010/6/2 Anthony Liguori <aligu...@linux.vnet.ibm.com>: > On 06/01/2010 10:40 AM, Yoshiaki Tamura wrote: >> >> Create a thread to handle tcp incoming migration when CONFIG_IOTHREAD >> is enabled. Spawned thread writes it's return status to th_fds[1] >> before exit, and main thread joins and reads it. In >> tcp_start_incoming_migration(), allocate FdMigrationState and return >> MigrationState to let migration to print incoming migration info. >> > > In the absence of any locking, I can't see how this is safe.
Right. If we use threading here, we need to prevent commands from monitor that affects incoming thread. Thanks, Yoshi > > Regards, > > Anthony Liguori > >> Signed-off-by: Yoshiaki Tamura<tamura.yoshi...@lab.ntt.co.jp> >> --- >> migration-tcp.c | 86 >> ++++++++++++++++++++++++++++++++++++++++++++++--------- >> migration.h | 2 +- >> 2 files changed, 73 insertions(+), 15 deletions(-) >> >> diff --git a/migration-tcp.c b/migration-tcp.c >> index 95ce722..f20e5fe 100644 >> --- a/migration-tcp.c >> +++ b/migration-tcp.c >> @@ -18,6 +18,7 @@ >> #include "sysemu.h" >> #include "buffered_file.h" >> #include "block.h" >> +#include "qemu-thread.h" >> >> //#define DEBUG_MIGRATION_TCP >> >> @@ -29,6 +30,11 @@ >> do { } while (0) >> #endif >> >> +#ifdef CONFIG_IOTHREAD >> +static QemuThread migration_thread; >> +static int th_fds[2]; >> +#endif >> + >> static int socket_errno(FdMigrationState *s) >> { >> return socket_error(); >> @@ -176,41 +182,93 @@ static void tcp_accept_incoming_migration(void >> *opaque) >> out_fopen: >> qemu_fclose(f); >> out: >> +#ifndef CONFIG_IOTHREAD >> qemu_set_fd_handler2(s, NULL, NULL, NULL, NULL); >> +#endif >> close(s); >> close(c); >> +#ifdef CONFIG_IOTHREAD >> + write(th_fds[1],&ret, sizeof(ret)); >> + qemu_thread_exit(NULL); >> +#endif >> +} >> + >> +#ifdef CONFIG_IOTHREAD >> +static void tcp_incoming_migration_complete(void *opaque) >> +{ >> + int ret, state = 0; >> + FdMigrationState *s = opaque; >> + >> + qemu_thread_join(&migration_thread, NULL); >> + >> + ret = read(th_fds[0],&state, sizeof(state)); >> + if (ret == -1) { >> + fprintf(stderr, "failed to read from pipe\n"); >> + goto err; >> + } >> + >> + s->state = state< 0 ? MIG_STATE_ERROR : MIG_STATE_COMPLETED; >> + >> +err: >> + qemu_set_fd_handler2(th_fds[0], NULL, NULL, NULL, NULL); >> + close(th_fds[0]); >> + close(th_fds[1]); >> } >> +#endif >> >> -int tcp_start_incoming_migration(const char *host_port) >> +MigrationState *tcp_start_incoming_migration(const char *host_port) >> { >> struct sockaddr_in addr; >> + FdMigrationState *s; >> int val; >> - int s; >> >> if (parse_host_port(&addr, host_port)< 0) { >> fprintf(stderr, "invalid host/port combination: %s\n", >> host_port); >> - return -EINVAL; >> + return NULL; >> } >> >> - s = qemu_socket(PF_INET, SOCK_STREAM, 0); >> - if (s == -1) >> - return -socket_error(); >> + s = qemu_mallocz(sizeof(*s)); >> + >> + s->get_error = socket_errno; >> + s->close = tcp_close; >> + s->mig_state.cancel = migrate_fd_cancel; >> + s->mig_state.get_status = migrate_fd_get_status; >> + s->state = MIG_STATE_ACTIVE; >> + >> + s->fd = qemu_socket(PF_INET, SOCK_STREAM, 0); >> + if (s->fd == -1) { >> + qemu_free(s); >> + return NULL; >> + } >> >> val = 1; >> - setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char *)&val, >> sizeof(val)); >> + setsockopt(s->fd, SOL_SOCKET, SO_REUSEADDR, >> + (const char *)&val, sizeof(val)); >> >> - if (bind(s, (struct sockaddr *)&addr, sizeof(addr)) == -1) >> + if (bind(s->fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) >> goto err; >> >> - if (listen(s, 1) == -1) >> + if (listen(s->fd, 1) == -1) >> goto err; >> >> - qemu_set_fd_handler2(s, NULL, tcp_accept_incoming_migration, NULL, >> - (void *)(unsigned long)s); >> +#ifdef CONFIG_IOTHREAD >> + if (qemu_pipe(th_fds) == -1) { >> + fprintf(stderr, "failed to create pipe\n"); >> + goto err; >> + } >> >> - return 0; >> + qemu_thread_create(&migration_thread, (void >> *)tcp_accept_incoming_migration, >> + (void *)(unsigned long)s->fd); >> + qemu_set_fd_handler2(th_fds[0], NULL, >> tcp_incoming_migration_complete, NULL, >> + (void *)s); >> +#else >> + qemu_set_fd_handler2(s->fd, NULL, tcp_accept_incoming_migration, >> NULL, >> + (void *)(unsigned long)s->fd); >> +#endif >> + >> + return&s->mig_state; >> >> err: >> - close(s); >> - return -socket_error(); >> + close(s->fd); >> + return NULL; >> } >> diff --git a/migration.h b/migration.h >> index 385423f..c11e6db 100644 >> --- a/migration.h >> +++ b/migration.h >> @@ -76,7 +76,7 @@ MigrationState *exec_start_outgoing_migration(Monitor >> *mon, >> int blk, >> int inc); >> >> -int tcp_start_incoming_migration(const char *host_port); >> +MigrationState *tcp_start_incoming_migration(const char *host_port); >> >> MigrationState *tcp_start_outgoing_migration(Monitor *mon, >> const char *host_port, >> > > >