This patch introduces key functions to allow a worker thread to enable enqueue and move streams of objects to the next nodes over different cores.
Signed-off-by: Haiyue Wang <haiyue.w...@intel.com> Signed-off-by: Cunming Liang <cunming.li...@intel.com> Signed-off-by: Zhirun Yan <zhirun....@intel.com> --- lib/graph/graph_private.h | 27 +++++ lib/graph/meson.build | 2 +- lib/graph/rte_graph_model_dispatch.c | 145 +++++++++++++++++++++++++++ lib/graph/rte_graph_model_dispatch.h | 37 +++++++ lib/graph/version.map | 2 + 5 files changed, 212 insertions(+), 1 deletion(-) diff --git a/lib/graph/graph_private.h b/lib/graph/graph_private.h index b66b18ebbc..e1a2a4bfd8 100644 --- a/lib/graph/graph_private.h +++ b/lib/graph/graph_private.h @@ -366,4 +366,31 @@ void graph_dump(FILE *f, struct graph *g); */ void node_dump(FILE *f, struct node *n); +/** + * @internal + * + * Create the graph schedule work queue. And all cloned graphs attached to the + * parent graph MUST be destroyed together for fast schedule design limitation. + * + * @param _graph + * The graph object + * @param _parent_graph + * The parent graph object which holds the run-queue head. + * + * @return + * - 0: Success. + * - <0: Graph schedule work queue related error. + */ +int graph_sched_wq_create(struct graph *_graph, struct graph *_parent_graph); + +/** + * @internal + * + * Destroy the graph schedule work queue. + * + * @param _graph + * The graph object + */ +void graph_sched_wq_destroy(struct graph *_graph); + #endif /* _RTE_GRAPH_PRIVATE_H_ */ diff --git a/lib/graph/meson.build b/lib/graph/meson.build index c729d984b6..e21affa280 100644 --- a/lib/graph/meson.build +++ b/lib/graph/meson.build @@ -20,4 +20,4 @@ sources = files( ) headers = files('rte_graph.h', 'rte_graph_worker.h') -deps += ['eal', 'pcapng'] +deps += ['eal', 'pcapng', 'mempool', 'ring'] diff --git a/lib/graph/rte_graph_model_dispatch.c b/lib/graph/rte_graph_model_dispatch.c index 4a2f99496d..a300fefb85 100644 --- a/lib/graph/rte_graph_model_dispatch.c +++ b/lib/graph/rte_graph_model_dispatch.c @@ -5,6 +5,151 @@ #include "graph_private.h" #include "rte_graph_model_dispatch.h" +int +graph_sched_wq_create(struct graph *_graph, struct graph *_parent_graph) +{ + struct rte_graph *parent_graph = _parent_graph->graph; + struct rte_graph *graph = _graph->graph; + unsigned int wq_size; + + wq_size = GRAPH_SCHED_WQ_SIZE(graph->nb_nodes); + wq_size = rte_align32pow2(wq_size + 1); + + graph->wq = rte_ring_create(graph->name, wq_size, graph->socket, + RING_F_SC_DEQ); + if (graph->wq == NULL) + SET_ERR_JMP(EIO, fail, "Failed to allocate graph WQ"); + + graph->mp = rte_mempool_create(graph->name, wq_size, + sizeof(struct graph_sched_wq_node), + 0, 0, NULL, NULL, NULL, NULL, + graph->socket, MEMPOOL_F_SP_PUT); + if (graph->mp == NULL) + SET_ERR_JMP(EIO, fail_mp, + "Failed to allocate graph WQ schedule entry"); + + graph->lcore_id = _graph->lcore_id; + + if (parent_graph->rq == NULL) { + parent_graph->rq = &parent_graph->rq_head; + SLIST_INIT(parent_graph->rq); + } + + graph->rq = parent_graph->rq; + SLIST_INSERT_HEAD(graph->rq, graph, rq_next); + + return 0; + +fail_mp: + rte_ring_free(graph->wq); + graph->wq = NULL; +fail: + return -rte_errno; +} + +void +graph_sched_wq_destroy(struct graph *_graph) +{ + struct rte_graph *graph = _graph->graph; + + if (graph == NULL) + return; + + rte_ring_free(graph->wq); + graph->wq = NULL; + + rte_mempool_free(graph->mp); + graph->mp = NULL; +} + +static __rte_always_inline bool +__graph_sched_node_enqueue(struct rte_node *node, struct rte_graph *graph) +{ + struct graph_sched_wq_node *wq_node; + uint16_t off = 0; + uint16_t size; + +submit_again: + if (rte_mempool_get(graph->mp, (void **)&wq_node) < 0) + goto fallback; + + size = RTE_MIN(node->idx, RTE_DIM(wq_node->objs)); + wq_node->node_off = node->off; + wq_node->nb_objs = size; + rte_memcpy(wq_node->objs, &node->objs[off], size * sizeof(void *)); + + while (rte_ring_mp_enqueue_bulk_elem(graph->wq, (void *)&wq_node, + sizeof(wq_node), 1, NULL) == 0) + rte_pause(); + + off += size; + node->idx -= size; + if (node->idx > 0) + goto submit_again; + + return true; + +fallback: + if (off != 0) + memmove(&node->objs[0], &node->objs[off], + node->idx * sizeof(void *)); + + return false; +} + +bool __rte_noinline +__rte_graph_sched_node_enqueue(struct rte_node *node, + struct rte_graph_rq_head *rq) +{ + const unsigned int lcore_id = node->lcore_id; + struct rte_graph *graph; + + SLIST_FOREACH(graph, rq, rq_next) + if (graph->lcore_id == lcore_id) + break; + + return graph != NULL ? __graph_sched_node_enqueue(node, graph) : false; +} + +void +__rte_graph_sched_wq_process(struct rte_graph *graph) +{ + struct graph_sched_wq_node *wq_node; + struct rte_mempool *mp = graph->mp; + struct rte_ring *wq = graph->wq; + uint16_t idx, free_space; + struct rte_node *node; + unsigned int i, n; + struct graph_sched_wq_node *wq_nodes[32]; + + n = rte_ring_sc_dequeue_burst_elem(wq, wq_nodes, sizeof(wq_nodes[0]), + RTE_DIM(wq_nodes), NULL); + if (n == 0) + return; + + for (i = 0; i < n; i++) { + wq_node = wq_nodes[i]; + node = RTE_PTR_ADD(graph, wq_node->node_off); + RTE_ASSERT(node->fence == RTE_GRAPH_FENCE); + idx = node->idx; + free_space = node->size - idx; + + if (unlikely(free_space < wq_node->nb_objs)) + __rte_node_stream_alloc_size(graph, node, node->size + wq_node->nb_objs); + + memmove(&node->objs[idx], wq_node->objs, wq_node->nb_objs * sizeof(void *)); + memset(wq_node->objs, 0, wq_node->nb_objs * sizeof(void *)); + node->idx = idx + wq_node->nb_objs; + + __rte_node_process(graph, node); + + wq_node->nb_objs = 0; + node->idx = 0; + } + + rte_mempool_put_bulk(mp, (void **)wq_nodes, n); +} + int rte_graph_model_dispatch_lcore_affinity_set(const char *name, unsigned int lcore_id) { diff --git a/lib/graph/rte_graph_model_dispatch.h b/lib/graph/rte_graph_model_dispatch.h index 179624e972..18fa7ce0ab 100644 --- a/lib/graph/rte_graph_model_dispatch.h +++ b/lib/graph/rte_graph_model_dispatch.h @@ -14,12 +14,49 @@ * * This API allows to set core affinity with the node. */ +#include <rte_errno.h> +#include <rte_mempool.h> +#include <rte_memzone.h> +#include <rte_ring.h> + #include "rte_graph_worker_common.h" #ifdef __cplusplus extern "C" { #endif +#define GRAPH_SCHED_WQ_SIZE_MULTIPLIER 8 +#define GRAPH_SCHED_WQ_SIZE(nb_nodes) \ + ((typeof(nb_nodes))((nb_nodes) * GRAPH_SCHED_WQ_SIZE_MULTIPLIER)) + +/** + * @internal + * + * Schedule the node to the right graph's work queue. + * + * @param node + * Pointer to the scheduled node object. + * @param rq + * Pointer to the scheduled run-queue for all graphs. + * + * @return + * True on success, false otherwise. + */ +__rte_experimental +bool __rte_noinline __rte_graph_sched_node_enqueue(struct rte_node *node, + struct rte_graph_rq_head *rq); + +/** + * @internal + * + * Process all nodes (streams) in the graph's work queue. + * + * @param graph + * Pointer to the graph object. + */ +__rte_experimental +void __rte_graph_sched_wq_process(struct rte_graph *graph); + /** * Set lcore affinity with the node. * diff --git a/lib/graph/version.map b/lib/graph/version.map index aaa86f66ed..d511133f39 100644 --- a/lib/graph/version.map +++ b/lib/graph/version.map @@ -48,6 +48,8 @@ EXPERIMENTAL { rte_graph_worker_model_set; rte_graph_worker_model_get; + __rte_graph_sched_wq_process; + __rte_graph_sched_node_enqueue; rte_graph_model_dispatch_lcore_affinity_set; -- 2.37.2