commit b421d2af0ab (ovsdb-server: Add commands for adding and removing remotes) made it possible to make ovsdb-server connect to OVS managers only after ovs-vswitchd has completed its initial configuration. But this results in an undesirable effect. Whenever ovsdb-server crashes, the monitor restarts its. But ovsdb-server can no longer connect to the manager because the remotes were added during runtime and that information is lost during the crash.
This commit lets the main ovsdb-server to sync the data that should not get lost because of a crash. When the monitor restarts ovsdb-server, the data is still available. Signed-off-by: Gurucharan Shetty <gshe...@nicira.com> --- lib/rxbuf.c | 7 +++ lib/rxbuf.h | 1 + ovsdb/automake.mk | 2 + ovsdb/ovsdb-server.c | 75 +++++++++++++++++++++++++++- ovsdb/sync.c | 135 ++++++++++++++++++++++++++++++++++++++++++++++++++ ovsdb/sync.h | 39 +++++++++++++++ 6 files changed, 258 insertions(+), 1 deletion(-) create mode 100644 ovsdb/sync.c create mode 100644 ovsdb/sync.h diff --git a/lib/rxbuf.c b/lib/rxbuf.c index 567f3a1..d988a63 100644 --- a/lib/rxbuf.c +++ b/lib/rxbuf.c @@ -26,6 +26,13 @@ rxbuf_init(struct rxbuf *rx) } void +rxbuf_unint(struct rxbuf *rx) +{ + ofpbuf_uninit(&rx->header); + ofpbuf_uninit(&rx->payload); +} + +void rxbuf_clear(struct rxbuf *rx) { ofpbuf_clear(&rx->header); diff --git a/lib/rxbuf.h b/lib/rxbuf.h index a6af7e0..94bcc42 100644 --- a/lib/rxbuf.h +++ b/lib/rxbuf.h @@ -32,6 +32,7 @@ struct rxbuf { }; void rxbuf_init(struct rxbuf *); +void rxbuf_uninit(struct rxbuf *); void rxbuf_clear(struct rxbuf *); int rxbuf_run(struct rxbuf *, int sock, size_t header_len); diff --git a/ovsdb/automake.mk b/ovsdb/automake.mk index d2e3f9a..f3fb990 100644 --- a/ovsdb/automake.mk +++ b/ovsdb/automake.mk @@ -23,6 +23,8 @@ ovsdb_libovsdb_a_SOURCES = \ ovsdb/row.h \ ovsdb/server.c \ ovsdb/server.h \ + ovsdb/sync.c \ + ovsdb/sync.h \ ovsdb/table.c \ ovsdb/table.h \ ovsdb/trigger.c \ diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c index 6311c6b..763fe12 100644 --- a/ovsdb/ovsdb-server.c +++ b/ovsdb/ovsdb-server.c @@ -47,6 +47,7 @@ #include "stream.h" #include "stress.h" #include "sset.h" +#include "sync.h" #include "table.h" #include "timeval.h" #include "transaction.h" @@ -99,6 +100,13 @@ static void update_remote_status(const struct ovsdb_jsonrpc_server *jsonrpc, const struct sset *remotes, struct db dbs[], size_t n_dbs); +/* Maintain the remotes added to a monitored process. */ +static struct sset remotes_sync; +static void remotes_init(struct sset *remotes); + +static monitor_callback handle_monitor_callback; +static sync_data_callback update_sync_data; + int main(int argc, char *argv[]) { @@ -112,6 +120,7 @@ main(int argc, char *argv[]) int retval; long long int status_timer = LLONG_MIN; struct add_remote_aux add_remote_aux; + const char *remote_name; struct db *dbs; int n_dbs; @@ -125,7 +134,16 @@ main(int argc, char *argv[]) parse_options(&argc, &argv, &remotes, &unixctl_path, &run_command); - daemonize_start(NULL); + if (daemon_get_monitor()) { + sync_init(); + } + remotes_init(&remotes); + + daemonize_start(handle_monitor_callback); + + SSET_FOR_EACH (remote_name, &remotes_sync) { + sset_add(&remotes, remote_name); + } n_dbs = MAX(1, argc); dbs = xcalloc(n_dbs + 1, sizeof *dbs); @@ -273,6 +291,49 @@ main(int argc, char *argv[]) return 0; } +static void +remotes_init(struct sset *remotes) +{ + const char *remote_name; + sset_init(&remotes_sync); + SSET_FOR_EACH (remote_name, remotes) { + sset_add(&remotes_sync, remote_name); + } + sset_clear(remotes); +} + +/* Callback from the monitor. */ +static void +handle_monitor_callback(void) +{ + sync_receive_monitor(update_sync_data); +} + +/* When ovsdb-sever is monitored by a 'monitor' and data that should remain + * constant across monitor restarts is changed in the monitored process, + * change the corresponding data based on 'request_type' in the monitor. */ +static void +update_sync_data(enum sync_request_type request_type, struct ofpbuf *reply) +{ + char *data; + switch(request_type) { + case SYNC_REMOTE_ADD: + data = xmalloc(reply->size+1); + ovs_strlcpy(data, reply->data, reply->size+1); + sset_add(&remotes_sync, data); + free(data); + break; + case SYNC_REMOTE_REMOVE: + data = xmalloc(reply->size+1); + ovs_strlcpy(data, reply->data, reply->size+1); + sset_find_and_delete(&remotes_sync, data); + free(data); + break; + default: + VLOG_ERR("Uknown sync request_type."); + } +} + static const struct db * find_db(const struct db dbs[], size_t n_dbs, const char *db_name) { @@ -913,12 +974,18 @@ ovsdb_server_add_remote(struct unixctl_conn *conn, int argc OVS_UNUSED, const struct ovsdb_table *table; const struct db *db; char *retval; + enum sync_request_type request_type = SYNC_REMOTE_ADD; retval = (strncmp("db:", remote, 3) ? NULL : parse_db_column(aux->dbs, aux->n_dbs, remote, &db, &table, &column)); if (!retval) { + if (daemon_get_monitor() && + !!sync_send_monitor(remote, request_type)) { + unixctl_command_reply_error(conn, "system error."); + return; + } sset_add(aux->remotes, remote); unixctl_command_reply(conn, NULL); } else { @@ -935,9 +1002,15 @@ ovsdb_server_remove_remote(struct unixctl_conn *conn, int argc OVS_UNUSED, { struct sset *remotes = remotes_; struct sset_node *node; + enum sync_request_type request_type = SYNC_REMOTE_REMOVE; node = sset_find(remotes, argv[1]); if (node) { + if (daemon_get_monitor() && + !!sync_send_monitor(argv[1], request_type)) { + unixctl_command_reply_error(conn, "system error."); + return; + } sset_delete(remotes, node); unixctl_command_reply(conn, NULL); } else { diff --git a/ovsdb/sync.c b/ovsdb/sync.c new file mode 100644 index 0000000..d108ecd --- /dev/null +++ b/ovsdb/sync.c @@ -0,0 +1,135 @@ +/* Copyright (c) 2013 Nicira, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> + +#include "sync.h" + +#include <errno.h> +#include <poll.h> +#include <stddef.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <sys/wait.h> +#include <unistd.h> + +#include "poll-loop.h" +#include "rxbuf.h" +#include "socket-util.h" +#include "util.h" +#include "vlog.h" + +VLOG_DEFINE_THIS_MODULE(sync); + +static int sync_fds[2]; + +void +sync_init(void) +{ + xsocketpair(AF_UNIX, SOCK_STREAM, 0, sync_fds); + xset_nonblocking(sync_fds[0]); + xset_nonblocking(sync_fds[1]); +} + +/* Runs in a monitor in an infinite loop waiting to receive requests from + * a monitored process to sync data. The function returns in case the + * monitored process dies. */ +void +sync_receive_monitor(sync_data_callback *cb) +{ + int error; + struct sync_send_request request; + struct rxbuf rx; + + if (sync_fds[1]) { + close(sync_fds[1]); + sync_fds[1] = -1; + } + + rxbuf_init(&rx); + for(;;) { + error = rxbuf_run(&rx, sync_fds[0], sizeof(struct sync_send_request)); + if (!error) { + request = *(struct sync_send_request *) rx.header.data; + cb(request.request_type, &rx.payload); + rxbuf_clear(&rx); + } else if (error == EOF && !rx.header.size) { + /* The child closed the IPC socket. Exit cleanly. */ + break; + } else if (error != EAGAIN) { + VLOG_ERR("RPC receive failed (%s)", ovs_retval_to_string(error)); + break; + } + poll_fd_wait(sync_fds[0], POLLIN|POLLRDHUP|POLLERR); + poll_block(); + } + rxbuf_unint(&rx); + close(sync_fds[0]); + sync_init(); +} + +/* Called by a monitored process with customized 'data' and a 'request_type' + * that is understood by the monitor process. */ +int +sync_send_monitor(const char *data, enum sync_request_type request_type) +{ + struct sync_send_request rq; + int error; + size_t sent = 0; + struct pollfd pfd; + + if (sync_fds[0]) { + sync_fds[0] = -1; + close(sync_fds[0]); + } + + if (strlen(data) > 0) { + struct iovec iov[2]; + + memset(iov, 0, sizeof(iov)); + memset(&rq, 0, sizeof(rq)); + + rq.request_len = strlen(data); + rq.request_type = request_type; + iov[0].iov_base = (void *) &rq; + iov[0].iov_len = sizeof(rq); + + iov[1].iov_base = (void *) data; + iov[1].iov_len = strlen(data); + + for(;;) { + error = send_iovec_and_fds_fully(sync_fds[1], iov, 2, NULL, 0, + sent, &sent); + if (error != EAGAIN) { + return error; + } + + /* Wait for 'client_sock' to become ready before trying again. */ + pfd.fd = sync_fds[1]; + pfd.events = POLLOUT; + do { + error = poll(&pfd, 1, -1) < 0 ? errno : 0; + } while (error == EINTR); + if (error) { + VLOG_ERR("poll failed (%s)", strerror(error)); + return error; + } + } + } + return 0; +} diff --git a/ovsdb/sync.h b/ovsdb/sync.h new file mode 100644 index 0000000..0b05eeb --- /dev/null +++ b/ovsdb/sync.h @@ -0,0 +1,39 @@ +/* Copyright (c) 2013 Nicira, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef SYNC_H +#define SYNC_H 1 + +#include <stdbool.h> +#include <stddef.h> + +#include "ofpbuf.h" + +enum sync_request_type { + SYNC_REMOTE_ADD, /* Add remote manager. */ + SYNC_REMOTE_REMOVE /* Remove remote manager. */ +}; + +struct sync_send_request { + size_t request_len; /* Length of the payload in bytes. */ + enum sync_request_type request_type; /* Type of payload. */ +}; + +void sync_init(void); +int sync_send_monitor(const char *data, enum sync_request_type request_type); +typedef void sync_data_callback(enum sync_request_type, struct ofpbuf *); +void sync_receive_monitor(sync_data_callback *cb); + +#endif /* sync.h */ -- 1.7.9.5 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev