Currently ovsdb_jsonrpc_session are grouped together in a linked list within 'ovsdb_jsonrpc_remote'. This makes sense since most session operations applies to sessions within a remote.
However, in order to scale up ovsdb-server with multi-threading, it is more convenient to distribute a sessions to any thread available, regardless which remote it is associated with. This patch introduces a set of APIs that provide operations on a list of sessions. Instead of group sessions by remote, they are linked together in a new ovs_list field 'all_sessions' in the ovsdb_jsonrpc_server struct. With multi-threading, the design is that all sessions managed by a thread will have them linked together on a thread private linked list. At that time, the 'all_sessions' field in ovsdb_jsonrpc_server struct will have all session managed the main process. Signed-off-by: Andy Zhou <az...@ovn.org> --- ovsdb/jsonrpc-server.c | 242 ++++++++++++++++++++++++++++--------------------- 1 file changed, 140 insertions(+), 102 deletions(-) diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c index 15dbc4e..56dddc6 100644 --- a/ovsdb/jsonrpc-server.c +++ b/ovsdb/jsonrpc-server.c @@ -1,4 +1,4 @@ -/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc. +/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,23 +52,30 @@ static bool monitor2_enable__ = true; /* Message rate-limiting. */ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); +/* Session set. */ +static void ovsdb_jsonrpc_sessions_run(struct ovs_list *); +static void ovsdb_jsonrpc_sessions_wait(struct ovs_list *); +static void ovsdb_jsonrpc_sessions_close(struct ovs_list *, + const struct ovsdb_jsonrpc_remote *remote); +static void ovsdb_jsonrpc_sessions_reconnect( struct ovs_list *, + const struct ovsdb_jsonrpc_remote *remote); +static void ovsdb_jsonrpc_sessions_set_options(struct ovs_list *, + const struct ovsdb_jsonrpc_remote *remote, + const struct ovsdb_jsonrpc_options *); +static size_t ovsdb_jsonrpc_sessions_count(const struct ovs_list *, + const struct ovsdb_jsonrpc_remote *remote); + /* Sessions. */ static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create( struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *); -static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *); -static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *); -static void ovsdb_jsonrpc_session_get_memory_usage_all( - const struct ovsdb_jsonrpc_remote *, struct simap *usage); -static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *); -static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *); -static void ovsdb_jsonrpc_session_set_all_options( - struct ovsdb_jsonrpc_remote *, const struct ovsdb_jsonrpc_options *); static bool ovsdb_jsonrpc_active_session_get_status( - const struct ovsdb_jsonrpc_remote *, + const struct ovsdb_jsonrpc_remote *remote, struct ovsdb_jsonrpc_remote_status *); static void ovsdb_jsonrpc_session_get_status( const struct ovsdb_jsonrpc_session *, struct ovsdb_jsonrpc_remote_status *); +static void ovsdb_jsonrpc_session_get_memory_usage_all( + const struct ovsdb_jsonrpc_server *, struct simap *usage); static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *); static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *); static void ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *, @@ -106,15 +113,22 @@ struct ovsdb_jsonrpc_server { struct ovsdb_server up; unsigned int n_sessions; struct shash remotes; /* Contains "struct ovsdb_jsonrpc_remote *"s. */ + struct ovs_list all_sessions; /* All 'ovsdb_jsonrpc_session's. */ }; +/* Cast an 'ovsdb_server' pointer down into an ovsdb_jsonrpc_server pinter. + * Caller needs to make sure this conversion is valid. */ +static struct ovsdb_jsonrpc_server * +ovsdb_jsonrpc_server_cast(struct ovsdb_server *s) { + return CONTAINER_OF(s, struct ovsdb_jsonrpc_server, up); +} + /* A configured remote. This is either a passive stream listener plus a list * of the currently connected sessions, or a list of exactly one active * session. */ struct ovsdb_jsonrpc_remote { struct ovsdb_jsonrpc_server *server; struct pstream *listener; /* Listener, if passive. */ - struct ovs_list sessions; /* List of "struct ovsdb_jsonrpc_session"s. */ uint8_t dscp; }; @@ -134,6 +148,7 @@ ovsdb_jsonrpc_server_create(void) struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server); ovsdb_server_init(&server->up); shash_init(&server->remotes); + list_init(&server->all_sessions); return server; } @@ -232,7 +247,8 @@ ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr, } } - ovsdb_jsonrpc_session_set_all_options(remote, options); + ovsdb_jsonrpc_sessions_set_options(&svr->all_sessions, remote, + options); } } @@ -254,7 +270,6 @@ ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr, remote = xmalloc(sizeof *remote); remote->server = svr; remote->listener = listener; - list_init(&remote->sessions); remote->dscp = options->dscp; shash_add(&svr->remotes, name, remote); @@ -268,10 +283,11 @@ static void ovsdb_jsonrpc_server_del_remote(struct shash_node *node) { struct ovsdb_jsonrpc_remote *remote = node->data; + struct ovsdb_jsonrpc_server *server = remote->server; - ovsdb_jsonrpc_session_close_all(remote); + ovsdb_jsonrpc_sessions_close(&server->all_sessions, remote); pstream_close(remote->listener); - shash_delete(&remote->server->remotes, node); + shash_delete(&server->remotes, node); free(remote); } @@ -297,9 +313,12 @@ ovsdb_jsonrpc_server_get_remote_status( } if (remote->listener) { + int n_connections = ovsdb_jsonrpc_sessions_count(&svr->all_sessions, + remote); + status->bound_port = pstream_get_bound_port(remote->listener); - status->is_connected = !list_is_empty(&remote->sessions); - status->n_connections = list_size(&remote->sessions); + status->n_connections = n_connections; + status->is_connected = (n_connections != 0); return true; } @@ -325,7 +344,7 @@ ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *svr) SHASH_FOR_EACH (node, &svr->remotes) { struct ovsdb_jsonrpc_remote *remote = node->data; - ovsdb_jsonrpc_session_reconnect_all(remote); + ovsdb_jsonrpc_sessions_reconnect(&svr->all_sessions, remote); } } @@ -354,7 +373,7 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr) } } - ovsdb_jsonrpc_session_run_all(remote); + ovsdb_jsonrpc_sessions_run(&svr->all_sessions); } } @@ -370,7 +389,7 @@ ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr) pstream_wait(remote->listener); } - ovsdb_jsonrpc_session_wait_all(remote); + ovsdb_jsonrpc_sessions_wait(&svr->all_sessions); } } @@ -380,14 +399,8 @@ void ovsdb_jsonrpc_server_get_memory_usage(const struct ovsdb_jsonrpc_server *svr, struct simap *usage) { - struct shash_node *node; - simap_increase(usage, "sessions", svr->n_sessions); - SHASH_FOR_EACH (node, &svr->remotes) { - struct ovsdb_jsonrpc_remote *remote = node->data; - - ovsdb_jsonrpc_session_get_memory_usage_all(remote, usage); - } + ovsdb_jsonrpc_session_get_memory_usage_all(svr, usage); } /* JSON-RPC database server session. */ @@ -423,17 +436,18 @@ ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote, struct jsonrpc_session *js) { struct ovsdb_jsonrpc_session *s; + struct ovsdb_jsonrpc_server *server = remote->server; s = xzalloc(sizeof *s); - ovsdb_session_init(&s->up, &remote->server->up); + ovsdb_session_init(&s->up, &server->up); s->remote = remote; - list_push_back(&remote->sessions, &s->node); + list_push_back(&server->all_sessions, &s->node); hmap_init(&s->triggers); hmap_init(&s->monitors); s->js = js; s->js_seqno = jsonrpc_session_get_seqno(js); - remote->server->n_sessions++; + server->n_sessions++; return s; } @@ -441,6 +455,8 @@ ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote, static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s) { + struct ovsdb_jsonrpc_server *server; + ovsdb_jsonrpc_monitor_remove_all(s); ovsdb_jsonrpc_session_unlock_all(s); ovsdb_jsonrpc_trigger_complete_all(s); @@ -450,7 +466,8 @@ ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s) jsonrpc_session_close(s->js); list_remove(&s->node); - s->remote->server->n_sessions--; + server = ovsdb_jsonrpc_server_cast(s->up.server); + server->n_sessions--; ovsdb_session_destroy(&s->up); free(s); } @@ -501,19 +518,6 @@ ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session, } static void -ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote) -{ - struct ovsdb_jsonrpc_session *s, *next; - - LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) { - int error = ovsdb_jsonrpc_session_run(s); - if (error) { - ovsdb_jsonrpc_session_close(s); - } - } -} - -static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s) { jsonrpc_session_wait(s->js); @@ -527,16 +531,6 @@ ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s) } static void -ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote) -{ - struct ovsdb_jsonrpc_session *s; - - LIST_FOR_EACH (s, node, &remote->sessions) { - ovsdb_jsonrpc_session_wait(s); - } -} - -static void ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session *s, struct simap *usage) { @@ -546,65 +540,22 @@ ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session *s, static void ovsdb_jsonrpc_session_get_memory_usage_all( - const struct ovsdb_jsonrpc_remote *remote, - struct simap *usage) + const struct ovsdb_jsonrpc_server *svr, struct simap *usage) { struct ovsdb_jsonrpc_session *s; - LIST_FOR_EACH (s, node, &remote->sessions) { + LIST_FOR_EACH (s, node, &svr->all_sessions) { ovsdb_jsonrpc_session_get_memory_usage(s, usage); } } -static void -ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote) -{ - struct ovsdb_jsonrpc_session *s, *next; - - LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) { - ovsdb_jsonrpc_session_close(s); - } -} - -/* Forces all of the JSON-RPC sessions managed by 'remote' to disconnect and - * reconnect. */ -static void -ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *remote) -{ - struct ovsdb_jsonrpc_session *s, *next; - - LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) { - jsonrpc_session_force_reconnect(s->js); - if (!jsonrpc_session_is_alive(s->js)) { - ovsdb_jsonrpc_session_close(s); - } - } -} - -/* Sets the options for all of the JSON-RPC sessions managed by 'remote' to - * 'options'. - * - * (The dscp value can't be changed directly; the caller must instead close and - * re-open the session.) */ -static void -ovsdb_jsonrpc_session_set_all_options( - struct ovsdb_jsonrpc_remote *remote, - const struct ovsdb_jsonrpc_options *options) -{ - struct ovsdb_jsonrpc_session *s; - - LIST_FOR_EACH (s, node, &remote->sessions) { - ovsdb_jsonrpc_session_set_options(s, options); - } -} - /* Sets the 'status' of for the 'remote' with an outgoing connection. */ static bool ovsdb_jsonrpc_active_session_get_status( const struct ovsdb_jsonrpc_remote *remote, struct ovsdb_jsonrpc_remote_status *status) { - const struct ovs_list *sessions = &remote->sessions; + const struct ovs_list *sessions = &remote->server->all_sessions; const struct ovsdb_jsonrpc_session *s; if (list_is_empty(sessions)) { @@ -764,8 +715,7 @@ ovsdb_jsonrpc_session_lock(struct ovsdb_jsonrpc_session *s, } /* Get the lock, add us as a waiter. */ - waiter = ovsdb_server_lock(&s->remote->server->up, &s->up, lock_name, mode, - &victim); + waiter = ovsdb_server_lock(s->up.server, &s->up, lock_name, mode, &victim); if (victim) { ovsdb_jsonrpc_session_notify(victim, lock_name, "stolen"); } @@ -1399,3 +1349,91 @@ ovsdb_jsonrpc_disable_monitor2(void) /* Once disabled, it is not possible to re-enable it. */ monitor2_enable__ = false; } + +static void +ovsdb_jsonrpc_sessions_run(struct ovs_list *sessions) +{ + struct ovsdb_jsonrpc_session *s, *next; + + LIST_FOR_EACH_SAFE (s, next, node, sessions) { + int error = ovsdb_jsonrpc_session_run(s); + if (error) { + ovsdb_jsonrpc_session_close(s); + } + } +} + +static void +ovsdb_jsonrpc_sessions_wait(struct ovs_list *sessions) +{ + struct ovsdb_jsonrpc_session *s; + + LIST_FOR_EACH (s, node, sessions) { + ovsdb_jsonrpc_session_wait(s); + } +} + +static void +ovsdb_jsonrpc_sessions_close(struct ovs_list *sessions, + const struct ovsdb_jsonrpc_remote *remote) +{ + struct ovsdb_jsonrpc_session *s, *next; + + LIST_FOR_EACH_SAFE (s, next, node, sessions) { + if (s->remote == remote) { + ovsdb_jsonrpc_session_close(s); + } + } +} + +/* Forces all of the JSON-RPC sessions to disconnect and + * reconnect. */ +static void +ovsdb_jsonrpc_sessions_reconnect(struct ovs_list *sessions, + const struct ovsdb_jsonrpc_remote *remote) +{ + struct ovsdb_jsonrpc_session *s, *next; + + LIST_FOR_EACH_SAFE (s, next, node, sessions) { + if (s->remote == remote) { + jsonrpc_session_force_reconnect(s->js); + if (!jsonrpc_session_is_alive(s->js)) { + ovsdb_jsonrpc_session_close(s); + } + } + } +} + +static size_t +ovsdb_jsonrpc_sessions_count(const struct ovs_list *sessions, + const struct ovsdb_jsonrpc_remote *remote) +{ + struct ovsdb_jsonrpc_session *s = NULL; + size_t count = 0; + + LIST_FOR_EACH (s, node, sessions) { + if (s->remote == remote) { + count++; + } + } + return count; +} + +/* Sets the options for all of the JSON-RPC sessions managed by 'remote' to + * 'options'. + * + * (The dscp value can't be changed directly; the caller must instead close and + * re-open the session.) */ +static void +ovsdb_jsonrpc_sessions_set_options(struct ovs_list *sessions, + const struct ovsdb_jsonrpc_remote *remote, + const struct ovsdb_jsonrpc_options *options) +{ + struct ovsdb_jsonrpc_session *s; + + LIST_FOR_EACH (s, node, sessions) { + if (s->remote == remote) { + ovsdb_jsonrpc_session_set_options(s, options); + } + } +} -- 1.9.1 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev