So basically, you replace ovs_mutex_rwlock (or sth) into ovs_mutex_trylock
in the loop of "other tasks after some time processing the RX queues".
Is it ?

2016-03-24 11:54 GMT+08:00 Flavio Leitner <f...@redhat.com>:

> The PMD thread needs to keep processing RX queues in order
> archive maximum throughput.  However, it also needs to run
> other tasks after some time processing the RX queues which
> a mutex can block the PMD thread.  That causes latency
> spikes and affects the throughput.
>
> Convert to recursive mutex so that PMD thread can test first
> and if it gets the lock, continue as before, otherwise try
> again another time.  There is an additional logic to make
> sure the PMD thread will try harder as the attempt to get
> the mutex continues to fail.
>
> Co-authored-by: Karl Rister <kris...@redhat.com>
> Signed-off-by: Flavio Leitner <f...@redhat.com>
> ---
>  include/openvswitch/thread.h |  3 +++
>  lib/dpif-netdev.c            | 33 ++++++++++++++++++++++-----------
>  lib/seq.c                    | 15 ++++++++++++++-
>  lib/seq.h                    |  3 +++
>  4 files changed, 42 insertions(+), 12 deletions(-)
>
> diff --git a/include/openvswitch/thread.h b/include/openvswitch/thread.h
> index af6f2bb..6d20720 100644
> --- a/include/openvswitch/thread.h
> +++ b/include/openvswitch/thread.h
> @@ -44,6 +44,9 @@ struct OVS_LOCKABLE ovs_mutex {
>  #define OVS_ADAPTIVE_MUTEX_INITIALIZER OVS_MUTEX_INITIALIZER
>  #endif
>
> +#define OVS_RECURSIVE_MUTEX_INITIALIZER \
> +   { PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP, "<unlocked>" }
> +
>  /* ovs_mutex functions analogous to pthread_mutex_*() functions.
>   *
>   * Most of these functions abort the process with an error message on any
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 0f2385a..a10b2d1 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -2668,12 +2668,15 @@ static void *
>  pmd_thread_main(void *f_)
>  {
>      struct dp_netdev_pmd_thread *pmd = f_;
> -    unsigned int lc = 0;
> +    unsigned int lc_max = 1024;
> +    unsigned int lc_start;
> +    unsigned int lc;
>      struct rxq_poll *poll_list;
>      unsigned int port_seq = PMD_INITIAL_SEQ;
>      int poll_cnt;
>      int i;
>
> +    lc_start = 0;
>      poll_cnt = 0;
>      poll_list = NULL;
>
> @@ -2698,24 +2701,32 @@ reload:
>       * reloading the updated configuration. */
>      dp_netdev_pmd_reload_done(pmd);
>
> +    lc = lc_start;
>      for (;;) {
>          for (i = 0; i < poll_cnt; i++) {
>              dp_netdev_process_rxq_port(pmd, poll_list[i].port,
> poll_list[i].rx);
>          }
>
> -        if (lc++ > 1024) {
> -            unsigned int seq;
> +        if (lc++ > lc_max) {
> +            if (!seq_pmd_trylock()) {
> +                unsigned int seq;
> +                lc_start = 0;
> +                lc = 0;
>
> -            lc = 0;
> +                emc_cache_slow_sweep(&pmd->flow_cache);
> +                coverage_try_clear();
> +                ovsrcu_quiesce();
>
> -            emc_cache_slow_sweep(&pmd->flow_cache);
> -            coverage_try_clear();
> -            ovsrcu_quiesce();
> +                seq_pmd_unlock();
>
> -            atomic_read_relaxed(&pmd->change_seq, &seq);
> -            if (seq != port_seq) {
> -                port_seq = seq;
> -                break;
> +                atomic_read_relaxed(&pmd->change_seq, &seq);
> +                if (seq != port_seq) {
> +                    port_seq = seq;
> +                    break;
> +                }
> +            } else {
> +                lc_start += (lc_start + lc_max)/2;
> +                lc = lc_start;
>              }
>          }
>      }
> diff --git a/lib/seq.c b/lib/seq.c
> index 9c3257c..198b2ce 100644
> --- a/lib/seq.c
> +++ b/lib/seq.c
> @@ -55,7 +55,7 @@ struct seq_thread {
>      bool waiting OVS_GUARDED;        /* True if latch_wait() already
> called. */
>  };
>
> -static struct ovs_mutex seq_mutex = OVS_MUTEX_INITIALIZER;
> +static struct ovs_mutex seq_mutex = OVS_RECURSIVE_MUTEX_INITIALIZER;
>
>  static uint64_t seq_next OVS_GUARDED_BY(seq_mutex) = 1;
>
> @@ -68,6 +68,19 @@ static void seq_thread_woke(struct seq_thread *)
> OVS_REQUIRES(seq_mutex);
>  static void seq_waiter_destroy(struct seq_waiter *)
> OVS_REQUIRES(seq_mutex);
>  static void seq_wake_waiters(struct seq *) OVS_REQUIRES(seq_mutex);
>
> +
> +int seq_pmd_trylock(void)
> +     OVS_TRY_LOCK(0, seq_mutex)
> +{
> +  return ovs_mutex_trylock(&seq_mutex);
> +}
> +
> +void seq_pmd_unlock(void)
> +    OVS_RELEASES(seq_mutex)
> +{
> +  ovs_mutex_unlock(&seq_mutex);
> +}
> +
>  /* Creates and returns a new 'seq' object. */
>  struct seq * OVS_EXCLUDED(seq_mutex)
>  seq_create(void)
> diff --git a/lib/seq.h b/lib/seq.h
> index b0ec6bf..f6b6e1a 100644
> --- a/lib/seq.h
> +++ b/lib/seq.h
> @@ -117,6 +117,9 @@
>  #include <stdint.h>
>  #include "util.h"
>
> +int seq_pmd_trylock(void);
> +void seq_pmd_unlock(void);
> +
>  /* For implementation of an object with a sequence number attached. */
>  struct seq *seq_create(void);
>  void seq_destroy(struct seq *);
> --
> 2.5.0
>
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev
>
_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to