This commit moves the ofproto-dpif-monitor module into a dedicated thread. Signed-off-by: Alex Wang <al...@nicira.com> --- lib/bfd.c | 4 +- lib/timeval.c | 20 +++++ lib/timeval.h | 4 + ofproto/ofproto-dpif-monitor.c | 166 ++++++++++++++++++++++++++++------------ ofproto/ofproto-dpif-monitor.h | 2 - ofproto/ofproto-dpif-xlate.h | 3 +- ofproto/ofproto-dpif.c | 7 +- tests/bfd.at | 33 ++++---- tests/ofproto-dpif.at | 58 ++++++++++++++ 9 files changed, 226 insertions(+), 71 deletions(-)
diff --git a/lib/bfd.c b/lib/bfd.c index 6c9e920..c106983 100644 --- a/lib/bfd.c +++ b/lib/bfd.c @@ -721,8 +721,10 @@ bfd_process_packet(struct bfd *bfd, const struct flow *flow, rmt_min_rx = MAX(ntohl(msg->min_rx) / 1000, 1); if (bfd->rmt_min_rx != rmt_min_rx) { bfd->rmt_min_rx = rmt_min_rx; - bfd_set_next_tx(bfd); log_msg(VLL_INFO, msg, "New remote min_rx", bfd); + if (bfd->last_tx) { + bfd_set_next_tx(bfd); + } } bfd->rmt_min_tx = MAX(ntohl(msg->min_tx) / 1000, 1); diff --git a/lib/timeval.c b/lib/timeval.c index 223ed30..befba1c 100644 --- a/lib/timeval.c +++ b/lib/timeval.c @@ -33,6 +33,7 @@ #include "hmap.h" #include "ovs-thread.h" #include "signals.h" +#include "seq.h" #include "unixctl.h" #include "util.h" #include "vlog.h" @@ -57,6 +58,9 @@ static struct clock wall_clock; /* CLOCK_REALTIME. */ /* The monotonic time at which the time module was initialized. */ static long long int boot_time; +/* Reference to the seq struct of monitor thread. */ +static struct seq *monitor_seq; + /* Monotonic time in milliseconds at which to die with SIGALRM (if not * LLONG_MAX). */ static long long int deadline = LLONG_MAX; @@ -294,6 +298,18 @@ time_boot_msec(void) return boot_time; } +/* Sets monitor_seq to 'seq'. */ +void +time_set_monitor_seq(struct seq *seq) { + monitor_seq = seq; +} + +/* Clears monitor_seq. */ +void +time_clear_monitor_seq(void) { + monitor_seq = NULL; +} + void xgettimeofday(struct timeval *tv) { @@ -509,6 +525,10 @@ timeval_warp_cb(struct unixctl_conn *conn, ovs_mutex_lock(&monotonic_clock.mutex); atomic_store(&monotonic_clock.slow_path, true); timespec_add(&monotonic_clock.warp, &monotonic_clock.warp, &ts); + /* Changes 'monitor_seq' to wakeup monitor thread. */ + if (monitor_seq) { + seq_change(monitor_seq); + } ovs_mutex_unlock(&monotonic_clock.mutex); unixctl_command_reply(conn, "warped"); diff --git a/lib/timeval.h b/lib/timeval.h index 99b3af0..1273dd8 100644 --- a/lib/timeval.h +++ b/lib/timeval.h @@ -27,6 +27,7 @@ extern "C" { struct ds; struct pollfd; +struct seq; struct timespec; struct timeval; @@ -69,6 +70,9 @@ int get_cpu_usage(void); long long int time_boot_msec(void); +void time_set_monitor_seq(struct seq *); +void time_clear_monitor_seq(void); + #ifdef __cplusplus } #endif diff --git a/ofproto/ofproto-dpif-monitor.c b/ofproto/ofproto-dpif-monitor.c index e76dac6..3078ed8 100644 --- a/ofproto/ofproto-dpif-monitor.c +++ b/ofproto/ofproto-dpif-monitor.c @@ -21,9 +21,14 @@ #include "cfm.h" #include "hash.h" #include "hmap.h" +#include "latch.h" #include "ofpbuf.h" #include "ofproto-dpif.h" #include "ofproto-dpif-xlate.h" +#include "ovs-thread.h" +#include "poll-loop.h" +#include "seq.h" +#include "timeval.h" #include "util.h" #include "vlog.h" @@ -34,6 +39,10 @@ struct xport; /* Monitor struct. */ struct monitor { struct hmap hmap; /* hmap that contains all port monitors. */ + struct latch exit_latch; + struct seq *wait_seq; /* seq for waking monitor thread up. */ + uint64_t last_seq; + pthread_t *tid; }; /* Monitored port. It has reference to xport. */ @@ -44,18 +53,27 @@ struct mport { static struct monitor monitor; -static void ofproto_dpif_monitor_init(void); +static void monitor_init(void); +static void *monitor_handler(void *); +static void monitor_run(void); +static void monitor_wait(void); +static void monitor_start(void); +static void monitor_terminate(void); + static struct mport *mport_find(const struct xport *) OVS_REQ_WRLOCK(xlate_rwlock); /* Initializes the monitor struct. The init function can only be * called once. */ static void -ofproto_dpif_monitor_init(void) { +monitor_init(void) { static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER; if (ovsthread_once_start(&once)) { hmap_init(&monitor.hmap); + monitor.wait_seq = seq_create(); + monitor.last_seq = seq_read(monitor.wait_seq); + monitor.tid = NULL; ovsthread_once_done(&once); } } @@ -77,40 +95,30 @@ mport_find(const struct xport *xport) OVS_REQ_WRLOCK(xlate_rwlock) } return mport; } - - -/* Creates a new mport and inserts it into monitor.hmap. */ -void -ofproto_dpif_monitor_mport_create(const struct xport *xport) -{ - struct mport *mport = mport_find(xport); - if (!mport) { - mport = xmalloc(sizeof *mport); - mport->xport = xport; - hmap_insert(&monitor.hmap, &mport->hmap_node, hash_pointer(xport, 0)); - } -} -/* Removes mport from monitor's hmap and frees it. */ -void -ofproto_dpif_monitor_mport_delete(const struct xport *xport) +/* The handler function for the monitor thread. */ +static void * +monitor_handler(void * args OVS_UNUSED) { - struct mport *mport = mport_find(xport); - if (mport) { - hmap_remove(&monitor.hmap, &mport->hmap_node); - free(mport); + set_subprogram_name("ofproto_dpif_monitor"); + VLOG_INFO("ofproto_dpif_monitor thread created"); + while (!latch_is_set(&monitor.exit_latch)) { + monitor_run(); + monitor_wait(); + latch_wait(&monitor.exit_latch); + poll_block(); } + VLOG_INFO("ofproto_dpif_monitor thread terminated"); + return NULL; } /* Checks the sending of control packets on all mports. Sends the control - * packets if needed. */ -void -ofproto_dpif_monitor_run_fast(void) + * packets if needed. Executes bfd_run(), cfm_run() on all mports. */ +static void +monitor_run(void) { struct mport *mport; - ofproto_dpif_monitor_init(); - ovs_rwlock_rdlock(&xlate_rwlock); HMAP_FOR_EACH (mport, hmap_node, &monitor.hmap) { struct monitor_info *info = xlate_get_monitor_info(mport->xport); @@ -131,21 +139,6 @@ ofproto_dpif_monitor_run_fast(void) ofproto_dpif_send_packet(info->ofport, &packet); ofpbuf_uninit(&packet); } - xlate_clear_monitor_info(info); - } - ovs_rwlock_unlock(&xlate_rwlock); -} - -/* Executes bfd_run(), cfm_run() on all mports. */ -void -ofproto_dpif_monitor_run(void) -{ - struct mport *mport; - - ofproto_dpif_monitor_init(); - ovs_rwlock_rdlock(&xlate_rwlock); - HMAP_FOR_EACH (mport, hmap_node, &monitor.hmap) { - struct monitor_info *info = xlate_get_monitor_info(mport->xport); if (info->cfm) { cfm_run(info->cfm); @@ -158,14 +151,13 @@ ofproto_dpif_monitor_run(void) ovs_rwlock_unlock(&xlate_rwlock); } -/* Executes the bfd_wait() and cfm_wait() functions on all mports. */ -void -ofproto_dpif_monitor_wait(void) +/* Executes the bfd_wait() and cfm_wait() functions on all mports, + * and wait on the sequence number. */ +static void +monitor_wait(void) { struct mport *mport; - ofproto_dpif_monitor_init(); - ovs_rwlock_rdlock(&xlate_rwlock); HMAP_FOR_EACH (mport, hmap_node, &monitor.hmap) { struct monitor_info *info = xlate_get_monitor_info(mport->xport); @@ -178,5 +170,85 @@ ofproto_dpif_monitor_wait(void) } xlate_clear_monitor_info(info); } + monitor.last_seq = seq_read(monitor.wait_seq); + seq_wait(monitor.wait_seq, monitor.last_seq); ovs_rwlock_unlock(&xlate_rwlock); } + +/* Starts the monitor thread. */ +static void +monitor_start(void) +{ + ovs_assert(!monitor.tid); + + /* Malloc tid. */ + monitor.tid = xmalloc(sizeof *monitor.tid); + latch_init(&monitor.exit_latch); + + xpthread_create(monitor.tid, NULL, monitor_handler, NULL); + time_set_monitor_seq(monitor.wait_seq); +} + +/* Terminates the monitor thread. */ +static void +monitor_terminate(void) +{ + ovs_assert(monitor.tid); + + time_clear_monitor_seq(); + latch_set(&monitor.exit_latch); + xpthread_join(*monitor.tid, NULL); + + /* Frees the tid. */ + latch_destroy(&monitor.exit_latch); + free(monitor.tid); + monitor.tid = NULL; +} + + +/* Creates a new mport and inserts it into monitor.hmap. */ +void +ofproto_dpif_monitor_mport_create(const struct xport *xport) +{ + struct mport *mport; + + monitor_init(); + + mport = mport_find(xport); + if (!mport) { + mport = xmalloc(sizeof *mport); + mport->xport = xport; + hmap_insert(&monitor.hmap, &mport->hmap_node, hash_pointer(xport, 0)); + } +} + +/* Removes mport from monitor's hmap and frees it. */ +void +ofproto_dpif_monitor_mport_delete(const struct xport *xport) +{ + struct mport *mport; + + monitor_init(); + + mport = mport_find(xport); + if (mport) { + hmap_remove(&monitor.hmap, &mport->hmap_node); + free(mport); + } +} + +/* Checks if the monitor thread has been started. If it hasn't, + * and the hmap is not empty, starts it. If it has + * and the hmap is empty, terminates it. + * This function can only be called by the main thread. */ +void +ofproto_dpif_monitor_run(void) +{ + monitor_init(); + + if (!monitor.tid && !hmap_is_empty(&monitor.hmap)) { + monitor_start(); + } else if (monitor.tid && hmap_is_empty(&monitor.hmap)) { + monitor_terminate(); + } +} diff --git a/ofproto/ofproto-dpif-monitor.h b/ofproto/ofproto-dpif-monitor.h index 44f2ab6..27fbab1 100644 --- a/ofproto/ofproto-dpif-monitor.h +++ b/ofproto/ofproto-dpif-monitor.h @@ -22,8 +22,6 @@ struct xport; extern struct ovs_rwlock xlate_rwlock; void ofproto_dpif_monitor_run(void); -void ofproto_dpif_monitor_run_fast(void); -void ofproto_dpif_monitor_wait(void); void ofproto_dpif_monitor_mport_create(const struct xport *) OVS_REQ_WRLOCK(xlate_rwlock); void ofproto_dpif_monitor_mport_delete(const struct xport *) OVS_REQ_WRLOCK(xlate_rwlock); diff --git a/ofproto/ofproto-dpif-xlate.h b/ofproto/ofproto-dpif-xlate.h index f60aa02..3bf0c7f 100644 --- a/ofproto/ofproto-dpif-xlate.h +++ b/ofproto/ofproto-dpif-xlate.h @@ -161,8 +161,7 @@ int xlate_receive(const struct dpif_backer *, struct ofpbuf *packet, struct ofproto_dpif **, odp_port_t *odp_in_port) OVS_EXCLUDED(xlate_rwlock); -void xlate_actions(struct xlate_in *, struct xlate_out *) - OVS_EXCLUDED(xlate_rwlock); +void xlate_actions(struct xlate_in *, struct xlate_out *); void xlate_in_init(struct xlate_in *, struct ofproto_dpif *, const struct flow *, struct rule_dpif *, uint8_t tcp_flags, const struct ofpbuf *packet); diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c index 038b701..75df985 100644 --- a/ofproto/ofproto-dpif.c +++ b/ofproto/ofproto-dpif.c @@ -834,6 +834,8 @@ type_run(const char *type) udpif_revalidate(backer->udpif); } + ofproto_dpif_monitor_run(); + if (!backer->recv_set_enable) { /* Wake up before a max of 1000ms. */ timer_set_duration(&backer->next_expiration, 1000); @@ -1464,7 +1466,6 @@ run_fast(struct ofproto *ofproto_) free(pin); } - ofproto_dpif_monitor_run_fast(); return 0; } @@ -1506,9 +1507,6 @@ run(struct ofproto *ofproto_) dpif_ipfix_run(ofproto->ipfix); } - ofproto_dpif_monitor_run_fast(); - ofproto_dpif_monitor_run(); - HMAP_FOR_EACH (ofport, up.hmap_node, &ofproto->up.ports) { port_run(ofport); } @@ -1565,7 +1563,6 @@ wait(struct ofproto *ofproto_) if (ofproto->ipfix) { dpif_ipfix_wait(ofproto->ipfix); } - ofproto_dpif_monitor_wait(); HMAP_FOR_EACH (bundle, hmap_node, &ofproto->bundles) { bundle_wait(bundle); } diff --git a/tests/bfd.at b/tests/bfd.at index cc6755c..b83440a 100644 --- a/tests/bfd.at +++ b/tests/bfd.at @@ -271,6 +271,7 @@ else for i in `seq 0 1`; do ovs-appctl time/warp 500; done fi + # Test-1 BFD decay: decay to decay_min_rx # bfd:decay_min_rx is set to 3000ms after the local state of p0 goes up, # so for the first 2500ms, there should be no change. @@ -345,15 +346,16 @@ BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [none], [up], [N # change decay_min_rx to 1000ms. # for decay_min_rx < 2000ms, the decay detection time is set to 2000ms. # this should firstly reset the min_rx and start poll sequence. -AT_CHECK([ovs-vsctl set Interface p0 bfd:decay_min_rx=1000]) +AT_CHECK([ovs-vsctl set Interface p0 bfd:decay_min_rx=1000])o +ovs-appctl time/warp 500 BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [final], [up], [No Diagnostic]) BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [poll], [up], [No Diagnostic]) BFD_CHECK_TX([p0], [500ms], [300ms], [500ms]) BFD_CHECK_RX([p0], [500ms], [300ms], [500ms]) -# for the following 1500ms, there should be no decay, +# for the following 1000ms, there should be no decay, # since the decay_detect_time is set to 2000ms. -for i in `seq 0 2` +for i in `seq 0 1` do ovs-appctl time/warp 500 BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [none], [up], [No Diagnostic]) @@ -376,21 +378,19 @@ for i in `seq 0 4`; do ovs-appctl time/warp 500; done # Test-4 BFD decay: set min_rx to 800ms. # this should firstly reset the min_rx and then re-decay to 1000ms. AT_CHECK([ovs-vsctl set Interface p0 bfd:min_rx=800]) +ovs-appctl time/warp 800 BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [final], [up], [No Diagnostic]) BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [poll], [up], [No Diagnostic]) BFD_CHECK_TX([p0], [500ms], [300ms], [500ms]) BFD_CHECK_RX([p0], [800ms], [800ms], [500ms]) -# for the following 1600ms, there should be no decay, +# for the following 800ms, there should be no decay, # since the decay detection time is set to 2000ms. -for i in `seq 0 1` -do - ovs-appctl time/warp 800 - BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [none], [up], [No Diagnostic]) - BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [none], [up], [No Diagnostic]) - BFD_CHECK_TX([p0], [500ms], [300ms], [500ms]) - BFD_CHECK_RX([p0], [800ms], [800ms], [500ms]) -done +ovs-appctl time/warp 800 +BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [none], [up], [No Diagnostic]) +BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [none], [up], [No Diagnostic]) +BFD_CHECK_TX([p0], [500ms], [300ms], [500ms]) +BFD_CHECK_RX([p0], [800ms], [800ms], [500ms]) ovs-appctl time/warp 400 # at 2000ms, decay should happen and there should be the poll sequence flags. @@ -405,6 +405,7 @@ for i in `seq 0 4`; do ovs-appctl time/warp 500; done # Test-5 BFD decay: set min_rx to 300ms and decay_min_rx to 5000ms together. AT_CHECK([ovs-vsctl set Interface p0 bfd:min_rx=300 bfd:decay_min_rx=5000]) +ovs-appctl time/warp 500 BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [final], [up], [No Diagnostic]) BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [poll], [up], [No Diagnostic]) BFD_CHECK_TX([p0], [500ms], [300ms], [500ms]) @@ -412,9 +413,9 @@ BFD_CHECK_RX([p0], [500ms], [300ms], [500ms]) # for decay_min_rx > 2000ms, the decay detection time is set to # decay_min_rx (5000ms). -# for the following 4500ms, there should be no decay, +# for the following 4000ms, there should be no decay, # since the decay detection time is set to 5000ms. -for i in `seq 0 8` +for i in `seq 1 8` do ovs-appctl time/warp 500 BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [none], [up], [No Diagnostic]) @@ -436,6 +437,7 @@ for i in `seq 0 9`; do ovs-appctl time/warp 500; done # Test-6 BFD decay: set decay_min_rx to 0 to disable bfd decay. AT_CHECK([ovs-vsctl set Interface p0 bfd:decay_min_rx=0]) +ovs-appctl time/warp 500 # min_rx is reset, and there should be the poll sequence flags. BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [final], [up], [No Diagnostic]) BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [poll], [up], [No Diagnostic]) @@ -472,6 +474,7 @@ do done # reset the p1's min_tx to 500ms. AT_CHECK([ovs-vsctl set Interface p1 bfd:min_tx=500]) +ovs-appctl time/warp 500 # check the poll sequence. since p0 has been in decay, now the RX will show 3000ms. BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [final], [up], [No Diagnostic]) BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [poll], [up], [No Diagnostic]) @@ -686,6 +689,8 @@ done # reconfigure the decay_min_rx to 1000ms. check the poll sequence. AT_CHECK([ovs-vsctl set interface p0 bfd:decay_min_rx=1000]) +# advance the clock by tiny little bit to wakeup the monitor thread. +ovs-appctl time/warp 1 BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [final], [up], [No Diagnostic]) BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [poll], [up], [No Diagnostic]) BFD_CHECK_TX([p0], [500ms], [300ms], [500ms]) diff --git a/tests/ofproto-dpif.at b/tests/ofproto-dpif.at index f67c3ab..c2c0b24 100644 --- a/tests/ofproto-dpif.at +++ b/tests/ofproto-dpif.at @@ -2816,3 +2816,61 @@ AT_CHECK([ovs-appctl bond/show | sed -n '/^.*may_enable:.*/p'], [0], [dnl OVS_VSWITCHD_STOP AT_CLEANUP + +AT_SETUP([ofproto-dpif - ofproto-dpif-monitor]) +OVS_VSWITCHD_START([add-port br0 p0 -- set interface p0 type=gre options:remote_ip=1.2.3.4]) + +# enable bfd on p0. +AT_CHECK([ovs-vsctl set interface p0 bfd:enable=true]) +# check log. +AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(ofproto_dpif_monitor)|INFO|\(.* created\)$/\1/p" ovs-vswitchd.log], [0], [dnl +ofproto_dpif_monitor thread created +]) +# disable bfd on p0. +AT_CHECK([ovs-vsctl set interface p0 bfd:enable=false]) +# check log. +AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(ofproto_dpif_monitor)|INFO|\(.* terminated\)$/\1/p" ovs-vswitchd.log], [0], [dnl +ofproto_dpif_monitor thread terminated +]) +AT_CHECK([cat ovs-vswitchd.log | sed -e '/^.*ofproto_dpif_monitor.*$/d' > ovs-vswitchd.log]) + +# enable cfm on p0. +AT_CHECK([ovs-vsctl set interface p0 cfm_mpid=10]) +# check log. +AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(ofproto_dpif_monitor)|INFO|\(.* created\)$/\1/p" ovs-vswitchd.log], [0], [dnl +ofproto_dpif_monitor thread created +]) +# disable cfm on p0. +AT_CHECK([ovs-vsctl remove interface p0 cfm_mpid 10]) +# check log. +AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(ofproto_dpif_monitor)|INFO|\(.* terminated\)$/\1/p" ovs-vswitchd.log], [0], [dnl +ofproto_dpif_monitor thread terminated +]) +AT_CHECK([cat ovs-vswitchd.log | sed -e '/^.*ofproto_dpif_monitor.*$/d' > ovs-vswitchd.log]) + +# enable both bfd and cfm on p0. +AT_CHECK([ovs-vsctl set interface p0 bfd:enable=true cfm_mpid=10]) +# check log. +AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(ofproto_dpif_monitor)|INFO|\(.* created\)$/\1/p" ovs-vswitchd.log], [0], [dnl +ofproto_dpif_monitor thread created +]) +# disable bfd on p0. +AT_CHECK([ovs-vsctl set interface p0 bfd:enable=false]) +# check log, there should not be the log of thread terminated. +AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(ofproto_dpif_monitor)|INFO|\(.* terminated\)$/\1/p" ovs-vswitchd.log], [0], [dnl +]) +# reenable bfd on p0. +AT_CHECK([ovs-vsctl set interface p0 bfd:enable=true]) +# check log, should still be on log of thread created. +AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(ofproto_dpif_monitor)|INFO|\(.* created\)$/\1/p" ovs-vswitchd.log], [0], [dnl +ofproto_dpif_monitor thread created +]) +# disable bfd and cfm together. +AT_CHECK([ovs-vsctl set interface p0 bfd:enable=false -- remove interface p0 cfm_mpid 10]) +# check log. +AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(ofproto_dpif_monitor)|INFO|\(.* terminated\)$/\1/p" ovs-vswitchd.log], [0], [dnl +ofproto_dpif_monitor thread terminated +]) + +OVS_VSWITCHD_STOP +AT_CLEANUP \ No newline at end of file -- 1.7.9.5 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev