Add a command queue for the main thread to pass commands to the sessions thread.
Defined an implemented a set of IPC messages for creating, closing and managing jsonrpc sessions from the main process to jsonrpc sessions threads. Add a doorbell mechanism for a jsonrpc session to inform other sessions of possible OVSDB changes. Although all supporting functions for running a multithreading OVSDB server have been implemented, multithread is still not turned on, because 'n_max_threads' is still hard coded to zero. Future patches will remove this restriction. Signed-off-by: Andy Zhou <az...@ovn.org> --- ovsdb/jsonrpc-server.c | 551 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 516 insertions(+), 35 deletions(-) diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c index ed76192..09b89b2 100644 --- a/ovsdb/jsonrpc-server.c +++ b/ovsdb/jsonrpc-server.c @@ -34,6 +34,7 @@ #include "reconnect.h" #include "row.h" #include "server.h" +#include "seq.h" #include "simap.h" #include "stream.h" #include "table.h" @@ -58,7 +59,7 @@ static struct ovs_mutex mutex = OVS_MUTEX_INITIALIZER; static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); /* Session set. */ -static void ovsdb_jsonrpc_sessions_run(struct ovs_list *); +static void ovsdb_jsonrpc_sessions_run(struct ovs_list *, struct seq *); static void ovsdb_jsonrpc_sessions_wait(struct ovs_list *); static void ovsdb_jsonrpc_sessions_close(struct ovs_list *, const void *remote); @@ -115,13 +116,112 @@ static struct json *ovsdb_jsonrpc_monitor_compose_update( struct sessions_thread { pthread_t thread; + struct ovs_list all_sessions; + /* Controls thread exit. */ struct latch exit_latch; + + /* Command queue to receive command from the main thread. */ + struct ovs_mutex cmd_queue_mutex; + struct ovs_list cmd_queue OVS_GUARDED; /* by 'cmd_queue_mutex' */ + struct seq *cmd_queue_seq; + uint64_t last_cmd_seq; + + /* Pointing to ovsdb_jsonrpc_server's doorbell. + * See comment in the definition of ovsdb_jsonrpc_server struct. */ + struct seq *doorbell; }; -static void sessions_thread_init(struct sessions_thread *); +static void sessions_thread_init(struct sessions_thread *, struct seq *); static void sessions_thread_exit(struct sessions_thread *); +static void sessions_thread_command_run(struct sessions_thread *); +static void sessions_thread_command_wait(struct sessions_thread *); +struct ipc_msg; +static void sessions_thread_add_new_session(struct sessions_thread *thread, + struct ipc_msg *); +static void sessions_thread_delete_sessions(struct sessions_thread *thread, + struct ipc_msg *); +static void sessions_thread_reconnect_sessions(struct sessions_thread *thread, + struct ipc_msg *); +static void sessions_thread_stop_and_go(struct ipc_msg *msg); + +/* IPC messages are used to communicate between the main thread and + * jsonrpc sessions thread. + * + * - IPC_ADD_SESSION_COMMAND + * Create a new jsonrpc session for a given 'stream'. + * + * - IPC_DELETE_SESSIONS_COMMAND + * Delete all sessions associated with a given 'remote'. + * + * - IPC_RECONNET_SESSIONS_COMMAND + * Reconnect all sessions associated with a given 'remote'. + * + * - IPC_STOP_AND_GO_COMMAND + * This is generic thread synchronization mechanism that uses + * two ovs_barriers. A 'stop' barrier is used to make sure + * all threads have reached it, before the main thread can + * safely operate on per thread data structure, such as + * access the all_sessions list, while all threads spins + * on the 'go' barrier, until the main thread is done + * and releases the 'go' barrier. + */ +#define IPC_MESSAGES \ + IPC_MSG(IPC_ADD_SESSION_COMMAND) \ + IPC_MSG(IPC_DELETE_SESSIONS_COMMAND) \ + IPC_MSG(IPC_RECONNECT_SESSIONS_COMMAND) \ + IPC_MSG(IPC_STOP_AND_GO_COMMAND) + +enum ipc_msg_type { +#define IPC_MSG(msg) msg, + IPC_MESSAGES +#undef IPC_MSG +}; + +struct ipc_msg { + struct ovs_list list; + size_t size; + enum ipc_msg_type msg_type; +}; + +struct ipc_msg_add_session_command { + struct ipc_msg up; + struct stream *stream; + struct ovsdb_jsonrpc_server *server; + void *remote; + uint8_t dscp; +}; + +struct ipc_msg_delete_sessions_command { + struct ipc_msg up; + const void *remote; + struct ovs_barrier *barrier; +}; + +struct ipc_msg_reconnect_sessions_command { + struct ipc_msg up; + const void *remote; + struct ovs_barrier *barrier; +}; +struct ipc_msg_stop_and_go_command { + struct ipc_msg up; + struct ovs_barrier *stop; + struct ovs_barrier *go; +}; + +static struct ipc_msg *ipc_create_add_session_command( + struct stream* stream, struct ovsdb_jsonrpc_server *, void *remote, + uint8_t dscp); +static struct ipc_msg *ipc_create_delete_sessions_command(const void *remote, + struct ovs_barrier *barrier); +static struct ipc_msg *ipc_create_reconnect_sessions_command( + const void *remote, struct ovs_barrier *barrier); +static struct ipc_msg *ipc_create_stop_and_go_command(struct ovs_barrier *stop, + struct ovs_barrier *go); +static void send_ipc_msg(struct sessions_thread *, struct ipc_msg *); +static void broadcast_ipc_msg(const struct ovsdb_jsonrpc_server *svr, + struct ipc_msg *msg); /* JSON-RPC database server. */ @@ -134,6 +234,18 @@ struct ovsdb_jsonrpc_server { created before multithreading, or connections of active remotes.. */ + /* Doorbell. */ + /* Whenever a jsonrpc session make a change to OVSDB, it should ring + * the doorbell, by calling seq_change() to 'doorbell', so that other + * sessions can notice those changes immediately. + * + * Calling seq_change() to 'doorbell' is similar to calling + * poll_immediate_wake(), except 'doorbell' works across threads; + * it can be rang from any thread, and can be heard by all threads + * that calls seq_wait() of the 'doorbell'. */ + struct seq *doorbell; + uint64_t doorbell_seqno; + /* Threads. */ size_t n_max_threads; size_t n_active_threads; @@ -153,6 +265,16 @@ ovsdb_jsonrpc_server_cast(struct ovsdb_server *s) { #define N_SESSIONS_THREASHHOLD (2) static bool ovsdb_jsonrpc_server_use_threads(struct ovsdb_jsonrpc_server *); +/* Create a new session. The session can be either handled by the process + * (i.e. the main thread) or one of the sessions thread. */ +static void ovsdb_jsonrpc_server_create_session(struct ovsdb_jsonrpc_server *, + struct stream *stream, + struct ovsdb_jsonrpc_remote *); + +/* Close all sessions associated with the 'remote' in all threads */ +static void ovsdb_jsonrpc_server_close_sessions(struct ovsdb_jsonrpc_server *, + const void *remote); + /* A configured remote. This is either a passive stream listener plus a list * of the currently connected sessions, or a list of exactly one active * session. */ @@ -166,7 +288,8 @@ static struct ovsdb_jsonrpc_remote *ovsdb_jsonrpc_server_add_remote( struct ovsdb_jsonrpc_server *, const char *name, const struct ovsdb_jsonrpc_options *options ); -static void ovsdb_jsonrpc_server_del_remote(struct shash_node *); +static void ovsdb_jsonrpc_server_del_remote(struct ovsdb_jsonrpc_server *svr, + struct shash_node *); /* Creates and returns a new server to provide JSON-RPC access to an OVSDB. * @@ -181,6 +304,8 @@ ovsdb_jsonrpc_server_create(size_t n_max_threads) ovsdb_server_init(&server->up); shash_init(&server->remotes); list_init(&server->all_sessions); + server->doorbell = seq_create(); + server->doorbell_seqno = seq_read(server->doorbell); server->n_max_threads = n_max_threads; server->n_active_threads = 0; server->threads = xmalloc(sizeof *server->threads * server->n_max_threads); @@ -230,7 +355,7 @@ ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server *svr) size_t i; SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) { - ovsdb_jsonrpc_server_del_remote(node); + ovsdb_jsonrpc_server_del_remote(svr, node); } for (i = 0; i < svr->n_active_threads; i++) { @@ -239,6 +364,7 @@ ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server *svr) } free(svr->threads); + seq_destroy(svr->doorbell); shash_destroy(&svr->remotes); ovsdb_server_destroy(&svr->up); free(svr); @@ -273,9 +399,9 @@ ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr, if (!options) { VLOG_INFO("%s: remote deconfigured", node->name); - ovsdb_jsonrpc_server_del_remote(node); + ovsdb_jsonrpc_server_del_remote(svr, node); } else if (options->dscp != remote->dscp) { - ovsdb_jsonrpc_server_del_remote(node); + ovsdb_jsonrpc_server_del_remote(svr, node); } } SHASH_FOR_EACH (node, new_remotes) { @@ -323,17 +449,50 @@ ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr, } static void -ovsdb_jsonrpc_server_del_remote(struct shash_node *node) +ovsdb_jsonrpc_server_del_remote(struct ovsdb_jsonrpc_server *svr, + struct shash_node *node) { struct ovsdb_jsonrpc_remote *remote = node->data; struct ovsdb_jsonrpc_server *server = remote->server; - ovsdb_jsonrpc_sessions_close(&server->all_sessions, remote); + /* Remove all sessions associated with remote. */ + ovsdb_jsonrpc_server_close_sessions(svr, remote); pstream_close(remote->listener); shash_delete(&server->remotes, node); free(remote); } +static int +ovsdb_jsonrpc_server_sessions_count(const struct ovsdb_jsonrpc_server *svr, + const struct ovsdb_jsonrpc_remote *remote) +{ + int count = ovsdb_jsonrpc_sessions_count(&svr->all_sessions, remote); + + if (svr->n_active_threads) { + struct ipc_msg *msg; + struct ovs_barrier stop, go; + + ovs_barrier_init(&stop, svr->n_active_threads + 1); + ovs_barrier_init(&go, svr->n_active_threads + 1); + + msg = ipc_create_stop_and_go_command(&stop, &go); + broadcast_ipc_msg(svr, msg); + + ovs_barrier_block(&stop); + int i; + for (i = 0; i < svr->n_active_threads; i++) { + struct ovs_list *sessions = &svr->threads[i].all_sessions; + count += ovsdb_jsonrpc_sessions_count(sessions, remote); + } + ovs_barrier_block(&go); + + ovs_barrier_destroy(&stop); + ovs_barrier_destroy(&go); + } + + return count; +} + /* Stores status information for the remote named 'target', which should have * been configured on 'svr' with a call to ovsdb_jsonrpc_server_set_remotes(), * into '*status'. On success returns true, on failure (if 'svr' doesn't have @@ -356,9 +515,7 @@ ovsdb_jsonrpc_server_get_remote_status( } if (remote->listener) { - int n_connections = ovsdb_jsonrpc_sessions_count(&svr->all_sessions, - remote); - + int n_connections = ovsdb_jsonrpc_server_sessions_count(svr, remote); status->bound_port = pstream_get_bound_port(remote->listener); status->n_connections = n_connections; status->is_connected = (n_connections != 0); @@ -387,6 +544,17 @@ ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *svr) SHASH_FOR_EACH (node, &svr->remotes) { struct ovsdb_jsonrpc_remote *remote = node->data; + if (svr->n_active_threads) { + struct ovs_barrier barrier; + struct ipc_msg *msg; + + ovs_barrier_init(&barrier, svr->n_active_threads + 1); + msg = ipc_create_reconnect_sessions_command(remote, &barrier); + broadcast_ipc_msg(svr, msg); + ovs_barrier_block(&barrier); + ovs_barrier_destroy(&barrier); + } + ovsdb_jsonrpc_sessions_reconnect(&svr->all_sessions, remote); } } @@ -396,6 +564,7 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr) { struct shash_node *node; + svr->doorbell_seqno = seq_read(svr->doorbell); SHASH_FOR_EACH (node, &svr->remotes) { struct ovsdb_jsonrpc_remote *remote = node->data; @@ -405,12 +574,7 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr) error = pstream_accept(remote->listener, &stream); if (!error) { - if (!ovsdb_jsonrpc_server_use_threads(svr)) { - struct jsonrpc_session *js; - js = jsonrpc_session_open_unreliably(jsonrpc_open(stream), - remote->dscp); - ovsdb_jsonrpc_session_create(remote, js); - } + ovsdb_jsonrpc_server_create_session(svr, stream, remote); } else if (error != EAGAIN) { VLOG_WARN_RL(&rl, "%s: accept failed: %s", pstream_get_name(remote->listener), @@ -418,7 +582,7 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr) } } - ovsdb_jsonrpc_sessions_run(&svr->all_sessions); + ovsdb_jsonrpc_sessions_run(&svr->all_sessions, svr->doorbell); } } @@ -436,6 +600,8 @@ ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr) ovsdb_jsonrpc_sessions_wait(&svr->all_sessions); } + + seq_wait(svr->doorbell, svr->doorbell_seqno); } /* Adds some memory usage statistics for 'svr' into 'usage', for use with @@ -463,12 +629,61 @@ ovsdb_jsonrpc_server_use_threads(struct ovsdb_jsonrpc_server *svr) { size_t n_desired_threads = n_sessions / N_SESSIONS_THREASHHOLD; if (n_desired_threads > svr->n_active_threads) { - sessions_thread_init(&svr->threads[svr->n_active_threads++]); + sessions_thread_init(&svr->threads[svr->n_active_threads++], + svr->doorbell); } } return svr->n_active_threads; } + +static void +ovsdb_jsonrpc_server_create_session(struct ovsdb_jsonrpc_server *svr, + struct stream *stream, + struct ovsdb_jsonrpc_remote *remote) +{ + if (!ovsdb_jsonrpc_server_use_threads(svr)) { + struct jsonrpc_session *js; + js = jsonrpc_session_open_unreliably(jsonrpc_open(stream), + remote->dscp); + ovsdb_jsonrpc_session_create(remote, js); + } else { + static size_t next_thread = 0; + struct sessions_thread *thread; + struct ipc_msg *msg; + + /* Using round-robin to pick the next thread for the session. + * This is certainly not the most optimal scheduling algorithm + * for assigning session to threads, but it may be the simplest + * to implement. */ + next_thread = (next_thread + 1) % svr->n_active_threads; + thread = &svr->threads[next_thread]; + + msg = ipc_create_add_session_command(stream, svr, remote, + remote->dscp); + send_ipc_msg(thread, msg); + } +} + +static void +ovsdb_jsonrpc_server_close_sessions(struct ovsdb_jsonrpc_server *svr, + const void *remote) +{ + if (svr->n_active_threads) { + struct ipc_msg *msg; + struct ovs_barrier barrier; + + /* Close sessions in all threads. */ + ovs_barrier_init(&barrier, svr->n_active_threads + 1); + msg = ipc_create_delete_sessions_command(remote, &barrier); + broadcast_ipc_msg(svr, msg); + ovs_barrier_block(&barrier); + ovs_barrier_destroy(&barrier); + } + + /* Close local sessions. */ + ovsdb_jsonrpc_sessions_close(&svr->all_sessions, remote); +} /* JSON-RPC database server session. */ @@ -491,8 +706,8 @@ struct ovsdb_jsonrpc_session { }; static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *); -static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *) - OVS_EXCLUDED(mutex); +static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *, + struct seq *seq) OVS_EXCLUDED(mutex); static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *); static void ovsdb_jsonrpc_session_get_memory_usage( const struct ovsdb_jsonrpc_session *, struct simap *usage); @@ -502,26 +717,52 @@ static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *, struct jsonrpc_msg *); static struct ovsdb_jsonrpc_session * -ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote, - struct jsonrpc_session *js) +ovsdb_jsonrpc_session_create__(void *remote, + struct ovsdb_jsonrpc_server *server, + struct jsonrpc_session *js) { struct ovsdb_jsonrpc_session *s; - struct ovsdb_jsonrpc_server *server = remote->server; s = xzalloc(sizeof *s); ovsdb_session_init(&s->up, &server->up); s->remote = remote; - list_push_back(&server->all_sessions, &s->node); hmap_init(&s->triggers); hmap_init(&s->monitors); s->js = js; s->js_seqno = jsonrpc_session_get_seqno(js); - atomic_count_inc(&server->n_sessions); return s; } +static struct ovsdb_jsonrpc_session * +ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote, + struct jsonrpc_session *js) +{ + struct ovsdb_jsonrpc_session *s; + struct ovsdb_jsonrpc_server *server = remote->server; + + s = ovsdb_jsonrpc_session_create__(remote, server, js); + list_push_back(&server->all_sessions, &s->node); + + return s; +} + +static struct ovsdb_jsonrpc_session * +ovsdb_jsonrpc_thread_session_create(struct sessions_thread *thread, + void *remote, + struct ovsdb_jsonrpc_server *server, + struct jsonrpc_session *js) +{ + struct ovsdb_jsonrpc_session *s; + + s = ovsdb_jsonrpc_session_create__(remote, server, js); + list_push_back(&thread->all_sessions, &s->node); + + return s; +} + + static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s) { @@ -547,7 +788,7 @@ ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s) } static int -ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s) +ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s, struct seq *doorbell) OVS_EXCLUDED(mutex) { jsonrpc_session_run(s->js); @@ -570,6 +811,7 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s) if (msg) { if (msg->type == JSONRPC_REQUEST) { ovsdb_jsonrpc_session_got_request(s, msg); + seq_change(doorbell); } else if (msg->type == JSONRPC_NOTIFY) { ovsdb_jsonrpc_session_got_notify(s, msg); } else { @@ -626,6 +868,29 @@ ovsdb_jsonrpc_session_get_memory_usage_all( LIST_FOR_EACH (s, node, &svr->all_sessions) { ovsdb_jsonrpc_session_get_memory_usage(s, usage); } + + if (svr->n_active_threads) { + struct ipc_msg *msg; + struct ovs_barrier stop, go; + size_t i; + + ovs_barrier_init(&stop, svr->n_active_threads + 1); + ovs_barrier_init(&go, svr->n_active_threads + 1); + + msg = ipc_create_stop_and_go_command(&stop, &go); + broadcast_ipc_msg(svr, msg); + + ovs_barrier_block(&stop); + for (i = 0; i < svr->n_active_threads; i++) { + LIST_FOR_EACH (s, node, &svr->threads[i].all_sessions) { + ovsdb_jsonrpc_session_get_memory_usage(s, usage); + } + } + ovs_barrier_block(&go); + + ovs_barrier_destroy(&stop); + ovs_barrier_destroy(&go); + } } /* Sets the 'status' of for the 'remote' with an outgoing connection. */ @@ -1436,12 +1701,12 @@ ovsdb_jsonrpc_disable_monitor2(void) } static void -ovsdb_jsonrpc_sessions_run(struct ovs_list *sessions) +ovsdb_jsonrpc_sessions_run(struct ovs_list *sessions, struct seq *seq) { struct ovsdb_jsonrpc_session *s, *next; LIST_FOR_EACH_SAFE (s, next, node, sessions) { - int error = ovsdb_jsonrpc_session_run(s); + int error = ovsdb_jsonrpc_session_run(s, seq); if (error) { ovsdb_jsonrpc_session_close(s); } @@ -1526,24 +1791,41 @@ ovsdb_jsonrpc_sessions_set_options(struct ovs_list *sessions, static void * sessions_thread_main(void * f_) { - struct sessions_thread *sessions_thread = f_; + struct sessions_thread *thread = f_; + + while (!latch_is_set(&thread->exit_latch)) { + uint64_t seqno = seq_read(thread->doorbell); + + sessions_thread_command_run(thread); + ovsdb_jsonrpc_sessions_run(&thread->all_sessions, thread->doorbell); + + sessions_thread_command_wait(thread); + ovsdb_jsonrpc_sessions_wait(&thread->all_sessions); - VLOG_DBG("Created"); - while (!latch_is_set(&sessions_thread->exit_latch)) { - latch_wait(&sessions_thread->exit_latch); + latch_wait(&thread->exit_latch); + seq_wait(thread->doorbell, seqno); poll_block(); } - VLOG_DBG("Finished"); return NULL; } static void -sessions_thread_init(struct sessions_thread *thread) +sessions_thread_init(struct sessions_thread *thread, struct seq *seq) { thread->thread = ovs_thread_create("sessions_thread", sessions_thread_main, thread); + list_init(&thread->all_sessions); latch_init(&thread->exit_latch); + + ovs_mutex_init(&thread->cmd_queue_mutex); + + ovs_mutex_lock(&thread->cmd_queue_mutex); + list_init(&thread->cmd_queue); + thread->cmd_queue_seq = seq_create(); + thread->last_cmd_seq = seq_read(thread->cmd_queue_seq); + thread->doorbell = seq; + ovs_mutex_unlock(&thread->cmd_queue_mutex); } static void @@ -1551,5 +1833,204 @@ sessions_thread_exit(struct sessions_thread *thread) { latch_set(&thread->exit_latch); xpthread_join(thread->thread, NULL); + ovs_mutex_destroy(&thread->cmd_queue_mutex); + seq_destroy(thread->cmd_queue_seq); latch_destroy(&thread->exit_latch); } + +static void +sessions_thread_command_run(struct sessions_thread *thread) +{ + uint64_t new_cmd_seq; + + new_cmd_seq = seq_read(thread->cmd_queue_seq); + if (new_cmd_seq != thread->last_cmd_seq) { + ovs_mutex_lock(&thread->cmd_queue_mutex); + while (!list_is_empty(&thread->cmd_queue)) { + struct ipc_msg *msg; + struct ovs_list *node; + + node = list_pop_front(&thread->cmd_queue); + msg = CONTAINER_OF(node, struct ipc_msg, list); + VLOG_ERR("thread receive command msg"); + switch (msg->msg_type) { + case IPC_ADD_SESSION_COMMAND: + sessions_thread_add_new_session(thread, msg); + break; + case IPC_DELETE_SESSIONS_COMMAND: + sessions_thread_delete_sessions(thread, msg); + break; + case IPC_RECONNECT_SESSIONS_COMMAND: + sessions_thread_reconnect_sessions(thread, msg); + break; + case IPC_STOP_AND_GO_COMMAND: + sessions_thread_stop_and_go(msg); + break; + default: + VLOG_ERR("thread receive unkown msg"); + } + free(msg); + } + ovs_mutex_unlock(&thread->cmd_queue_mutex); + thread->last_cmd_seq = new_cmd_seq; + } +} + +static void +sessions_thread_add_new_session(struct sessions_thread *thread, + struct ipc_msg *msg) +{ + struct jsonrpc_session *js; + struct ipc_msg_add_session_command *m = + (struct ipc_msg_add_session_command *)msg; + + js = jsonrpc_session_open_unreliably(jsonrpc_open(m->stream), m->dscp); + ovsdb_jsonrpc_thread_session_create(thread, m->remote, m->server, js); +} + +static void +sessions_thread_delete_sessions(struct sessions_thread *thread, + struct ipc_msg *msg) +{ + struct ipc_msg_delete_sessions_command *m = + (struct ipc_msg_delete_sessions_command *)msg; + + ovsdb_jsonrpc_sessions_close(&thread->all_sessions, m->remote); + ovs_barrier_block(m->barrier); +} + +static void +sessions_thread_reconnect_sessions(struct sessions_thread *thread, + struct ipc_msg *msg) +{ + struct ipc_msg_reconnect_sessions_command *m = + (struct ipc_msg_reconnect_sessions_command *)msg; + + ovsdb_jsonrpc_sessions_reconnect(&thread->all_sessions, m->remote); +} + +static void +sessions_thread_stop_and_go(struct ipc_msg *msg) +{ + struct ipc_msg_stop_and_go_command *m = + (struct ipc_msg_stop_and_go_command *)msg; + + ovs_barrier_block(m->stop); + ovs_barrier_block(m->go); +} + +static void +sessions_thread_command_wait(struct sessions_thread *thread) +{ + seq_wait(thread->cmd_queue_seq, thread->last_cmd_seq); +} + +static void +ipc_msg_init(struct ipc_msg *msg, size_t size, enum ipc_msg_type msg_type) +{ + list_init(&msg->list); + msg->size = size; + msg->msg_type = msg_type; +} + +static struct ipc_msg * +ipc_msg_clone(struct ipc_msg *msg) +{ + struct ipc_msg *clone = xmalloc(msg->size); + memcpy(clone, msg, msg->size); + list_init(&clone->list); + + return clone; +} + +static struct ipc_msg * +ipc_create_add_session_command(struct stream* stream, + struct ovsdb_jsonrpc_server *server, + void *remote, uint8_t dscp) +{ + struct ipc_msg_add_session_command *msg; + size_t size = sizeof *msg; + + msg = xmalloc(sizeof *msg); + ipc_msg_init(&msg->up, size, IPC_ADD_SESSION_COMMAND); + + msg->stream = stream; + msg->server = server; + msg->remote = remote; + msg->dscp = dscp; + + return &msg->up; +} + +static struct ipc_msg * +ipc_create_delete_sessions_command(const void *remote, + struct ovs_barrier *barrier) +{ + struct ipc_msg_delete_sessions_command *msg; + size_t size = sizeof *msg; + + msg = xmalloc(size); + ipc_msg_init(&msg->up, size, IPC_DELETE_SESSIONS_COMMAND); + msg->remote = remote; + msg->barrier = barrier; + + return &msg->up; +} + +static struct ipc_msg * +ipc_create_reconnect_sessions_command(const void *remote, + struct ovs_barrier *barrier) +{ + struct ipc_msg_reconnect_sessions_command *msg; + size_t size = sizeof *msg; + + msg = xmalloc(size); + ipc_msg_init(&msg->up, size, IPC_RECONNECT_SESSIONS_COMMAND); + msg->remote = remote; + msg->barrier = barrier; + + return &msg->up; +} + +static struct ipc_msg * +ipc_create_stop_and_go_command(struct ovs_barrier *stop, + struct ovs_barrier *go) +{ + struct ipc_msg_stop_and_go_command *msg; + size_t size = sizeof *msg; + + msg = xmalloc(size); + ipc_msg_init(&msg->up, size, IPC_STOP_AND_GO_COMMAND); + msg->stop = stop; + msg->go = go; + + return &msg->up; +} + +/* Send an IPC message to a thread. The receiving thread is responsible for + * freeing the memory of 'msg'. */ +static void +send_ipc_msg(struct sessions_thread *thread, struct ipc_msg *msg) +{ + ovs_mutex_lock(&thread->cmd_queue_mutex); + list_push_back(&thread->cmd_queue, &msg->list); + ovs_mutex_unlock(&thread->cmd_queue_mutex); + + seq_change(thread->cmd_queue_seq); +} + +/* Send the same IPC message to all threads. 'msg' will be cloned for + * each thread. The receiving threads are responsible for freeing + * the memory of 'msg' */ +static void +broadcast_ipc_msg(const struct ovsdb_jsonrpc_server *svr, struct ipc_msg *msg) +{ + size_t i; + + for (i = svr->n_active_threads; i > 0; i--) { + struct sessions_thread *thread = &svr->threads[i-1]; + struct ipc_msg *m = i ? ipc_msg_clone(msg) : msg; + + send_ipc_msg(thread, m); + } +} -- 1.9.1 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev