Add a command queue for the main thread to pass commands to the
sessions thread.

Defined an implemented a set of IPC messages for creating,
closing and managing jsonrpc sessions from the main process to jsonrpc
sessions threads.

Add a doorbell mechanism for a jsonrpc session to inform other sessions
of possible OVSDB changes.

Although all supporting functions for running a multithreading OVSDB
server have been implemented, multithread is still not turned on,
because 'n_max_threads' is still hard coded to zero.  Future patches
will remove this restriction.

Signed-off-by: Andy Zhou <az...@ovn.org>
---
 ovsdb/jsonrpc-server.c | 551 +++++++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 516 insertions(+), 35 deletions(-)

diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
index ed76192..09b89b2 100644
--- a/ovsdb/jsonrpc-server.c
+++ b/ovsdb/jsonrpc-server.c
@@ -34,6 +34,7 @@
 #include "reconnect.h"
 #include "row.h"
 #include "server.h"
+#include "seq.h"
 #include "simap.h"
 #include "stream.h"
 #include "table.h"
@@ -58,7 +59,7 @@ static struct ovs_mutex mutex = OVS_MUTEX_INITIALIZER;
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
 /* Session set. */
-static void ovsdb_jsonrpc_sessions_run(struct ovs_list *);
+static void ovsdb_jsonrpc_sessions_run(struct ovs_list *, struct seq *);
 static void ovsdb_jsonrpc_sessions_wait(struct ovs_list *);
 static void ovsdb_jsonrpc_sessions_close(struct ovs_list *,
                                          const void *remote);
@@ -115,13 +116,112 @@ static struct json *ovsdb_jsonrpc_monitor_compose_update(
 struct sessions_thread {
     pthread_t thread;
 
+    struct ovs_list all_sessions;
+
     /* Controls thread exit. */
     struct latch exit_latch;
+
+    /* Command queue to receive command from the main thread. */
+    struct ovs_mutex cmd_queue_mutex;
+    struct ovs_list cmd_queue OVS_GUARDED; /* by 'cmd_queue_mutex' */
+    struct seq *cmd_queue_seq;
+    uint64_t last_cmd_seq;
+
+    /* Pointing to ovsdb_jsonrpc_server's doorbell.
+     * See comment in the definition of ovsdb_jsonrpc_server struct. */
+    struct seq *doorbell;
 };
 
-static void sessions_thread_init(struct sessions_thread *);
+static void sessions_thread_init(struct sessions_thread *, struct seq *);
 static void sessions_thread_exit(struct sessions_thread *);
+static void sessions_thread_command_run(struct sessions_thread *);
+static void sessions_thread_command_wait(struct sessions_thread *);
+struct ipc_msg;
+static void sessions_thread_add_new_session(struct sessions_thread *thread,
+                                  struct ipc_msg *);
+static void sessions_thread_delete_sessions(struct sessions_thread *thread,
+                                  struct ipc_msg *);
+static void sessions_thread_reconnect_sessions(struct sessions_thread *thread,
+                                  struct ipc_msg *);
+static void sessions_thread_stop_and_go(struct ipc_msg *msg);
+
+/* IPC messages are used to communicate between the main thread and
+ * jsonrpc sessions thread.
+ *
+ * - IPC_ADD_SESSION_COMMAND
+ * Create a new jsonrpc session for a given 'stream'.
+ *
+ * - IPC_DELETE_SESSIONS_COMMAND
+ * Delete all sessions associated with a given 'remote'.
+ *
+ * - IPC_RECONNET_SESSIONS_COMMAND
+ * Reconnect all sessions associated with a given 'remote'.
+ *
+ * - IPC_STOP_AND_GO_COMMAND
+ * This is generic thread synchronization mechanism that uses
+ * two ovs_barriers. A 'stop' barrier is used to make sure
+ * all threads have reached it, before the main thread can
+ * safely operate on per thread data structure, such as
+ * access the all_sessions list, while all threads spins
+ * on the 'go' barrier, until the main thread is done
+ * and releases the 'go' barrier.
+ */
+#define IPC_MESSAGES \
+       IPC_MSG(IPC_ADD_SESSION_COMMAND) \
+       IPC_MSG(IPC_DELETE_SESSIONS_COMMAND) \
+       IPC_MSG(IPC_RECONNECT_SESSIONS_COMMAND) \
+       IPC_MSG(IPC_STOP_AND_GO_COMMAND)
+
+enum ipc_msg_type {
+#define IPC_MSG(msg) msg,
+    IPC_MESSAGES
+#undef IPC_MSG
+};
+
+struct ipc_msg {
+    struct ovs_list list;
+    size_t size;
+    enum ipc_msg_type msg_type;
+};
+
+struct ipc_msg_add_session_command {
+    struct ipc_msg up;
+    struct stream *stream;
+    struct ovsdb_jsonrpc_server *server;
+    void *remote;
+    uint8_t dscp;
+};
+
+struct ipc_msg_delete_sessions_command {
+    struct ipc_msg up;
+    const void *remote;
+    struct ovs_barrier *barrier;
+};
+
+struct ipc_msg_reconnect_sessions_command {
+    struct ipc_msg up;
+    const void *remote;
+    struct ovs_barrier *barrier;
+};
 
+struct ipc_msg_stop_and_go_command {
+    struct ipc_msg up;
+    struct ovs_barrier *stop;
+    struct ovs_barrier *go;
+};
+
+static struct ipc_msg *ipc_create_add_session_command(
+    struct stream* stream, struct ovsdb_jsonrpc_server *, void *remote,
+    uint8_t dscp);
+static struct ipc_msg *ipc_create_delete_sessions_command(const void *remote,
+                                    struct ovs_barrier *barrier);
+static struct ipc_msg *ipc_create_reconnect_sessions_command(
+    const void *remote, struct ovs_barrier *barrier);
+static struct ipc_msg *ipc_create_stop_and_go_command(struct ovs_barrier *stop,
+                                                      struct ovs_barrier *go);
+static void send_ipc_msg(struct sessions_thread *, struct ipc_msg *);
+static void broadcast_ipc_msg(const struct ovsdb_jsonrpc_server *svr,
+                              struct ipc_msg *msg);
 
 /* JSON-RPC database server. */
 
@@ -134,6 +234,18 @@ struct ovsdb_jsonrpc_server {
                                      created before multithreading, or
                                      connections of active remotes.. */
 
+    /* Doorbell. */
+    /* Whenever a jsonrpc session make a change to OVSDB, it should ring
+     * the doorbell, by calling seq_change() to 'doorbell', so that other
+     * sessions can notice those changes immediately.
+     *
+     * Calling seq_change() to 'doorbell' is similar to calling
+     * poll_immediate_wake(), except 'doorbell' works across threads;
+     * it can be rang from any thread, and can be heard by all threads
+     * that calls seq_wait() of the 'doorbell'.   */
+    struct seq *doorbell;
+    uint64_t doorbell_seqno;
+
     /* Threads. */
     size_t n_max_threads;
     size_t n_active_threads;
@@ -153,6 +265,16 @@ ovsdb_jsonrpc_server_cast(struct ovsdb_server *s) {
 #define N_SESSIONS_THREASHHOLD  (2)
 static bool ovsdb_jsonrpc_server_use_threads(struct ovsdb_jsonrpc_server *);
 
+/* Create a new session.  The session can be either handled by the process
+ * (i.e. the main thread) or one of the sessions thread. */
+static void ovsdb_jsonrpc_server_create_session(struct ovsdb_jsonrpc_server *,
+                                                struct stream *stream,
+                                                struct ovsdb_jsonrpc_remote *);
+
+/* Close all sessions associated with the 'remote' in all threads */
+static void ovsdb_jsonrpc_server_close_sessions(struct ovsdb_jsonrpc_server *,
+                                                const void *remote);
+
 /* A configured remote.  This is either a passive stream listener plus a list
  * of the currently connected sessions, or a list of exactly one active
  * session. */
@@ -166,7 +288,8 @@ static struct ovsdb_jsonrpc_remote 
*ovsdb_jsonrpc_server_add_remote(
     struct ovsdb_jsonrpc_server *, const char *name,
     const struct ovsdb_jsonrpc_options *options
 );
-static void ovsdb_jsonrpc_server_del_remote(struct shash_node *);
+static void ovsdb_jsonrpc_server_del_remote(struct ovsdb_jsonrpc_server *svr,
+                                            struct shash_node *);
 
 /* Creates and returns a new server to provide JSON-RPC access to an OVSDB.
  *
@@ -181,6 +304,8 @@ ovsdb_jsonrpc_server_create(size_t n_max_threads)
     ovsdb_server_init(&server->up);
     shash_init(&server->remotes);
     list_init(&server->all_sessions);
+    server->doorbell = seq_create();
+    server->doorbell_seqno = seq_read(server->doorbell);
     server->n_max_threads = n_max_threads;
     server->n_active_threads = 0;
     server->threads = xmalloc(sizeof *server->threads * server->n_max_threads);
@@ -230,7 +355,7 @@ ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server 
*svr)
     size_t i;
 
     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
-        ovsdb_jsonrpc_server_del_remote(node);
+        ovsdb_jsonrpc_server_del_remote(svr, node);
     }
 
     for (i = 0; i < svr->n_active_threads; i++) {
@@ -239,6 +364,7 @@ ovsdb_jsonrpc_server_destroy(struct ovsdb_jsonrpc_server 
*svr)
     }
     free(svr->threads);
 
+    seq_destroy(svr->doorbell);
     shash_destroy(&svr->remotes);
     ovsdb_server_destroy(&svr->up);
     free(svr);
@@ -273,9 +399,9 @@ ovsdb_jsonrpc_server_set_remotes(struct 
ovsdb_jsonrpc_server *svr,
 
         if (!options) {
             VLOG_INFO("%s: remote deconfigured", node->name);
-            ovsdb_jsonrpc_server_del_remote(node);
+            ovsdb_jsonrpc_server_del_remote(svr, node);
         } else if (options->dscp != remote->dscp) {
-            ovsdb_jsonrpc_server_del_remote(node);
+            ovsdb_jsonrpc_server_del_remote(svr, node);
          }
     }
     SHASH_FOR_EACH (node, new_remotes) {
@@ -323,17 +449,50 @@ ovsdb_jsonrpc_server_add_remote(struct 
ovsdb_jsonrpc_server *svr,
 }
 
 static void
-ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
+ovsdb_jsonrpc_server_del_remote(struct ovsdb_jsonrpc_server *svr,
+                                struct shash_node *node)
 {
     struct ovsdb_jsonrpc_remote *remote = node->data;
     struct ovsdb_jsonrpc_server *server = remote->server;
 
-    ovsdb_jsonrpc_sessions_close(&server->all_sessions, remote);
+    /* Remove all sessions associated with remote.  */
+    ovsdb_jsonrpc_server_close_sessions(svr, remote);
     pstream_close(remote->listener);
     shash_delete(&server->remotes, node);
     free(remote);
 }
 
+static int
+ovsdb_jsonrpc_server_sessions_count(const struct ovsdb_jsonrpc_server *svr,
+                                    const struct ovsdb_jsonrpc_remote *remote)
+{
+    int count = ovsdb_jsonrpc_sessions_count(&svr->all_sessions, remote);
+
+    if (svr->n_active_threads) {
+        struct ipc_msg *msg;
+        struct ovs_barrier stop, go;
+
+        ovs_barrier_init(&stop, svr->n_active_threads + 1);
+        ovs_barrier_init(&go, svr->n_active_threads + 1);
+
+        msg = ipc_create_stop_and_go_command(&stop, &go);
+        broadcast_ipc_msg(svr, msg);
+
+        ovs_barrier_block(&stop);
+        int i;
+        for (i = 0; i < svr->n_active_threads; i++) {
+            struct ovs_list *sessions = &svr->threads[i].all_sessions;
+            count += ovsdb_jsonrpc_sessions_count(sessions, remote);
+        }
+        ovs_barrier_block(&go);
+
+        ovs_barrier_destroy(&stop);
+        ovs_barrier_destroy(&go);
+    }
+
+    return count;
+}
+
 /* Stores status information for the remote named 'target', which should have
  * been configured on 'svr' with a call to ovsdb_jsonrpc_server_set_remotes(),
  * into '*status'.  On success returns true, on failure (if 'svr' doesn't have
@@ -356,9 +515,7 @@ ovsdb_jsonrpc_server_get_remote_status(
     }
 
     if (remote->listener) {
-        int n_connections = ovsdb_jsonrpc_sessions_count(&svr->all_sessions,
-                                                         remote);
-
+        int n_connections = ovsdb_jsonrpc_server_sessions_count(svr, remote);
         status->bound_port = pstream_get_bound_port(remote->listener);
         status->n_connections = n_connections;
         status->is_connected = (n_connections != 0);
@@ -387,6 +544,17 @@ ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server 
*svr)
     SHASH_FOR_EACH (node, &svr->remotes) {
         struct ovsdb_jsonrpc_remote *remote = node->data;
 
+        if (svr->n_active_threads) {
+            struct ovs_barrier barrier;
+            struct ipc_msg *msg;
+
+            ovs_barrier_init(&barrier, svr->n_active_threads + 1);
+            msg = ipc_create_reconnect_sessions_command(remote, &barrier);
+            broadcast_ipc_msg(svr, msg);
+            ovs_barrier_block(&barrier);
+            ovs_barrier_destroy(&barrier);
+        }
+
         ovsdb_jsonrpc_sessions_reconnect(&svr->all_sessions, remote);
     }
 }
@@ -396,6 +564,7 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
 {
     struct shash_node *node;
 
+    svr->doorbell_seqno = seq_read(svr->doorbell);
     SHASH_FOR_EACH (node, &svr->remotes) {
         struct ovsdb_jsonrpc_remote *remote = node->data;
 
@@ -405,12 +574,7 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
 
             error = pstream_accept(remote->listener, &stream);
             if (!error) {
-                if (!ovsdb_jsonrpc_server_use_threads(svr)) {
-                    struct jsonrpc_session *js;
-                    js = jsonrpc_session_open_unreliably(jsonrpc_open(stream),
-                                                     remote->dscp);
-                    ovsdb_jsonrpc_session_create(remote, js);
-                }
+                ovsdb_jsonrpc_server_create_session(svr, stream, remote);
             } else if (error != EAGAIN) {
                 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
                              pstream_get_name(remote->listener),
@@ -418,7 +582,7 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
             }
         }
 
-        ovsdb_jsonrpc_sessions_run(&svr->all_sessions);
+        ovsdb_jsonrpc_sessions_run(&svr->all_sessions, svr->doorbell);
     }
 }
 
@@ -436,6 +600,8 @@ ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
 
         ovsdb_jsonrpc_sessions_wait(&svr->all_sessions);
     }
+
+    seq_wait(svr->doorbell, svr->doorbell_seqno);
 }
 
 /* Adds some memory usage statistics for 'svr' into 'usage', for use with
@@ -463,12 +629,61 @@ ovsdb_jsonrpc_server_use_threads(struct 
ovsdb_jsonrpc_server *svr) {
         size_t n_desired_threads = n_sessions /  N_SESSIONS_THREASHHOLD;
 
         if (n_desired_threads > svr->n_active_threads) {
-            sessions_thread_init(&svr->threads[svr->n_active_threads++]);
+            sessions_thread_init(&svr->threads[svr->n_active_threads++],
+                                 svr->doorbell);
         }
     }
 
     return svr->n_active_threads;
 }
+
+static void
+ovsdb_jsonrpc_server_create_session(struct ovsdb_jsonrpc_server *svr,
+                                    struct stream *stream,
+                                    struct ovsdb_jsonrpc_remote *remote)
+{
+    if (!ovsdb_jsonrpc_server_use_threads(svr)) {
+        struct jsonrpc_session *js;
+        js = jsonrpc_session_open_unreliably(jsonrpc_open(stream),
+                                             remote->dscp);
+        ovsdb_jsonrpc_session_create(remote, js);
+    } else {
+        static size_t next_thread = 0;
+        struct sessions_thread *thread;
+        struct ipc_msg *msg;
+
+        /* Using round-robin to pick the next thread for the session.
+         * This is certainly not the most optimal scheduling algorithm
+         * for assigning session to threads, but it may be the simplest
+         * to implement.   */
+        next_thread = (next_thread + 1) % svr->n_active_threads;
+        thread = &svr->threads[next_thread];
+
+        msg = ipc_create_add_session_command(stream, svr, remote,
+                                             remote->dscp);
+        send_ipc_msg(thread, msg);
+    }
+}
+
+static void
+ovsdb_jsonrpc_server_close_sessions(struct ovsdb_jsonrpc_server *svr,
+                                    const void *remote)
+{
+    if (svr->n_active_threads) {
+        struct ipc_msg *msg;
+        struct ovs_barrier barrier;
+
+        /* Close sessions in all threads.  */
+        ovs_barrier_init(&barrier, svr->n_active_threads + 1);
+        msg = ipc_create_delete_sessions_command(remote, &barrier);
+        broadcast_ipc_msg(svr, msg);
+        ovs_barrier_block(&barrier);
+        ovs_barrier_destroy(&barrier);
+    }
+
+    /* Close local sessions.  */
+    ovsdb_jsonrpc_sessions_close(&svr->all_sessions, remote);
+}
 
 /* JSON-RPC database server session. */
 
