> This patch introduces key functions to allow a worker thread to > enable enqueue and move streams of objects to the next nodes over > different cores. > > Signed-off-by: Haiyue Wang <haiyue.w...@intel.com> > Signed-off-by: Cunming Liang <cunming.li...@intel.com> > Signed-off-by: Zhirun Yan <zhirun....@intel.com> > --- > lib/graph/graph_private.h | 27 +++++ > lib/graph/meson.build | 2 +- > lib/graph/rte_graph_model_dispatch.c | 145 > +++++++++++++++++++++++++++ > lib/graph/rte_graph_model_dispatch.h | 37 +++++++ > lib/graph/version.map | 2 + > 5 files changed, 212 insertions(+), 1 deletion(-) > > diff --git a/lib/graph/graph_private.h b/lib/graph/graph_private.h > index b66b18ebbc..e1a2a4bfd8 100644 > --- a/lib/graph/graph_private.h > +++ b/lib/graph/graph_private.h > @@ -366,4 +366,31 @@ void graph_dump(FILE *f, struct graph *g); > */ > void node_dump(FILE *f, struct node *n); > > +/** > + * @internal > + * > + * Create the graph schedule work queue. And all cloned graphs attached to > the > + * parent graph MUST be destroyed together for fast schedule design > limitation. > + * > + * @param _graph > + * The graph object > + * @param _parent_graph > + * The parent graph object which holds the run-queue head. > + * > + * @return > + * - 0: Success. > + * - <0: Graph schedule work queue related error. > + */ > +int graph_sched_wq_create(struct graph *_graph, struct graph > *_parent_graph); > + > +/** > + * @internal > + * > + * Destroy the graph schedule work queue. > + * > + * @param _graph > + * The graph object > + */ > +void graph_sched_wq_destroy(struct graph *_graph); > + > #endif /* _RTE_GRAPH_PRIVATE_H_ */ > diff --git a/lib/graph/meson.build b/lib/graph/meson.build > index c729d984b6..e21affa280 100644 > --- a/lib/graph/meson.build > +++ b/lib/graph/meson.build > @@ -20,4 +20,4 @@ sources = files( > ) > headers = files('rte_graph.h', 'rte_graph_worker.h') > > -deps += ['eal', 'pcapng'] > +deps += ['eal', 'pcapng', 'mempool', 'ring'] > diff --git a/lib/graph/rte_graph_model_dispatch.c > b/lib/graph/rte_graph_model_dispatch.c > index 4a2f99496d..a300fefb85 100644 > --- a/lib/graph/rte_graph_model_dispatch.c > +++ b/lib/graph/rte_graph_model_dispatch.c > @@ -5,6 +5,151 @@ > #include "graph_private.h" > #include "rte_graph_model_dispatch.h" > > +int > +graph_sched_wq_create(struct graph *_graph, struct graph > *_parent_graph) > +{ > + struct rte_graph *parent_graph = _parent_graph->graph; > + struct rte_graph *graph = _graph->graph; > + unsigned int wq_size; > + > + wq_size = GRAPH_SCHED_WQ_SIZE(graph->nb_nodes); > + wq_size = rte_align32pow2(wq_size + 1);
Hi Zhirun, We should introduce a new function `rte_graph_configure` which can help application to control the ring size and mempool size of the work queue? We could fallback to default values if nothing is configured. rte_graph_configure should take a struct rte_graph_config { struct { u64 rsvd[8]; } rtc; struct { u16 wq_size; ... } dispatch; }; This will help future graph models to have their own configuration. We can have a rte_graph_config_init() function to initialize the rte_graph_config structure. > + > + graph->wq = rte_ring_create(graph->name, wq_size, graph->socket, > + RING_F_SC_DEQ); > + if (graph->wq == NULL) > + SET_ERR_JMP(EIO, fail, "Failed to allocate graph WQ"); > + > + graph->mp = rte_mempool_create(graph->name, wq_size, > + sizeof(struct graph_sched_wq_node), > + 0, 0, NULL, NULL, NULL, NULL, > + graph->socket, MEMPOOL_F_SP_PUT); > + if (graph->mp == NULL) > + SET_ERR_JMP(EIO, fail_mp, > + "Failed to allocate graph WQ schedule entry"); > + > + graph->lcore_id = _graph->lcore_id; > + > + if (parent_graph->rq == NULL) { > + parent_graph->rq = &parent_graph->rq_head; > + SLIST_INIT(parent_graph->rq); > + } > + > + graph->rq = parent_graph->rq; > + SLIST_INSERT_HEAD(graph->rq, graph, rq_next); > + > + return 0; > + > +fail_mp: > + rte_ring_free(graph->wq); > + graph->wq = NULL; > +fail: > + return -rte_errno; > +} > + > +void > +graph_sched_wq_destroy(struct graph *_graph) > +{ > + struct rte_graph *graph = _graph->graph; > + > + if (graph == NULL) > + return; > + > + rte_ring_free(graph->wq); > + graph->wq = NULL; > + > + rte_mempool_free(graph->mp); > + graph->mp = NULL; > +} > + > +static __rte_always_inline bool > +__graph_sched_node_enqueue(struct rte_node *node, struct rte_graph > *graph) > +{ > + struct graph_sched_wq_node *wq_node; > + uint16_t off = 0; > + uint16_t size; > + > +submit_again: > + if (rte_mempool_get(graph->mp, (void **)&wq_node) < 0) > + goto fallback; > + > + size = RTE_MIN(node->idx, RTE_DIM(wq_node->objs)); > + wq_node->node_off = node->off; > + wq_node->nb_objs = size; > + rte_memcpy(wq_node->objs, &node->objs[off], size * sizeof(void > *)); > + > + while (rte_ring_mp_enqueue_bulk_elem(graph->wq, (void > *)&wq_node, > + sizeof(wq_node), 1, NULL) == 0) > + rte_pause(); > + > + off += size; > + node->idx -= size; > + if (node->idx > 0) > + goto submit_again; > + > + return true; > + > +fallback: > + if (off != 0) > + memmove(&node->objs[0], &node->objs[off], > + node->idx * sizeof(void *)); > + > + return false; > +} > + > +bool __rte_noinline > +__rte_graph_sched_node_enqueue(struct rte_node *node, > + struct rte_graph_rq_head *rq) > +{ > + const unsigned int lcore_id = node->lcore_id; > + struct rte_graph *graph; > + > + SLIST_FOREACH(graph, rq, rq_next) > + if (graph->lcore_id == lcore_id) > + break; > + > + return graph != NULL ? __graph_sched_node_enqueue(node, > graph) : false; > +} > + > +void > +__rte_graph_sched_wq_process(struct rte_graph *graph) > +{ > + struct graph_sched_wq_node *wq_node; > + struct rte_mempool *mp = graph->mp; > + struct rte_ring *wq = graph->wq; > + uint16_t idx, free_space; > + struct rte_node *node; > + unsigned int i, n; > + struct graph_sched_wq_node *wq_nodes[32]; > + > + n = rte_ring_sc_dequeue_burst_elem(wq, wq_nodes, > sizeof(wq_nodes[0]), > + RTE_DIM(wq_nodes), NULL); > + if (n == 0) > + return; > + > + for (i = 0; i < n; i++) { > + wq_node = wq_nodes[i]; > + node = RTE_PTR_ADD(graph, wq_node->node_off); > + RTE_ASSERT(node->fence == RTE_GRAPH_FENCE); > + idx = node->idx; > + free_space = node->size - idx; > + > + if (unlikely(free_space < wq_node->nb_objs)) > + __rte_node_stream_alloc_size(graph, node, node- > >size + wq_node->nb_objs); > + > + memmove(&node->objs[idx], wq_node->objs, wq_node- > >nb_objs * sizeof(void *)); > + memset(wq_node->objs, 0, wq_node->nb_objs * > sizeof(void *)); Memset should be avoided in fastpath for better performance as we anyway set wq_node->nb_objs as 0. > + node->idx = idx + wq_node->nb_objs; > + > + __rte_node_process(graph, node); > + > + wq_node->nb_objs = 0; > + node->idx = 0; > + } > + > + rte_mempool_put_bulk(mp, (void **)wq_nodes, n); > +} > + > int > rte_graph_model_dispatch_lcore_affinity_set(const char *name, unsigned > int lcore_id) > { > diff --git a/lib/graph/rte_graph_model_dispatch.h > b/lib/graph/rte_graph_model_dispatch.h > index 179624e972..18fa7ce0ab 100644 > --- a/lib/graph/rte_graph_model_dispatch.h > +++ b/lib/graph/rte_graph_model_dispatch.h > @@ -14,12 +14,49 @@ > * > * This API allows to set core affinity with the node. > */ > +#include <rte_errno.h> > +#include <rte_mempool.h> > +#include <rte_memzone.h> > +#include <rte_ring.h> > + > #include "rte_graph_worker_common.h" > > #ifdef __cplusplus > extern "C" { > #endif > > +#define GRAPH_SCHED_WQ_SIZE_MULTIPLIER 8 > +#define GRAPH_SCHED_WQ_SIZE(nb_nodes) \ > + ((typeof(nb_nodes))((nb_nodes) * > GRAPH_SCHED_WQ_SIZE_MULTIPLIER)) > + > +/** > + * @internal > + * > + * Schedule the node to the right graph's work queue. > + * > + * @param node > + * Pointer to the scheduled node object. > + * @param rq > + * Pointer to the scheduled run-queue for all graphs. > + * > + * @return > + * True on success, false otherwise. > + */ > +__rte_experimental > +bool __rte_noinline __rte_graph_sched_node_enqueue(struct rte_node > *node, > + struct rte_graph_rq_head *rq); > + > +/** > + * @internal > + * > + * Process all nodes (streams) in the graph's work queue. > + * > + * @param graph > + * Pointer to the graph object. > + */ > +__rte_experimental > +void __rte_graph_sched_wq_process(struct rte_graph *graph); > + > /** > * Set lcore affinity with the node. > * > diff --git a/lib/graph/version.map b/lib/graph/version.map > index aaa86f66ed..d511133f39 100644 > --- a/lib/graph/version.map > +++ b/lib/graph/version.map > @@ -48,6 +48,8 @@ EXPERIMENTAL { > > rte_graph_worker_model_set; > rte_graph_worker_model_get; > + __rte_graph_sched_wq_process; > + __rte_graph_sched_node_enqueue; > > rte_graph_model_dispatch_lcore_affinity_set; > > -- > 2.37.2