All of this makes sense, thanks.

Ethan

On Wed, Jun 27, 2012 at 11:25 AM, Ben Pfaff <b...@nicira.com> wrote:
> ovs-vswitchd is effectively a "soft real-time" process, because flows that
> do not get set up quickly lead to packet loss or retransmission.  We've
> done our best to keep it from blocking unnecessarily, but some operations
> unavoidably block.  This new library allows a daemon to break itself up
> into a main process and a worker process, connected by an RPC channel,
> with the idea being that the main process will delegate any possibly
> blocking operations to the worker.
>
> This commit also modifies ovs-vswitchd to start a worker process, but it
> does not actually introduce any uses for the worker process.  Upcoming
> commits will add those.
>
> Signed-off-by: Ben Pfaff <b...@nicira.com>
> ---
>  lib/automake.mk         |    4 +-
>  lib/worker.c            |  447 
> +++++++++++++++++++++++++++++++++++++++++++++++
>  lib/worker.h            |   68 +++++++
>  vswitchd/ovs-vswitchd.c |    5 +
>  4 files changed, 523 insertions(+), 1 deletions(-)
>  create mode 100644 lib/worker.c
>  create mode 100644 lib/worker.h
>
> diff --git a/lib/automake.mk b/lib/automake.mk
> index 9d8b426..93a4860 100644
> --- a/lib/automake.mk
> +++ b/lib/automake.mk
> @@ -197,7 +197,9 @@ lib_libopenvswitch_a_SOURCES = \
>        lib/vlog.c \
>        lib/vlog.h \
>        lib/vswitch-idl.c \
> -       lib/vswitch-idl.h
> +       lib/vswitch-idl.h \
> +       lib/worker.c \
> +       lib/worker.h
>
>  nodist_lib_libopenvswitch_a_SOURCES = \
>        lib/dirs.c
> diff --git a/lib/worker.c b/lib/worker.c
> new file mode 100644
> index 0000000..bc44885
> --- /dev/null
> +++ b/lib/worker.c
> @@ -0,0 +1,447 @@
> +/* Copyright (c) 2012 Nicira, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#include <config.h>
> +
> +#include "worker.h"
> +
> +#include <assert.h>
> +#include <errno.h>
> +#include <stdlib.h>
> +#include <string.h>
> +#include <sys/socket.h>
> +#include <sys/types.h>
> +#include <sys/uio.h>
> +#include <sys/wait.h>
> +#include <unistd.h>
> +
> +#include "command-line.h"
> +#include "daemon.h"
> +#include "ofpbuf.h"
> +#include "poll-loop.h"
> +#include "socket-util.h"
> +#include "util.h"
> +#include "vlog.h"
> +
> +VLOG_DEFINE_THIS_MODULE(worker);
> +
> +/* Header for an RPC request. */
> +struct worker_request {
> +    size_t request_len;              /* Length of the payload in bytes. */
> +    worker_request_func *request_cb; /* Function to call in worker process. 
> */
> +    worker_reply_func *reply_cb;     /* Function to call in main process. */
> +    void *reply_aux;                 /* Auxiliary data for 'reply_cb'. */
> +};
> +
> +/* Header for an RPC reply. */
> +struct worker_reply {
> +    size_t reply_len;            /* Length of the payload in bytes. */
> +    worker_reply_func *reply_cb; /* Function to call in main process. */
> +    void *reply_aux;             /* Auxiliary data for 'reply_cb'. */
> +};
> +
> +/* Receive buffer for a RPC request or reply. */
> +struct rxbuf {
> +    /* Header. */
> +    struct ofpbuf header;       /* Header data. */
> +    int fds[SOUTIL_MAX_FDS];    /* File descriptors. */
> +    size_t n_fds;
> +
> +    /* Payload. */
> +    struct ofpbuf payload;      /* Payload data. */
> +};
> +
> +static int client_sock = -1;
> +static struct rxbuf client_rx;
> +
> +static void rxbuf_init(struct rxbuf *);
> +static void rxbuf_clear(struct rxbuf *);
> +static int rxbuf_run(struct rxbuf *, int sock, size_t header_len);
> +
> +static struct iovec *prefix_iov(void *data, size_t len,
> +                                const struct iovec *iovs, size_t n_iovs);
> +
> +static void worker_broke(void);
> +
> +static void worker_main(int fd) NO_RETURN;
> +
> +/* Starts a worker process as a subprocess of the current process.  Currently
> + * only a single worker process is supported, so this function may only be
> + * called once.
> + *
> + * The client should call worker_run() and worker_wait() from its main loop.
> + *
> + * Call this function between daemonize_start() and daemonize_complete(). */
> +void
> +worker_start(void)
> +{
> +    int work_fds[2];
> +
> +    assert(client_sock < 0);
> +
> +    /* Create non-blocking socket pair. */
> +    xsocketpair(AF_UNIX, SOCK_STREAM, 0, work_fds);
> +    xset_nonblocking(work_fds[0]);
> +    xset_nonblocking(work_fds[1]);
> +
> +    if (!fork_and_clean_up()) {
> +        /* In child (worker) process. */
> +        daemonize_post_detach();
> +        close(work_fds[0]);
> +        worker_main(work_fds[1]);
> +        NOT_REACHED();
> +    }
> +
> +    /* In parent (main) process. */
> +    close(work_fds[1]);
> +    client_sock = work_fds[0];
> +    rxbuf_init(&client_rx);
> +}
> +
> +/* Returns true if this process has started a worker and the worker is not
> + * known to have malfunctioned. */
> +bool
> +worker_is_running(void)
> +{
> +    return client_sock >= 0;
> +}
> +
> +/* If a worker process was started, processes RPC replies from it, calling 
> the
> + * registered 'reply_cb' callbacks.
> + *
> + * If the worker process died or malfunctioned, aborts. */
> +void
> +worker_run(void)
> +{
> +    if (worker_is_running()) {
> +        int error;
> +
> +        error = rxbuf_run(&client_rx, client_sock,
> +                          sizeof(struct worker_reply));
> +        if (!error) {
> +            struct worker_reply *reply = client_rx.header.data;
> +            reply->reply_cb(&client_rx.payload, client_rx.fds,
> +                            client_rx.n_fds, reply->reply_aux);
> +            rxbuf_clear(&client_rx);
> +        } else if (error != EAGAIN) {
> +            worker_broke();
> +            VLOG_ABORT("receive from worker failed (%s)",
> +                       ovs_retval_to_string(error));
> +        }
> +    }
> +}
> +
> +/* Causes the poll loop to wake up if we need to process RPC replies. */
> +void
> +worker_wait(void)
> +{
> +    if (worker_is_running()) {
> +        poll_fd_wait(client_sock, POLLIN);
> +    }
> +}
> +
> +/* Interface for main process to interact with the worker. */
> +
> +/* Sends an RPC request to the worker process.  The worker process will call
> + * 'request_cb' passing the 'size' (zero or more) bytes of data in 'data' as
> + * arguments as well as the 'n_fds' (SOUTIL_MAX_FDS or fewer) file 
> descriptors
> + * in 'fds'.
> + *
> + * If and only if 'reply_cb' is nonnull, 'request_cb' must call 
> worker_reply()
> + * or worker_reply_iovec() with a reply.  The main process will later call
> + * 'reply_cb' with the reply data (if any) and file descriptors (if any).
> + *
> + * 'request_cb' receives copies (as if by dup()) of the file descriptors in
> + * fds[].  'request_cb' takes ownership of these copies, and the caller of
> + * worker_request() retains its ownership of the originals.
> + *
> + * This function may block until the RPC request has been sent (if the socket
> + * buffer fills up) but it does not wait for the reply (if any).  If this
> + * function blocks, it may invoke reply callbacks for previous requests.
> + *
> + * The worker process executes RPC requests in strict order of submission and
> + * runs each request to completion before beginning the next request.  The 
> main
> + * process invokes reply callbacks in strict order of request submission. */
> +void
> +worker_request(const void *data, size_t size,
> +               const int fds[], size_t n_fds,
> +               worker_request_func *request_cb,
> +               worker_reply_func *reply_cb, void *aux)
> +{
> +    if (size > 0) {
> +        struct iovec iov;
> +
> +        iov.iov_base = (void *) data;
> +        iov.iov_len = size;
> +        worker_request_iovec(&iov, 1, fds, n_fds, request_cb, reply_cb, aux);
> +    } else {
> +        worker_request_iovec(NULL, 0, fds, n_fds, request_cb, reply_cb, aux);
> +    }
> +}
> +
> +static int
> +worker_send_iovec(const struct iovec iovs[], size_t n_iovs,
> +                  const int fds[], size_t n_fds)
> +{
> +    size_t sent = 0;
> +
> +    for (;;) {
> +        int error;
> +
> +        /* Try to send the rest of the request. */
> +        error = send_iovec_and_fds_fully(client_sock, iovs, n_iovs,
> +                                         fds, n_fds, sent, &sent);
> +        if (error != EAGAIN) {
> +            return error;
> +        }
> +
> +        /* Process replies to avoid deadlock. */
> +        worker_run();
> +
> +        poll_fd_wait(client_sock, POLLIN | POLLOUT);
> +        poll_block();
> +    }
> +}
> +
> +/* Same as worker_request() except that the data to send is specified as an
> + * array of iovecs. */
> +void
> +worker_request_iovec(const struct iovec iovs[], size_t n_iovs,
> +                     const int fds[], size_t n_fds,
> +                     worker_request_func *request_cb,
> +                     worker_reply_func *reply_cb, void *aux)
> +{
> +    struct worker_request rq;
> +    struct iovec *all_iovs;
> +    int error;
> +
> +    assert(worker_is_running());
> +
> +    rq.request_len = iovec_len(iovs, n_iovs);
> +    rq.request_cb = request_cb;
> +    rq.reply_cb = reply_cb;
> +    rq.reply_aux = aux;
> +
> +    all_iovs = prefix_iov(&rq, sizeof rq, iovs, n_iovs);
> +    error = worker_send_iovec(all_iovs, n_iovs + 1, fds, n_fds);
> +    if (error) {
> +        worker_broke();
> +        VLOG_ABORT("send failed (%s)", strerror(error));
> +    }
> +    free(all_iovs);
> +}
> +
> +/* Closes the client socket, if any, so that worker_is_running() will return
> + * false.
> + *
> + * The client does this just before aborting if the worker process dies or
> + * malfunctions, to prevent the logging subsystem from trying to use the
> + * worker to log the failure. */
> +static void
> +worker_broke(void)
> +{
> +    if (client_sock >= 0) {
> +        close(client_sock);
> +        client_sock = -1;
> +    }
> +}
> +
> +/* Interfaces for RPC implementations (running in the worker process). */
> +
> +static int server_sock = -1;
> +static bool expect_reply;
> +static struct worker_request request;
> +
> +/* When a call to worker_request() or worker_request_iovec() provides a
> + * 'reply_cb' callback, the 'request_cb' implementation must call this 
> function
> + * to send its reply.  The main process will call 'reply_cb' passing the
> + * 'size' (zero or more) bytes of data in 'data' as arguments as well as the
> + * 'n_fds' (SOUTIL_MAX_FDS or fewer) file descriptors in 'fds'.
> + *
> + * If a call to worker_request() or worker_request_iovec() provides no
> + * 'reply_cb' callback, the 'request_cb' implementation must not call this
> + * function.
> + *
> + * 'reply_cb' receives copies (as if by dup()) of the file descriptors in
> + * fds[].  'reply_cb' takes ownership of these copies, and the caller of
> + * worker_reply() retains its ownership of the originals.
> + *
> + * This function blocks until the RPC reply has been sent (if the socket 
> buffer
> + * fills up) but it does not wait for the main process to receive or to 
> process
> + * the reply. */
> +void
> +worker_reply(const void *data, size_t size, const int fds[], size_t n_fds)
> +{
> +    if (size > 0) {
> +        struct iovec iov;
> +
> +        iov.iov_base = (void *) data;
> +        iov.iov_len = size;
> +        worker_reply_iovec(&iov, 1, fds, n_fds);
> +    } else {
> +        worker_reply_iovec(NULL, 0, fds, n_fds);
> +    }
> +}
> +
> +/* Same as worker_reply() except that the data to send is specified as an 
> array
> + * of iovecs. */
> +void
> +worker_reply_iovec(const struct iovec *iovs, size_t n_iovs,
> +                       const int fds[], size_t n_fds)
> +{
> +    struct worker_reply reply;
> +    struct iovec *all_iovs;
> +    int error;
> +
> +    assert(expect_reply);
> +    expect_reply = false;
> +
> +    reply.reply_len = iovec_len(iovs, n_iovs);
> +    reply.reply_cb = request.reply_cb;
> +    reply.reply_aux = request.reply_aux;
> +
> +    all_iovs = prefix_iov(&reply, sizeof reply, iovs, n_iovs);
> +
> +    error = send_iovec_and_fds_fully_block(server_sock, all_iovs, n_iovs + 1,
> +                                           fds, n_fds);
> +    if (error == EPIPE) {
> +        /* Parent probably died.  Continue processing any RPCs still 
> buffered,
> +         * to avoid missing log messages. */
> +        VLOG_INFO("send failed (%s)", strerror(error));
> +    } else if (error) {
> +        VLOG_ABORT("send failed (%s)", strerror(error));
> +    }
> +
> +    free(all_iovs);
> +}
> +
> +static void
> +worker_main(int fd)
> +{
> +    struct rxbuf rx;
> +
> +    server_sock = fd;
> +
> +    subprogram_name = "worker";
> +    proctitle_set("%s: worker process for pid %lu",
> +                  program_name, (unsigned long int) getppid());
> +    VLOG_INFO("worker process started");
> +
> +    rxbuf_init(&rx);
> +    for (;;) {
> +        int error;
> +
> +        error = rxbuf_run(&rx, server_sock, sizeof(struct worker_request));
> +        if (!error) {
> +            request = *(struct worker_request *) rx.header.data;
> +
> +            expect_reply = request.reply_cb != NULL;
> +            request.request_cb(&rx.payload, rx.fds, rx.n_fds);
> +            assert(!expect_reply);
> +
> +            rxbuf_clear(&rx);
> +        } else if (error == EOF && !rx.header.size) {
> +            /* Main process closed the IPC socket.  Exit cleanly. */
> +            break;
> +        } else if (error != EAGAIN) {
> +            VLOG_ABORT("RPC receive failed (%s)", strerror(error));
> +        }
> +
> +        poll_fd_wait(server_sock, POLLIN);
> +        poll_block();
> +    }
> +
> +    VLOG_INFO("worker process exiting");
> +    exit(0);
> +}
> +
> +static void
> +rxbuf_init(struct rxbuf *rx)
> +{
> +    ofpbuf_init(&rx->header, 0);
> +    rx->n_fds = 0;
> +    ofpbuf_init(&rx->payload, 0);
> +}
> +
> +static void
> +rxbuf_clear(struct rxbuf *rx)
> +{
> +    ofpbuf_clear(&rx->header);
> +    rx->n_fds = 0;
> +    ofpbuf_clear(&rx->payload);
> +}
> +
> +static int
> +rxbuf_run(struct rxbuf *rx, int sock, size_t header_len)
> +{
> +    for (;;) {
> +        if (!rx->header.size) {
> +            int retval;
> +
> +            ofpbuf_clear(&rx->header);
> +            ofpbuf_prealloc_tailroom(&rx->header, header_len);
> +
> +            retval = recv_data_and_fds(sock, rx->header.data, header_len,
> +                                       rx->fds, &rx->n_fds);
> +            if (retval <= 0) {
> +                return retval ? -retval : EOF;
> +            }
> +            rx->header.size += retval;
> +        } else if (rx->header.size < header_len) {
> +            size_t bytes_read;
> +            int error;
> +
> +            error = read_fully(sock, ofpbuf_tail(&rx->header),
> +                               header_len - rx->header.size, &bytes_read);
> +            rx->header.size += bytes_read;
> +            if (error) {
> +                return error;
> +            }
> +        } else {
> +            size_t payload_len = *(size_t *) rx->header.data;
> +
> +            if (rx->payload.size < payload_len) {
> +                size_t left = payload_len - rx->payload.size;
> +                size_t bytes_read;
> +                int error;
> +
> +                ofpbuf_prealloc_tailroom(&rx->payload, left);
> +                error = read_fully(sock, ofpbuf_tail(&rx->payload), left,
> +                                   &bytes_read);
> +                rx->payload.size += bytes_read;
> +                if (error) {
> +                    return error;
> +                }
> +            } else {
> +                return 0;
> +            }
> +        }
> +    }
> +
> +    return EAGAIN;
> +}
> +
> +static struct iovec *
> +prefix_iov(void *data, size_t len, const struct iovec *iovs, size_t n_iovs)
> +{
> +    struct iovec *dst;
> +
> +    dst = xmalloc((n_iovs + 1) * sizeof *dst);
> +    dst[0].iov_base = data;
> +    dst[0].iov_len = len;
> +    memcpy(dst + 1, iovs, n_iovs * sizeof *iovs);
> +
> +    return dst;
> +}
> diff --git a/lib/worker.h b/lib/worker.h
> new file mode 100644
> index 0000000..135d50d
> --- /dev/null
> +++ b/lib/worker.h
> @@ -0,0 +1,68 @@
> +/* Copyright (c) 2012 Nicira, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#ifndef WORKER_H
> +#define WORKER_H 1
> +
> +/* Worker processes.
> + *
> + * Thes functions allow an OVS daemon to fork off a "worker process" to do
> + * tasks that may unavoidably block in the kernel.  The worker executes 
> remote
> + * procedure calls on behalf of the main process.
> + *
> + * Tasks that may unavoidably block in the kernel include writes to regular
> + * files, sends to Generic Netlink sockets (which as of this writing use a
> + * global lock), and other unusual operations.
> + *
> + * The worker functions *will* block if the finite buffer between a main
> + * process and its worker process fills up.
> + */
> +
> +#include <stdbool.h>
> +#include <stddef.h>
> +#include "compiler.h"
> +
> +struct iovec;
> +struct ofpbuf;
> +
> +/* The main process calls this function to start a worker. */
> +void worker_start(void);
> +
> +/* Interface for main process to interact with the worker. */
> +typedef void worker_request_func(struct ofpbuf *request,
> +                                 const int fds[], size_t n_fds);
> +typedef void worker_reply_func(struct ofpbuf *reply,
> +                               const int fds[], size_t n_fds, void *aux);
> +
> +bool worker_is_running(void);
> +void worker_run(void);
> +void worker_wait(void);
> +
> +void worker_request(const void *data, size_t size,
> +                    const int fds[], size_t n_fds,
> +                    worker_request_func *request_cb,
> +                    worker_reply_func *reply_cb, void *aux);
> +void worker_request_iovec(const struct iovec *iovs, size_t n_iovs,
> +                          const int fds[], size_t n_fds,
> +                          worker_request_func *request_cb,
> +                          worker_reply_func *reply_cb, void *aux);
> +
> +/* Interfaces for RPC implementations (running in the worker process). */
> +void worker_reply(const void *data, size_t size,
> +                  const int fds[], size_t n_fds);
> +void worker_reply_iovec(const struct iovec *iovs, size_t n_iovs,
> +                        const int fds[], size_t n_fds);
> +
> +#endif /* worker.h */
> diff --git a/vswitchd/ovs-vswitchd.c b/vswitchd/ovs-vswitchd.c
> index 8ef3b10..e9a7b07 100644
> --- a/vswitchd/ovs-vswitchd.c
> +++ b/vswitchd/ovs-vswitchd.c
> @@ -52,6 +52,7 @@
>  #include "vconn.h"
>  #include "vlog.h"
>  #include "lib/vswitch-idl.h"
> +#include "worker.h"
>
>  VLOG_DEFINE_THIS_MODULE(vswitchd);
>
> @@ -81,6 +82,8 @@ main(int argc, char *argv[])
>
>     daemonize_start();
>
> +    worker_start();
> +
>     retval = unixctl_server_create(unixctl_path, &unixctl);
>     if (retval) {
>         exit(EXIT_FAILURE);
> @@ -92,6 +95,7 @@ main(int argc, char *argv[])
>
>     exiting = false;
>     while (!exiting) {
> +        worker_run();
>         if (signal_poll(sighup)) {
>             vlog_reopen_log_file();
>         }
> @@ -110,6 +114,7 @@ main(int argc, char *argv[])
>         unixctl_server_run(unixctl);
>         netdev_run();
>
> +        worker_wait();
>         signal_wait(sighup);
>         memory_wait();
>         bridge_wait();
> --
> 1.7.2.5
>
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev
_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to