Thanks for the patch. This is not a complete review, but I have some preliminary comments.
If I understand correctly 'port_mutex' is converted to rwlock because we want the pmd threads in dpif_netdev_xps_get_tx_qid() to be able to grab it concurrently. I think that we can add a pointer from 'struct tx_port' to 'struct dp_netdev_port' and access that without locking. As long as a port is in a pmd thread tx_port cache it cannot be deleted from the datapath. This way we can avoid the rwlock. 'last_cycles' is only used to monitor the performances of the pmd threads and it is always 0 if we compile without DPDK. Perhaps we can add a 'now' parameter to dp_netdev_execute_actions(), pass it from packet_batch_per_flow_execute() and use that instead. Maybe we can improve this in the future, but with this patch dpif-netdev calls netdev_send() taking into account n_txq, which is the real number of queue. Perhaps txq_needs_locking for phy devices should be stored in dpif-netdev and passed to every invocation of netdev_send()? Finally, have you thought about avoiding txq_used_mutex and using some form of atomic_compare_exchange() on the number of users, perhaps? I'm not sure it's better than the mutex, I just wanted to throw this here, in case someone comes up with a good idea. Thanks, Daniele On 11/07/2016 08:15, "Ilya Maximets" <i.maxim...@samsung.com> wrote: >If CPU number in pmd-cpu-mask is not divisible by the number of queues and >in a few more complex situations there may be unfair distribution of TX >queue-ids between PMD threads. > >For example, if we have 2 ports with 4 queues and 6 CPUs in pmd-cpu-mask >such distribution is possible: ><------------------------------------------------------------------------> >pmd thread numa_id 0 core_id 13: > port: vhost-user1 queue-id: 1 > port: dpdk0 queue-id: 3 >pmd thread numa_id 0 core_id 14: > port: vhost-user1 queue-id: 2 >pmd thread numa_id 0 core_id 16: > port: dpdk0 queue-id: 0 >pmd thread numa_id 0 core_id 17: > port: dpdk0 queue-id: 1 >pmd thread numa_id 0 core_id 12: > port: vhost-user1 queue-id: 0 > port: dpdk0 queue-id: 2 >pmd thread numa_id 0 core_id 15: > port: vhost-user1 queue-id: 3 ><------------------------------------------------------------------------> > >As we can see above dpdk0 port polled by threads on cores: > 12, 13, 16 and 17. > >By design of dpif-netdev, there is only one TX queue-id assigned to each >pmd thread. This queue-id's are sequential similar to core-id's. And >thread will send packets to queue with exact this queue-id regardless >of port. > >In previous example: > > pmd thread on core 12 will send packets to tx queue 0 > pmd thread on core 13 will send packets to tx queue 1 > ... > pmd thread on core 17 will send packets to tx queue 5 > >So, for dpdk0 port after truncating in netdev-dpdk: > > core 12 --> TX queue-id 0 % 4 == 0 > core 13 --> TX queue-id 1 % 4 == 1 > core 16 --> TX queue-id 4 % 4 == 0 > core 17 --> TX queue-id 5 % 4 == 1 > >As a result only 2 of 4 queues used. > >To fix this issue some kind of XPS implemented in following way: > > * TX queue-ids are allocated dynamically. > * When PMD thread first time tries to send packets to new port > it allocates less used TX queue for this port. > * PMD threads periodically performes revalidation of > allocated TX queue-ids. If queue wasn't used in last XPS_CYCLES > it will be freed while revalidation. > >Reported-by: Zhihong Wang <zhihong.w...@intel.com> >Signed-off-by: Ilya Maximets <i.maxim...@samsung.com> >--- > lib/dpif-netdev.c | 130 +++++++++++++++++++++++++++++++++++++++--------------- > 1 file changed, 94 insertions(+), 36 deletions(-) > >diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c >index 3fb1942..5eed50c 100644 >--- a/lib/dpif-netdev.c >+++ b/lib/dpif-netdev.c >@@ -248,6 +248,8 @@ enum pmd_cycles_counter_type { > PMD_N_CYCLES > }; > >+#define XPS_CYCLES 1000000000ULL >+ > /* A port in a netdev-based datapath. */ > struct dp_netdev_port { > odp_port_t port_no; >@@ -256,6 +258,8 @@ struct dp_netdev_port { > struct netdev_saved_flags *sf; > unsigned n_rxq; /* Number of elements in 'rxq' */ > struct netdev_rxq **rxq; >+ unsigned *txq_used; /* Number of threads that uses each tx queue. >*/ >+ struct ovs_mutex txq_used_mutex; > char *type; /* Port type as requested by user. */ > }; > >@@ -385,6 +389,8 @@ struct rxq_poll { > /* Contained by struct dp_netdev_pmd_thread's 'port_cache' or 'tx_ports'. */ > struct tx_port { > odp_port_t port_no; >+ int qid; >+ unsigned long long last_cycles; > struct netdev *netdev; > struct hmap_node node; > }; >@@ -541,6 +547,11 @@ static void dp_netdev_pmd_flow_flush(struct >dp_netdev_pmd_thread *pmd); > static void pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd) > OVS_REQUIRES(pmd->port_mutex); > >+static void >+dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd); >+static int dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd, >+ struct tx_port *tx); >+ > static inline bool emc_entry_alive(struct emc_entry *ce); > static void emc_clear_entry(struct emc_entry *ce); > >@@ -1185,7 +1196,9 @@ port_create(const char *devname, const char *open_type, >const char *type, > port->netdev = netdev; > port->n_rxq = netdev_n_rxq(netdev); > port->rxq = xcalloc(port->n_rxq, sizeof *port->rxq); >+ port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used); > port->type = xstrdup(type); >+ ovs_mutex_init(&port->txq_used_mutex); > > for (i = 0; i < port->n_rxq; i++) { > error = netdev_rxq_open(netdev, &port->rxq[i], i); >@@ -1211,7 +1224,9 @@ out_rxq_close: > for (i = 0; i < n_open_rxqs; i++) { > netdev_rxq_close(port->rxq[i]); > } >+ ovs_mutex_destroy(&port->txq_used_mutex); > free(port->type); >+ free(port->txq_used); > free(port->rxq); > free(port); > >@@ -1353,7 +1368,8 @@ port_destroy(struct dp_netdev_port *port) > for (unsigned i = 0; i < port->n_rxq; i++) { > netdev_rxq_close(port->rxq[i]); > } >- >+ ovs_mutex_destroy(&port->txq_used_mutex); >+ free(port->txq_used); > free(port->rxq); > free(port->type); > free(port); >@@ -1376,13 +1392,6 @@ get_port_by_name(struct dp_netdev *dp, > } > > static int >-get_n_pmd_threads(struct dp_netdev *dp) >-{ >- /* There is one non pmd thread in dp->poll_threads */ >- return cmap_count(&dp->poll_threads) - 1; >-} >- >-static int > get_n_pmd_threads_on_numa(struct dp_netdev *dp, int numa_id) > { > struct dp_netdev_pmd_thread *pmd; >@@ -2664,6 +2673,10 @@ port_reconfigure(struct dp_netdev_port *port) > } > /* If the netdev_reconfigure() above succeeds, reopens the 'rxq's. */ > port->rxq = xrealloc(port->rxq, sizeof *port->rxq * netdev_n_rxq(netdev)); >+ /* Realloc 'used' counters for tx queues. */ >+ free(port->txq_used); >+ port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used); >+ > for (i = 0; i < netdev_n_rxq(netdev); i++) { > err = netdev_rxq_open(netdev, &port->rxq[i], i); > if (err) { >@@ -2743,6 +2756,7 @@ dpif_netdev_run(struct dpif *dpif) > } > } > } >+ dpif_netdev_xps_revalidate_pmd(non_pmd); > ovs_mutex_unlock(&dp->non_pmd_mutex); > > dp_netdev_pmd_unref(non_pmd); >@@ -3027,11 +3041,6 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread >*pmd, struct dp_netdev *dp, > pmd->numa_id = numa_id; > pmd->poll_cnt = 0; > >- atomic_init(&pmd->tx_qid, >- (core_id == NON_PMD_CORE_ID) >- ? ovs_numa_get_n_cores() >- : get_n_pmd_threads(dp)); >- > ovs_refcount_init(&pmd->ref_cnt); > latch_init(&pmd->exit_latch); > atomic_init(&pmd->change_seq, PMD_INITIAL_SEQ); >@@ -3122,18 +3131,16 @@ dp_netdev_destroy_all_pmds(struct dp_netdev *dp) > free(pmd_list); > } > >-/* Deletes all pmd threads on numa node 'numa_id' and >- * fixes tx_qids of other threads to keep them sequential. */ >+/* Deletes all pmd threads on numa node 'numa_id'. */ > static void > dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id) > { > struct dp_netdev_pmd_thread *pmd; >- int n_pmds_on_numa, n_pmds; >- int *free_idx, k = 0; >+ int n_pmds_on_numa; >+ int k = 0; > struct dp_netdev_pmd_thread **pmd_list; > > n_pmds_on_numa = get_n_pmd_threads_on_numa(dp, numa_id); >- free_idx = xcalloc(n_pmds_on_numa, sizeof *free_idx); > pmd_list = xcalloc(n_pmds_on_numa, sizeof *pmd_list); > > CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { >@@ -3141,7 +3148,6 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int >numa_id) > * 'dp->poll_threads' (while we're iterating it) and it > * might quiesce. */ > if (pmd->numa_id == numa_id) { >- atomic_read_relaxed(&pmd->tx_qid, &free_idx[k]); > pmd_list[k] = pmd; > ovs_assert(k < n_pmds_on_numa); > k++; >@@ -3152,21 +3158,7 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int >numa_id) > dp_netdev_del_pmd(dp, pmd_list[i]); > } > >- n_pmds = get_n_pmd_threads(dp); >- CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { >- int old_tx_qid; >- >- atomic_read_relaxed(&pmd->tx_qid, &old_tx_qid); >- >- if (old_tx_qid >= n_pmds) { >- int new_tx_qid = free_idx[--k]; >- >- atomic_store_relaxed(&pmd->tx_qid, new_tx_qid); >- } >- } >- > free(pmd_list); >- free(free_idx); > } > > /* Deletes all rx queues from pmd->poll_list and all the ports from >@@ -3321,6 +3313,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread >*pmd, > > tx->netdev = port->netdev; > tx->port_no = port->port_no; >+ tx->qid = -1; > > ovs_mutex_lock(&pmd->port_mutex); > hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port_no)); >@@ -3980,6 +3973,73 @@ dpif_netdev_register_upcall_cb(struct dpif *dpif, >upcall_callback *cb, > dp->upcall_cb = cb; > } > >+static void >+dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd) >+{ >+ struct tx_port *tx; >+ struct dp_netdev_port *port; >+ struct dp_netdev *dp = pmd->dp; >+ unsigned long long interval; >+ >+ HMAP_FOR_EACH (tx, node, &pmd->port_cache) { >+ interval = pmd->last_cycles - tx->last_cycles; >+ if (tx->qid >= 0 && interval >= XPS_CYCLES) { >+ fat_rwlock_rdlock(&dp->port_rwlock); >+ port = dp_netdev_lookup_port(dp, tx->port_no); >+ ovs_mutex_lock(&port->txq_used_mutex); >+ port->txq_used[tx->qid]--; >+ ovs_mutex_unlock(&port->txq_used_mutex); >+ fat_rwlock_unlock(&dp->port_rwlock); >+ tx->qid = -1; >+ } >+ } >+} >+ >+static int >+dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd, >+ struct tx_port *tx) >+{ >+ struct dp_netdev_port *port; >+ struct dp_netdev *dp = pmd->dp; >+ unsigned long long interval = pmd->last_cycles - tx->last_cycles; >+ int i, min_cnt, min_qid; >+ >+ tx->last_cycles = pmd->last_cycles; >+ if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_CYCLES)) { >+ return tx->qid; >+ } >+ >+ fat_rwlock_rdlock(&dp->port_rwlock); >+ port = dp_netdev_lookup_port(dp, tx->port_no); >+ >+ ovs_mutex_lock(&port->txq_used_mutex); >+ if (tx->qid >= 0) { >+ port->txq_used[tx->qid]--; >+ tx->qid = -1; >+ } >+ >+ min_cnt = -1; >+ min_qid = 0; >+ for (i = 0; i < netdev_n_txq(port->netdev); i++) { >+ if (port->txq_used[i] < min_cnt || min_cnt == -1) { >+ min_cnt = port->txq_used[i]; >+ min_qid = i; >+ } >+ } >+ >+ port->txq_used[min_qid]++; >+ tx->qid = min_qid; >+ >+ ovs_mutex_unlock(&port->txq_used_mutex); >+ fat_rwlock_unlock(&dp->port_rwlock); >+ >+ dpif_netdev_xps_revalidate_pmd(pmd); >+ >+ VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.\n", >+ pmd->core_id, tx->qid, netdev_get_name(tx->netdev)); >+ return min_qid; >+} >+ > static struct tx_port * > pmd_tx_port_cache_lookup(const struct dp_netdev_pmd_thread *pmd, > odp_port_t port_no) >@@ -4051,9 +4111,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch >*packets_, > case OVS_ACTION_ATTR_OUTPUT: > p = pmd_tx_port_cache_lookup(pmd, u32_to_odp(nl_attr_get_u32(a))); > if (OVS_LIKELY(p)) { >- int tx_qid; >- >- atomic_read_relaxed(&pmd->tx_qid, &tx_qid); >+ int tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p); > > netdev_send(p->netdev, tx_qid, packets_, may_steal); > return; >-- >2.7.4 > _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev