This commit moves the ofproto-dpif-monitor module into a
dedicated thread.  This helps eliminate the burden of main
thread having to wake up very frequently for periodic
interface monitoring (bfd, cfm).  Also, this commit greatly
increases the number of bfd/cfm sessions that can be supported
by ovs.

Signed-off-by: Alex Wang <al...@nicira.com>
---
 ofproto/ofproto-dpif-monitor.c |  130 +++++++++++++++++++++++++++++-----------
 ofproto/ofproto-dpif-monitor.h |    5 +-
 ofproto/ofproto-dpif.c         |    8 +--
 tests/bfd.at                   |   31 ++++++----
 tests/ofproto-dpif.at          |   58 ++++++++++++++++++
 5 files changed, 176 insertions(+), 56 deletions(-)

diff --git a/ofproto/ofproto-dpif-monitor.c b/ofproto/ofproto-dpif-monitor.c
index 49210fe..e7a9144 100644
--- a/ofproto/ofproto-dpif-monitor.c
+++ b/ofproto/ofproto-dpif-monitor.c
@@ -23,11 +23,18 @@
 #include "cfm.h"
 #include "hash.h"
 #include "hmap.h"
+#include "latch.h"
 #include "ofpbuf.h"
 #include "ofproto-dpif.h"
+#include "ovs-thread.h"
+#include "poll-loop.h"
+#include "seq.h"
+#include "timeval.h"
 #include "util.h"
 #include "vlog.h"
 
+VLOG_DEFINE_THIS_MODULE(ofproto_dpif_monitor);
+
 /* Monitored port.  It contains references to ofport, bfd, cfm structs. */
 struct mport {
     struct hmap_node hmap_node;       /* In monitor's hmap. */
@@ -40,9 +47,21 @@ struct mport {
 
 /* hmap that contains all monitored port. */
 static struct hmap monitor_hmap = HMAP_INITIALIZER(&monitor_hmap);
+/* latch that controls the exit of monitor thread. */
+static struct latch monitor_exit_latch;
+/* Holds the last seq value of timewarp_seq. */
+static uint64_t last_seq;
+/* NULL if monitor thread is not started. */
+static pthread_t *monitor_tid;
 
 static struct ovs_rwlock monitor_rwlock = OVS_RWLOCK_INITIALIZER;
 
+static void *monitor_main(void *);
+static void monitor_run(void) OVS_REQ_RDLOCK(monitor_rwlock);
+static void monitor_wait(void) OVS_REQ_RDLOCK(monitor_rwlock);
+static void monitor_start(void);
+static void monitor_terminate(void);
+
 static void mport_register(const struct ofport_dpif *, struct bfd *,
                            struct cfm *, uint8_t[ETH_ADDR_LEN])
     OVS_REQ_WRLOCK(monitor_rwlock);
@@ -119,32 +138,32 @@ mport_update(struct mport *mport, struct bfd *bfd, struct 
cfm *cfm,
         memcpy(mport->hw_addr, hw_addr, ETH_ADDR_LEN);
     }
 }
-
 
-/* Creates the mport in monitor module if either bfd or cfm
- * is configured.  Otherwise, deletes the mport. */
-void
-ofproto_dpif_monitor_port_update(const struct ofport_dpif *ofport,
-                                 struct bfd *bfd, struct cfm *cfm,
-                                 uint8_t hw_addr[ETH_ADDR_LEN])
+/* The 'main' function for the monitor thread. */
+static void *
+monitor_main(void * args OVS_UNUSED)
 {
-    ovs_rwlock_wrlock(&monitor_rwlock);
-    if (!cfm && !bfd) {
-        mport_unregister(ofport);
-    } else {
-        mport_register(ofport, bfd, cfm, hw_addr);
+    set_subprogram_name("monitor");
+    VLOG_INFO("monitor thread created");
+    while (!latch_is_set(&monitor_exit_latch)) {
+        ovs_rwlock_rdlock(&monitor_rwlock);
+        monitor_run();
+        monitor_wait();
+        ovs_rwlock_unlock(&monitor_rwlock);
+        latch_wait(&monitor_exit_latch);
+        poll_block();
     }
-    ovs_rwlock_unlock(&monitor_rwlock);
+    VLOG_INFO("monitor thread terminated");
+    return NULL;
 }
 
 /* Checks the sending of control packets on all mports.  Sends the control
- * packets if needed. */
-void
-ofproto_dpif_monitor_run_fast(void)
+ * packets if needed.  Executes bfd_run(), cfm_run() on all mports. */
+static void
+monitor_run(void)
 {
     struct mport *mport;
 
-    ovs_rwlock_rdlock(&monitor_rwlock);
     HMAP_FOR_EACH (mport, hmap_node, &monitor_hmap) {
         if (mport->cfm && cfm_should_send_ccm(mport->cfm)) {
             struct ofpbuf packet;
@@ -162,18 +181,6 @@ ofproto_dpif_monitor_run_fast(void)
             ofproto_dpif_send_packet(mport->ofport, &packet);
             ofpbuf_uninit(&packet);
         }
-    }
-    ovs_rwlock_unlock(&monitor_rwlock);
-}
-
-/* Executes bfd_run(), cfm_run() on all mports. */
-void
-ofproto_dpif_monitor_run(void)
-{
-    struct mport *mport;
-
-    ovs_rwlock_rdlock(&monitor_rwlock);
-    HMAP_FOR_EACH (mport, hmap_node, &monitor_hmap) {
         if (mport->cfm) {
             cfm_run(mport->cfm);
         }
@@ -181,16 +188,15 @@ ofproto_dpif_monitor_run(void)
             bfd_run(mport->bfd);
         }
     }
-    ovs_rwlock_unlock(&monitor_rwlock);
 }
 
-/* Executes the bfd_wait() and cfm_wait() functions on all mports. */
-void
-ofproto_dpif_monitor_wait(void)
+/* Executes the bfd_wait() and cfm_wait() functions on all mports,
+ * and wait on the sequence number. */
+static void
+monitor_wait(void)
 {
     struct mport *mport;
 
-    ovs_rwlock_rdlock(&monitor_rwlock);
     HMAP_FOR_EACH (mport, hmap_node, &monitor_hmap) {
         if (mport->cfm) {
             cfm_wait(mport->cfm);
@@ -199,5 +205,61 @@ ofproto_dpif_monitor_wait(void)
             bfd_wait(mport->bfd);
         }
     }
+    last_seq = seq_read(timewarp_seq);
+    seq_wait(timewarp_seq, last_seq);
+}
+
+/* Starts the monitor thread. */
+static void
+monitor_start(void)
+{
+    /* zalloc tid. */
+    monitor_tid = xzalloc(sizeof *monitor_tid);
+    latch_init(&monitor_exit_latch);
+
+    xpthread_create(monitor_tid, NULL, monitor_main, NULL);
+}
+
+/* Terminates the monitor thread. */
+static void
+monitor_terminate(void)
+{
+    latch_set(&monitor_exit_latch);
+    xpthread_join(*monitor_tid, NULL);
+
+    /* frees the tid. */
+    latch_destroy(&monitor_exit_latch);
+    free(monitor_tid);
+    monitor_tid = NULL;
+}
+
+
+/* Creates the mport in monitor module if either bfd or cfm
+ * is configured.  Otherwise, deletes the mport. */
+void
+ofproto_dpif_monitor_port_update(const struct ofport_dpif *ofport,
+                                 struct bfd *bfd, struct cfm *cfm,
+                                 uint8_t hw_addr[ETH_ADDR_LEN])
+{
+    ovs_rwlock_wrlock(&monitor_rwlock);
+    if (!cfm && !bfd) {
+        mport_unregister(ofport);
+    } else {
+        mport_register(ofport, bfd, cfm, hw_addr);
+    }
     ovs_rwlock_unlock(&monitor_rwlock);
 }
+
+/* Checks if the monitor thread has been started.  If it hasn't,
+ * and the hmap is not empty, starts it.  If it has
+ * and the hmap is empty, terminates it.
+ * This function can only be called by the main thread.  */
+void
+ofproto_dpif_monitor_check_run(void)
+{
+    if (!monitor_tid && !hmap_is_empty(&monitor_hmap))  {
+        monitor_start();
+    } else if (monitor_tid && hmap_is_empty(&monitor_hmap))  {
+        monitor_terminate();
+    }
+}
diff --git a/ofproto/ofproto-dpif-monitor.h b/ofproto/ofproto-dpif-monitor.h
index 0f27ba5..43f7ee9 100644
--- a/ofproto/ofproto-dpif-monitor.h
+++ b/ofproto/ofproto-dpif-monitor.h
@@ -23,10 +23,7 @@ struct bfd;
 struct cfm;
 struct ofport_dpif;
 
-void ofproto_dpif_monitor_run(void);
-void ofproto_dpif_monitor_run_fast(void);
-void ofproto_dpif_monitor_wait(void);
-
+void ofproto_dpif_monitor_check_run(void);
 void ofproto_dpif_monitor_port_update(const struct ofport_dpif *,
                                       struct bfd *, struct cfm *,
                                       uint8_t[OFP_ETH_ALEN]);
diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
index fde8943..a8d15dd 100644
--- a/ofproto/ofproto-dpif.c
+++ b/ofproto/ofproto-dpif.c
@@ -832,6 +832,9 @@ type_run(const char *type)
         udpif_revalidate(backer->udpif);
     }
 
+    /* Checks the creation or termination of monitor thread. */
+    ofproto_dpif_monitor_check_run();
+
     if (!backer->recv_set_enable) {
         /* Wake up before a max of 1000ms. */
         timer_set_duration(&backer->next_expiration, 1000);
@@ -1464,7 +1467,6 @@ run_fast(struct ofproto *ofproto_)
         free(pin);
     }
 
-    ofproto_dpif_monitor_run_fast();
     return 0;
 }
 
@@ -1506,9 +1508,6 @@ run(struct ofproto *ofproto_)
         dpif_ipfix_run(ofproto->ipfix);
     }
 
-    ofproto_dpif_monitor_run_fast();
-    ofproto_dpif_monitor_run();
-
     HMAP_FOR_EACH (ofport, up.hmap_node, &ofproto->up.ports) {
         port_run(ofport);
     }
@@ -1565,7 +1564,6 @@ wait(struct ofproto *ofproto_)
     if (ofproto->ipfix) {
         dpif_ipfix_wait(ofproto->ipfix);
     }
-    ofproto_dpif_monitor_wait();
     HMAP_FOR_EACH (bundle, hmap_node, &ofproto->bundles) {
         bundle_wait(bundle);
     }
diff --git a/tests/bfd.at b/tests/bfd.at
index b9dd7d8..af7d987 100644
--- a/tests/bfd.at
+++ b/tests/bfd.at
@@ -278,6 +278,7 @@ else
     for i in `seq 0 1`; do ovs-appctl time/warp 500; done
 fi
 
+
 # Test-1 BFD decay: decay to decay_min_rx
 # bfd:decay_min_rx is set to 3000ms after the local state of p0 goes up,
 # so for the first 2500ms, there should be no change.
@@ -353,14 +354,15 @@ BFD_CHECK([p1], [true], [false], [none], [up], [No 
Diagnostic], [none], [up], [N
 # for decay_min_rx < 2000ms, the decay detection time is set to 2000ms.
 # this should firstly reset the min_rx and start poll sequence.
 AT_CHECK([ovs-vsctl set Interface p0 bfd:decay_min_rx=1000])
+ovs-appctl time/warp 500
 BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [final], [up], 
[No Diagnostic])
 BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [poll], [up], 
[No Diagnostic])
 BFD_CHECK_TX([p0], [500ms], [300ms], [500ms])
 BFD_CHECK_RX([p0], [500ms], [300ms], [500ms])
 
-# for the following 1500ms, there should be no decay,
+# for the following 1000ms, there should be no decay,
 # since the decay_detect_time is set to 2000ms.
-for i in `seq 0 2`
+for i in `seq 0 1`
 do
     ovs-appctl time/warp 500
     BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [none], 
[up], [No Diagnostic])
@@ -383,21 +385,19 @@ for i in `seq 0 4`; do ovs-appctl time/warp 500; done
 # Test-4 BFD decay: set min_rx to 800ms.
 # this should firstly reset the min_rx and then re-decay to 1000ms.
 AT_CHECK([ovs-vsctl set Interface p0 bfd:min_rx=800])
+ovs-appctl time/warp 800
 BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [final], [up], 
[No Diagnostic])
 BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [poll], [up], 
[No Diagnostic])
 BFD_CHECK_TX([p0], [500ms], [300ms], [500ms])
 BFD_CHECK_RX([p0], [800ms], [800ms], [500ms])
 
-# for the following 1600ms, there should be no decay,
+# for the following 800ms, there should be no decay,
 # since the decay detection time is set to 2000ms.
-for i in `seq 0 1`
-do
-    ovs-appctl time/warp 800
-    BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [none], 
[up], [No Diagnostic])
-    BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [none], 
[up], [No Diagnostic])
-    BFD_CHECK_TX([p0], [500ms], [300ms], [500ms])
-    BFD_CHECK_RX([p0], [800ms], [800ms], [500ms])
-done
+ovs-appctl time/warp 800
+BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [none], [up], 
[No Diagnostic])
+BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [none], [up], 
[No Diagnostic])
+BFD_CHECK_TX([p0], [500ms], [300ms], [500ms])
+BFD_CHECK_RX([p0], [800ms], [800ms], [500ms])
 
 ovs-appctl time/warp 400
 # at 2000ms, decay should happen and there should be the poll sequence flags.
@@ -412,6 +412,7 @@ for i in `seq 0 4`; do ovs-appctl time/warp 500; done
 
 # Test-5 BFD decay: set min_rx to 300ms and decay_min_rx to 5000ms together.
 AT_CHECK([ovs-vsctl set Interface p0 bfd:min_rx=300 bfd:decay_min_rx=5000])
+ovs-appctl time/warp 500
 BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [final], [up], 
[No Diagnostic])
 BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [poll], [up], 
[No Diagnostic])
 BFD_CHECK_TX([p0], [500ms], [300ms], [500ms])
@@ -419,9 +420,9 @@ BFD_CHECK_RX([p0], [500ms], [300ms], [500ms])
 
 # for decay_min_rx > 2000ms, the decay detection time is set to
 # decay_min_rx (5000ms).
-# for the following 4500ms, there should be no decay,
+# for the following 4000ms, there should be no decay,
 # since the decay detection time is set to 5000ms.
-for i in `seq 0 8`
+for i in `seq 1 8`
 do
     ovs-appctl time/warp 500
     BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [none], 
[up], [No Diagnostic])
@@ -443,6 +444,7 @@ for i in `seq 0 9`; do ovs-appctl time/warp 500; done
 
 # Test-6 BFD decay: set decay_min_rx to 0 to disable bfd decay.
 AT_CHECK([ovs-vsctl set Interface p0 bfd:decay_min_rx=0])
+ovs-appctl time/warp 500
 # min_rx is reset, and there should be the poll sequence flags.
 BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [final], [up], 
[No Diagnostic])
 BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [poll], [up], 
