I don't think dynamic_txqs should be atomic, since we change it when the pmd threads are stopped.
Also, in port_create() we should check for 'netdev_n_txq(netdev) < n_cores + 1' after we reconfigure the device. Other than that this looks good to me, so I applied the following incremental and pushed this to master. Thanks, Daniele ---8<--- diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index d1ba6f3..d45aba0 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -258,7 +258,7 @@ struct dp_netdev_port { struct netdev_saved_flags *sf; unsigned n_rxq; /* Number of elements in 'rxq' */ struct netdev_rxq **rxq; - atomic_bool dynamic_txqs; /* If true XPS will be used. */ + bool dynamic_txqs; /* If true XPS will be used. */ unsigned *txq_used; /* Number of threads that uses each tx queue. */ struct ovs_mutex txq_used_mutex; char *type; /* Port type as requested by user. */ @@ -1151,6 +1151,7 @@ port_create(const char *devname, const char *open_type, const char *type, enum netdev_flags flags; struct netdev *netdev; int n_open_rxqs = 0; + int n_cores = 0; int i, error; bool dynamic_txqs = false; @@ -1171,7 +1172,7 @@ port_create(const char *devname, const char *open_type, const char *type, } if (netdev_is_pmd(netdev)) { - int n_cores = ovs_numa_get_n_cores(); + n_cores = ovs_numa_get_n_cores(); if (n_cores == OVS_CORE_UNSPEC) { VLOG_ERR("%s, cannot get cpu core info", devname); @@ -1186,9 +1187,6 @@ port_create(const char *devname, const char *open_type, const char *type, VLOG_ERR("%s, cannot set multiq", devname); goto out; } - if (netdev_n_txq(netdev) < n_cores + 1) { - dynamic_txqs = true; - } } if (netdev_is_reconf_required(netdev)) { @@ -1198,6 +1196,12 @@ port_create(const char *devname, const char *open_type, const char *type, } } + if (netdev_is_pmd(netdev)) { + if (netdev_n_txq(netdev) < n_cores + 1) { + dynamic_txqs = true; + } + } + port = xzalloc(sizeof *port); port->port_no = port_no; port->netdev = netdev; @@ -1206,7 +1210,7 @@ port_create(const char *devname, const char *open_type, const char *type, port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used); port->type = xstrdup(type); ovs_mutex_init(&port->txq_used_mutex); - atomic_init(&port->dynamic_txqs, dynamic_txqs); + port->dynamic_txqs = dynamic_txqs; for (i = 0; i < port->n_rxq; i++) { error = netdev_rxq_open(netdev, &port->rxq[i], i); @@ -2718,8 +2722,7 @@ reconfigure_pmd_threads(struct dp_netdev *dp) seq_change(dp->port_seq); port_destroy(port); } else { - atomic_init(&port->dynamic_txqs, - netdev_n_txq(port->netdev) < n_cores + 1); + port->dynamic_txqs = netdev_n_txq(port->netdev) < n_cores + 1; } } /* Restores the non-pmd. */ @@ -4015,11 +4018,9 @@ dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd, struct tx_port *tx; struct dp_netdev_port *port; long long interval; - bool dynamic_txqs; HMAP_FOR_EACH (tx, node, &pmd->port_cache) { - atomic_read_relaxed(&tx->port->dynamic_txqs, &dynamic_txqs); - if (dynamic_txqs) { + if (tx->port->dynamic_txqs) { continue; } interval = now - tx->last_used; @@ -4156,7 +4157,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_, int tx_qid; bool dynamic_txqs; - atomic_read_relaxed(&p->port->dynamic_txqs, &dynamic_txqs); + dynamic_txqs = p->port->dynamic_txqs; if (dynamic_txqs) { tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p, now); } else { ---8<--- On 27/07/2016 07:44, "Ilya Maximets" <i.maxim...@samsung.com> wrote: >If CPU number in pmd-cpu-mask is not divisible by the number of queues and >in a few more complex situations there may be unfair distribution of TX >queue-ids between PMD threads. > >For example, if we have 2 ports with 4 queues and 6 CPUs in pmd-cpu-mask >such distribution is possible: ><------------------------------------------------------------------------> >pmd thread numa_id 0 core_id 13: > port: vhost-user1 queue-id: 1 > port: dpdk0 queue-id: 3 >pmd thread numa_id 0 core_id 14: > port: vhost-user1 queue-id: 2 >pmd thread numa_id 0 core_id 16: > port: dpdk0 queue-id: 0 >pmd thread numa_id 0 core_id 17: > port: dpdk0 queue-id: 1 >pmd thread numa_id 0 core_id 12: > port: vhost-user1 queue-id: 0 > port: dpdk0 queue-id: 2 >pmd thread numa_id 0 core_id 15: > port: vhost-user1 queue-id: 3 ><------------------------------------------------------------------------> > >As we can see above dpdk0 port polled by threads on cores: > 12, 13, 16 and 17. > >By design of dpif-netdev, there is only one TX queue-id assigned to each >pmd thread. This queue-id's are sequential similar to core-id's. And >thread will send packets to queue with exact this queue-id regardless >of port. > >In previous example: > > pmd thread on core 12 will send packets to tx queue 0 > pmd thread on core 13 will send packets to tx queue 1 > ... > pmd thread on core 17 will send packets to tx queue 5 > >So, for dpdk0 port after truncating in netdev-dpdk: > > core 12 --> TX queue-id 0 % 4 == 0 > core 13 --> TX queue-id 1 % 4 == 1 > core 16 --> TX queue-id 4 % 4 == 0 > core 17 --> TX queue-id 5 % 4 == 1 > >As a result only 2 of 4 queues used. > >To fix this issue some kind of XPS implemented in following way: > > * TX queue-ids are allocated dynamically. > * When PMD thread first time tries to send packets to new port > it allocates less used TX queue for this port. > * PMD threads periodically performes revalidation of > allocated TX queue-ids. If queue wasn't used in last > XPS_TIMEOUT_MS milliseconds it will be freed while revalidation. > * XPS is not working if we have enough TX queues. > >Reported-by: Zhihong Wang <zhihong.w...@intel.com> >Signed-off-by: Ilya Maximets <i.maxim...@samsung.com> >--- > lib/dpif-netdev.c | 204 ++++++++++++++++++++++++++++++++++++++++---------- > lib/netdev-bsd.c | 3 +- > lib/netdev-dpdk.c | 32 +++----- > lib/netdev-dummy.c | 3 +- > lib/netdev-linux.c | 3 +- > lib/netdev-provider.h | 11 +-- > lib/netdev.c | 13 ++-- > lib/netdev.h | 2 +- > 8 files changed, 198 insertions(+), 73 deletions(-) > >diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c >index f05ca4e..d1ba6f3 100644 >--- a/lib/dpif-netdev.c >+++ b/lib/dpif-netdev.c >@@ -248,6 +248,8 @@ enum pmd_cycles_counter_type { > PMD_N_CYCLES > }; > >+#define XPS_TIMEOUT_MS 500LL >+ > /* A port in a netdev-based datapath. */ > struct dp_netdev_port { > odp_port_t port_no; >@@ -256,6 +258,9 @@ struct dp_netdev_port { > struct netdev_saved_flags *sf; > unsigned n_rxq; /* Number of elements in 'rxq' */ > struct netdev_rxq **rxq; >+ atomic_bool dynamic_txqs; /* If true XPS will be used. */ >+ unsigned *txq_used; /* Number of threads that uses each tx queue. >*/ >+ struct ovs_mutex txq_used_mutex; > char *type; /* Port type as requested by user. */ > }; > >@@ -384,8 +389,9 @@ struct rxq_poll { > > /* Contained by struct dp_netdev_pmd_thread's 'port_cache' or 'tx_ports'. */ > struct tx_port { >- odp_port_t port_no; >- struct netdev *netdev; >+ struct dp_netdev_port *port; >+ int qid; >+ long long last_used; > struct hmap_node node; > }; > >@@ -443,9 +449,10 @@ struct dp_netdev_pmd_thread { > unsigned core_id; /* CPU core id of this pmd thread. */ > int numa_id; /* numa node id of this pmd thread. */ > >- /* Queue id used by this pmd thread to send packets on all netdevs. >- * All tx_qid's are unique and less than 'ovs_numa_get_n_cores() + 1'. */ >- atomic_int tx_qid; >+ /* Queue id used by this pmd thread to send packets on all netdevs if >+ * XPS disabled for this netdev. All static_tx_qid's are unique and less >+ * than 'ovs_numa_get_n_cores() + 1'. */ >+ atomic_int static_tx_qid; > > struct ovs_mutex port_mutex; /* Mutex for 'poll_list' and 'tx_ports'. > */ > /* List of rx queues to poll. */ >@@ -498,7 +505,8 @@ static void dp_netdev_execute_actions(struct >dp_netdev_pmd_thread *pmd, > struct dp_packet_batch *, > bool may_steal, > const struct nlattr *actions, >- size_t actions_len); >+ size_t actions_len, >+ long long now); > static void dp_netdev_input(struct dp_netdev_pmd_thread *, > struct dp_packet_batch *, odp_port_t port_no); > static void dp_netdev_recirculate(struct dp_netdev_pmd_thread *, >@@ -541,6 +549,12 @@ static void dp_netdev_pmd_flow_flush(struct >dp_netdev_pmd_thread *pmd); > static void pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd) > OVS_REQUIRES(pmd->port_mutex); > >+static void >+dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd, >+ long long now, bool purge); >+static int dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd, >+ struct tx_port *tx, long long now); >+ > static inline bool emc_entry_alive(struct emc_entry *ce); > static void emc_clear_entry(struct emc_entry *ce); > >@@ -1138,6 +1152,7 @@ port_create(const char *devname, const char *open_type, >const char *type, > struct netdev *netdev; > int n_open_rxqs = 0; > int i, error; >+ bool dynamic_txqs = false; > > *portp = NULL; > >@@ -1171,6 +1186,9 @@ port_create(const char *devname, const char *open_type, >const char *type, > VLOG_ERR("%s, cannot set multiq", devname); > goto out; > } >+ if (netdev_n_txq(netdev) < n_cores + 1) { >+ dynamic_txqs = true; >+ } > } > > if (netdev_is_reconf_required(netdev)) { >@@ -1185,7 +1203,10 @@ port_create(const char *devname, const char *open_type, >const char *type, > port->netdev = netdev; > port->n_rxq = netdev_n_rxq(netdev); > port->rxq = xcalloc(port->n_rxq, sizeof *port->rxq); >+ port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used); > port->type = xstrdup(type); >+ ovs_mutex_init(&port->txq_used_mutex); >+ atomic_init(&port->dynamic_txqs, dynamic_txqs); > > for (i = 0; i < port->n_rxq; i++) { > error = netdev_rxq_open(netdev, &port->rxq[i], i); >@@ -1211,7 +1232,9 @@ out_rxq_close: > for (i = 0; i < n_open_rxqs; i++) { > netdev_rxq_close(port->rxq[i]); > } >+ ovs_mutex_destroy(&port->txq_used_mutex); > free(port->type); >+ free(port->txq_used); > free(port->rxq); > free(port); > >@@ -1351,7 +1374,8 @@ port_destroy(struct dp_netdev_port *port) > for (unsigned i = 0; i < port->n_rxq; i++) { > netdev_rxq_close(port->rxq[i]); > } >- >+ ovs_mutex_destroy(&port->txq_used_mutex); >+ free(port->txq_used); > free(port->rxq); > free(port->type); > free(port); >@@ -2476,7 +2500,7 @@ dpif_netdev_execute(struct dpif *dpif, struct >dpif_execute *execute) > > packet_batch_init_packet(&pp, execute->packet); > dp_netdev_execute_actions(pmd, &pp, false, execute->actions, >- execute->actions_len); >+ execute->actions_len, time_msec()); > > if (pmd->core_id == NON_PMD_CORE_ID) { > ovs_mutex_unlock(&dp->non_pmd_mutex); >@@ -2650,6 +2674,10 @@ port_reconfigure(struct dp_netdev_port *port) > } > /* If the netdev_reconfigure() above succeeds, reopens the 'rxq's. */ > port->rxq = xrealloc(port->rxq, sizeof *port->rxq * netdev_n_rxq(netdev)); >+ /* Realloc 'used' counters for tx queues. */ >+ free(port->txq_used); >+ port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used); >+ > for (i = 0; i < netdev_n_rxq(netdev); i++) { > err = netdev_rxq_open(netdev, &port->rxq[i], i); > if (err) { >@@ -2666,9 +2694,21 @@ reconfigure_pmd_threads(struct dp_netdev *dp) > OVS_REQUIRES(dp->port_mutex) > { > struct dp_netdev_port *port, *next; >+ int n_cores; > > dp_netdev_destroy_all_pmds(dp); > >+ /* Reconfigures the cpu mask. */ >+ ovs_numa_set_cpu_mask(dp->requested_pmd_cmask); >+ free(dp->pmd_cmask); >+ dp->pmd_cmask = nullable_xstrdup(dp->requested_pmd_cmask); >+ >+ n_cores = ovs_numa_get_n_cores(); >+ if (n_cores == OVS_CORE_UNSPEC) { >+ VLOG_ERR("Cannot get cpu core info"); >+ return; >+ } >+ > HMAP_FOR_EACH_SAFE (port, next, node, &dp->ports) { > int err; > >@@ -2677,13 +2717,11 @@ reconfigure_pmd_threads(struct dp_netdev *dp) > hmap_remove(&dp->ports, &port->node); > seq_change(dp->port_seq); > port_destroy(port); >+ } else { >+ atomic_init(&port->dynamic_txqs, >+ netdev_n_txq(port->netdev) < n_cores + 1); > } > } >- /* Reconfigures the cpu mask. */ >- ovs_numa_set_cpu_mask(dp->requested_pmd_cmask); >- free(dp->pmd_cmask); >- dp->pmd_cmask = nullable_xstrdup(dp->requested_pmd_cmask); >- > /* Restores the non-pmd. */ > dp_netdev_set_nonpmd(dp); > /* Restores all pmd threads. */ >@@ -2727,6 +2765,7 @@ dpif_netdev_run(struct dpif *dpif) > } > } > } >+ dpif_netdev_xps_revalidate_pmd(non_pmd, time_msec(), false); > ovs_mutex_unlock(&dp->non_pmd_mutex); > > dp_netdev_pmd_unref(non_pmd); >@@ -2776,6 +2815,9 @@ pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd) > { > struct tx_port *tx_port_cached; > >+ /* Free all used tx queue ids. */ >+ dpif_netdev_xps_revalidate_pmd(pmd, 0, true); >+ > HMAP_FOR_EACH_POP (tx_port_cached, node, &pmd->port_cache) { > free(tx_port_cached); > } >@@ -2795,7 +2837,7 @@ pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd) > HMAP_FOR_EACH (tx_port, node, &pmd->tx_ports) { > tx_port_cached = xmemdup(tx_port, sizeof *tx_port_cached); > hmap_insert(&pmd->port_cache, &tx_port_cached->node, >- hash_port_no(tx_port_cached->port_no)); >+ hash_port_no(tx_port_cached->port->port_no)); > } > } > >@@ -3011,7 +3053,7 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread >*pmd, struct dp_netdev *dp, > pmd->numa_id = numa_id; > pmd->poll_cnt = 0; > >- atomic_init(&pmd->tx_qid, >+ atomic_init(&pmd->static_tx_qid, > (core_id == NON_PMD_CORE_ID) > ? ovs_numa_get_n_cores() > : get_n_pmd_threads(dp)); >@@ -3107,7 +3149,7 @@ dp_netdev_destroy_all_pmds(struct dp_netdev *dp) > } > > /* Deletes all pmd threads on numa node 'numa_id' and >- * fixes tx_qids of other threads to keep them sequential. */ >+ * fixes static_tx_qids of other threads to keep them sequential. */ > static void > dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id) > { >@@ -3125,7 +3167,7 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int >numa_id) > * 'dp->poll_threads' (while we're iterating it) and it > * might quiesce. */ > if (pmd->numa_id == numa_id) { >- atomic_read_relaxed(&pmd->tx_qid, &free_idx[k]); >+ atomic_read_relaxed(&pmd->static_tx_qid, &free_idx[k]); > pmd_list[k] = pmd; > ovs_assert(k < n_pmds_on_numa); > k++; >@@ -3140,12 +3182,12 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int >numa_id) > CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { > int old_tx_qid; > >- atomic_read_relaxed(&pmd->tx_qid, &old_tx_qid); >+ atomic_read_relaxed(&pmd->static_tx_qid, &old_tx_qid); > > if (old_tx_qid >= n_pmds) { > int new_tx_qid = free_idx[--k]; > >- atomic_store_relaxed(&pmd->tx_qid, new_tx_qid); >+ atomic_store_relaxed(&pmd->static_tx_qid, new_tx_qid); > } > } > >@@ -3178,7 +3220,7 @@ tx_port_lookup(const struct hmap *hmap, odp_port_t >port_no) > struct tx_port *tx; > > HMAP_FOR_EACH_IN_BUCKET (tx, node, hash_port_no(port_no), hmap) { >- if (tx->port_no == port_no) { >+ if (tx->port->port_no == port_no) { > return tx; > } > } >@@ -3303,11 +3345,11 @@ dp_netdev_add_port_tx_to_pmd(struct >dp_netdev_pmd_thread *pmd, > { > struct tx_port *tx = xzalloc(sizeof *tx); > >- tx->netdev = port->netdev; >- tx->port_no = port->port_no; >+ tx->port = port; >+ tx->qid = -1; > > ovs_mutex_lock(&pmd->port_mutex); >- hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port_no)); >+ hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no)); > ovs_mutex_unlock(&pmd->port_mutex); > } > >@@ -3648,7 +3690,7 @@ packet_batch_per_flow_execute(struct >packet_batch_per_flow *batch, > actions = dp_netdev_flow_get_actions(flow); > > dp_netdev_execute_actions(pmd, &batch->array, true, >- actions->actions, actions->size); >+ actions->actions, actions->size, now); > } > > static inline void >@@ -3736,7 +3778,7 @@ static inline void > handle_packet_upcall(struct dp_netdev_pmd_thread *pmd, struct dp_packet > *packet, > const struct netdev_flow_key *key, > struct ofpbuf *actions, struct ofpbuf *put_actions, >- int *lost_cnt) >+ int *lost_cnt, long long now) > { > struct ofpbuf *add_actions; > struct dp_packet_batch b; >@@ -3775,7 +3817,7 @@ handle_packet_upcall(struct dp_netdev_pmd_thread *pmd, >struct dp_packet *packet, > * we'll send the packet up twice. */ > packet_batch_init_packet(&b, packet); > dp_netdev_execute_actions(pmd, &b, true, >- actions->data, actions->size); >+ actions->data, actions->size, now); > > add_actions = put_actions->size ? put_actions : actions; > if (OVS_LIKELY(error != ENOSPC)) { >@@ -3804,7 +3846,8 @@ static inline void > fast_path_processing(struct dp_netdev_pmd_thread *pmd, > struct dp_packet_batch *packets_, > struct netdev_flow_key *keys, >- struct packet_batch_per_flow batches[], size_t >*n_batches) >+ struct packet_batch_per_flow batches[], size_t >*n_batches, >+ long long now) > { > int cnt = packets_->count; > #if !defined(__CHECKER__) && !defined(_WIN32) >@@ -3850,8 +3893,8 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd, > } > > miss_cnt++; >- handle_packet_upcall(pmd, packets[i], &keys[i], &actions, >&put_actions, >- &lost_cnt); >+ handle_packet_upcall(pmd, packets[i], &keys[i], &actions, >+ &put_actions, &lost_cnt, now); > } > > ofpbuf_uninit(&actions); >@@ -3915,7 +3958,7 @@ dp_netdev_input__(struct dp_netdev_pmd_thread *pmd, > md_is_valid, port_no); > if (OVS_UNLIKELY(newcnt)) { > packets->count = newcnt; >- fast_path_processing(pmd, packets, keys, batches, &n_batches); >+ fast_path_processing(pmd, packets, keys, batches, &n_batches, now); > } > > for (i = 0; i < n_batches; i++) { >@@ -3944,6 +3987,7 @@ dp_netdev_recirculate(struct dp_netdev_pmd_thread *pmd, > > struct dp_netdev_execute_aux { > struct dp_netdev_pmd_thread *pmd; >+ long long now; > }; > > static void >@@ -3964,6 +4008,79 @@ dpif_netdev_register_upcall_cb(struct dpif *dpif, >upcall_callback *cb, > dp->upcall_cb = cb; > } > >+static void >+dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd, >+ long long now, bool purge) >+{ >+ struct tx_port *tx; >+ struct dp_netdev_port *port; >+ long long interval; >+ bool dynamic_txqs; >+ >+ HMAP_FOR_EACH (tx, node, &pmd->port_cache) { >+ atomic_read_relaxed(&tx->port->dynamic_txqs, &dynamic_txqs); >+ if (dynamic_txqs) { >+ continue; >+ } >+ interval = now - tx->last_used; >+ if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT_MS)) { >+ port = tx->port; >+ ovs_mutex_lock(&port->txq_used_mutex); >+ port->txq_used[tx->qid]--; >+ ovs_mutex_unlock(&port->txq_used_mutex); >+ tx->qid = -1; >+ } >+ } >+} >+ >+static int >+dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd, >+ struct tx_port *tx, long long now) >+{ >+ struct dp_netdev_port *port; >+ long long interval; >+ int i, min_cnt, min_qid; >+ >+ if (OVS_UNLIKELY(!now)) { >+ now = time_msec(); >+ } >+ >+ interval = now - tx->last_used; >+ tx->last_used = now; >+ >+ if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT_MS)) { >+ return tx->qid; >+ } >+ >+ port = tx->port; >+ >+ ovs_mutex_lock(&port->txq_used_mutex); >+ if (tx->qid >= 0) { >+ port->txq_used[tx->qid]--; >+ tx->qid = -1; >+ } >+ >+ min_cnt = -1; >+ min_qid = 0; >+ for (i = 0; i < netdev_n_txq(port->netdev); i++) { >+ if (port->txq_used[i] < min_cnt || min_cnt == -1) { >+ min_cnt = port->txq_used[i]; >+ min_qid = i; >+ } >+ } >+ >+ port->txq_used[min_qid]++; >+ tx->qid = min_qid; >+ >+ ovs_mutex_unlock(&port->txq_used_mutex); >+ >+ dpif_netdev_xps_revalidate_pmd(pmd, now, false); >+ >+ VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.", >+ pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev)); >+ return min_qid; >+} >+ > static struct tx_port * > pmd_tx_port_cache_lookup(const struct dp_netdev_pmd_thread *pmd, > odp_port_t port_no) >@@ -3987,7 +4104,7 @@ push_tnl_action(const struct dp_netdev_pmd_thread *pmd, > err = -EINVAL; > goto error; > } >- err = netdev_push_header(tun_port->netdev, batch, data); >+ err = netdev_push_header(tun_port->port->netdev, batch, data); > if (!err) { > return 0; > } >@@ -4001,7 +4118,7 @@ dp_execute_userspace_action(struct dp_netdev_pmd_thread >*pmd, > struct dp_packet *packet, bool may_steal, > struct flow *flow, ovs_u128 *ufid, > struct ofpbuf *actions, >- const struct nlattr *userdata) >+ const struct nlattr *userdata, long long now) > { > struct dp_packet_batch b; > int error; >@@ -4014,7 +4131,7 @@ dp_execute_userspace_action(struct dp_netdev_pmd_thread >*pmd, > if (!error || error == ENOSPC) { > packet_batch_init_packet(&b, packet); > dp_netdev_execute_actions(pmd, &b, may_steal, >- actions->data, actions->size); >+ actions->data, actions->size, now); > } else if (may_steal) { > dp_packet_delete(packet); > } >@@ -4029,6 +4146,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch >*packets_, > struct dp_netdev_pmd_thread *pmd = aux->pmd; > struct dp_netdev *dp = pmd->dp; > int type = nl_attr_type(a); >+ long long now = aux->now; > struct tx_port *p; > > switch ((enum ovs_action_attr)type) { >@@ -4036,10 +4154,17 @@ dp_execute_cb(void *aux_, struct dp_packet_batch >*packets_, > p = pmd_tx_port_cache_lookup(pmd, u32_to_odp(nl_attr_get_u32(a))); > if (OVS_LIKELY(p)) { > int tx_qid; >+ bool dynamic_txqs; > >- atomic_read_relaxed(&pmd->tx_qid, &tx_qid); >+ atomic_read_relaxed(&p->port->dynamic_txqs, &dynamic_txqs); >+ if (dynamic_txqs) { >+ tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p, now); >+ } else { >+ atomic_read_relaxed(&pmd->static_tx_qid, &tx_qid); >+ } > >- netdev_send(p->netdev, tx_qid, packets_, may_steal); >+ netdev_send(p->port->netdev, tx_qid, packets_, may_steal, >+ dynamic_txqs); > return; > } > break; >@@ -4086,7 +4211,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch >*packets_, > > dp_packet_batch_apply_cutlen(packets_); > >- netdev_pop_header(p->netdev, packets_); >+ netdev_pop_header(p->port->netdev, packets_); > if (!packets_->count) { > return; > } >@@ -4134,7 +4259,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch >*packets_, > flow_extract(packets[i], &flow); > dpif_flow_hash(dp->dpif, &flow, sizeof flow, &ufid); > dp_execute_userspace_action(pmd, packets[i], may_steal, &flow, >- &ufid, &actions, userdata); >+ &ufid, &actions, userdata, now); > } > > if (clone) { >@@ -4200,9 +4325,10 @@ static void > dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd, > struct dp_packet_batch *packets, > bool may_steal, >- const struct nlattr *actions, size_t actions_len) >+ const struct nlattr *actions, size_t actions_len, >+ long long now) > { >- struct dp_netdev_execute_aux aux = { pmd }; >+ struct dp_netdev_execute_aux aux = { pmd, now }; > > odp_execute_actions(&aux, packets, may_steal, actions, > actions_len, dp_execute_cb); >diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c >index f963c6e..2f57c1a 100644 >--- a/lib/netdev-bsd.c >+++ b/lib/netdev-bsd.c >@@ -680,7 +680,8 @@ netdev_bsd_rxq_drain(struct netdev_rxq *rxq_) > */ > static int > netdev_bsd_send(struct netdev *netdev_, int qid OVS_UNUSED, >- struct dp_packet_batch *batch, bool may_steal) >+ struct dp_packet_batch *batch, bool may_steal, >+ bool concurrent_txq OVS_UNUSED) > { > struct netdev_bsd *dev = netdev_bsd_cast(netdev_); > const char *name = netdev_get_name(netdev_); >diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c >index af87f18..c208f32 100644 >--- a/lib/netdev-dpdk.c >+++ b/lib/netdev-dpdk.c >@@ -298,7 +298,7 @@ struct dpdk_tx_queue { > rte_spinlock_t tx_lock; /* Protects the members and the NIC queue > * from concurrent access. It is used only > * if the queue is shared among different >- * pmd threads (see 'txq_needs_locking'). >*/ >+ * pmd threads (see 'concurrent_txq'). */ > int map; /* Mapping of configured vhost-user queues > * to enabled by guest. */ > }; >@@ -349,13 +349,6 @@ struct netdev_dpdk { > struct rte_eth_link link; > int link_reset_cnt; > >- /* Caller of netdev_send() might want to use more txqs than the device >has. >- * For physical NICs, if the 'requested_n_txq' less or equal to >'up.n_txq', >- * 'txq_needs_locking' is false, otherwise it is true and we will take a >- * spinlock on transmission. For vhost devices, 'requested_n_txq' is >- * always true. */ >- bool txq_needs_locking; >- > /* virtio-net structure for vhost device */ > OVSRCU_TYPE(struct virtio_net *) virtio_dev; > >@@ -778,10 +771,8 @@ netdev_dpdk_init(struct netdev *netdev, unsigned int >port_no, > goto unlock; > } > netdev_dpdk_alloc_txq(dev, netdev->n_txq); >- dev->txq_needs_locking = netdev->n_txq < dev->requested_n_txq; > } else { > netdev_dpdk_alloc_txq(dev, OVS_VHOST_MAX_QUEUE_NUM); >- dev->txq_needs_locking = true; > /* Enable DPDK_DEV_VHOST device and set promiscuous mode flag. */ > dev->flags = NETDEV_UP | NETDEV_PROMISC; > } >@@ -1468,7 +1459,7 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct >dp_packet_batch *batch) > static int > netdev_dpdk_vhost_send(struct netdev *netdev, int qid, > struct dp_packet_batch *batch, >- bool may_steal) >+ bool may_steal, bool concurrent_txq OVS_UNUSED) > { > > if (OVS_UNLIKELY(batch->packets[0]->source != DPBUF_DPDK)) { >@@ -1484,9 +1475,10 @@ netdev_dpdk_vhost_send(struct netdev *netdev, int qid, > > static inline void > netdev_dpdk_send__(struct netdev_dpdk *dev, int qid, >- struct dp_packet_batch *batch, bool may_steal) >+ struct dp_packet_batch *batch, bool may_steal, >+ bool concurrent_txq) > { >- if (OVS_UNLIKELY(dev->txq_needs_locking)) { >+ if (OVS_UNLIKELY(concurrent_txq)) { > qid = qid % dev->up.n_txq; > rte_spinlock_lock(&dev->tx_q[qid].tx_lock); > } >@@ -1551,18 +1543,19 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, int qid, > } > } > >- if (OVS_UNLIKELY(dev->txq_needs_locking)) { >+ if (OVS_UNLIKELY(concurrent_txq)) { > rte_spinlock_unlock(&dev->tx_q[qid].tx_lock); > } > } > > static int > netdev_dpdk_eth_send(struct netdev *netdev, int qid, >- struct dp_packet_batch *batch, bool may_steal) >+ struct dp_packet_batch *batch, bool may_steal, >+ bool concurrent_txq) > { > struct netdev_dpdk *dev = netdev_dpdk_cast(netdev); > >- netdev_dpdk_send__(dev, qid, batch, may_steal); >+ netdev_dpdk_send__(dev, qid, batch, may_steal, concurrent_txq); > return 0; > } > >@@ -2533,7 +2526,8 @@ dpdk_ring_open(const char dev_name[], unsigned int >*eth_port_id) > > static int > netdev_dpdk_ring_send(struct netdev *netdev, int qid, >- struct dp_packet_batch *batch, bool may_steal) >+ struct dp_packet_batch *batch, bool may_steal, >+ bool concurrent_txq) > { > struct netdev_dpdk *dev = netdev_dpdk_cast(netdev); > unsigned i; >@@ -2546,7 +2540,7 @@ netdev_dpdk_ring_send(struct netdev *netdev, int qid, > dp_packet_rss_invalidate(batch->packets[i]); > } > >- netdev_dpdk_send__(dev, qid, batch, may_steal); >+ netdev_dpdk_send__(dev, qid, batch, may_steal, concurrent_txq); > return 0; > } > >@@ -2823,8 +2817,6 @@ netdev_dpdk_reconfigure(struct netdev *netdev) > err = dpdk_eth_dev_init(dev); > netdev_dpdk_alloc_txq(dev, netdev->n_txq); > >- dev->txq_needs_locking = netdev->n_txq < dev->requested_n_txq; >- > out: > > ovs_mutex_unlock(&dev->mutex); >diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c >index a95f7bb..813ce69 100644 >--- a/lib/netdev-dummy.c >+++ b/lib/netdev-dummy.c >@@ -1034,7 +1034,8 @@ netdev_dummy_rxq_drain(struct netdev_rxq *rxq_) > > static int > netdev_dummy_send(struct netdev *netdev, int qid OVS_UNUSED, >- struct dp_packet_batch *batch, bool may_steal) >+ struct dp_packet_batch *batch, bool may_steal, >+ bool concurrent_txq OVS_UNUSED) > { > struct netdev_dummy *dev = netdev_dummy_cast(netdev); > int error = 0; >diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c >index c71a3df..2da8c18 100644 >--- a/lib/netdev-linux.c >+++ b/lib/netdev-linux.c >@@ -1161,7 +1161,8 @@ netdev_linux_rxq_drain(struct netdev_rxq *rxq_) > * expected to do additional queuing of packets. */ > static int > netdev_linux_send(struct netdev *netdev_, int qid OVS_UNUSED, >- struct dp_packet_batch *batch, bool may_steal) >+ struct dp_packet_batch *batch, bool may_steal, >+ bool concurrent_txq OVS_UNUSED) > { > int i; > int error = 0; >diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h >index 915a5a5..41fa9e7 100644 >--- a/lib/netdev-provider.h >+++ b/lib/netdev-provider.h >@@ -303,10 +303,6 @@ struct netdev_class { > * otherwise a positive errno value. > * > * 'n_txq' specifies the exact number of transmission queues to create. >- * The caller will call netdev_send() concurrently from 'n_txq' different >- * threads (with different qid). The netdev provider is responsible for >- * making sure that these concurrent calls do not create a race condition >- * by using multiple hw queues or locking. > * > * The caller will call netdev_reconfigure() (if necessary) before using > * netdev_send() on any of the newly configured queues, giving the > provider >@@ -328,6 +324,11 @@ struct netdev_class { > * packets. If 'may_steal' is true, the caller transfers ownership of all > * the packets to the network device, regardless of success. > * >+ * If 'concurrent_txq' is true, the caller may perform concurrent calls >+ * to netdev_send() with the same 'qid'. The netdev provider is >responsible >+ * for making sure that these concurrent calls do not create a race >+ * condition by using locking or other synchronization if required. >+ * > * The network device is expected to maintain one or more packet > * transmission queues, so that the caller does not ordinarily have to > * do additional queuing of packets. 'qid' specifies the queue to use >@@ -341,7 +342,7 @@ struct netdev_class { > * datapath". It will also prevent the OVS implementation of bonding from > * working properly over 'netdev'.) */ > int (*send)(struct netdev *netdev, int qid, struct dp_packet_batch *batch, >- bool may_steal); >+ bool may_steal, bool concurrent_txq); > > /* Registers with the poll loop to wake up from the next call to > * poll_block() when the packet transmission queue for 'netdev' has >diff --git a/lib/netdev.c b/lib/netdev.c >index 31a6a46..a792eb6 100644 >--- a/lib/netdev.c >+++ b/lib/netdev.c >@@ -655,9 +655,6 @@ netdev_rxq_drain(struct netdev_rxq *rx) > * otherwise a positive errno value. > * > * 'n_txq' specifies the exact number of transmission queues to create. >- * If this function returns successfully, the caller can make 'n_txq' >- * concurrent calls to netdev_send() (each one with a different 'qid' in the >- * range [0..'n_txq'-1]). > * > * The change might not effective immediately. The caller must check if a > * reconfiguration is required with netdev_is_reconf_required() and eventually >@@ -694,6 +691,11 @@ netdev_set_tx_multiq(struct netdev *netdev, unsigned int >n_txq) > * If 'may_steal' is true, the caller transfers ownership of all the packets > * to the network device, regardless of success. > * >+ * If 'concurrent_txq' is true, the caller may perform concurrent calls >+ * to netdev_send() with the same 'qid'. The netdev provider is responsible >+ * for making sure that these concurrent calls do not create a race condition >+ * by using locking or other synchronization if required. >+ * > * The network device is expected to maintain one or more packet > * transmission queues, so that the caller does not ordinarily have to > * do additional queuing of packets. 'qid' specifies the queue to use >@@ -704,14 +706,15 @@ netdev_set_tx_multiq(struct netdev *netdev, unsigned int >n_txq) > * cases this function will always return EOPNOTSUPP. */ > int > netdev_send(struct netdev *netdev, int qid, struct dp_packet_batch *batch, >- bool may_steal) >+ bool may_steal, bool concurrent_txq) > { > if (!netdev->netdev_class->send) { > dp_packet_delete_batch(batch, may_steal); > return EOPNOTSUPP; > } > >- int error = netdev->netdev_class->send(netdev, qid, batch, may_steal); >+ int error = netdev->netdev_class->send(netdev, qid, batch, may_steal, >+ concurrent_txq); > if (!error) { > COVERAGE_INC(netdev_sent); > if (!may_steal) { >diff --git a/lib/netdev.h b/lib/netdev.h >index 591d861..dc7ede8 100644 >--- a/lib/netdev.h >+++ b/lib/netdev.h >@@ -149,7 +149,7 @@ int netdev_rxq_drain(struct netdev_rxq *); > > /* Packet transmission. */ > int netdev_send(struct netdev *, int qid, struct dp_packet_batch *, >- bool may_steal); >+ bool may_steal, bool concurrent_txq); > void netdev_send_wait(struct netdev *, int qid); > > /* native tunnel APIs */ >-- >2.7.4 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev