Current rx queue management model is buggy and will not work properly
without additional barriers and other syncronization between PMD
threads and main thread.

Known BUGS of current model:
        * While reloading, two PMD threads, one already reloaded and
          one not yet reloaded, can poll same queue of the same port.
          This behavior may lead to dpdk driver failure, because they
          are not thread-safe.
        * Same bug as fixed in commit e4e74c3a2b
          ("dpif-netdev: Purge all ukeys when reconfigure pmd.") but
          reproduced while only reconfiguring of pmd threads without
          restarting, because addition may change the sequence of
          other ports, which is important in time of reconfiguration.

Introducing the new model, where distribution of queues made by main
thread with minimal synchronizations and without data races between
pmd threads. Also, this model should work faster, because only
needed threads will be interrupted for reconfiguraition and total
computational complexity of reconfiguration is less.

Signed-off-by: Ilya Maximets <i.maxim...@samsung.com>
---
 lib/dpif-netdev.c | 245 ++++++++++++++++++++++++++++++++----------------------
 1 file changed, 147 insertions(+), 98 deletions(-)

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index cd72e62..f055c6e 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -372,6 +372,13 @@ struct dp_netdev_pmd_cycles {
     atomic_ullong n[PMD_N_CYCLES];
 };
 
+/* Contained by struct dp_netdev_pmd_thread's 'poll_list' member. */
+struct rxq_poll {
+    struct dp_netdev_port *port;
+    struct netdev_rxq *rx;
+    struct ovs_list node;
+};
+
 /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
  * the performance overhead of interrupt processing.  Therefore netdev can
  * not implement rx-wait for these devices.  dpif-netdev needs to poll
@@ -393,9 +400,6 @@ struct dp_netdev_pmd_thread {
     struct ovs_refcount ref_cnt;    /* Every reference must be refcount'ed. */
     struct cmap_node node;          /* In 'dp->poll_threads'. */
 
-    pthread_cond_t cond;            /* For synchronizing pmd thread reload. */
-    struct ovs_mutex cond_mutex;    /* Mutex for condition variable. */
-
     /* Per thread exact-match cache.  Note, the instance for cpu core
      * NON_PMD_CORE_ID can be accessed by multiple threads, and thusly
      * need to be protected (e.g. by 'dp_netdev_mutex').  All other
@@ -430,6 +434,11 @@ struct dp_netdev_pmd_thread {
     int tx_qid;                     /* Queue id used by this pmd thread to
                                      * send packets on all netdevs */
 
+    struct ovs_mutex poll_mutex;    /* Mutex for poll_list. */
+    /* List of rx queues to poll. */
+    struct ovs_list poll_list OVS_GUARDED;
+    int poll_cnt;                   /* Number of elemints in poll_list. */
+
     /* Only a pmd thread can write on its own 'cycles' and 'stats'.
      * The main thread keeps 'stats_zero' and 'cycles_zero' as base
      * values and subtracts them from 'stats' and 'cycles' before
@@ -469,7 +478,6 @@ static void dp_netdev_input(struct dp_netdev_pmd_thread *,
                             struct dp_packet **, int cnt);
 
 static void dp_netdev_disable_upcall(struct dp_netdev *);
-void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd);
 static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
                                     struct dp_netdev *dp, int index,
                                     unsigned core_id, int numa_id);
@@ -482,6 +490,11 @@ dp_netdev_pmd_get_next(struct dp_netdev *dp, struct 
cmap_position *pos);
 static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp);
 static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id);
 static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id);
+static void
+dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
+                         struct dp_netdev_port *port, struct netdev_rxq *rx);
+static struct dp_netdev_pmd_thread *
+dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id);
 static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp);
 static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
 static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd);
@@ -1019,22 +1032,7 @@ dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread *pmd)
         return;
     }
 
-    ovs_mutex_lock(&pmd->cond_mutex);
     atomic_add_relaxed(&pmd->change_seq, 1, &old_seq);
-    ovs_mutex_cond_wait(&pmd->cond, &pmd->cond_mutex);
-    ovs_mutex_unlock(&pmd->cond_mutex);
-}
-
-/* Causes all pmd threads to reload its tx/rx devices.
- * Must be called after adding/removing ports. */
-static void
-dp_netdev_reload_pmds(struct dp_netdev *dp)
-{
-    struct dp_netdev_pmd_thread *pmd;
-
-    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-        dp_netdev_reload_pmd__(pmd);
-    }
 }
 
 static uint32_t
@@ -1128,8 +1126,26 @@ do_add_port(struct dp_netdev *dp, const char *devname, 
const char *type,
     cmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
 
     if (netdev_is_pmd(netdev)) {
-        dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev));
-        dp_netdev_reload_pmds(dp);
+        int numa_id = netdev_get_numa_id(netdev);
+        struct dp_netdev_pmd_thread *pmd;
+
+        /* Cannot create pmd threads for invalid numa node. */
+        ovs_assert(ovs_numa_numa_id_is_valid(numa_id));
+
+        for (i = 0; i < netdev_n_rxq(netdev); i++) {
+            pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id);
+            if (!pmd) {
+                /* There is no pmd threads on this numa node. */
+                dp_netdev_set_pmds_on_numa(dp, numa_id);
+                /* Assigning of rx queues done. */
+                break;
+            }
+
+            ovs_mutex_lock(&pmd->poll_mutex);
+            dp_netdev_add_rxq_to_pmd(pmd, port, port->rxq[i]);
+            ovs_mutex_unlock(&pmd->poll_mutex);
+            dp_netdev_reload_pmd__(pmd);
+        }
     }
     seq_change(dp->port_seq);
 
@@ -1226,16 +1242,6 @@ port_ref(struct dp_netdev_port *port)
     }
 }
 
-static bool
-port_try_ref(struct dp_netdev_port *port)
-{
-    if (port) {
-        return ovs_refcount_try_ref_rcu(&port->ref_cnt);
-    }
-
-    return false;
-}
-
 static void
 port_unref(struct dp_netdev_port *port)
 {
@@ -1313,12 +1319,38 @@ do_del_port(struct dp_netdev *dp, struct dp_netdev_port 
*port)
     if (netdev_is_pmd(port->netdev)) {
         int numa_id = netdev_get_numa_id(port->netdev);
 
+        /* PMD threads can not be on invalid numa node. */
+        ovs_assert(ovs_numa_numa_id_is_valid(numa_id));
         /* If there is no netdev on the numa node, deletes the pmd threads
-         * for that numa.  Else, just reloads the queues.  */
+         * for that numa.  Else, deletes the queues from polling lists. */
         if (!has_pmd_port_for_numa(dp, numa_id)) {
             dp_netdev_del_pmds_on_numa(dp, numa_id);
         }
-        dp_netdev_reload_pmds(dp);
+        else {
+            struct dp_netdev_pmd_thread *pmd;
+            struct rxq_poll *poll, *next;
+
+            CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+                if (pmd->numa_id == numa_id) {
+                    bool found = false;
+
+                    ovs_mutex_lock(&pmd->poll_mutex);
+                    LIST_FOR_EACH_SAFE (poll, next, node, &pmd->poll_list) {
+                        if (poll->port == port) {
+                            found = true;
+                            port_unref(poll->port);
+                            list_remove(&poll->node);
+                            pmd->poll_cnt--;
+                            free(poll);
+                        }
+                    }
+                    ovs_mutex_unlock(&pmd->poll_mutex);
+                    if (found) {
+                        dp_netdev_reload_pmd__(pmd);
+                    }
+                }
+            }
+        }
     }
 
     port_unref(port);
@@ -2583,56 +2615,29 @@ dpif_netdev_wait(struct dpif *dpif)
     seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq);
 }
 
-struct rxq_poll {
-    struct dp_netdev_port *port;
-    struct netdev_rxq *rx;
-};
-
 static int
 pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
                 struct rxq_poll **ppoll_list, int poll_cnt)