[No Diagnostic])
@@ -479,6 +481,7 @@ do
 done
 # reset the p1's min_tx to 500ms.
 AT_CHECK([ovs-vsctl set Interface p1 bfd:min_tx=500])
+ovs-appctl time/warp 500
 # check the poll sequence. since p0 has been in decay, now the RX will show 
3000ms.
 BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [final], [up], 
[No Diagnostic])
 BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [poll], [up], 
[No Diagnostic])
@@ -693,6 +696,8 @@ done
 
 # reconfigure the decay_min_rx to 1000ms. check the poll sequence.
 AT_CHECK([ovs-vsctl set interface p0 bfd:decay_min_rx=1000])
+# advance the clock by tiny little bit to wakeup the monitor thread.
+ovs-appctl time/warp 1
 BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [final], [up], 
[No Diagnostic])
 BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [poll], [up], 
[No Diagnostic])
 BFD_CHECK_TX([p0], [500ms], [300ms], [500ms])
diff --git a/tests/ofproto-dpif.at b/tests/ofproto-dpif.at
index 652304e..0b66a9c 100644
--- a/tests/ofproto-dpif.at
+++ b/tests/ofproto-dpif.at
@@ -2838,3 +2838,61 @@ AT_CHECK([ovs-appctl bond/show | sed -n 
'/^.*may_enable:.*/p'], [0], [dnl
 
 OVS_VSWITCHD_STOP
 AT_CLEANUP
+
+AT_SETUP([ofproto-dpif - ofproto-dpif-monitor])
+OVS_VSWITCHD_START([add-port br0 p0 -- set interface p0 type=gre 
options:remote_ip=1.2.3.4])
+
+# enable bfd on p0.
+AT_CHECK([ovs-vsctl set interface p0 bfd:enable=true])
+# check log.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(monitor)|INFO|\(.* 
created\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+monitor thread created
+])
+# disable bfd on p0.
+AT_CHECK([ovs-vsctl set interface p0 bfd:enable=false])
+# check log.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(monitor)|INFO|\(.* 
terminated\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+monitor thread terminated
+])
+AT_CHECK([cat ovs-vswitchd.log | sed -e '/^.*ofproto_dpif_monitor.*$/d' > 
ovs-vswitchd.log])
+
+# enable cfm on p0.
+AT_CHECK([ovs-vsctl set interface p0 cfm_mpid=10])
+# check log.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(monitor)|INFO|\(.* 
created\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+monitor thread created
+])
+# disable cfm on p0.
+AT_CHECK([ovs-vsctl remove interface p0 cfm_mpid 10])
+# check log.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(monitor)|INFO|\(.* 
terminated\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+monitor thread terminated
+])
+AT_CHECK([cat ovs-vswitchd.log | sed -e '/^.*ofproto_dpif_monitor.*$/d' > 
ovs-vswitchd.log])
+
+# enable both bfd and cfm on p0.
+AT_CHECK([ovs-vsctl set interface p0 bfd:enable=true cfm_mpid=10])
+# check log.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(monitor)|INFO|\(.* 
created\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+monitor thread created
+])
+# disable bfd on p0.
+AT_CHECK([ovs-vsctl set interface p0 bfd:enable=false])
+# check log, there should not be the log of thread terminated.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(monitor)|INFO|\(.* 
terminated\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+])
+# reenable bfd on p0.
+AT_CHECK([ovs-vsctl set interface p0 bfd:enable=true])
+# check log, should still be on log of thread created.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(monitor)|INFO|\(.* 
created\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+monitor thread created
+])
+# disable bfd and cfm together.
+AT_CHECK([ovs-vsctl set interface p0 bfd:enable=false -- remove interface p0 
cfm_mpid 10])
+# check log.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(monitor)|INFO|\(.* 
terminated\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+monitor thread terminated
+])
+
+OVS_VSWITCHD_STOP
+AT_CLEANUP
\ No newline at end of file
-- 
1.7.9.5

_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to