Hi all,
we have finally found the time to finish work on this patch. We have
addressed all the issues that were mentioned in the previous mails, plus
some more that showed up later. We now pass all unit tests on both Linux
and FreeBSD and we can more confidently offer the patch to your
attention, for possible inclusion. As usual, any comment would be much
appreciated.
Regards,
Giuseppe
Il 27/08/2012 02:00, Ben Pfaff ha scritto:
On Fri, Aug 24, 2012 at 10:25:02AM +0200, Giuseppe Lettieri wrote:
Il 20/08/2012 20:48, Ed Maste ha scritto:
3. The datapath thread waits in poll() with each port's file descriptor in
the fd array. However there's currently no fd to signal a bridge
reconfiguration, so the poll() has to timeout before the thread will pick up
the new config on the next time through its loop.
We are considering to add the notification in dpif_netdev_run(), a
callback which is now empty in the #ifdef THREADED case. According
to our tests this solves the problem in practice, but we are worried
that it may be a misuse of the interface, since the corresponding
dpif_netdev_wait() callback would still be empty. Any comment would
be much appreciated.
There's no inherent reason that a "wait" function has to be nonempty if
the corresponding "run" function is nonempty. The question is simply
whether the poll loop will wake up when "run" needs to do some work, and
if you're confident that that will happen (I don't have enough context
here to check myself) then that's fine.
--
Dr. Ing. Giuseppe Lettieri
Dipartimento di Ingegneria della Informazione
Universita' di Pisa
Largo Lucio Lazzarino 2, 56122 Pisa - Italy
Ph. : (+39) 050-2217.649 (direct) .599 (switch)
Fax : (+39) 050-2217.600
e-mail: g.letti...@iet.unipi.it
>From f42467cae9be8003a95b25f1c409fc7b3cee0440 Mon Sep 17 00:00:00 2001
From: Giuseppe Lettieri <g.letti...@iet.unipi.it>
Date: Fri, 28 Sep 2012 12:05:18 +0200
Subject: [PATCH] Threaded userspace datapath
This patch refactors the userlevel datapath (i.e., datapath_type=netdev)
into two threads: one only forwards the packets for which a flow is found,
and the other does all other processing. This arrangement can speed-up
forwarding performance, measuread in pps, by a factor of 5-10.
To enable compilation of the threaded datapath, pass '--enable-threaded'
to configure.
Signed-off-by: Gaetano Catalli <gaetano.cata...@gmai.com>
Signed-off-by: Ed Maste <ema...@adaranet.com>
Signed-off-by: Giuseppe Lettieri <g.letti...@iet.unipi.it>
---
configure.ac | 1 +
lib/automake.mk | 1 +
lib/dispatch.h | 9 +
lib/dpif-netdev.c | 473 ++++++++++++++++++++++++++++++++++++++++++++++++-
lib/fatal-signal.c | 2 +-
lib/netdev-bsd.c | 91 ++++++++++
lib/netdev-dummy.c | 123 ++++++++++++-
lib/netdev-linux.c | 44 +++++
lib/netdev-provider.h | 19 ++
lib/netdev-vport.c | 8 +
lib/netdev.c | 22 +++
lib/netdev.h | 7 +
lib/vlog.c | 16 ++
m4/openvswitch.m4 | 19 ++
14 files changed, 827 insertions(+), 8 deletions(-)
create mode 100644 lib/dispatch.h
diff --git a/configure.ac b/configure.ac
index 9bdffea..a70232f 100644
--- a/configure.ac
+++ b/configure.ac
@@ -43,6 +43,7 @@ AC_SEARCH_LIBS([clock_gettime], [rt])
AC_SEARCH_LIBS([timer_create], [rt])
AC_SEARCH_LIBS([pcap_open_live], [pcap])
+OVS_CHECK_THREADED
OVS_CHECK_COVERAGE
OVS_CHECK_NDEBUG
OVS_CHECK_NETLINK
diff --git a/lib/automake.mk b/lib/automake.mk
index 94b86f6..3479ec0 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -37,6 +37,7 @@ lib_libopenvswitch_a_SOURCES = \
lib/daemon.c \
lib/daemon.h \
lib/dhcp.h \
+ lib/dispatch.h \
lib/dummy.c \
lib/dummy.h \
lib/dhparams.h \
diff --git a/lib/dispatch.h b/lib/dispatch.h
new file mode 100644
index 0000000..80ac9c7
--- /dev/null
+++ b/lib/dispatch.h
@@ -0,0 +1,9 @@
+#include <sys/types.h>
+#include "ofpbuf.h"
+
+#ifndef DISPATCH_H
+#define DISPATCH_H 1
+
+typedef void (*pkt_handler)(u_char *user, struct ofpbuf* buf);
+
+#endif /* DISPATCH_H */
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index dc4479e..e7013db 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -32,6 +32,15 @@
#include <sys/stat.h>
#include <unistd.h>
+#ifdef THREADED
+#include <signal.h>
+#include <pthread.h>
+
+#include "socket-util.h"
+#include "fatal-signal.h"
+#include "dispatch.h"
+#endif
+
#include "csum.h"
#include "dpif.h"
#include "dpif-provider.h"
@@ -55,6 +64,18 @@
#include "vlog.h"
VLOG_DEFINE_THIS_MODULE(dpif_netdev);
+/* Pthread lock macros, nops in the non-threaded case. */
+#ifdef THREADED
+#define INIT_MUTEX(mutex) pthread_mutex_init(mutex, NULL)
+#define DESTROY_MUTEX(mutex) pthread_mutex_destroy(mutex)
+#define LOCK(mutex) pthread_mutex_lock(mutex)
+#define UNLOCK(mutex) pthread_mutex_unlock(mutex)
+#else
+#define INIT_MUTEX(mutex)
+#define DESTROY_MUTEX(mutex)
+#define LOCK(mutex)
+#define UNLOCK(mutex)
+#endif
/* Configuration parameters. */
enum { MAX_PORTS = 256 }; /* Maximum number of ports. */
@@ -80,6 +101,49 @@ struct dp_netdev_queue {
unsigned int head, tail;
};
+#ifdef THREADED
+struct dp_netdev_notifier {
+ int pipe[2];
+};
+
+static int dp_netdev_notifier_init(struct dp_netdev_notifier *);
+static int dp_netdev_notifier_poll(struct dp_netdev_notifier *dn, struct pollfd *pfd);
+static int dp_netdev_notifier_notify(struct dp_netdev_notifier *);
+static int dp_netdev_notifier_ack(struct dp_netdev_notifier *);
+static int dp_netdev_notifier_ack1(struct dp_netdev_notifier *);
+#else
+struct dp_netdev_notifier {
+ /* nothing */
+};
+
+static int dp_netdev_notifier_init(struct dp_netdev_notifier *dn OVS_UNUSED)
+{
+ return 0;
+}
+/* unused
+static int dp_netdev_notifier_poll(struct dp_netdev_notifier *dn OVS_UNUSED,
+ struct pollfd *pfd OVS_UNUSED)
+{
+ return 0;
+}
+*/
+static int dp_netdev_notifier_notify(struct dp_netdev_notifier *dn OVS_UNUSED)
+{
+ return 0;
+}
+/*
+static int dp_netdev_notifier_ack(struct dp_netdev_notifier *dn OVS_UNUSED)
+{
+ return 0;
+}
+*/
+static int dp_netdev_notifier_ack1(struct dp_netdev_notifier *dn OVS_UNUSED)
+{
+ return 0;
+}
+#endif
+
+
/* Datapath based on the network device interface from netdev.h. */
struct dp_netdev {
const struct dpif_class *class;
@@ -87,6 +151,16 @@ struct dp_netdev {
int open_cnt;
bool destroyed;
+ struct dp_netdev_notifier packet_notifier; /* signal a packet on the queue */
+ struct dp_netdev_notifier notifier;
+#ifdef THREADED
+ struct pollfd *notifier_fd;
+
+ pthread_mutex_t table_mutex; /* mutex for the flow table */
+ pthread_mutex_t port_list_mutex; /* port list mutex */
+
+ /* The access to this queue is protected by the table_mutex mutex */
+#endif
struct dp_netdev_queue queues[N_QUEUES];
struct hmap flow_table; /* Flow table. */
@@ -107,6 +181,9 @@ struct dp_netdev_port {
struct list node; /* Element in dp_netdev's 'port_list'. */
struct netdev *netdev;
char *type; /* Port type as requested by user. */
+#ifdef THREADED
+ struct pollfd *poll_fd; /* To manage the poll loop in the thread. */
+#endif
};
/* A flow in dp_netdev's 'flow_table'. */
@@ -132,6 +209,11 @@ struct dpif_netdev {
unsigned int dp_serial;
};
+#ifdef THREADED
+/* XXX global Descriptor of the thread that manages the datapaths. */
+pthread_t thread_p;
+#endif
+
/* All netdev-based datapaths. */
static struct shash dp_netdevs = SHASH_INITIALIZER(&dp_netdevs);
@@ -209,6 +291,13 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
dp->class = class;
dp->name = xstrdup(name);
dp->open_cnt = 0;
+ dp_netdev_notifier_init(&dp->packet_notifier);
+ dp_netdev_notifier_init(&dp->notifier);
+ INIT_MUTEX(&dp->table_mutex);
+ INIT_MUTEX(&dp->port_list_mutex);
+#ifdef THREADED
+ dp->notifier_fd = NULL;
+#endif
for (i = 0; i < N_QUEUES; i++) {
dp->queues[i].head = dp->queues[i].tail = 0;
}
@@ -226,6 +315,124 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
return 0;
}
+#ifdef THREADED
+
+static int
+dp_netdev_notifier_init(struct dp_netdev_notifier *dn)
+{
+ int error = pipe(dn->pipe);
+ if (error) {
+ VLOG_ERR("Unable to create notifier: %s", strerror(errno));
+ return errno;
+ }
+ if (set_nonblocking(dn->pipe[0]) || set_nonblocking(dn->pipe[1])) {
+ VLOG_ERR("Unable to set nonblocking on notifier pipe: %s",
+ strerror(errno));
+ return errno;
+ }
+ VLOG_DBG("Notifier pipes created (%d, %d)", dn->pipe[0], dn->pipe[1]);
+ return 0;
+}
+
+static int
+dp_netdev_notifier_poll(struct dp_netdev_notifier *dn, struct pollfd *pfd)
+{
+ pfd->fd = dn->pipe[0];
+ pfd->events = POLLIN;
+ return 1;
+}
+
+static int
+dp_netdev_notifier_ack(struct dp_netdev_notifier *dn)
+{
+ int error;
+ char readbuf[1024];
+
+ while ((error = read(dn->pipe[0], readbuf, sizeof(readbuf))) > 0)
+ ;
+ if (error < 0 && errno != EAGAIN) {
+ VLOG_ERR("Pipe read error: %s", strerror(errno));
+ return error;
+ }
+ return 0;
+}
+
+static int
+dp_netdev_notifier_ack1(struct dp_netdev_notifier *dn)
+{
+ int error;
+ char c;
+
+ error = read(dn->pipe[0], &c, 1);
+ if (error < 0 && errno != EAGAIN) {
+ VLOG_ERR("Pipe read error: %s", strerror(errno));
+ return error;
+ }
+ return 0;
+}
+
+
+static int
+dp_netdev_notifier_notify(struct dp_netdev_notifier *dn)
+{
+ char c = 0;
+
+ if (write(dn->pipe[1], &c, 1) < 0) {
+ VLOG_ERR("Pipe write error (to datapath): %s", strerror(errno));
+ return errno;
+ }
+ return 0;
+}
+
+static void * dp_thread_body(void *args OVS_UNUSED);
+
+/* This is the function that is called in response of a fatal signal (e.g.
+ * SIGTERM) */
+static void
+dpif_netdev_exit_hook(void *aux OVS_UNUSED)
+{
+ if (pthread_cancel(thread_p) == 0) {
+ /*
+ * POSIX specifies that poll is a thread cancellation point, but it
+ * appears that (at least on FreeBSD) we can wait indefinitely in the
+ * poll() in dp_thread_body. As a workaround force a notify to exit
+ * the poll().
+ */
+ struct shash_node *node;
+ struct dp_netdev *dp;
+ SHASH_FOR_EACH(node, &dp_netdevs) {
+ dp = (struct dp_netdev *)node->data;
+ dp_netdev_notifier_notify(&dp->notifier);
+ }
+ pthread_join(thread_p, NULL);
+ }
+}
+
+static int
+dpif_netdev_init(void)
+{
+ static int error = -1;
+
+ if (error < 0) {
+ fatal_signal_add_hook(dpif_netdev_exit_hook, NULL, NULL, true);
+ error = pthread_create(&thread_p, NULL, dp_thread_body, NULL);
+ if (error != 0) {
+ VLOG_ERR("Unable to create datapath thread: %s", strerror(errno));
+ error = errno;
+ } else {
+ VLOG_DBG("Datapath thread started");
+ }
+ }
+ return error;
+}
+#else
+static int
+dpif_netdev_init(void)
+{
+ return 0;
+}
+#endif
+
static int
dpif_netdev_open(const struct dpif_class *class, const char *name,
bool create, struct dpif **dpifp)
@@ -252,9 +459,12 @@ dpif_netdev_open(const struct dpif_class *class, const char *name,
}
*dpifp = create_dpif_netdev(dp);
+ dpif_netdev_init(); /* XXX check error */
return 0;
}
+/* table_mutex must be locked in THREADED mode.
+ */
static void
dp_netdev_purge_queues(struct dp_netdev *dp)
{
@@ -276,11 +486,17 @@ dp_netdev_free(struct dp_netdev *dp)
struct dp_netdev_port *port, *next;
dp_netdev_flow_flush(dp);
+ LOCK(&dp->port_list_mutex);
LIST_FOR_EACH_SAFE (port, next, node, &dp->port_list) {
do_del_port(dp, port->port_no);
}
+ UNLOCK(&dp->port_list_mutex);
+ LOCK(&dp->table_mutex);
dp_netdev_purge_queues(dp);
hmap_destroy(&dp->flow_table);
+ UNLOCK(&dp->table_mutex);
+ DESTROY_MUTEX(&dp->table_mutex);
+ DESTROY_MUTEX(&dp->port_list_mutex);
free(dp->name);
free(dp);
}
@@ -309,7 +525,9 @@ static int
dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
+ LOCK(&dp->table_mutex);
stats->n_flows = hmap_count(&dp->flow_table);
+ UNLOCK(&dp->table_mutex);
stats->n_hit = dp->n_hit;
stats->n_missed = dp->n_missed;
stats->n_lost = dp->n_lost;
@@ -357,13 +575,18 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
port->port_no = port_no;
port->netdev = netdev;
port->type = xstrdup(type);
+#ifdef THREADED
+ port->poll_fd = NULL;
+#endif
error = netdev_get_mtu(netdev, &mtu);
if (!error) {
max_mtu = mtu;
}
+ LOCK(&dp->port_list_mutex);
list_push_back(&dp->port_list, &port->node);
+ UNLOCK(&dp->port_list_mutex);
dp->ports[port_no] = port;
dp->serial++;
@@ -432,7 +655,16 @@ static int
dpif_netdev_port_del(struct dpif *dpif, uint16_t port_no)
{
struct dp_netdev *dp = get_dp_netdev(dpif);
- return port_no == OVSP_LOCAL ? EINVAL : do_del_port(dp, port_no);
+ int error;
+
+ if (port_no == OVSP_LOCAL) {
+ return EINVAL;
+ } else {
+ LOCK(&dp->port_list_mutex);
+ error = do_del_port(dp, port_no);
+ UNLOCK(&dp->port_list_mutex);
+ }
+ return error;
}
static bool
@@ -460,15 +692,19 @@ get_port_by_name(struct dp_netdev *dp,
{
struct dp_netdev_port *port;
+ LOCK(&dp->port_list_mutex);
LIST_FOR_EACH (port, node, &dp->port_list) {
if (!strcmp(netdev_get_name(port->netdev), devname)) {
*portp = port;
+ UNLOCK(&dp->port_list_mutex);
return 0;
}
}
+ UNLOCK(&dp->port_list_mutex);
return ENOENT;
}
+/* In THREADED mode, must be called with port_list_mutex held. */
static int
do_del_port(struct dp_netdev *dp, uint16_t port_no)
{
@@ -543,7 +779,9 @@ dpif_netdev_get_max_ports(const struct dpif *dpif OVS_UNUSED)
static void
dp_netdev_free_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow)
{
+ LOCK(&dp->table_mutex);
hmap_remove(&dp->flow_table, &flow->node);
+ UNLOCK(&dp->table_mutex);
free(flow->actions);
free(flow);
}
@@ -632,7 +870,11 @@ dpif_netdev_port_poll_wait(const struct dpif *dpif_)
}
static struct dp_netdev_flow *
-dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key)
+#ifdef THREADED
+dp_netdev_lookup_flow_locked(struct dp_netdev *dp, const struct flow *key)
+#else
+dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key)
+#endif
{
struct dp_netdev_flow *flow;
@@ -644,6 +886,19 @@ dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key)
return NULL;
}
+#ifdef THREADED
+static struct dp_netdev_flow *
+dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key)
+{
+ struct dp_netdev_flow *flow;
+
+ LOCK(&dp->table_mutex);
+ flow = dp_netdev_lookup_flow_locked(dp, key);
+ UNLOCK(&dp->table_mutex);
+ return flow;
+}
+#endif
+
static void
get_dpif_flow_stats(struct dp_netdev_flow *flow, struct dpif_flow_stats *stats)
{
@@ -740,7 +995,9 @@ dp_netdev_flow_add(struct dp_netdev *dp, const struct flow *key,
return error;
}
+ LOCK(&dp->table_mutex);
hmap_insert(&dp->flow_table, &flow->node, flow_hash(&flow->key, 0));
+ UNLOCK(&dp->table_mutex);
return 0;
}
@@ -760,6 +1017,7 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
struct dp_netdev_flow *flow;
struct flow key;
int error;
+ int n_flows;
error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &key);
if (error) {
@@ -769,7 +1027,10 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
flow = dp_netdev_lookup_flow(dp, &key);
if (!flow) {
if (put->flags & DPIF_FP_CREATE) {
- if (hmap_count(&dp->flow_table) < MAX_FLOWS) {
+ LOCK(&dp->table_mutex);
+ n_flows = hmap_count(&dp->flow_table);
+ UNLOCK(&dp->table_mutex);
+ if (n_flows < MAX_FLOWS) {
if (put->stats) {
memset(put->stats, 0, sizeof *put->stats);
}
@@ -855,7 +1116,9 @@ dpif_netdev_flow_dump_next(const struct dpif *dpif, void *state_,
struct dp_netdev_flow *flow;
struct hmap_node *node;
+ LOCK(&dp->table_mutex);
node = hmap_at_position(&dp->flow_table, &state->bucket, &state->offset);
+ UNLOCK(&dp->table_mutex);
if (!node) {
return EOF;
}
@@ -961,7 +1224,10 @@ static int
dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
struct ofpbuf *buf)
{
- struct dp_netdev_queue *q = find_nonempty_queue(dpif);
+ struct dp_netdev_queue *q;
+ struct dp_netdev *dp = get_dp_netdev(dpif);
+ LOCK(&dp->table_mutex);
+ q = find_nonempty_queue(dpif);
if (q) {
struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
@@ -971,8 +1237,11 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
ofpbuf_uninit(buf);
*buf = u->buf;
+ dp_netdev_notifier_ack1(&dp->packet_notifier);
+ UNLOCK(&dp->table_mutex);
return 0;
} else {
+ UNLOCK(&dp->table_mutex);
return EAGAIN;
}
}
@@ -980,19 +1249,31 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
static void
dpif_netdev_recv_wait(struct dpif *dpif)
{
+#ifdef THREADED
+ struct dp_netdev *dp = get_dp_netdev(dpif);
+ struct pollfd pfd;
+
+ if (dp_netdev_notifier_poll(&dp->packet_notifier, &pfd)) {
+ poll_fd_wait(pfd.fd, pfd.events);
+ }
+#else
if (find_nonempty_queue(dpif)) {
poll_immediate_wake();
} else {
/* No messages ready to be received, and dp_wait() will ensure that we
* wake up to queue new messages, so there is nothing to do. */
}
+#endif
}
static void
dpif_netdev_recv_purge(struct dpif *dpif)
{
struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
+ struct dp_netdev *dp = get_dp_netdev(dpif);
+ LOCK(&dp->table_mutex);
dp_netdev_purge_queues(dpif_netdev->dp);
+ UNLOCK(&dp->table_mutex);
}
static void
@@ -1010,23 +1291,64 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port,
{
struct dp_netdev_flow *flow;
struct flow key;
+ struct nlattr *actions;
+ size_t actions_len;
+#ifdef THREADED
+ uint8_t actions_buf[128];
+#endif
if (packet->size < ETH_HEADER_LEN) {
return;
}
flow_extract(packet, 0, 0, odp_port_to_ofp_port(port->port_no), &key);
+#ifdef THREADED
+ LOCK(&dp->table_mutex);
+ flow = dp_netdev_lookup_flow_locked(dp, &key);
+#else
flow = dp_netdev_lookup_flow(dp, &key);
+#endif
if (flow) {
dp_netdev_flow_used(flow, packet);
- dp_netdev_execute_actions(dp, packet, &key,
- flow->actions, flow->actions_len);
+ actions_len = flow->actions_len;
+#ifdef THREADED
+ if (actions_len <= sizeof(actions_buf)) {
+ actions = (struct nlattr*)actions_buf;
+ } else {
+ actions = xmalloc(actions_len);
+ }
+ memcpy(actions, flow->actions, actions_len);
+#else
+ actions = flow->actions;
+#endif
+ UNLOCK(&dp->table_mutex);
+ dp_netdev_execute_actions(dp, packet, &key, actions, actions_len);
+#ifdef THREADED
+ if (actions_len > sizeof(actions_buf)) {
+ free(actions);
+ }
+#endif
dp->n_hit++;
} else {
dp->n_missed++;
dp_netdev_output_userspace(dp, packet, DPIF_UC_MISS, &key, 0);
+ UNLOCK(&dp->table_mutex);
}
}
+#ifdef THREADED
+static void
+dpif_netdev_run(struct dpif *dpif)
+{
+ struct dp_netdev *dp = get_dp_netdev(dpif);
+
+ dp_netdev_notifier_notify(&dp->notifier);
+}
+
+static void
+dpif_netdev_wait(struct dpif *dpif OVS_UNUSED)
+{
+}
+#else
static void
dpif_netdev_run(struct dpif *dpif)
{
@@ -1065,6 +1387,138 @@ dpif_netdev_wait(struct dpif *dpif)
netdev_recv_wait(port->netdev);
}
}
+#endif
+
+#ifdef THREADED
+/*
+ * pcap callback argument
+ */
+struct dispatch_arg {
+ struct dp_netdev *dp; /* update statistics */
+ struct dp_netdev_port *port; /* argument to flow identifier function */
+};
+
+/* Process a packet.
+ *
+ * The port_input function will send immediately if it finds a flow match and
+ * the associated action is ODPAT_OUTPUT or ODPAT_OUTPUT_GROUP.
+ * If a flow is not found or for the other actions, the packet is copied.
+ */
+static void
+process_pkt(u_char *user, struct ofpbuf *buf)
+{
+ struct dispatch_arg *arg = (struct dispatch_arg *)user;
+
+ ofpbuf_padto(buf, ETH_TOTAL_MIN);
+ dp_netdev_port_input(arg->dp, arg->port, buf);
+}
+
+/* Body of the thread that manages the datapaths */
+static void*
+dp_thread_body(void *args OVS_UNUSED)
+{
+ struct dp_netdev *dp;
+ struct dp_netdev_port *port;
+ struct dispatch_arg arg;
+ int error;
+ int n_fds;
+ uint32_t batch = 50; /* max number of pkts processed by the dispatch */
+ int processed; /* actual number of pkts processed by the dispatch */
+
+ sigset_t sigmask;
+
+ /*XXX Since the poll involves all ports of all datapaths, the right fds
+ * size should be MAX_PORTS * max_number_of_datapaths */
+ struct pollfd fds[MAX_PORTS + 1];
+
+ /* mask the fatal signals. In this way the main thread is delegate to
+ * manage this them. */
+ sigemptyset(&sigmask);
+ sigaddset(&sigmask, SIGTERM);
+ sigaddset(&sigmask, SIGALRM);
+ sigaddset(&sigmask, SIGINT);
+ sigaddset(&sigmask, SIGHUP);
+
+ if (pthread_sigmask(SIG_BLOCK, &sigmask, NULL) != 0) {
+ VLOG_ERR("Error setting thread sigmask: %s", strerror(errno));
+ }
+
+ for(;;) {
+ struct shash_node *node;
+ n_fds = 0;
+ /* build the structure for poll */
+ SHASH_FOR_EACH(node, &dp_netdevs) {
+ dp = (struct dp_netdev *)node->data;
+ if (dp_netdev_notifier_poll(&dp->notifier, &fds[n_fds])) {
+ dp->notifier_fd = &fds[n_fds];
+ n_fds++;
+ }
+ if (n_fds >= sizeof(fds) / sizeof(fds[0])) {
+ VLOG_ERR("Too many fds for poll adding notifier");
+ break;
+ }
+ LOCK(&dp->port_list_mutex);
+ LIST_FOR_EACH (port, node, &dp->port_list) {
+ /* insert an element in the fds structure */
+ fds[n_fds].fd = netdev_get_fd(port->netdev);
+ fds[n_fds].events = POLLIN;
+ port->poll_fd = &fds[n_fds];
+ n_fds++;
+ if (n_fds >= sizeof(fds) / sizeof(fds[0])) {
+ VLOG_ERR("Too many fds for poll adding port fd");
+ break;
+ }
+ }
+ UNLOCK(&dp->port_list_mutex);
+ }
+
+ error = poll(fds, n_fds, -1);
+
+ if (error < 0) {
+ if (errno == EINTR) {
+ /* XXX get this case in detach mode */
+ continue;
+ }
+ VLOG_ERR("Datapath thread poll() error: %s\n", strerror(errno));
+ /* XXX terminating the thread is probably not right */
+ break;
+ }
+ pthread_testcancel();
+
+ SHASH_FOR_EACH (node, &dp_netdevs) {
+ dp = (struct dp_netdev *)node->data;
+ if (dp->notifier_fd && (dp->notifier_fd->revents & POLLIN)) {
+ VLOG_DBG("Signalled from main thread");
+ dp_netdev_notifier_ack(&dp->notifier);
+ }
+ arg.dp = dp;
+ LOCK(&dp->port_list_mutex);
+ LIST_FOR_EACH (port, node, &dp->port_list) {
+ arg.port = port;
+ if (port->poll_fd) {
+ VLOG_DBG("fd %d revents 0x%x", port->poll_fd->fd, port->poll_fd->revents);
+ }
+ if (port->poll_fd && (port->poll_fd->revents & POLLIN)) {
+ /* call the dispatch and process the packet into
+ * its callback. We process 'batch' packets at time */
+ processed = netdev_dispatch(port->netdev, batch,
+ process_pkt, (u_char *)&arg);
+ if (processed < 0) { /* pcap returns error */
+ static struct vlog_rate_limit rl =
+ VLOG_RATE_LIMIT_INIT(1, 5);
+ VLOG_ERR_RL(&rl,
+ "error receiving data from XXX \n");
+ }
+ } /* end of if poll */
+ } /* end of port loop */
+ UNLOCK(&dp->port_list_mutex);
+ } /* end of dp loop */
+ } /* for ;; */
+
+ return NULL;
+}
+
+#endif /* THREADED */
static void
dp_netdev_set_dl(struct ofpbuf *packet, const struct ovs_key_ethernet *eth_key)
@@ -1080,11 +1534,14 @@ dp_netdev_output_port(struct dp_netdev *dp, struct ofpbuf *packet,
uint16_t out_port)
{
struct dp_netdev_port *p = dp->ports[out_port];
+
if (p) {
netdev_send(p->netdev, packet);
+ dp_netdev_notifier_notify(&dp->notifier);
}
}
+/* In THREADED mode, must be called with table_lock_mutex held. */
static int
dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet,
int queue_no, const struct flow *flow, uint64_t arg)
@@ -1117,6 +1574,8 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet,
upcall->key_len = key_len;
upcall->userdata = arg;
+ dp_netdev_notifier_notify(&dp->packet_notifier);
+
return 0;
}
@@ -1164,7 +1623,9 @@ dp_netdev_action_userspace(struct dp_netdev *dp,
userdata_attr = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA);
userdata = userdata_attr ? nl_attr_get_u64(userdata_attr) : 0;
+ LOCK(&dp->table_mutex);
dp_netdev_output_userspace(dp, packet, DPIF_UC_ACTION, key, userdata);
+ UNLOCK(&dp->table_mutex);
}
static void
diff --git a/lib/fatal-signal.c b/lib/fatal-signal.c
index 21ebb5a..add0f80 100644
--- a/lib/fatal-signal.c
+++ b/lib/fatal-signal.c
@@ -192,7 +192,7 @@ call_hooks(int sig_nr)
recurse = 1;
for (i = 0; i < n_hooks; i++) {
- struct hook *h = &hooks[i];
+ struct hook *h = &hooks[n_hooks - i - 1];
if (sig_nr || h->run_at_exit) {
h->hook_cb(h->aux);
}
diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c
index f8b1188..e62e220 100644
--- a/lib/netdev-bsd.c
+++ b/lib/netdev-bsd.c
@@ -667,6 +667,89 @@ netdev_bsd_recv_wait(struct netdev *netdev_)
}
}
+#ifdef THREADED
+
+struct dispatch_arg {
+ pkt_handler h;
+ u_char *user;
+};
+
+static void
+dispatch_handler(u_char *user, const struct pcap_pkthdr *phdr, const u_char *pdata)
+{
+ struct ofpbuf buf;
+ struct dispatch_arg *parg = (struct dispatch_arg*)user;
+
+ ofpbuf_use_stub(&buf, (void*)pdata, phdr->caplen);
+ buf.size = phdr->caplen;
+ (*parg->h)(parg->user, &buf);
+ ofpbuf_uninit(&buf);
+}
+
+static int
+netdev_bsd_dispatch_system(struct netdev_bsd *netdev, int batch, pkt_handler h,
+ u_char *user)
+{
+ int ret;
+ struct dispatch_arg arg;
+
+ arg.h = h;
+ arg.user = user;
+ ret = pcap_dispatch(netdev->pcap_handle, batch, dispatch_handler, (u_char*)&arg);
+ return ret;
+}
+
+static int
+netdev_bsd_dispatch_tap(struct netdev_bsd *netdev, int batch, pkt_handler h,
+ u_char *user)
+{
+ int ret;
+ int i;
+ const size_t size = VLAN_HEADER_LEN + ETH_HEADER_LEN + ETH_PAYLOAD_MAX;
+ OFPBUF_STACK_BUFFER(buf_, size);
+
+ struct ofpbuf buf;
+ ofpbuf_use_stub(&buf, buf_, size);
+ for (i = 0; i < batch; i++) {
+ ret = netdev_bsd_recv_tap(netdev, buf.data, ofpbuf_tailroom(&buf));
+ if (ret >= 0) {
+ buf.size += ret;
+ h(user, &buf);
+ } else if (ret != -EAGAIN) {
+ return -1;
+ } else { /* ret = EAGAIN */
+ break;
+ }
+ ofpbuf_clear(&buf);
+ }
+ ofpbuf_uninit(&buf);
+ return i;
+}
+
+static int
+netdev_bsd_dispatch(struct netdev *netdev_, int batch, pkt_handler h,
+ u_char *user)
+{
+ struct netdev_bsd *netdev = netdev_bsd_cast(netdev_);
+ struct netdev_dev_bsd * netdev_dev =
+ netdev_dev_bsd_cast(netdev_get_dev(netdev_));
+
+ if (!strcmp(netdev_get_type(netdev_), "tap") &&
+ netdev->netdev_fd == netdev_dev->tap_fd) {
+ return netdev_bsd_dispatch_tap(netdev, batch, h, user);
+ } else {
+ return netdev_bsd_dispatch_system(netdev, batch, h, user);
+ }
+}
+
+static int
+netdev_bsd_get_fd(struct netdev *netdev_)
+{
+ struct netdev_bsd *netdev = netdev_bsd_cast(netdev_);
+ return netdev->netdev_fd;
+}
+#endif
+
/* Discards all packets waiting to be received from 'netdev'. */
static int
netdev_bsd_drain(struct netdev *netdev_)
@@ -1263,6 +1346,10 @@ const struct netdev_class netdev_bsd_class = {
netdev_bsd_recv,
netdev_bsd_recv_wait,
+#ifdef THREADED
+ netdev_bsd_dispatch,
+ netdev_bsd_get_fd,
+#endif
netdev_bsd_drain,
netdev_bsd_send,
@@ -1323,6 +1410,10 @@ const struct netdev_class netdev_tap_class = {
netdev_bsd_recv,
netdev_bsd_recv_wait,
+#ifdef THREADED
+ netdev_bsd_dispatch,
+ netdev_bsd_get_fd,
+#endif
netdev_bsd_drain,
netdev_bsd_send,
diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c
index 6aa4084..e0d4448 100644
--- a/lib/netdev-dummy.c
+++ b/lib/netdev-dummy.c
@@ -20,6 +20,12 @@
#include <errno.h>
+#ifdef THREADED
+#include <pthread.h>
+#include <unistd.h>
+#include "socket-util.h"
+#endif
+
#include "flow.h"
#include "list.h"
#include "netdev-provider.h"
@@ -51,6 +57,10 @@ struct netdev_dummy {
struct list node; /* In netdev_dev_dummy's "devs" list. */
struct list recv_queue;
bool listening;
+#ifdef THREADED
+ pthread_mutex_t queue_mutex;
+ int s_pipe[2]; /* used to signal packet arrivals */
+#endif
};
static struct shash dummy_netdev_devs = SHASH_INITIALIZER(&dummy_netdev_devs);
@@ -146,6 +156,13 @@ netdev_dummy_close(struct netdev *netdev_)
struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
list_remove(&netdev->node);
ofpbuf_list_delete(&netdev->recv_queue);
+#ifdef THREADED
+ if (netdev->listening) {
+ close(netdev->s_pipe[0]);
+ close(netdev->s_pipe[1]);
+ pthread_mutex_destroy(&netdev->queue_mutex);
+ }
+#endif
free(netdev);
}
@@ -153,6 +170,27 @@ static int
netdev_dummy_listen(struct netdev *netdev_)
{
struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
+#ifdef THREADED
+ int error;
+
+ if (netdev->listening)
+ return 0;
+
+ error = pipe(netdev->s_pipe);
+ if (error) {
+ VLOG_ERR("Unable to create dummy pipe: %s", strerror(errno));
+ return errno;
+ }
+ if (set_nonblocking(netdev->s_pipe[0]) ||
+ set_nonblocking(netdev->s_pipe[1])) {
+ VLOG_ERR("Unable to set nonblocking on dummy pipe: %s",
+ strerror(errno));
+ close(netdev->s_pipe[0]);
+ close(netdev->s_pipe[1]);
+ return errno;
+ }
+ pthread_mutex_init(&netdev->queue_mutex, NULL);
+#endif
netdev->listening = true;
return 0;
}
@@ -163,12 +201,29 @@ netdev_dummy_recv(struct netdev *netdev_, void *buffer, size_t size)
struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
struct ofpbuf *packet;
size_t packet_size;
+#ifdef THREADED
+ char c;
+#endif
+#ifdef THREADED
+ pthread_mutex_lock(&netdev->queue_mutex);
+#endif
if (list_is_empty(&netdev->recv_queue)) {
+#ifdef THREADED
+ pthread_mutex_unlock(&netdev->queue_mutex);
+#endif
return -EAGAIN;
}
+#ifdef THREADED
+ if (read(netdev->s_pipe[0], &c, 1) < 0) {
+ VLOG_ERR("Error reading dummy pipe: %s", strerror(errno));
+ }
+#endif
packet = ofpbuf_from_list(list_pop_front(&netdev->recv_queue));
+#ifdef THREADED
+ pthread_mutex_unlock(&netdev->queue_mutex);
+#endif
if (packet->size > size) {
return -EMSGSIZE;
}
@@ -184,11 +239,60 @@ static void
netdev_dummy_recv_wait(struct netdev *netdev_)
{
struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
- if (!list_is_empty(&netdev->recv_queue)) {
+ int empty;
+
+#ifdef THREADED
+ pthread_mutex_lock(&netdev->queue_mutex);
+#endif
+ empty = list_is_empty(&netdev->recv_queue);
+#ifdef THREADED
+ pthread_mutex_unlock(&netdev->queue_mutex);
+#endif
+ if (!empty) {
poll_immediate_wake();
}
}
+#ifdef THREADED
+static int
+netdev_dummy_dispatch(struct netdev *netdev_, int batch, pkt_handler h,
+ u_char *user)
+{
+ int i;
+ struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
+ struct ofpbuf *packet;
+ VLOG_DBG("dispatch %d", batch);
+
+ for (i = 0; i < batch; i++) {
+ char c;
+ if (read(netdev->s_pipe[0], &c, 1) < 0) {
+ if (errno == EAGAIN)
+ break;
+ VLOG_ERR("%s: error reading from the pipe: %s",
+ netdev_get_name(netdev_), strerror(errno));
+ return -1;
+ }
+ pthread_mutex_lock(&netdev->queue_mutex);
+ if (list_is_empty(&netdev->recv_queue)) {
+ pthread_mutex_unlock(&netdev->queue_mutex);
+ return -EAGAIN;
+ }
+ packet = ofpbuf_from_list(list_pop_front(&netdev->recv_queue));
+ pthread_mutex_unlock(&netdev->queue_mutex);
+ h(user, packet);
+ ofpbuf_delete(packet);
+ }
+ return i;
+}
+
+static int
+netdev_dummy_get_fd(struct netdev *netdev_)
+{
+ struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
+ return netdev->s_pipe[0];
+}
+#endif
+
static int
netdev_dummy_drain(struct netdev *netdev_)
{
@@ -326,6 +430,10 @@ static const struct netdev_class dummy_class = {
netdev_dummy_listen,
netdev_dummy_recv,
netdev_dummy_recv_wait,
+#ifdef THREADED
+ netdev_dummy_dispatch, /* dispatch */
+ netdev_dummy_get_fd, /* get_fd */
+#endif
netdev_dummy_drain,
NULL, /* send */
@@ -417,6 +525,9 @@ netdev_dummy_receive(struct unixctl_conn *conn,
struct netdev_dev_dummy *dummy_dev;
int n_listeners;
int i;
+#ifdef THREADED
+ char c = 0;
+#endif
dummy_dev = shash_find_data(&dummy_netdev_devs, argv[1]);
if (!dummy_dev) {
@@ -424,6 +535,7 @@ netdev_dummy_receive(struct unixctl_conn *conn,
return;
}
+
n_listeners = 0;
for (i = 2; i < argc; i++) {
struct netdev_dummy *dev;
@@ -439,7 +551,16 @@ netdev_dummy_receive(struct unixctl_conn *conn,
LIST_FOR_EACH (dev, node, &dummy_dev->devs) {
if (dev->listening) {
struct ofpbuf *copy = ofpbuf_clone(packet);
+#ifdef THREADED
+ pthread_mutex_lock(&dev->queue_mutex);
+#endif
list_push_back(&dev->recv_queue, ©->list_node);
+#ifdef THREADED
+ pthread_mutex_unlock(&dev->queue_mutex);
+ if (write(dev->s_pipe[1], &c, 1) < 0) {
+ VLOG_ERR("Error writing dummy pipe: %s", strerror(errno));
+ }
+#endif
n_listeners++;
}
}
diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c
index 412a92d..7d25ce5 100644
--- a/lib/netdev-linux.c
+++ b/lib/netdev-linux.c
@@ -893,6 +893,43 @@ netdev_linux_recv_wait(struct netdev *netdev_)
}
}
+#ifdef THREADED
+static int
+netdev_linux_dispatch(struct netdev *netdev_, int batch, pkt_handler h,
+ u_char *user)
+{
+ int ret;
+ int i;
+ const size_t size = VLAN_HEADER_LEN + ETH_HEADER_LEN + ETH_PAYLOAD_MAX;
+ OFPBUF_STACK_BUFFER(buf_, size);
+ struct ofpbuf buf;
+ VLOG_DBG("dispatch %d", batch);
+
+ ofpbuf_use_stub(&buf, buf_, size);
+ for (i = 0; i < batch; i++) {
+ ret = netdev_linux_recv(netdev_, buf.data, ofpbuf_tailroom(&buf));
+ if (ret >= 0) {
+ buf.size += ret;
+ h(user, &buf);
+ } else if (ret != -EAGAIN) {
+ return -1;
+ } else {
+ break;
+ }
+ ofpbuf_clear(&buf);
+ }
+ ofpbuf_uninit(&buf);
+ return i;
+}
+
+static int
+netdev_linux_get_fd(struct netdev *netdev_)
+{
+ struct netdev_linux *netdev = netdev_linux_cast(netdev_);
+ return netdev->fd;
+}
+#endif
+
/* Discards all packets waiting to be received from 'netdev'. */
static int
netdev_linux_drain(struct netdev *netdev_)
@@ -2383,6 +2420,12 @@ netdev_linux_change_seq(const struct netdev *netdev)
return netdev_dev_linux_cast(netdev_get_dev(netdev))->change_seq;
}
+#ifdef THREADED
+# define THREADED_METHODS netdev_linux_dispatch, netdev_linux_get_fd,
+#else
+# define THREADED_METHODS
+#endif
+
#define NETDEV_LINUX_CLASS(NAME, CREATE, GET_STATS, SET_STATS, \
GET_FEATURES, GET_STATUS) \
{ \
@@ -2403,6 +2446,7 @@ netdev_linux_change_seq(const struct netdev *netdev)
netdev_linux_listen, \
netdev_linux_recv, \
netdev_linux_recv_wait, \
+ THREADED_METHODS \
netdev_linux_drain, \
\
netdev_linux_send, \
diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h
index 94f60af..f0cdedf 100644
--- a/lib/netdev-provider.h
+++ b/lib/netdev-provider.h
@@ -25,6 +25,9 @@
#include "list.h"
#include "shash.h"
#include "smap.h"
+#ifdef THREADED
+#include "dispatch.h"
+#endif
#ifdef __cplusplus
extern "C" {
@@ -191,6 +194,22 @@ struct netdev_class {
* implement packet reception through the 'recv' member function. */
void (*recv_wait)(struct netdev *netdev);
+#ifdef THREADED
+ /* Attempts to receive 'batch' packets from 'netdev' and process them
+ * through the 'handler' callback. This function is used in the 'THREADED'
+ * version in order to optimize the forwarding process, since it permits to
+ * process packets directly in the netdev memory.
+ *
+ * Returns the number of packets processed on success; this can be 0 if no
+ * packets are available to be read. Returns -1 if an error occurred.
+ */
+ int (*dispatch)(struct netdev *netdev, int batch, pkt_handler handler,
+ u_char *user);
+
+ /* Return the file descriptor of the device */
+ int (*get_fd)(struct netdev *netdev);
+#endif
+
/* Discards all packets waiting to be received from 'netdev'.
*
* May be null if not needed, such as for a network device that does not
diff --git a/lib/netdev-vport.c b/lib/netdev-vport.c
index d5c288f..7fcbebc 100644
--- a/lib/netdev-vport.c
+++ b/lib/netdev-vport.c
@@ -899,6 +899,13 @@ unparse_patch_config(const char *name OVS_UNUSED, const char *type OVS_UNUSED,
return 0;
}
+
+#ifdef THREADED
+# define THREADED_NULL NULL, NULL,
+#else
+# define THREADED_NULL
+#endif
+
#define VPORT_FUNCTIONS(GET_STATUS) \
NULL, \
netdev_vport_run, \
@@ -915,6 +922,7 @@ unparse_patch_config(const char *name OVS_UNUSED, const char *type OVS_UNUSED,
NULL, /* listen */ \
NULL, /* recv */ \
NULL, /* recv_wait */ \
+ THREADED_NULL \
NULL, /* drain */ \
\
netdev_vport_send, /* send */ \
diff --git a/lib/netdev.c b/lib/netdev.c
index 394d895..3359b4c 100644
--- a/lib/netdev.c
+++ b/lib/netdev.c
@@ -424,6 +424,28 @@ netdev_recv_wait(struct netdev *netdev)
}
}
+#ifdef THREADED
+/* Attempts to receive and process 'batch' packets from 'netdev'. */
+int
+netdev_dispatch(struct netdev *netdev, int batch, pkt_handler h, u_char *user)
+{
+ int (*dispatch)(struct netdev*, int, pkt_handler, u_char *);
+
+ dispatch = netdev_get_dev(netdev)->netdev_class->dispatch;
+ return dispatch ? dispatch(netdev, batch, h, user) : 0;
+}
+
+/* Returns the file descriptor */
+int
+netdev_get_fd(struct netdev *netdev)
+{
+ int (*get_fd)(struct netdev *);
+
+ get_fd = netdev_get_dev(netdev)->netdev_class->get_fd;
+ return get_fd ? get_fd(netdev) : 0;
+}
+#endif
+
/* Discards all packets waiting to be received from 'netdev'. */
int
netdev_drain(struct netdev *netdev)
diff --git a/lib/netdev.h b/lib/netdev.h
index d2cc8b5..f55a286 100644
--- a/lib/netdev.h
+++ b/lib/netdev.h
@@ -21,6 +21,9 @@
#include <stddef.h>
#include <stdint.h>
#include "openvswitch/types.h"
+#ifdef THREADED
+#include "dispatch.h"
+#endif
#ifdef __cplusplus
extern "C" {
@@ -107,6 +110,10 @@ int netdev_get_ifindex(const struct netdev *);
int netdev_listen(struct netdev *);
int netdev_recv(struct netdev *, struct ofpbuf *);
void netdev_recv_wait(struct netdev *);
+#ifdef THREADED
+int netdev_dispatch(struct netdev *, int, pkt_handler, u_char *);
+int netdev_get_fd(struct netdev *);
+#endif
int netdev_drain(struct netdev *);
int netdev_send(struct netdev *, const struct ofpbuf *);
diff --git a/lib/vlog.c b/lib/vlog.c
index 0bd9bd1..affd09e 100644
--- a/lib/vlog.c
+++ b/lib/vlog.c
@@ -37,6 +37,9 @@
#include "unixctl.h"
#include "util.h"
#include "worker.h"
+#ifdef THREADED
+#include <pthread.h>
+#endif
VLOG_DEFINE_THIS_MODULE(vlog);
@@ -92,6 +95,10 @@ static int log_fd = -1;
/* vlog initialized? */
static bool vlog_inited;
+#ifdef THREADED
+static pthread_mutex_t vlog_mutex;
+#endif
+
static void format_log_message(const struct vlog_module *, enum vlog_level,
enum vlog_facility, unsigned int msg_num,
const char *message, va_list, struct ds *)
@@ -492,6 +499,9 @@ vlog_init(void)
return;
}
vlog_inited = true;
+#ifdef THREADED
+ pthread_mutex_init(&vlog_mutex, NULL);
+#endif
/* openlog() is allowed to keep the pointer passed in, without making a
* copy. The daemonize code sometimes frees and replaces 'program_name',
@@ -707,6 +717,9 @@ vlog_valist(const struct vlog_module *module, enum vlog_level level,
ds_init(&s);
ds_reserve(&s, 1024);
+#ifdef THREADED
+ pthread_mutex_lock(&vlog_mutex);
+#endif
msg_num++;
if (log_to_console) {
@@ -736,6 +749,9 @@ vlog_valist(const struct vlog_module *module, enum vlog_level level,
vlog_write_file(&s);
}
+#ifdef THREADED
+ pthread_mutex_unlock(&vlog_mutex);
+#endif
ds_destroy(&s);
errno = save_errno;
}
diff --git a/m4/openvswitch.m4 b/m4/openvswitch.m4
index 939f296..f4cad2c 100644
--- a/m4/openvswitch.m4
+++ b/m4/openvswitch.m4
@@ -14,6 +14,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+dnl Check for --enable-threaded and updates CFLAGS.
+AC_DEFUN([OVS_CHECK_THREADED],
+ [AC_REQUIRE([AC_PROG_CC])
+ AC_ARG_ENABLE(
+ [threaded],
+ [AC_HELP_STRING([--enable-threaded],
+ [Enable threaded version of userspace implementation])],
+ [case "${enableval}" in
+ (yes) threaded=true ;;
+ (no) threaded=false ;;
+ (*) AC_MSG_ERROR([bad value ${enableval} for --enable-threaded]) ;;
+ esac],
+ [threaded=false])
+ if $threaded; then
+ AC_DEFINE([THREADED], [1],
+ [Define to 1 if the threaded version of userspace
+ implementation is enabled.])
+ fi])
+
dnl Checks for --enable-coverage and updates CFLAGS and LDFLAGS appropriately.
AC_DEFUN([OVS_CHECK_COVERAGE],
[AC_REQUIRE([AC_PROG_CC])
--
1.7.9.5
_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev