Thanks for all your work!

I applied this to master and branch-2.5

On 25/01/2016 22:12, "Ilya Maximets" <i.maxim...@samsung.com> wrote:

>Current rx queue management model is buggy and will not work properly
>without additional barriers and other syncronization between PMD
>threads and main thread.
>
>Known BUGS of current model:
>       * While reloading, two PMD threads, one already reloaded and
>         one not yet reloaded, can poll same queue of the same port.
>         This behavior may lead to dpdk driver failure, because they
>         are not thread-safe.
>       * Same bug as fixed in commit e4e74c3a2b
>         ("dpif-netdev: Purge all ukeys when reconfigure pmd.") but
>         reproduced while only reconfiguring of pmd threads without
>         restarting, because addition may change the sequence of
>         other ports, which is important in time of reconfiguration.
>
>Introducing the new model, where distribution of queues made by main
>thread with minimal synchronizations and without data races between
>pmd threads. Also, this model should work faster, because only
>needed threads will be interrupted for reconfiguraition and total
>computational complexity of reconfiguration is less.
>
>Signed-off-by: Ilya Maximets <i.maxim...@samsung.com>
>---
> lib/dpif-netdev.c | 223
>++++++++++++++++++++++++++++++++++++------------------
> 1 file changed, 148 insertions(+), 75 deletions(-)
>
>diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
>index cd72e62..8c87c05 100644
>--- a/lib/dpif-netdev.c
>+++ b/lib/dpif-netdev.c
>@@ -372,6 +372,13 @@ struct dp_netdev_pmd_cycles {
>     atomic_ullong n[PMD_N_CYCLES];
> };
> 
>+/* Contained by struct dp_netdev_pmd_thread's 'poll_list' member. */
>+struct rxq_poll {
>+    struct dp_netdev_port *port;
>+    struct netdev_rxq *rx;
>+    struct ovs_list node;
>+};
>+
> /* PMD: Poll modes drivers.  PMD accesses devices via polling to
>eliminate
>  * the performance overhead of interrupt processing.  Therefore netdev
>can
>  * not implement rx-wait for these devices.  dpif-netdev needs to poll
>@@ -430,6 +437,11 @@ struct dp_netdev_pmd_thread {
>     int tx_qid;                     /* Queue id used by this pmd thread
>to
>                                      * send packets on all netdevs */
> 
>+    struct ovs_mutex poll_mutex;    /* Mutex for poll_list. */
>+    /* List of rx queues to poll. */
>+    struct ovs_list poll_list OVS_GUARDED;
>+    int poll_cnt;                   /* Number of elemints in poll_list.
>*/
>+
>     /* Only a pmd thread can write on its own 'cycles' and 'stats'.
>      * The main thread keeps 'stats_zero' and 'cycles_zero' as base
>      * values and subtracts them from 'stats' and 'cycles' before
>@@ -469,7 +481,7 @@ static void dp_netdev_input(struct
>dp_netdev_pmd_thread *,
>                             struct dp_packet **, int cnt);
> 
> static void dp_netdev_disable_upcall(struct dp_netdev *);
>-void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd);
>+static void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd);
> static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
>                                     struct dp_netdev *dp, int index,
>                                     unsigned core_id, int numa_id);
>@@ -482,6 +494,11 @@ dp_netdev_pmd_get_next(struct dp_netdev *dp, struct
>cmap_position *pos);
> static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp);
> static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int
>numa_id);
> static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int
>numa_id);
>+static void
>+dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
>+                         struct dp_netdev_port *port, struct netdev_rxq
>*rx);
>+static struct dp_netdev_pmd_thread *
>+dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id);
> static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp);
> static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
> static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd);
>@@ -1025,18 +1042,6 @@ dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread
>*pmd)
>     ovs_mutex_unlock(&pmd->cond_mutex);
> }
> 
>-/* Causes all pmd threads to reload its tx/rx devices.
>- * Must be called after adding/removing ports. */
>-static void
>-dp_netdev_reload_pmds(struct dp_netdev *dp)
>-{
>-    struct dp_netdev_pmd_thread *pmd;
>-
>-    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
>-        dp_netdev_reload_pmd__(pmd);
>-    }
>-}
>-
> static uint32_t
> hash_port_no(odp_port_t port_no)
> {
>@@ -1128,8 +1133,26 @@ do_add_port(struct dp_netdev *dp, const char
>*devname, const char *type,
>     cmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
> 
>     if (netdev_is_pmd(netdev)) {
>-        dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev));
>-        dp_netdev_reload_pmds(dp);
>+        int numa_id = netdev_get_numa_id(netdev);
>+        struct dp_netdev_pmd_thread *pmd;
>+
>+        /* Cannot create pmd threads for invalid numa node. */
>+        ovs_assert(ovs_numa_numa_id_is_valid(numa_id));
>+
>+        for (i = 0; i < netdev_n_rxq(netdev); i++) {
>+            pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id);
>+            if (!pmd) {
>+                /* There is no pmd threads on this numa node. */
>+                dp_netdev_set_pmds_on_numa(dp, numa_id);
>+                /* Assigning of rx queues done. */
>+                break;
>+            }
>+
>+            ovs_mutex_lock(&pmd->poll_mutex);
>+            dp_netdev_add_rxq_to_pmd(pmd, port, port->rxq[i]);
>+            ovs_mutex_unlock(&pmd->poll_mutex);
>+            dp_netdev_reload_pmd__(pmd);
>+        }
>     }
>     seq_change(dp->port_seq);
> 
>@@ -1226,16 +1249,6 @@ port_ref(struct dp_netdev_port *port)
>     }
> }
> 
>-static bool
>-port_try_ref(struct dp_netdev_port *port)
>-{
>-    if (port) {
>-        return ovs_refcount_try_ref_rcu(&port->ref_cnt);
>-    }
>-
>-    return false;
>-}
>-
> static void
> port_unref(struct dp_netdev_port *port)
> {
>@@ -1313,12 +1326,37 @@ do_del_port(struct dp_netdev *dp, struct
>dp_netdev_port *port)
>     if (netdev_is_pmd(port->netdev)) {
>         int numa_id = netdev_get_numa_id(port->netdev);
> 
>+        /* PMD threads can not be on invalid numa node. */
>+        ovs_assert(ovs_numa_numa_id_is_valid(numa_id));
>         /* If there is no netdev on the numa node, deletes the pmd
>threads
>-         * for that numa.  Else, just reloads the queues.  */
>+         * for that numa.  Else, deletes the queues from polling lists.
>*/
>         if (!has_pmd_port_for_numa(dp, numa_id)) {
>             dp_netdev_del_pmds_on_numa(dp, numa_id);
>+        } else {
>+            struct dp_netdev_pmd_thread *pmd;
>+            struct rxq_poll *poll, *next;
>+
>+            CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
>+                if (pmd->numa_id == numa_id) {
>+                    bool found = false;
>+
>+                    ovs_mutex_lock(&pmd->poll_mutex);
>+                    LIST_FOR_EACH_SAFE (poll, next, node,
>&pmd->poll_list) {
>+                        if (poll->port == port) {
>+                            found = true;
>+                            port_unref(poll->port);
>+                            list_remove(&poll->node);
>+                            pmd->poll_cnt--;
>+                            free(poll);
>+                        }
>+                    }
>+                    ovs_mutex_unlock(&pmd->poll_mutex);
>+                    if (found) {
>+                        dp_netdev_reload_pmd__(pmd);
>+                    }
>+                }
>+            }
>         }
>-        dp_netdev_reload_pmds(dp);
>     }
> 
>     port_unref(port);
>@@ -2583,56 +2621,29 @@ dpif_netdev_wait(struct dpif *dpif)
>     seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq);
> }
> 
>-struct rxq_poll {
>-    struct dp_netdev_port *port;
>-    struct netdev_rxq *rx;
>-};
>-
> static int
> pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
>                 struct rxq_poll **ppoll_list, int poll_cnt)
>+    OVS_REQUIRES(pmd->poll_mutex)
> {
>     struct rxq_poll *poll_list = *ppoll_list;
>-    struct dp_netdev_port *port;
>-    int n_pmds_on_numa, index, i;
>+    struct rxq_poll *poll;
>+    int i;
> 
>-    /* Simple scheduler for netdev rx polling. */
>     for (i = 0; i < poll_cnt; i++) {
>         port_unref(poll_list[i].port);
>     }
> 
>-    poll_cnt = 0;
>-    n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id);
>-    index = 0;
>-
>-    CMAP_FOR_EACH (port, node, &pmd->dp->ports) {
>-        /* Calls port_try_ref() to prevent the main thread
>-         * from deleting the port. */
>-        if (port_try_ref(port)) {
>-            if (netdev_is_pmd(port->netdev)
>-                && netdev_get_numa_id(port->netdev) == pmd->numa_id) {
>-                int i;
>+    poll_list = xrealloc(poll_list, pmd->poll_cnt * sizeof *poll_list);
> 
>-                for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
>-                    if ((index % n_pmds_on_numa) == pmd->index) {
>-                        poll_list = xrealloc(poll_list,
>-                                        sizeof *poll_list * (poll_cnt +
>1));
>-
>-                        port_ref(port);
>-                        poll_list[poll_cnt].port = port;
>-                        poll_list[poll_cnt].rx = port->rxq[i];
>-                        poll_cnt++;
>-                    }
>-                    index++;
>-                }
>-            }
>-            /* Unrefs the port_try_ref(). */
>-            port_unref(port);
>-        }
>+    i = 0;
>+    LIST_FOR_EACH (poll, node, &pmd->poll_list) {
>+        port_ref(poll->port);
>+        poll_list[i++] = *poll;
>     }
> 
>     *ppoll_list = poll_list;
>-    return poll_cnt;
>+    return pmd->poll_cnt;
> }
> 
> static void *
>@@ -2653,11 +2664,15 @@ pmd_thread_main(void *f_)
>     pmd_thread_setaffinity_cpu(pmd->core_id);
> reload:
>     emc_cache_init(&pmd->flow_cache);
>+
>+    ovs_mutex_lock(&pmd->poll_mutex);
>     poll_cnt = pmd_load_queues(pmd, &poll_list, poll_cnt);
>+    ovs_mutex_unlock(&pmd->poll_mutex);
> 
>     /* List port/core affinity */
>     for (i = 0; i < poll_cnt; i++) {
>-       VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id,
>netdev_get_name(poll_list[i].port->netdev));
>+       VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id,
>+                 netdev_get_name(poll_list[i].port->netdev));
>     }
> 
>     /* Signal here to make sure the pmd finishes
>@@ -2665,8 +2680,6 @@ reload:
>     dp_netdev_pmd_reload_done(pmd);
> 
>     for (;;) {
>-        int i;
>-
>         for (i = 0; i < poll_cnt; i++) {
>             dp_netdev_process_rxq_port(pmd, poll_list[i].port,
>poll_list[i].rx);
>         }
>@@ -2695,7 +2708,7 @@ reload:
>     }
> 
>     for (i = 0; i < poll_cnt; i++) {
>-         port_unref(poll_list[i].port);
>+        port_unref(poll_list[i].port);
>     }
> 
>     dp_netdev_pmd_reload_done(pmd);
>@@ -2734,7 +2747,7 @@ dpif_netdev_enable_upcall(struct dpif *dpif)
>     dp_netdev_enable_upcall(dp);
> }
> 
>-void
>+static void
> dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd)
> {
>     ovs_mutex_lock(&pmd->cond_mutex);
>@@ -2827,6 +2840,7 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread
>*pmd, struct dp_netdev *dp,
>     pmd->core_id = core_id;
>     pmd->tx_qid = core_id_to_qid(core_id);
>     pmd->numa_id = numa_id;
>+    pmd->poll_cnt = 0;
> 
>     ovs_refcount_init(&pmd->ref_cnt);
>     latch_init(&pmd->exit_latch);
>@@ -2834,8 +2848,10 @@ dp_netdev_configure_pmd(struct
>dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>     xpthread_cond_init(&pmd->cond, NULL);
>     ovs_mutex_init(&pmd->cond_mutex);
>     ovs_mutex_init(&pmd->flow_mutex);
>+    ovs_mutex_init(&pmd->poll_mutex);
>     dpcls_init(&pmd->cls);
>     cmap_init(&pmd->flow_table);
>+    list_init(&pmd->poll_list);
>     /* init the 'flow_cache' since there is no
>      * actual thread created for NON_PMD_CORE_ID. */
>     if (core_id == NON_PMD_CORE_ID) {
>@@ -2855,6 +2871,7 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread
>*pmd)
>     latch_destroy(&pmd->exit_latch);
>     xpthread_cond_destroy(&pmd->cond);
>     ovs_mutex_destroy(&pmd->cond_mutex);
>+    ovs_mutex_destroy(&pmd->poll_mutex);
>     free(pmd);
> }
> 
>@@ -2863,6 +2880,8 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread
>*pmd)
> static void
> dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
> {
>+    struct rxq_poll *poll;
>+
>     /* Uninit the 'flow_cache' since there is
>      * no actual thread uninit it for NON_PMD_CORE_ID. */
>     if (pmd->core_id == NON_PMD_CORE_ID) {
>@@ -2873,6 +2892,13 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct
>dp_netdev_pmd_thread *pmd)
>         ovs_numa_unpin_core(pmd->core_id);
>         xpthread_join(pmd->thread, NULL);
>     }
>+
>+    /* Unref all ports and free poll_list. */
>+    LIST_FOR_EACH_POP (poll, node, &pmd->poll_list) {
>+        port_unref(poll->port);
>+        free(poll);
>+    }
>+
>     /* Purges the 'pmd''s flows after stopping the thread, but before
>      * destroying the flows, so that the flow stats can be collected. */
>     if (dp->dp_purge_cb) {
>@@ -2906,6 +2932,42 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp,
>int numa_id)
>     }
> }
> 
>+/* Returns PMD thread from this numa node with fewer rx queues to poll.
>+ * Returns NULL if there is no PMD threads on this numa node.
>+ * Can be called safely only by main thread. */
>+static struct dp_netdev_pmd_thread *
>+dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)
>+{
>+    int min_cnt = -1;
>+    struct dp_netdev_pmd_thread *pmd, *res = NULL;
>+
>+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
>+        if (pmd->numa_id == numa_id
>+            && (min_cnt > pmd->poll_cnt || res == NULL)) {
>+            min_cnt = pmd->poll_cnt;
>+            res = pmd;
>+        }
>+    }
>+
>+    return res;
>+}
>+
>+/* Adds rx queue to poll_list of PMD thread. */
>+static void
>+dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
>+                         struct dp_netdev_port *port, struct netdev_rxq
>*rx)
>+    OVS_REQUIRES(pmd->poll_mutex)
>+{
>+    struct rxq_poll *poll = xmalloc(sizeof *poll);
>+
>+    port_ref(port);
>+    poll->port = port;
>+    poll->rx = rx;
>+
>+    list_push_back(&pmd->poll_list, &poll->node);
>+    pmd->poll_cnt++;
>+}
>+
> /* Checks the numa node id of 'netdev' and starts pmd threads for
>  * the numa node. */
> static void
>@@ -2925,8 +2987,9 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp,
>int numa_id)
>      * in which 'netdev' is on, do nothing.  Else, creates the
>      * pmd threads for the numa node. */
>     if (!n_pmds) {
>-        int can_have, n_unpinned, i;
>+        int can_have, n_unpinned, i, index = 0;
>         struct dp_netdev_pmd_thread **pmds;
>+        struct dp_netdev_port *port;
> 
>         n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id);
>         if (!n_unpinned) {
>@@ -2944,13 +3007,23 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp,
>int numa_id)
>             pmds[i] = xzalloc(sizeof **pmds);
>             dp_netdev_configure_pmd(pmds[i], dp, i, core_id, numa_id);
>         }
>-        /* The pmd thread code needs to see all the others configured pmd
>-         * threads on the same numa node.  That's why we call
>-         * 'dp_netdev_configure_pmd()' on all the threads and then we
>actually
>-         * start them. */
>+
>+        /* Distributes rx queues of this numa node between new pmd
>threads. */
>+        CMAP_FOR_EACH (port, node, &dp->ports) {
>+            if (netdev_is_pmd(port->netdev)
>+                && netdev_get_numa_id(port->netdev) == numa_id) {
>+                for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
>+                    /* Make thread-safety analyser happy. */
>+                    ovs_mutex_lock(&pmds[index]->poll_mutex);
>+                    dp_netdev_add_rxq_to_pmd(pmds[index], port,
>port->rxq[i]);
>+                    ovs_mutex_unlock(&pmds[index]->poll_mutex);
>+                    index = (index + 1) % can_have;
>+                }
>+            }
>+        }
>+
>+        /* Actual start of pmd threads. */
>         for (i = 0; i < can_have; i++) {
>-            /* Each thread will distribute all devices rx-queues among
>-             * themselves. */
>             pmds[i]->thread = ovs_thread_create("pmd", pmd_thread_main,
>pmds[i]);
>         }
>         free(pmds);
>-- 
>2.5.0
>

_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to