On Mon, Sep 15, 2014 at 2:03 PM, Alex Wang <al...@nicira.com> wrote: > This commits adds the multithreading functionality to OVS dpdk > module. Users are able to create multiple pmd threads and set > their cpu affinity via specifying the cpu mask string similar > to the EAL '-c COREMASK' option. > > Also, the number of rx queues for each dpdk interface is made > configurable to help distribution of rx packets among multiple > pmd threads. > > Signed-off-by: Alex Wang <al...@nicira.com> > --- > lib/dpif-linux.c | 1 + > lib/dpif-netdev.c | 122 > +++++++++++++++++++++++++++++++++++++++++--- > lib/dpif-provider.h | 7 +++ > lib/dpif.c | 18 +++++++ > lib/dpif.h | 2 + > ofproto/ofproto-dpif.c | 2 + > ofproto/ofproto-provider.h | 6 +++ > ofproto/ofproto.c | 16 ++++++ > ofproto/ofproto.h | 2 + > vswitchd/bridge.c | 3 ++ > vswitchd/vswitch.xml | 27 ++++++++++ > 11 files changed, 198 insertions(+), 8 deletions(-) > > diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c > index 2c387ed..ed2058c 100644 > --- a/lib/dpif-linux.c > +++ b/lib/dpif-linux.c > @@ -1873,6 +1873,7 @@ const struct dpif_class dpif_linux_class = { > dpif_linux_operate, > dpif_linux_recv_set, > dpif_linux_handlers_set, > + NULL, /* poll_thread_set */ > dpif_linux_queue_to_priority, > dpif_linux_recv, > dpif_linux_recv_wait, > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c > index 3f69219..bfab78a 100644 > --- a/lib/dpif-netdev.c > +++ b/lib/dpif-netdev.c > @@ -205,6 +205,11 @@ struct dp_netdev { > /* Each pmd thread will store its pointer to > * 'struct dp_netdev_pmd_thread' in 'per_pmd_key'. */ > ovsthread_key_t per_pmd_key; > + > + /* Number of rx queues for each dpdk interface and the cpu mask > + * for pin of pmd threads. */ > + size_t n_dpdk_rxqs; > + char *pmd_cmask; > }; > > static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev > *dp, > @@ -395,10 +400,12 @@ static void dp_netdev_disable_upcall(struct dp_netdev > *); > static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, > struct dp_netdev *dp, int index, > int core_id, int numa_id); > +static void dp_netdev_set_nonpmd(struct dp_netdev *dp); > static struct dp_netdev_pmd_thread *dp_netdev_get_nonpmd(struct dp_netdev > *dp); > static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp); > static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id); > static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id); > +static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp); > > static void emc_clear_entry(struct emc_entry *ce); > > @@ -537,7 +544,6 @@ create_dp_netdev(const char *name, const struct > dpif_class *class, > OVS_REQUIRES(dp_netdev_mutex) > { > struct dp_netdev *dp; > - struct dp_netdev_pmd_thread *non_pmd; > int error; > > dp = xzalloc(sizeof *dp); > @@ -570,9 +576,7 @@ create_dp_netdev(const char *name, const struct > dpif_class *class, > > /* Reserves the core NON_PMD_CORE_ID for all non-pmd threads. */ > ovs_numa_try_pin_core_specific(NON_PMD_CORE_ID); > - non_pmd = xzalloc(sizeof *non_pmd); > - dp_netdev_configure_pmd(non_pmd, dp, 0, NON_PMD_CORE_ID, > - OVS_NUMA_UNSPEC); > + dp_netdev_set_nonpmd(dp); > > ovs_mutex_lock(&dp->port_mutex); > error = do_add_port(dp, name, "internal", ODPP_LOCAL); > @@ -776,8 +780,10 @@ do_add_port(struct dp_netdev *dp, const char *devname, > const char *type, > return ENOENT; > } > /* There can only be ovs_numa_get_n_cores() pmd threads, > - * so creates a tx_q for each. */ > - error = netdev_set_multiq(netdev, n_cores, NR_QUEUE); > + * so creates a txq for each. */ > + error = netdev_set_multiq(netdev, n_cores, > + dp->n_dpdk_rxqs ? dp->n_dpdk_rxqs > + : NR_QUEUE); We can just initialize n_dpdk_rxqs to NR_QUEUE, so that we can avoid this check.
> if (error) { > VLOG_ERR("%s, cannot set multiq", devname); > return errno; > @@ -1842,6 +1848,77 @@ dpif_netdev_operate(struct dpif *dpif, struct dpif_op > **ops, size_t n_ops) > } > } > > +/* Returns true if the configuration for rx queues or cpu mask > + * changed. */ > +static bool > +pmd_config_changed(const struct dp_netdev *dp, size_t rxqs, const char > *cmask) > +{ > + if (dp->n_dpdk_rxqs != rxqs) { > + return true; > + } else { > + if (dp->pmd_cmask != NULL && cmask != NULL) { > + return strcmp(dp->pmd_cmask, cmask); > + } else { > + return (dp->pmd_cmask != NULL || cmask != NULL); > + } > + } > +} > + > +/* Resets pmd threads if the configuration for 'rxq's or cpu mask changes. */ > +static int > +dpif_netdev_pmd_set(struct dpif *dpif, unsigned int n_rxqs, const char > *cmask) > +{ > + struct dp_netdev *dp = get_dp_netdev(dpif); > + > + if (pmd_config_changed(dp, n_rxqs, cmask)) { > + struct dp_netdev_port *port; > + > + dp_netdev_destroy_all_pmds(dp); > + > + CMAP_FOR_EACH (port, node, &dp->ports) { > + if (netdev_is_pmd(port->netdev)) { > + int i, err; > + > + /* Closes the existing 'rxq's. */ > + for (i = 0; i < netdev_n_rxq(port->netdev); i++) { > + netdev_rxq_close(port->rxq[i]); > + port->rxq[i] = NULL; > + } > + > + /* Sets the new rx queue config. */ > + err = netdev_set_multiq(port->netdev, ovs_numa_get_n_cores(), > + n_rxqs); > + if (err) { > + VLOG_ERR("Failed to set dpdk interface %s rx_queue to:" > + " %u", netdev_get_name(port->netdev), > + n_rxqs); > + return err; > + } > + > + /* If the set_multiq() above succeeds, reopens the 'rxq's. */ > + port->rxq = xrealloc(port->rxq, sizeof *port->rxq > + * netdev_n_rxq(port->netdev)); > + for (i = 0; i < netdev_n_rxq(port->netdev); i++) { > + netdev_rxq_open(port->netdev, &port->rxq[i], i); > + } > + } > + } > + dp->n_dpdk_rxqs = n_rxqs; > + > + /* Reconfigures the cpu mask. */ > + ovs_numa_set_cpu_mask(cmask); > + free(dp->pmd_cmask); > + dp->pmd_cmask = cmask ? xstrdup(cmask) : NULL; > + > + /* Restores the non-pmd. */ > + dp_netdev_set_nonpmd(dp); > + /* Restores all pmd threads. */ > + dp_netdev_reset_pmd_threads(dp); > + } > + > + return 0; > +} > + > static int > dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED, > uint32_t queue_id, uint32_t *priority) > @@ -2093,6 +2170,17 @@ dp_netdev_get_nonpmd(struct dp_netdev *dp) > return pmd; > } > > +/* Sets the 'struct dp_netdev_pmd_thread' for non-pmd threads. */ > +static void > +dp_netdev_set_nonpmd(struct dp_netdev *dp) > +{ > + struct dp_netdev_pmd_thread *non_pmd; > + > + non_pmd = xzalloc(sizeof *non_pmd); > + dp_netdev_configure_pmd(non_pmd, dp, 0, NON_PMD_CORE_ID, > + OVS_NUMA_UNSPEC); > +} > + > /* Configures the 'pmd' based on the input argument. */ > static void > dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev > *dp, > @@ -2185,8 +2273,9 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int > numa_id) > return; > } > > - /* Tries creating NR_PMD_THREADS pmd threads on the numa node. */ > - can_have = MIN(n_unpinned, NR_PMD_THREADS); > + /* If cpu mask is specified, uses all unpinned cores, otherwise > + * tries creating NR_PMD_THREADS pmd threads. */ > + can_have = dp->pmd_cmask ? n_unpinned : MIN(n_unpinned, > NR_PMD_THREADS); > for (i = 0; i < can_have; i++) { > struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd); > int core_id = ovs_numa_get_unpinned_core_on_numa(numa_id); > @@ -2209,6 +2298,22 @@ dp_netdev_flow_stats_new_cb(void) > return bucket; > } > > +/* Called after pmd threads config change. Restarts pmd threads with > + * new configuration. */ > +static void > +dp_netdev_reset_pmd_threads(struct dp_netdev *dp) > +{ > + struct dp_netdev_port *port; > + > + CMAP_FOR_EACH (port, node, &dp->ports) { > + if (netdev_is_pmd(port->netdev)) { > + int numa_id = netdev_get_numa_id(port->netdev); > + > + dp_netdev_set_pmds_on_numa(dp, numa_id); > + } > + } > +} > + > static void > dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow, > int cnt, int size, > @@ -2772,6 +2877,7 @@ const struct dpif_class dpif_netdev_class = { > dpif_netdev_operate, > NULL, /* recv_set */ > NULL, /* handlers_set */ > + dpif_netdev_pmd_set, > dpif_netdev_queue_to_priority, > NULL, /* recv */ > NULL, /* recv_wait */ > diff --git a/lib/dpif-provider.h b/lib/dpif-provider.h > index 89b32dd..e1136e1 100644 > --- a/lib/dpif-provider.h > +++ b/lib/dpif-provider.h > @@ -300,6 +300,13 @@ struct dpif_class { > * */ > int (*handlers_set)(struct dpif *dpif, uint32_t n_handlers); > > + /* If 'dpif' creates its own I/O polling threads, refreshes poll threads > + * configuration. 'n_rxqs' configures the number of rx_queues, which > + * are distributed among threads. 'cmask' configures the cpu mask > + * for setting the polling threads' cpu affinity. */ > + int (*poll_threads_set)(struct dpif *dpif, unsigned int n_rxqs, > + const char *cmask); > + > /* Translates OpenFlow queue ID 'queue_id' (in host byte order) into a > * priority value used for setting packet priority. */ > int (*queue_to_priority)(const struct dpif *dpif, uint32_t queue_id, > diff --git a/lib/dpif.c b/lib/dpif.c > index bf2c5f9..91ccfd8 100644 > --- a/lib/dpif.c > +++ b/lib/dpif.c > @@ -1300,6 +1300,24 @@ dpif_print_packet(struct dpif *dpif, struct > dpif_upcall *upcall) > } > } > > +/* If 'dpif' creates its own I/O polling threads, refreshes poll threads > + * configuration. */ > +int > +dpif_poll_threads_set(struct dpif *dpif, unsigned int n_rxqs, > + const char *cmask) > +{ > + int error = 0; > + > + if (dpif->dpif_class->poll_threads_set) { > + error = dpif->dpif_class->poll_threads_set(dpif, n_rxqs, cmask); > + if (error) { > + log_operation(dpif, "poll_threads_set", error); > + } > + } > + > + return error; > +} > + > /* Polls for an upcall from 'dpif' for an upcall handler. Since there > * there can be multiple poll loops, 'handler_id' is needed as index to > * identify the corresponding poll loop. If successful, stores the upcall > diff --git a/lib/dpif.h b/lib/dpif.h > index be1bc4f..c57c8b0 100644 > --- a/lib/dpif.h > +++ b/lib/dpif.h > @@ -769,6 +769,8 @@ void dpif_register_upcall_cb(struct dpif *, > upcall_callback *, void *aux); > > int dpif_recv_set(struct dpif *, bool enable); > int dpif_handlers_set(struct dpif *, uint32_t n_handlers); > +int dpif_poll_threads_set(struct dpif *, unsigned int n_rxqs, > + const char *cmask); > int dpif_recv(struct dpif *, uint32_t handler_id, struct dpif_upcall *, > struct ofpbuf *); > void dpif_recv_purge(struct dpif *); > diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c > index 6a59098..6cc9789 100644 > --- a/ofproto/ofproto-dpif.c > +++ b/ofproto/ofproto-dpif.c > @@ -532,6 +532,8 @@ type_run(const char *type) > udpif_set_threads(backer->udpif, n_handlers, n_revalidators); > } > > + dpif_poll_threads_set(backer->dpif, n_dpdk_rxqs, pmd_cpu_mask); > + > if (backer->need_revalidate) { > struct ofproto_dpif *ofproto; > struct simap_node *node; > diff --git a/ofproto/ofproto-provider.h b/ofproto/ofproto-provider.h > index de354ec..158f86e 100644 > --- a/ofproto/ofproto-provider.h > +++ b/ofproto/ofproto-provider.h > @@ -451,6 +451,12 @@ extern unsigned ofproto_max_idle; > * ofproto-dpif implementation. */ > extern size_t n_handlers, n_revalidators; > > +/* Number of rx queues to be created for each dpdk interface. */ > +extern size_t n_dpdk_rxqs; > + > +/* Cpu mask for pmd threads. */ > +extern char *pmd_cpu_mask; > + > static inline struct rule *rule_from_cls_rule(const struct cls_rule *); > > void ofproto_rule_expire(struct rule *rule, uint8_t reason) > diff --git a/ofproto/ofproto.c b/ofproto/ofproto.c > index 7b1d478..818e23f 100644 > --- a/ofproto/ofproto.c > +++ b/ofproto/ofproto.c > @@ -304,6 +304,8 @@ unsigned ofproto_flow_limit = OFPROTO_FLOW_LIMIT_DEFAULT; > unsigned ofproto_max_idle = OFPROTO_MAX_IDLE_DEFAULT; > > size_t n_handlers, n_revalidators; > +size_t n_dpdk_rxqs; > +char *pmd_cpu_mask; > > /* Map from datapath name to struct ofproto, for use by unixctl commands. */ > static struct hmap all_ofprotos = HMAP_INITIALIZER(&all_ofprotos); > @@ -731,6 +733,20 @@ ofproto_port_set_mcast_snooping(struct ofproto *ofproto, > void *aux, bool flood) > } > > void > +ofproto_set_n_dpdk_rxqs(int n_rxqs) > +{ > + n_dpdk_rxqs = MAX(n_rxqs, 0); > +} > + > +void > +ofproto_set_cpu_mask(const char *cmask) > +{ > + free(pmd_cpu_mask); > + > + pmd_cpu_mask = cmask ? xstrdup(cmask) : NULL; > +} > + > +void > ofproto_set_threads(int n_handlers_, int n_revalidators_) > { > int threads = MAX(count_cpu_cores(), 2); > diff --git a/ofproto/ofproto.h b/ofproto/ofproto.h > index d60b198..40bb3b7 100644 > --- a/ofproto/ofproto.h > +++ b/ofproto/ofproto.h > @@ -299,6 +299,8 @@ int ofproto_set_mcast_snooping(struct ofproto *ofproto, > int ofproto_port_set_mcast_snooping(struct ofproto *ofproto, void *aux, > bool flood); > void ofproto_set_threads(int n_handlers, int n_revalidators); > +void ofproto_set_n_dpdk_rxqs(int n_rxqs); > +void ofproto_set_cpu_mask(const char *cmask); > void ofproto_set_dp_desc(struct ofproto *, const char *dp_desc); > int ofproto_set_snoops(struct ofproto *, const struct sset *snoops); > int ofproto_set_netflow(struct ofproto *, > diff --git a/vswitchd/bridge.c b/vswitchd/bridge.c > index 8f99d7d..045dd77 100644 > --- a/vswitchd/bridge.c > +++ b/vswitchd/bridge.c > @@ -537,6 +537,9 @@ bridge_reconfigure(const struct ovsrec_open_vswitch > *ovs_cfg) > OFPROTO_FLOW_LIMIT_DEFAULT)); > ofproto_set_max_idle(smap_get_int(&ovs_cfg->other_config, "max-idle", > OFPROTO_MAX_IDLE_DEFAULT)); > + ofproto_set_n_dpdk_rxqs(smap_get_int(&ovs_cfg->other_config, > + "n-dpdk-rxqs", 0)); > + ofproto_set_cpu_mask(smap_get(&ovs_cfg->other_config, "pmd-cpu-mask")); > > ofproto_set_threads( > smap_get_int(&ovs_cfg->other_config, "n-handler-threads", 0), > diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml > index d07d54f..b00f74d 100644 > --- a/vswitchd/vswitch.xml > +++ b/vswitchd/vswitch.xml > @@ -152,6 +152,33 @@ > </p> > </column> > > + <column name="other_config" key="n-dpdk-rxqs" > + type='{"type": "integer", "minInteger": 1}'> > + <p> > + Specifies the number of rx queues to be created for each dpdk > + interface. If not specified or specified to 0, one rx queue will > + be created for each dpdk interface by default. > + </p> > + </column> > + > + <column name="other_config" key="pmd-cpu-mask"> > + <p> > + Specifies CPU mask for setting the cpu affinity of PMD (Poll > + Mode Driver) threads. Value should be in the form of hex string, > + similar to the dpdk EAL '-c COREMASK' option input or the 'taskset' > + mask input. > + </p> > + <p> > + The lowest order bit corresponds to the first CPU core. A set bit > + means the corresponding core is available. If the input does not > + cover all cores, those uncovered cores are considered not set. > + </p> > + <p> > + If not specified, one pmd thread will be created for each numa node > + and pinned to any available core on the numa node by default. > + </p> > + </column> > + > <column name="other_config" key="n-handler-threads" > type='{"type": "integer", "minInteger": 1}'> > <p> Otherwise looks good. Acked-by: Pravin B Shelar <pshe...@nicira.com> _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev