> This patch introduces key functions to allow a worker thread to
> enable enqueue and move streams of objects to the next nodes over
> different cores.
> 
> Signed-off-by: Haiyue Wang <haiyue.w...@intel.com>
> Signed-off-by: Cunming Liang <cunming.li...@intel.com>
> Signed-off-by: Zhirun Yan <zhirun....@intel.com>
> ---
>  lib/graph/graph_private.h            |  27 +++++
>  lib/graph/meson.build                |   2 +-
>  lib/graph/rte_graph_model_dispatch.c | 145
> +++++++++++++++++++++++++++
>  lib/graph/rte_graph_model_dispatch.h |  37 +++++++
>  lib/graph/version.map                |   2 +
>  5 files changed, 212 insertions(+), 1 deletion(-)
> 
> diff --git a/lib/graph/graph_private.h b/lib/graph/graph_private.h
> index b66b18ebbc..e1a2a4bfd8 100644
> --- a/lib/graph/graph_private.h
> +++ b/lib/graph/graph_private.h
> @@ -366,4 +366,31 @@ void graph_dump(FILE *f, struct graph *g);
>   */
>  void node_dump(FILE *f, struct node *n);
> 
> +/**
> + * @internal
> + *
> + * Create the graph schedule work queue. And all cloned graphs attached to
> the
> + * parent graph MUST be destroyed together for fast schedule design
> limitation.
> + *
> + * @param _graph
> + *   The graph object
> + * @param _parent_graph
> + *   The parent graph object which holds the run-queue head.
> + *
> + * @return
> + *   - 0: Success.
> + *   - <0: Graph schedule work queue related error.
> + */
> +int graph_sched_wq_create(struct graph *_graph, struct graph
> *_parent_graph);
> +
> +/**
> + * @internal
> + *
> + * Destroy the graph schedule work queue.
> + *
> + * @param _graph
> + *   The graph object
> + */
> +void graph_sched_wq_destroy(struct graph *_graph);
> +
>  #endif /* _RTE_GRAPH_PRIVATE_H_ */
> diff --git a/lib/graph/meson.build b/lib/graph/meson.build
> index c729d984b6..e21affa280 100644
> --- a/lib/graph/meson.build
> +++ b/lib/graph/meson.build
> @@ -20,4 +20,4 @@ sources = files(
>  )
>  headers = files('rte_graph.h', 'rte_graph_worker.h')
> 
> -deps += ['eal', 'pcapng']
> +deps += ['eal', 'pcapng', 'mempool', 'ring']
> diff --git a/lib/graph/rte_graph_model_dispatch.c
> b/lib/graph/rte_graph_model_dispatch.c
> index 4a2f99496d..a300fefb85 100644
> --- a/lib/graph/rte_graph_model_dispatch.c
> +++ b/lib/graph/rte_graph_model_dispatch.c
> @@ -5,6 +5,151 @@
>  #include "graph_private.h"
>  #include "rte_graph_model_dispatch.h"
> 
> +int
> +graph_sched_wq_create(struct graph *_graph, struct graph
> *_parent_graph)
> +{
> +     struct rte_graph *parent_graph = _parent_graph->graph;
> +     struct rte_graph *graph = _graph->graph;
> +     unsigned int wq_size;
> +
> +     wq_size = GRAPH_SCHED_WQ_SIZE(graph->nb_nodes);
> +     wq_size = rte_align32pow2(wq_size + 1);

Hi Zhirun,

We should introduce a new function `rte_graph_configure` which can help 
application to control the ring size and mempool size of the work queue?
We could fallback to default values if nothing is configured.

rte_graph_configure should take a 
struct rte_graph_config {
        struct {
                u64 rsvd[8];
        } rtc;
        struct {
                u16 wq_size;
                ...
        } dispatch;
};

This will help future graph models to have their own configuration.

We can have a rte_graph_config_init() function to initialize the 
rte_graph_config structure.


> +
> +     graph->wq = rte_ring_create(graph->name, wq_size, graph->socket,
> +                                 RING_F_SC_DEQ);
> +     if (graph->wq == NULL)
> +             SET_ERR_JMP(EIO, fail, "Failed to allocate graph WQ");
> +
> +     graph->mp = rte_mempool_create(graph->name, wq_size,
> +                                    sizeof(struct graph_sched_wq_node),
> +                                    0, 0, NULL, NULL, NULL, NULL,
> +                                    graph->socket, MEMPOOL_F_SP_PUT);
> +     if (graph->mp == NULL)
> +             SET_ERR_JMP(EIO, fail_mp,
> +                         "Failed to allocate graph WQ schedule entry");
> +
> +     graph->lcore_id = _graph->lcore_id;
> +
> +     if (parent_graph->rq == NULL) {
> +             parent_graph->rq = &parent_graph->rq_head;
> +             SLIST_INIT(parent_graph->rq);
> +     }
> +
> +     graph->rq = parent_graph->rq;
> +     SLIST_INSERT_HEAD(graph->rq, graph, rq_next);
> +
> +     return 0;
> +
> +fail_mp:
> +     rte_ring_free(graph->wq);
> +     graph->wq = NULL;
> +fail:
> +     return -rte_errno;
> +}
> +
> +void
> +graph_sched_wq_destroy(struct graph *_graph)
> +{
> +     struct rte_graph *graph = _graph->graph;
> +
> +     if (graph == NULL)
> +             return;
> +
> +     rte_ring_free(graph->wq);
> +     graph->wq = NULL;
> +
> +     rte_mempool_free(graph->mp);
> +     graph->mp = NULL;
> +}
> +
> +static __rte_always_inline bool
> +__graph_sched_node_enqueue(struct rte_node *node, struct rte_graph
> *graph)
> +{
> +     struct graph_sched_wq_node *wq_node;
> +     uint16_t off = 0;
> +     uint16_t size;
> +
> +submit_again:
> +     if (rte_mempool_get(graph->mp, (void **)&wq_node) < 0)
> +             goto fallback;
> +
> +     size = RTE_MIN(node->idx, RTE_DIM(wq_node->objs));
> +     wq_node->node_off = node->off;
> +     wq_node->nb_objs = size;
> +     rte_memcpy(wq_node->objs, &node->objs[off], size * sizeof(void
> *));
> +
> +     while (rte_ring_mp_enqueue_bulk_elem(graph->wq, (void
> *)&wq_node,
> +                                       sizeof(wq_node), 1, NULL) == 0)
> +             rte_pause();
> +
> +     off += size;
> +     node->idx -= size;
> +     if (node->idx > 0)
> +             goto submit_again;
> +
> +     return true;
> +
> +fallback:
> +     if (off != 0)
> +             memmove(&node->objs[0], &node->objs[off],
> +                     node->idx * sizeof(void *));
> +
> +     return false;
> +}
> +
> +bool __rte_noinline
> +__rte_graph_sched_node_enqueue(struct rte_node *node,
> +                            struct rte_graph_rq_head *rq)
> +{
> +     const unsigned int lcore_id = node->lcore_id;
> +     struct rte_graph *graph;
> +
> +     SLIST_FOREACH(graph, rq, rq_next)
> +             if (graph->lcore_id == lcore_id)
> +                     break;
> +
> +     return graph != NULL ? __graph_sched_node_enqueue(node,
> graph) : false;
> +}
> +
> +void
> +__rte_graph_sched_wq_process(struct rte_graph *graph)
> +{
> +     struct graph_sched_wq_node *wq_node;
> +     struct rte_mempool *mp = graph->mp;
> +     struct rte_ring *wq = graph->wq;
> +     uint16_t idx, free_space;
> +     struct rte_node *node;
> +     unsigned int i, n;
> +     struct graph_sched_wq_node *wq_nodes[32];
> +
> +     n = rte_ring_sc_dequeue_burst_elem(wq, wq_nodes,
> sizeof(wq_nodes[0]),
> +                                        RTE_DIM(wq_nodes), NULL);
> +     if (n == 0)
> +             return;
> +
> +     for (i = 0; i < n; i++) {
> +             wq_node = wq_nodes[i];
> +             node = RTE_PTR_ADD(graph, wq_node->node_off);
> +             RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
> +             idx = node->idx;
> +             free_space = node->size - idx;
> +
> +             if (unlikely(free_space < wq_node->nb_objs))
> +                     __rte_node_stream_alloc_size(graph, node, node-
> >size + wq_node->nb_objs);
> +
> +             memmove(&node->objs[idx], wq_node->objs, wq_node-
> >nb_objs * sizeof(void *));
> +             memset(wq_node->objs, 0, wq_node->nb_objs *
> sizeof(void *));

Memset should be avoided in fastpath for better performance as we anyway set  
wq_node->nb_objs as 0.

> +             node->idx = idx + wq_node->nb_objs;
> +
> +             __rte_node_process(graph, node);
> +
> +             wq_node->nb_objs = 0;
> +             node->idx = 0;
> +     }
> +
> +     rte_mempool_put_bulk(mp, (void **)wq_nodes, n);
> +}
> +
>  int
>  rte_graph_model_dispatch_lcore_affinity_set(const char *name, unsigned
> int lcore_id)
>  {
> diff --git a/lib/graph/rte_graph_model_dispatch.h
> b/lib/graph/rte_graph_model_dispatch.h
> index 179624e972..18fa7ce0ab 100644
> --- a/lib/graph/rte_graph_model_dispatch.h
> +++ b/lib/graph/rte_graph_model_dispatch.h
> @@ -14,12 +14,49 @@
>   *
>   * This API allows to set core affinity with the node.
>   */
> +#include <rte_errno.h>
> +#include <rte_mempool.h>
> +#include <rte_memzone.h>
> +#include <rte_ring.h>
> +
>  #include "rte_graph_worker_common.h"
> 
>  #ifdef __cplusplus
>  extern "C" {
>  #endif
> 
> +#define GRAPH_SCHED_WQ_SIZE_MULTIPLIER  8
> +#define GRAPH_SCHED_WQ_SIZE(nb_nodes)   \
> +     ((typeof(nb_nodes))((nb_nodes) *
> GRAPH_SCHED_WQ_SIZE_MULTIPLIER))
> +
> +/**
> + * @internal
> + *
> + * Schedule the node to the right graph's work queue.
> + *
> + * @param node
> + *   Pointer to the scheduled node object.
> + * @param rq
> + *   Pointer to the scheduled run-queue for all graphs.
> + *
> + * @return
> + *   True on success, false otherwise.
> + */
> +__rte_experimental
> +bool __rte_noinline __rte_graph_sched_node_enqueue(struct rte_node
> *node,
> +                                 struct rte_graph_rq_head *rq);
> +
> +/**
> + * @internal
> + *
> + * Process all nodes (streams) in the graph's work queue.
> + *
> + * @param graph
> + *   Pointer to the graph object.
> + */
> +__rte_experimental
> +void __rte_graph_sched_wq_process(struct rte_graph *graph);
> +
>  /**
>   * Set lcore affinity with the node.
>   *
> diff --git a/lib/graph/version.map b/lib/graph/version.map
> index aaa86f66ed..d511133f39 100644
> --- a/lib/graph/version.map
> +++ b/lib/graph/version.map
> @@ -48,6 +48,8 @@ EXPERIMENTAL {
> 
>       rte_graph_worker_model_set;
>       rte_graph_worker_model_get;
> +     __rte_graph_sched_wq_process;
> +     __rte_graph_sched_node_enqueue;
> 
>       rte_graph_model_dispatch_lcore_affinity_set;
> 
> --
> 2.37.2

Reply via email to