The PMD thread needs to keep processing RX queues in order
archive maximum throughput.  However, it also needs to run
other tasks after some time processing the RX queues which
a mutex can block the PMD thread.  That causes latency
spikes and affects the throughput.

Convert to recursive mutex so that PMD thread can test first
and if it gets the lock, continue as before, otherwise try
again another time.  There is an additional logic to make
sure the PMD thread will try harder as the attempt to get
the mutex continues to fail.

Co-authored-by: Karl Rister <kris...@redhat.com>
Signed-off-by: Flavio Leitner <f...@redhat.com>
---
 include/openvswitch/thread.h |  3 +++
 lib/dpif-netdev.c            | 33 ++++++++++++++++++++++-----------
 lib/seq.c                    | 15 ++++++++++++++-
 lib/seq.h                    |  3 +++
 4 files changed, 42 insertions(+), 12 deletions(-)

diff --git a/include/openvswitch/thread.h b/include/openvswitch/thread.h
index af6f2bb..6d20720 100644
--- a/include/openvswitch/thread.h
+++ b/include/openvswitch/thread.h
@@ -44,6 +44,9 @@ struct OVS_LOCKABLE ovs_mutex {
 #define OVS_ADAPTIVE_MUTEX_INITIALIZER OVS_MUTEX_INITIALIZER
 #endif
 
+#define OVS_RECURSIVE_MUTEX_INITIALIZER \
+   { PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP, "<unlocked>" }
+
 /* ovs_mutex functions analogous to pthread_mutex_*() functions.
  *
  * Most of these functions abort the process with an error message on any
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 0f2385a..a10b2d1 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -2668,12 +2668,15 @@ static void *
 pmd_thread_main(void *f_)
 {
     struct dp_netdev_pmd_thread *pmd = f_;
-    unsigned int lc = 0;
+    unsigned int lc_max = 1024;
+    unsigned int lc_start;
+    unsigned int lc;
     struct rxq_poll *poll_list;
     unsigned int port_seq = PMD_INITIAL_SEQ;
     int poll_cnt;
     int i;
 
+    lc_start = 0;
     poll_cnt = 0;
     poll_list = NULL;
 
@@ -2698,24 +2701,32 @@ reload:
      * reloading the updated configuration. */
     dp_netdev_pmd_reload_done(pmd);
 
+    lc = lc_start;
     for (;;) {
         for (i = 0; i < poll_cnt; i++) {
             dp_netdev_process_rxq_port(pmd, poll_list[i].port, 
poll_list[i].rx);
         }
 
-        if (lc++ > 1024) {
-            unsigned int seq;
+        if (lc++ > lc_max) {
+            if (!seq_pmd_trylock()) {
+                unsigned int seq;
+                lc_start = 0;
+                lc = 0;
 
-            lc = 0;
+                emc_cache_slow_sweep(&pmd->flow_cache);
+                coverage_try_clear();
+                ovsrcu_quiesce();
 
-            emc_cache_slow_sweep(&pmd->flow_cache);
-            coverage_try_clear();
-            ovsrcu_quiesce();
+                seq_pmd_unlock();
 
-            atomic_read_relaxed(&pmd->change_seq, &seq);
-            if (seq != port_seq) {
-                port_seq = seq;
-                break;
+                atomic_read_relaxed(&pmd->change_seq, &seq);
+                if (seq != port_seq) {
+                    port_seq = seq;
+                    break;
+                }
+            } else {
+                lc_start += (lc_start + lc_max)/2;
+                lc = lc_start;
             }
         }
     }
diff --git a/lib/seq.c b/lib/seq.c
index 9c3257c..198b2ce 100644
--- a/lib/seq.c
+++ b/lib/seq.c
@@ -55,7 +55,7 @@ struct seq_thread {
     bool waiting OVS_GUARDED;        /* True if latch_wait() already called. */
 };
 
-static struct ovs_mutex seq_mutex = OVS_MUTEX_INITIALIZER;
+static struct ovs_mutex seq_mutex = OVS_RECURSIVE_MUTEX_INITIALIZER;
 
 static uint64_t seq_next OVS_GUARDED_BY(seq_mutex) = 1;
 
@@ -68,6 +68,19 @@ static void seq_thread_woke(struct seq_thread *) 
OVS_REQUIRES(seq_mutex);
 static void seq_waiter_destroy(struct seq_waiter *) OVS_REQUIRES(seq_mutex);
 static void seq_wake_waiters(struct seq *) OVS_REQUIRES(seq_mutex);
 
+
+int seq_pmd_trylock(void)
+     OVS_TRY_LOCK(0, seq_mutex)
+{
+  return ovs_mutex_trylock(&seq_mutex);
+}
+
+void seq_pmd_unlock(void)
+    OVS_RELEASES(seq_mutex)
+{
+  ovs_mutex_unlock(&seq_mutex);
+}
+
 /* Creates and returns a new 'seq' object. */
 struct seq * OVS_EXCLUDED(seq_mutex)
 seq_create(void)
diff --git a/lib/seq.h b/lib/seq.h
index b0ec6bf..f6b6e1a 100644
--- a/lib/seq.h
+++ b/lib/seq.h
@@ -117,6 +117,9 @@
 #include <stdint.h>
 #include "util.h"
 
+int seq_pmd_trylock(void);
+void seq_pmd_unlock(void);
+
 /* For implementation of an object with a sequence number attached. */
 struct seq *seq_create(void);
 void seq_destroy(struct seq *);
-- 
2.5.0

_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to