commit b421d2af0ab (ovsdb-server: Add commands for adding and removing remotes)
made it possible to make ovsdb-server connect to OVS managers only after
ovs-vswitchd has completed its initial configuration. But this results in an
undesirable effect. Whenever ovsdb-server crashes, the monitor restarts its.
But ovsdb-server can no longer connect to the manager because the remotes were
added during runtime and that information is lost during the crash.

This commit lets the main ovsdb-server to sync the data that should not get
lost because of a crash. When the monitor restarts ovsdb-server, the data
is still available.

Signed-off-by: Gurucharan Shetty <gshe...@nicira.com>
---
 lib/rxbuf.c          |    7 +++
 lib/rxbuf.h          |    1 +
 ovsdb/automake.mk    |    2 +
 ovsdb/ovsdb-server.c |   75 +++++++++++++++++++++++++++-
 ovsdb/sync.c         |  135 ++++++++++++++++++++++++++++++++++++++++++++++++++
 ovsdb/sync.h         |   39 +++++++++++++++
 6 files changed, 258 insertions(+), 1 deletion(-)
 create mode 100644 ovsdb/sync.c
 create mode 100644 ovsdb/sync.h

diff --git a/lib/rxbuf.c b/lib/rxbuf.c
index 567f3a1..d988a63 100644
--- a/lib/rxbuf.c
+++ b/lib/rxbuf.c
@@ -26,6 +26,13 @@ rxbuf_init(struct rxbuf *rx)
 }
 
 void
+rxbuf_unint(struct rxbuf *rx)
+{
+    ofpbuf_uninit(&rx->header);
+    ofpbuf_uninit(&rx->payload);
+}
+
+void
 rxbuf_clear(struct rxbuf *rx)
 {
     ofpbuf_clear(&rx->header);
diff --git a/lib/rxbuf.h b/lib/rxbuf.h
index a6af7e0..94bcc42 100644
--- a/lib/rxbuf.h
+++ b/lib/rxbuf.h
@@ -32,6 +32,7 @@ struct rxbuf {
 };
 
 void rxbuf_init(struct rxbuf *);
+void rxbuf_uninit(struct rxbuf *);
 void rxbuf_clear(struct rxbuf *);
 int rxbuf_run(struct rxbuf *, int sock, size_t header_len);
 
diff --git a/ovsdb/automake.mk b/ovsdb/automake.mk
index d2e3f9a..f3fb990 100644
--- a/ovsdb/automake.mk
+++ b/ovsdb/automake.mk
@@ -23,6 +23,8 @@ ovsdb_libovsdb_a_SOURCES = \
        ovsdb/row.h \
        ovsdb/server.c \
        ovsdb/server.h \
+       ovsdb/sync.c \
+       ovsdb/sync.h \
        ovsdb/table.c \
        ovsdb/table.h \
        ovsdb/trigger.c \
diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
index 6311c6b..763fe12 100644
--- a/ovsdb/ovsdb-server.c
+++ b/ovsdb/ovsdb-server.c
@@ -47,6 +47,7 @@
 #include "stream.h"
 #include "stress.h"
 #include "sset.h"
+#include "sync.h"
 #include "table.h"
 #include "timeval.h"
 #include "transaction.h"
@@ -99,6 +100,13 @@ static void update_remote_status(const struct 
ovsdb_jsonrpc_server *jsonrpc,
                                  const struct sset *remotes,
                                  struct db dbs[], size_t n_dbs);
 
+/* Maintain the remotes added to a monitored process. */
+static struct sset remotes_sync;
+static void remotes_init(struct sset *remotes);
+
+static monitor_callback handle_monitor_callback;
+static sync_data_callback update_sync_data;
+
 int
 main(int argc, char *argv[])
 {
@@ -112,6 +120,7 @@ main(int argc, char *argv[])
     int retval;
     long long int status_timer = LLONG_MIN;
     struct add_remote_aux add_remote_aux;
+    const char *remote_name;
 
     struct db *dbs;
     int n_dbs;
@@ -125,7 +134,16 @@ main(int argc, char *argv[])
 
     parse_options(&argc, &argv, &remotes, &unixctl_path, &run_command);
 
-    daemonize_start(NULL);
+    if (daemon_get_monitor()) {
+        sync_init();
+    }
+    remotes_init(&remotes);
+
+    daemonize_start(handle_monitor_callback);
+
+    SSET_FOR_EACH (remote_name, &remotes_sync) {
+        sset_add(&remotes, remote_name);
+    }
 
     n_dbs = MAX(1, argc);
     dbs = xcalloc(n_dbs + 1, sizeof *dbs);
@@ -273,6 +291,49 @@ main(int argc, char *argv[])
     return 0;
 }
 
+static void
+remotes_init(struct sset *remotes)
+{
+    const char *remote_name;
+    sset_init(&remotes_sync);
+    SSET_FOR_EACH (remote_name, remotes) {
+        sset_add(&remotes_sync, remote_name);
+    }
+    sset_clear(remotes);
+}
+
+/* Callback from the monitor. */
+static void
+handle_monitor_callback(void)
+{
+    sync_receive_monitor(update_sync_data);
+}
+
+/* When ovsdb-sever is monitored by a 'monitor' and data that should remain
+ * constant across monitor restarts is changed in the monitored process,
+ * change the corresponding data based on 'request_type' in the monitor. */
+static void
+update_sync_data(enum sync_request_type request_type, struct ofpbuf *reply)
+{
+    char *data;
+    switch(request_type) {
+    case SYNC_REMOTE_ADD:
+        data = xmalloc(reply->size+1);
+        ovs_strlcpy(data, reply->data, reply->size+1);
+        sset_add(&remotes_sync, data);
+        free(data);
+        break;
+    case SYNC_REMOTE_REMOVE:
+        data = xmalloc(reply->size+1);
+        ovs_strlcpy(data, reply->data, reply->size+1);
+        sset_find_and_delete(&remotes_sync, data);
+        free(data);
+        break;
+    default:
+        VLOG_ERR("Uknown sync request_type.");
+    }
+}
+
 static const struct db *
 find_db(const struct db dbs[], size_t n_dbs, const char *db_name)
 {
@@ -913,12 +974,18 @@ ovsdb_server_add_remote(struct unixctl_conn *conn, int 
argc OVS_UNUSED,
     const struct ovsdb_table *table;
     const struct db *db;
     char *retval;
+    enum sync_request_type request_type = SYNC_REMOTE_ADD;
 
     retval = (strncmp("db:", remote, 3)
               ? NULL
               : parse_db_column(aux->dbs, aux->n_dbs, remote,
                                 &db, &table, &column));
     if (!retval) {
+        if (daemon_get_monitor() &&
+                !!sync_send_monitor(remote, request_type)) {
+            unixctl_command_reply_error(conn, "system error.");
+            return;
+        }
         sset_add(aux->remotes, remote);
         unixctl_command_reply(conn, NULL);
     } else {
@@ -935,9 +1002,15 @@ ovsdb_server_remove_remote(struct unixctl_conn *conn, int 
argc OVS_UNUSED,
 {
     struct sset *remotes = remotes_;
     struct sset_node *node;
+    enum sync_request_type request_type = SYNC_REMOTE_REMOVE;
 
     node = sset_find(remotes, argv[1]);
     if (node) {
+        if (daemon_get_monitor() &&
+                !!sync_send_monitor(argv[1], request_type)) {
+            unixctl_command_reply_error(conn, "system error.");
+            return;
+        }
         sset_delete(remotes, node);
         unixctl_command_reply(conn, NULL);
     } else {
diff --git a/ovsdb/sync.c b/ovsdb/sync.c
new file mode 100644
index 0000000..d108ecd
--- /dev/null
+++ b/ovsdb/sync.c
@@ -0,0 +1,135 @@
+/* Copyright (c) 2013 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "sync.h"
+
+#include <errno.h>
+#include <poll.h>
+#include <stddef.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <sys/wait.h>
+#include <unistd.h>
+
+#include "poll-loop.h"
+#include "rxbuf.h"
+#include "socket-util.h"
+#include "util.h"
+#include "vlog.h"
+
+VLOG_DEFINE_THIS_MODULE(sync);
+
+static int sync_fds[2];
+
+void
+sync_init(void)
+{
+    xsocketpair(AF_UNIX, SOCK_STREAM, 0, sync_fds);
+    xset_nonblocking(sync_fds[0]);
+    xset_nonblocking(sync_fds[1]);
+}
+
+/* Runs in a monitor in an infinite loop waiting to receive requests from
+ * a monitored process to sync data. The function returns in case the
+ * monitored process dies. */
+void
+sync_receive_monitor(sync_data_callback *cb)
+{
+    int error;
+    struct sync_send_request request;
+    struct rxbuf rx;
+
+    if (sync_fds[1]) {
+        close(sync_fds[1]);
+        sync_fds[1] = -1;
+    }
+
+    rxbuf_init(&rx);
+    for(;;) {
+        error = rxbuf_run(&rx, sync_fds[0], sizeof(struct sync_send_request));
+        if (!error) {
+            request = *(struct sync_send_request *) rx.header.data;
+            cb(request.request_type, &rx.payload);
+            rxbuf_clear(&rx);
+        } else if (error == EOF && !rx.header.size) {
+            /* The child closed the IPC socket. Exit cleanly. */
+            break;
+        } else if (error != EAGAIN) {
+            VLOG_ERR("RPC receive failed (%s)", ovs_retval_to_string(error));
+            break;
+        }
+        poll_fd_wait(sync_fds[0], POLLIN|POLLRDHUP|POLLERR);
+        poll_block();
+    }
+    rxbuf_unint(&rx);
+    close(sync_fds[0]);
+    sync_init();
+}
+
+/* Called by a monitored process with customized 'data' and a 'request_type'
+ * that is understood by the monitor process. */
+int
+sync_send_monitor(const char *data, enum sync_request_type request_type)
+{
+    struct sync_send_request rq;
+    int error;
+    size_t sent = 0;
+    struct pollfd pfd;
+
+    if (sync_fds[0]) {
+        sync_fds[0] = -1;
+        close(sync_fds[0]);
+    }
+
+    if (strlen(data) > 0) {
+        struct iovec iov[2];
+
+        memset(iov, 0, sizeof(iov));
+        memset(&rq, 0, sizeof(rq));
+
+        rq.request_len = strlen(data);
+        rq.request_type = request_type;
+        iov[0].iov_base = (void *) &rq;
+        iov[0].iov_len = sizeof(rq);
+
+        iov[1].iov_base = (void *) data;
+        iov[1].iov_len = strlen(data);
+
+        for(;;) {
+            error = send_iovec_and_fds_fully(sync_fds[1], iov, 2, NULL, 0,
+                                            sent, &sent);
+            if (error != EAGAIN) {
+                return error;
+            }
+
+            /* Wait for 'client_sock' to become ready before trying again. */
+            pfd.fd = sync_fds[1];
+            pfd.events = POLLOUT;
+            do {
+                error = poll(&pfd, 1, -1) < 0 ? errno : 0;
+            } while (error == EINTR);
+            if (error) {
+                VLOG_ERR("poll failed (%s)", strerror(error));
+                return error;
+            }
+        }
+    }
+    return 0;
+}
diff --git a/ovsdb/sync.h b/ovsdb/sync.h
new file mode 100644
index 0000000..0b05eeb
--- /dev/null
+++ b/ovsdb/sync.h
@@ -0,0 +1,39 @@
+/* Copyright (c) 2013 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SYNC_H
+#define SYNC_H 1
+
+#include <stdbool.h>
+#include <stddef.h>
+
+#include "ofpbuf.h"
+
+enum sync_request_type {
+    SYNC_REMOTE_ADD,        /* Add remote manager. */
+    SYNC_REMOTE_REMOVE      /* Remove remote manager. */
+};
+
+struct sync_send_request {
+    size_t request_len;                   /* Length of the payload in bytes. */
+    enum sync_request_type request_type;  /* Type of payload. */
+};
+
+void sync_init(void);
+int sync_send_monitor(const char *data, enum sync_request_type request_type);
+typedef void sync_data_callback(enum sync_request_type, struct ofpbuf *);
+void sync_receive_monitor(sync_data_callback *cb);
+
+#endif /* sync.h */
-- 
1.7.9.5

_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to