@@ -491,8 +706,8 @@ struct ovsdb_jsonrpc_session {
 };
 
 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
-static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *)
-    OVS_EXCLUDED(mutex);
+static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *,
+                               struct seq *seq) OVS_EXCLUDED(mutex);
 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
 static void ovsdb_jsonrpc_session_get_memory_usage(
     const struct ovsdb_jsonrpc_session *, struct simap *usage);
@@ -502,26 +717,52 @@ static void ovsdb_jsonrpc_session_got_notify(struct 
ovsdb_jsonrpc_session *,
                                              struct jsonrpc_msg *);
 
 static struct ovsdb_jsonrpc_session *
-ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
-                             struct jsonrpc_session *js)
+ovsdb_jsonrpc_session_create__(void *remote,
+                               struct ovsdb_jsonrpc_server *server,
+                               struct jsonrpc_session *js)
 {
     struct ovsdb_jsonrpc_session *s;
-    struct ovsdb_jsonrpc_server *server = remote->server;
 
     s = xzalloc(sizeof *s);
     ovsdb_session_init(&s->up, &server->up);
     s->remote = remote;
-    list_push_back(&server->all_sessions, &s->node);
     hmap_init(&s->triggers);
     hmap_init(&s->monitors);
     s->js = js;
     s->js_seqno = jsonrpc_session_get_seqno(js);
-
     atomic_count_inc(&server->n_sessions);
 
     return s;
 }
 
+static struct ovsdb_jsonrpc_session *
+ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
+                             struct jsonrpc_session *js)
+{
+    struct ovsdb_jsonrpc_session *s;
+    struct ovsdb_jsonrpc_server *server = remote->server;
+
+    s = ovsdb_jsonrpc_session_create__(remote, server, js);
+    list_push_back(&server->all_sessions, &s->node);
+
+    return s;
+}
+
+static struct ovsdb_jsonrpc_session *
+ovsdb_jsonrpc_thread_session_create(struct sessions_thread *thread,
+                                    void *remote,
+                                    struct ovsdb_jsonrpc_server *server,
+                                    struct jsonrpc_session *js)
+{
+    struct ovsdb_jsonrpc_session *s;
+
+    s = ovsdb_jsonrpc_session_create__(remote, server, js);
+    list_push_back(&thread->all_sessions, &s->node);
+
+    return s;
+}
+
+
 static void
 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
 {
@@ -547,7 +788,7 @@ ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
 }
 
 static int
-ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
+ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s, struct seq 
*doorbell)
     OVS_EXCLUDED(mutex)
 {
     jsonrpc_session_run(s->js);
@@ -570,6 +811,7 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
         if (msg) {
             if (msg->type == JSONRPC_REQUEST) {
                 ovsdb_jsonrpc_session_got_request(s, msg);
+                seq_change(doorbell);
             } else if (msg->type == JSONRPC_NOTIFY) {
                 ovsdb_jsonrpc_session_got_notify(s, msg);
             } else {
@@ -626,6 +868,29 @@ ovsdb_jsonrpc_session_get_memory_usage_all(
     LIST_FOR_EACH (s, node, &svr->all_sessions) {
         ovsdb_jsonrpc_session_get_memory_usage(s, usage);
     }
+
+    if (svr->n_active_threads) {
+        struct ipc_msg *msg;
+        struct ovs_barrier stop, go;
+        size_t i;
+
+        ovs_barrier_init(&stop, svr->n_active_threads + 1);
+        ovs_barrier_init(&go, svr->n_active_threads + 1);
+
+        msg = ipc_create_stop_and_go_command(&stop, &go);
+        broadcast_ipc_msg(svr, msg);
+
+        ovs_barrier_block(&stop);
+        for (i = 0; i < svr->n_active_threads; i++) {
+             LIST_FOR_EACH (s, node, &svr->threads[i].all_sessions) {
+                ovsdb_jsonrpc_session_get_memory_usage(s, usage);
+             }
+        }
+        ovs_barrier_block(&go);
+
+        ovs_barrier_destroy(&stop);
+        ovs_barrier_destroy(&go);
+    }
 }
 
 /* Sets the 'status' of for the 'remote' with an outgoing connection.   */
@@ -1436,12 +1701,12 @@ ovsdb_jsonrpc_disable_monitor2(void)
 }
 
 static void
-ovsdb_jsonrpc_sessions_run(struct ovs_list *sessions)
+ovsdb_jsonrpc_sessions_run(struct ovs_list *sessions, struct seq *seq)
 {
     struct ovsdb_jsonrpc_session *s, *next;
 
     LIST_FOR_EACH_SAFE (s, next, node, sessions) {
-    int error = ovsdb_jsonrpc_session_run(s);
+    int error = ovsdb_jsonrpc_session_run(s, seq);
         if (error) {
             ovsdb_jsonrpc_session_close(s);
         }
@@ -1526,24 +1791,41 @@ ovsdb_jsonrpc_sessions_set_options(struct ovs_list 
*sessions,
 static void *
 sessions_thread_main(void * f_)
 {
-    struct sessions_thread *sessions_thread = f_;
+    struct sessions_thread *thread = f_;
+
+    while (!latch_is_set(&thread->exit_latch)) {
+        uint64_t seqno = seq_read(thread->doorbell);
+
+        sessions_thread_command_run(thread);
+        ovsdb_jsonrpc_sessions_run(&thread->all_sessions, thread->doorbell);
+
+        sessions_thread_command_wait(thread);
+        ovsdb_jsonrpc_sessions_wait(&thread->all_sessions);
 
-    VLOG_DBG("Created");
-    while (!latch_is_set(&sessions_thread->exit_latch)) {
-        latch_wait(&sessions_thread->exit_latch);
+        latch_wait(&thread->exit_latch);
+        seq_wait(thread->doorbell, seqno);
         poll_block();
     }
-    VLOG_DBG("Finished");
     return NULL;
 }
 
 static void
-sessions_thread_init(struct sessions_thread *thread)
+sessions_thread_init(struct sessions_thread *thread, struct seq *seq)
 {
     thread->thread = ovs_thread_create("sessions_thread",
                                        sessions_thread_main, thread);
 
+    list_init(&thread->all_sessions);
     latch_init(&thread->exit_latch);
+
+    ovs_mutex_init(&thread->cmd_queue_mutex);
+
+    ovs_mutex_lock(&thread->cmd_queue_mutex);
+    list_init(&thread->cmd_queue);
+    thread->cmd_queue_seq = seq_create();
+    thread->last_cmd_seq = seq_read(thread->cmd_queue_seq);
+    thread->doorbell = seq;
+    ovs_mutex_unlock(&thread->cmd_queue_mutex);
 }
 
 static void
@@ -1551,5 +1833,204 @@ sessions_thread_exit(struct sessions_thread *thread)
 {
     latch_set(&thread->exit_latch);
     xpthread_join(thread->thread, NULL);
+    ovs_mutex_destroy(&thread->cmd_queue_mutex);
+    seq_destroy(thread->cmd_queue_seq);
     latch_destroy(&thread->exit_latch);
 }
+
+static void
+sessions_thread_command_run(struct sessions_thread *thread)
+{
+    uint64_t new_cmd_seq;
+
+    new_cmd_seq = seq_read(thread->cmd_queue_seq);
+    if (new_cmd_seq != thread->last_cmd_seq) {
+        ovs_mutex_lock(&thread->cmd_queue_mutex);
+        while (!list_is_empty(&thread->cmd_queue)) {
+             struct ipc_msg *msg;
+             struct ovs_list *node;
+
+             node = list_pop_front(&thread->cmd_queue);
+             msg = CONTAINER_OF(node, struct ipc_msg, list);
+             VLOG_ERR("thread receive command msg");
+             switch (msg->msg_type) {
+             case IPC_ADD_SESSION_COMMAND:
+                 sessions_thread_add_new_session(thread, msg);
+                 break;
+             case IPC_DELETE_SESSIONS_COMMAND:
+                 sessions_thread_delete_sessions(thread, msg);
+                 break;
+             case IPC_RECONNECT_SESSIONS_COMMAND:
+                 sessions_thread_reconnect_sessions(thread, msg);
+                 break;
+             case IPC_STOP_AND_GO_COMMAND:
+                 sessions_thread_stop_and_go(msg);
+                 break;
+             default:
+                  VLOG_ERR("thread receive unkown msg");
+             }
+             free(msg);
+        }
+        ovs_mutex_unlock(&thread->cmd_queue_mutex);
+        thread->last_cmd_seq = new_cmd_seq;
+    }
+}
+
+static void
+sessions_thread_add_new_session(struct sessions_thread *thread,
+                                struct ipc_msg *msg)
+{
+    struct jsonrpc_session *js;
+    struct ipc_msg_add_session_command *m =
+        (struct ipc_msg_add_session_command *)msg;
+
+    js = jsonrpc_session_open_unreliably(jsonrpc_open(m->stream), m->dscp);
+    ovsdb_jsonrpc_thread_session_create(thread, m->remote, m->server, js);
+}
+
+static void
+sessions_thread_delete_sessions(struct sessions_thread *thread,
+                                struct ipc_msg *msg)
+{
+    struct ipc_msg_delete_sessions_command *m =
+        (struct ipc_msg_delete_sessions_command *)msg;
+
+    ovsdb_jsonrpc_sessions_close(&thread->all_sessions, m->remote);
+    ovs_barrier_block(m->barrier);
+}
+
+static void
+sessions_thread_reconnect_sessions(struct sessions_thread *thread,
+                                   struct ipc_msg *msg)
+{
+    struct ipc_msg_reconnect_sessions_command *m =
+        (struct ipc_msg_reconnect_sessions_command *)msg;
+
+    ovsdb_jsonrpc_sessions_reconnect(&thread->all_sessions, m->remote);
+}
+
+static void
+sessions_thread_stop_and_go(struct ipc_msg *msg)
+{
+    struct ipc_msg_stop_and_go_command *m =
+        (struct ipc_msg_stop_and_go_command *)msg;
+
+    ovs_barrier_block(m->stop);
+    ovs_barrier_block(m->go);
+}
+
+static void
+sessions_thread_command_wait(struct sessions_thread *thread)
+{
+    seq_wait(thread->cmd_queue_seq, thread->last_cmd_seq);
+}
+
+static void
+ipc_msg_init(struct ipc_msg *msg, size_t size, enum ipc_msg_type msg_type)
+{
+    list_init(&msg->list);
+    msg->size = size;
+    msg->msg_type = msg_type;
+}
+
+static struct ipc_msg *
+ipc_msg_clone(struct ipc_msg *msg)
+{
+    struct ipc_msg *clone = xmalloc(msg->size);
+    memcpy(clone, msg, msg->size);
+    list_init(&clone->list);
+
+    return clone;
+}
+
+static struct ipc_msg *
+ipc_create_add_session_command(struct stream* stream,
+                               struct ovsdb_jsonrpc_server *server,
+                               void *remote, uint8_t dscp)
+{
+    struct ipc_msg_add_session_command *msg;
+    size_t size = sizeof *msg;
+
+    msg = xmalloc(sizeof *msg);
+    ipc_msg_init(&msg->up, size, IPC_ADD_SESSION_COMMAND);
+
+    msg->stream = stream;
+    msg->server = server;
+    msg->remote = remote;
+    msg->dscp = dscp;
+
+    return &msg->up;
+}
+
+static struct ipc_msg *
+ipc_create_delete_sessions_command(const void *remote,
+                                   struct ovs_barrier *barrier)
+{
+    struct ipc_msg_delete_sessions_command *msg;
+    size_t size = sizeof *msg;
+
+    msg = xmalloc(size);
+    ipc_msg_init(&msg->up, size, IPC_DELETE_SESSIONS_COMMAND);
+    msg->remote = remote;
+    msg->barrier = barrier;
+
+    return &msg->up;
+}
+
+static struct ipc_msg *
+ipc_create_reconnect_sessions_command(const void *remote,
+                                      struct ovs_barrier *barrier)
+{
+    struct ipc_msg_reconnect_sessions_command *msg;
+    size_t size = sizeof *msg;
+
+    msg = xmalloc(size);
+    ipc_msg_init(&msg->up, size, IPC_RECONNECT_SESSIONS_COMMAND);
+    msg->remote = remote;
+    msg->barrier = barrier;
+
+    return &msg->up;
+}
+
+static struct ipc_msg *
+ipc_create_stop_and_go_command(struct ovs_barrier *stop,
+                               struct ovs_barrier *go)
+{
+    struct ipc_msg_stop_and_go_command *msg;
+    size_t size = sizeof *msg;
+
+    msg = xmalloc(size);
+    ipc_msg_init(&msg->up, size, IPC_STOP_AND_GO_COMMAND);
+    msg->stop = stop;
+    msg->go = go;
+
+    return &msg->up;
+}
+
+/* Send an IPC message to a thread. The receiving thread is responsible for
+ * freeing the memory of 'msg'.   */
+static void
+send_ipc_msg(struct sessions_thread *thread, struct ipc_msg *msg)
+{
+    ovs_mutex_lock(&thread->cmd_queue_mutex);
+    list_push_back(&thread->cmd_queue, &msg->list);
+    ovs_mutex_unlock(&thread->cmd_queue_mutex);
+
+    seq_change(thread->cmd_queue_seq);
+}
+
+/* Send the same IPC message to all threads. 'msg' will be cloned for
+ * each thread. The receiving threads are responsible for freeing
+ * the memory of 'msg' */
+static void
+broadcast_ipc_msg(const struct ovsdb_jsonrpc_server *svr, struct ipc_msg *msg)
+{
+    size_t i;
+
+    for (i = svr->n_active_threads; i > 0; i--) {
+        struct sessions_thread *thread = &svr->threads[i-1];
+        struct ipc_msg *m = i ? ipc_msg_clone(msg) : msg;
+
+        send_ipc_msg(thread, m);
+    }
+}
-- 
1.9.1

_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to