+    OVS_REQUIRES(pmd->poll_mutex)
 {
     struct rxq_poll *poll_list = *ppoll_list;
-    struct dp_netdev_port *port;
-    int n_pmds_on_numa, index, i;
+    struct rxq_poll *poll;
+    int i;
 
-    /* Simple scheduler for netdev rx polling. */
     for (i = 0; i < poll_cnt; i++) {
         port_unref(poll_list[i].port);
     }
 
-    poll_cnt = 0;
-    n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id);
-    index = 0;
-
-    CMAP_FOR_EACH (port, node, &pmd->dp->ports) {
-        /* Calls port_try_ref() to prevent the main thread
-         * from deleting the port. */
-        if (port_try_ref(port)) {
-            if (netdev_is_pmd(port->netdev)
-                && netdev_get_numa_id(port->netdev) == pmd->numa_id) {
-                int i;
+    poll_list = xrealloc(poll_list, pmd->poll_cnt * sizeof *poll_list);
 
-                for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
-                    if ((index % n_pmds_on_numa) == pmd->index) {
-                        poll_list = xrealloc(poll_list,
-                                        sizeof *poll_list * (poll_cnt + 1));
-
-                        port_ref(port);
-                        poll_list[poll_cnt].port = port;
-                        poll_list[poll_cnt].rx = port->rxq[i];
-                        poll_cnt++;
-                    }
-                    index++;
-                }
-            }
-            /* Unrefs the port_try_ref(). */
-            port_unref(port);
-        }
+    i = 0;
+    LIST_FOR_EACH (poll, node, &pmd->poll_list) {
+        port_ref(poll->port);
+        poll_list[i++] = *poll;
     }
 
     *ppoll_list = poll_list;
-    return poll_cnt;
+    return pmd->poll_cnt;
 }
 
 static void *
@@ -2653,20 +2658,18 @@ pmd_thread_main(void *f_)
     pmd_thread_setaffinity_cpu(pmd->core_id);
 reload:
     emc_cache_init(&pmd->flow_cache);
+
+    ovs_mutex_lock(&pmd->poll_mutex);
     poll_cnt = pmd_load_queues(pmd, &poll_list, poll_cnt);
+    ovs_mutex_unlock(&pmd->poll_mutex);
 
     /* List port/core affinity */
     for (i = 0; i < poll_cnt; i++) {
-       VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id, 
netdev_get_name(poll_list[i].port->netdev));
+       VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id,
+                 netdev_get_name(poll_list[i].port->netdev));
     }
 
-    /* Signal here to make sure the pmd finishes
-     * reloading the updated configuration. */
-    dp_netdev_pmd_reload_done(pmd);
-
     for (;;) {
-        int i;
-
         for (i = 0; i < poll_cnt; i++) {
             dp_netdev_process_rxq_port(pmd, poll_list[i].port, 
poll_list[i].rx);
         }
@@ -2695,11 +2698,9 @@ reload:
     }
 
     for (i = 0; i < poll_cnt; i++) {
-         port_unref(poll_list[i].port);
+        port_unref(poll_list[i].port);
     }
 
-    dp_netdev_pmd_reload_done(pmd);
-
     free(poll_list);
     return NULL;
 }
@@ -2734,14 +2735,6 @@ dpif_netdev_enable_upcall(struct dpif *dpif)
     dp_netdev_enable_upcall(dp);
 }
 
-void
-dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd)
-{
-    ovs_mutex_lock(&pmd->cond_mutex);
-    xpthread_cond_signal(&pmd->cond);
-    ovs_mutex_unlock(&pmd->cond_mutex);
-}
-
 /* Finds and refs the dp_netdev_pmd_thread on core 'core_id'.  Returns
  * the pointer if succeeds, otherwise, NULL.
  *
@@ -2827,15 +2820,16 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread 
*pmd, struct dp_netdev *dp,
     pmd->core_id = core_id;
     pmd->tx_qid = core_id_to_qid(core_id);
     pmd->numa_id = numa_id;
+    pmd->poll_cnt = 0;
 
     ovs_refcount_init(&pmd->ref_cnt);
     latch_init(&pmd->exit_latch);
     atomic_init(&pmd->change_seq, PMD_INITIAL_SEQ);
-    xpthread_cond_init(&pmd->cond, NULL);
-    ovs_mutex_init(&pmd->cond_mutex);
     ovs_mutex_init(&pmd->flow_mutex);
+    ovs_mutex_init(&pmd->poll_mutex);
     dpcls_init(&pmd->cls);
     cmap_init(&pmd->flow_table);
+    list_init(&pmd->poll_list);
     /* init the 'flow_cache' since there is no
      * actual thread created for NON_PMD_CORE_ID. */
     if (core_id == NON_PMD_CORE_ID) {
@@ -2853,8 +2847,7 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd)
     cmap_destroy(&pmd->flow_table);
     ovs_mutex_destroy(&pmd->flow_mutex);
     latch_destroy(&pmd->exit_latch);
-    xpthread_cond_destroy(&pmd->cond);
-    ovs_mutex_destroy(&pmd->cond_mutex);
+    ovs_mutex_destroy(&pmd->poll_mutex);
     free(pmd);
 }
 
@@ -2863,6 +2856,8 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd)
 static void
 dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
 {
+    struct rxq_poll *poll;
+
     /* Uninit the 'flow_cache' since there is
      * no actual thread uninit it for NON_PMD_CORE_ID. */
     if (pmd->core_id == NON_PMD_CORE_ID) {
@@ -2873,6 +2868,13 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct 
dp_netdev_pmd_thread *pmd)
         ovs_numa_unpin_core(pmd->core_id);
         xpthread_join(pmd->thread, NULL);
     }
+
+    /* Unref all ports and free poll_list. */
+    LIST_FOR_EACH_POP (poll, node, &pmd->poll_list) {
+        port_unref(poll->port);
+        free(poll);
+    }
+
     /* Purges the 'pmd''s flows after stopping the thread, but before
      * destroying the flows, so that the flow stats can be collected. */
     if (dp->dp_purge_cb) {
@@ -2906,6 +2908,42 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int 
numa_id)
     }
 }
 
+/* Returns PMD thread from this numa node with fewer rx queues to poll.
+ * Returns NULL if there is no PMD threads on this numa node.
+ * Can be called safely only by main thread. */
+static struct dp_netdev_pmd_thread *
+dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)
+{
+    int min_cnt = -1;
+    struct dp_netdev_pmd_thread *pmd, *res = NULL;
+
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        if (pmd->numa_id == numa_id
+            && (min_cnt > pmd->poll_cnt || res == NULL)) {
+            min_cnt = pmd->poll_cnt;
+            res = pmd;
+        }
+    }
+
+    return res;
+}
+
+/* Adds rx queue to poll_list of PMD thread. */
+static void
+dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
+                         struct dp_netdev_port *port, struct netdev_rxq *rx)
+    OVS_REQUIRES(pmd->poll_mutex)
+{
+    struct rxq_poll *poll = xmalloc(sizeof *poll);
+
+    port_ref(port);
+    poll->port = port;
+    poll->rx = rx;
+
+    list_push_back(&pmd->poll_list, &poll->node);
+    pmd->poll_cnt++;
+}
+
 /* Checks the numa node id of 'netdev' and starts pmd threads for
  * the numa node. */
 static void
@@ -2925,8 +2963,9 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int 
numa_id)
      * in which 'netdev' is on, do nothing.  Else, creates the
      * pmd threads for the numa node. */
     if (!n_pmds) {
-        int can_have, n_unpinned, i;
+        int can_have, n_unpinned, i, index = 0;
         struct dp_netdev_pmd_thread **pmds;
+        struct dp_netdev_port *port;
 
         n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id);
         if (!n_unpinned) {
@@ -2944,13 +2983,23 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int 
numa_id)
             pmds[i] = xzalloc(sizeof **pmds);
             dp_netdev_configure_pmd(pmds[i], dp, i, core_id, numa_id);
         }
-        /* The pmd thread code needs to see all the others configured pmd
-         * threads on the same numa node.  That's why we call
-         * 'dp_netdev_configure_pmd()' on all the threads and then we actually
-         * start them. */
+
+        /* Distributes rx queues of this numa node between new pmd threads. */
+        CMAP_FOR_EACH (port, node, &dp->ports) {
+            if (netdev_is_pmd(port->netdev)
+                && netdev_get_numa_id(port->netdev) == numa_id) {
+                for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
+                    /* Make thread-safety analyser happy. */
+                    ovs_mutex_lock(&pmds[index]->poll_mutex);
+                    dp_netdev_add_rxq_to_pmd(pmds[index], port, port->rxq[i]);
+                    ovs_mutex_unlock(&pmds[index]->poll_mutex);
+                    index = (index + 1) % can_have;
+                }
+            }
+        }
+
+        /* Actual start of pmd threads. */
         for (i = 0; i < can_have; i++) {
-            /* Each thread will distribute all devices rx-queues among
-             * themselves. */
             pmds[i]->thread = ovs_thread_create("pmd", pmd_thread_main, 
pmds[i]);
         }
         free(pmds);
-- 
2.5.0

_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to