This patch enhance the task scheduler mechanism to enable dispatching
tasks to another worker cores. Currently, there is only a local work
queue for one graph to walk. We introduce a scheduler worker queue in
each worker core for dispatching tasks. It will perform the walk on
scheduler work queue first, then handle the local work queue.

Signed-off-by: Haiyue Wang <haiyue.w...@intel.com>
Signed-off-by: Cunming Liang <cunming.li...@intel.com>
Signed-off-by: Zhirun Yan <zhirun....@intel.com>
---
 lib/graph/graph.c            |  6 ++++++
 lib/graph/rte_graph_worker.h | 11 +++++++++++
 2 files changed, 17 insertions(+)

diff --git a/lib/graph/graph.c b/lib/graph/graph.c
index b4eb18175a..49ea2b3fbb 100644
--- a/lib/graph/graph.c
+++ b/lib/graph/graph.c
@@ -368,6 +368,8 @@ rte_graph_destroy(rte_graph_t id)
        while (graph != NULL) {
                tmp = STAILQ_NEXT(graph, next);
                if (graph->id == id) {
+                       /* Destroy the schedule work queue if has */
+                       graph_sched_wq_destroy(graph);
                        /* Call fini() of the all the nodes in the graph */
                        graph_node_fini(graph);
                        /* Destroy graph fast path memory */
@@ -470,6 +472,10 @@ graph_clone(struct graph *parent_graph, const char *name,
        if (graph_node_init(graph))
                goto graph_mem_destroy;
 
+       /* Create the graph schedule work queue */
+       if (graph_sched_wq_create(graph, parent_graph))
+               goto graph_mem_destroy;
+
        /* All good, Lets add the graph to the list */
        graph_id++;
        STAILQ_INSERT_TAIL(&graph_list, graph, next);
diff --git a/lib/graph/rte_graph_worker.h b/lib/graph/rte_graph_worker.h
index faf3f31ddc..e98697d880 100644
--- a/lib/graph/rte_graph_worker.h
+++ b/lib/graph/rte_graph_worker.h
@@ -177,6 +177,7 @@ static inline void
 rte_graph_walk(struct rte_graph *graph)
 {
        const rte_graph_off_t *cir_start = graph->cir_start;
+       const unsigned int lcore_id = graph->lcore_id;
        const rte_node_t mask = graph->cir_mask;
        uint32_t head = graph->head;
        struct rte_node *node;
@@ -184,6 +185,9 @@ rte_graph_walk(struct rte_graph *graph)
        uint16_t rc;
        void **objs;
 
+       if (graph->wq != NULL)
+               __rte_graph_sched_wq_process(graph);
+
        /*
         * Walk on the source node(s) ((cir_start - head) -> cir_start) and then
         * on the pending streams (cir_start -> (cir_start + mask) -> cir_start)
@@ -205,6 +209,12 @@ rte_graph_walk(struct rte_graph *graph)
                objs = node->objs;
                rte_prefetch0(objs);
 
+               /* Schedule the node until all task/objs are done */
+               if (node->lcore_id != RTE_MAX_LCORE && (int32_t)head > 0 &&
+                   lcore_id != node->lcore_id && graph->rq != NULL &&
+                   __rte_graph_sched_node_enqueue(node, graph->rq))
+                       goto next;
+
                if (rte_graph_has_stats_feature()) {
                        start = rte_rdtsc();
                        rc = node->process(graph, node, objs, node->idx);
@@ -215,6 +225,7 @@ rte_graph_walk(struct rte_graph *graph)
                        node->process(graph, node, objs, node->idx);
                }
                node->idx = 0;
+       next:
                head = likely((int32_t)head > 0) ? head & mask : head;
        }
        graph->tail = 0;
-- 
2.25.1

Reply via email to