On Mon, Oct 16, 2023 at 01:57:12PM -0700, Sivaprasad Tummala wrote: > Add optional support for inline event processing within pmd dequeue > call. For a dequeue callback, events dequeued from the event port > were passed them to a callback function if configured, to allow > additional processing. e.g. unpack batch of packets from each event > on dequeue, before passing back to the application. > > Signed-off-by: Sivaprasad Tummala <sivaprasad.tumm...@amd.com> > --- > lib/eventdev/eventdev_pmd.h | 38 +++++++++++ > lib/eventdev/eventdev_private.c | 2 + > lib/eventdev/rte_eventdev.c | 107 +++++++++++++++++++++++++++++++ > lib/eventdev/rte_eventdev.h | 95 +++++++++++++++++++++++++++ > lib/eventdev/rte_eventdev_core.h | 12 +++- > lib/eventdev/version.map | 3 + > 6 files changed, 256 insertions(+), 1 deletion(-) > > diff --git a/lib/eventdev/eventdev_pmd.h b/lib/eventdev/eventdev_pmd.h > index a0ee768ce7..ce067b1d5d 100644 > --- a/lib/eventdev/eventdev_pmd.h > +++ b/lib/eventdev/eventdev_pmd.h > @@ -97,6 +97,19 @@ struct rte_eventdev_global { > uint8_t nb_devs; /**< Number of devices found */ > }; > > +/** > + * @internal > + * Structure used to hold information about the callbacks to be called for a > + * port on dequeue. > + */ > +struct rte_event_dequeue_callback { > + struct rte_event_dequeue_callback *next; > + union{ > + rte_dequeue_callback_fn dequeue; > + } fn; > + void *param; > +}; > + > /** > * @internal > * The data part, with no function pointers, associated with each device. > @@ -171,6 +184,10 @@ struct rte_eventdev { > /**< Pointer to PMD dequeue burst function. */ > event_maintain_t maintain; > /**< Pointer to PMD port maintenance function. */ > + struct rte_event_dequeue_callback > *post_dequeue_burst_cbs[RTE_EVENT_MAX_PORTS_PER_DEV]; > + /**< User-supplied functions called from dequeue_burst to post-process > + * received packets before passing them to the user > + */ > event_tx_adapter_enqueue_t txa_enqueue_same_dest; > /**< Pointer to PMD eth Tx adapter burst enqueue function with > * events destined to same Eth port & Tx queue. > @@ -245,6 +262,27 @@ rte_event_pmd_is_valid_dev(uint8_t dev_id) > return 1; > } > > +/** > + * Executes all the user application registered callbacks for the specific > + * event device. > + * > + * @param dev_id > + * Event device index. > + * @param port_id > + * Event port index > + * @param ev > + * Points to an array of *nb_events* objects of type *rte_event* structure > + * for output to be populated with the dequeued event objects. > + * @param nb_events > + * number of event objects > + * > + * @return > + * The number of event objects > + */ > +__rte_internal > +uint16_t rte_eventdev_pmd_dequeue_callback_process(uint8_t dev_id, > + uint8_t port_id, struct rte_event ev[], uint16_t nb_events); > + > /** > * Definitions of all functions exported by a driver through the > * generic structure of type *event_dev_ops* supplied in the > diff --git a/lib/eventdev/eventdev_private.c b/lib/eventdev/eventdev_private.c > index 017f97ccab..052c526ce0 100644 > --- a/lib/eventdev/eventdev_private.c > +++ b/lib/eventdev/eventdev_private.c > @@ -137,4 +137,6 @@ event_dev_fp_ops_set(struct rte_event_fp_ops *fp_op, > fp_op->dma_enqueue = dev->dma_enqueue; > fp_op->profile_switch = dev->profile_switch; > fp_op->data = dev->data->ports; > + fp_op->ev_port.clbk = (void **)(uintptr_t)dev->post_dequeue_burst_cbs; > + fp_op->ev_port.data = dev->data->ports; > } > diff --git a/lib/eventdev/rte_eventdev.c b/lib/eventdev/rte_eventdev.c > index 5feb4326a2..f2540a6aa8 100644 > --- a/lib/eventdev/rte_eventdev.c > +++ b/lib/eventdev/rte_eventdev.c > @@ -18,6 +18,7 @@ > #include <rte_common.h> > #include <rte_malloc.h> > #include <rte_errno.h> > +#include <rte_stdatomic.h> > #include <ethdev_driver.h> > #include <rte_cryptodev.h> > #include <rte_dmadev.h> > @@ -39,6 +40,9 @@ static struct rte_eventdev_global eventdev_globals = { > /* Public fastpath APIs. */ > struct rte_event_fp_ops rte_event_fp_ops[RTE_EVENT_MAX_DEVS]; > > +/* spinlock for add/remove dequeue callbacks */ > +static rte_spinlock_t event_dev_dequeue_cb_lock = RTE_SPINLOCK_INITIALIZER; > + > /* Event dev north bound API implementation */ > > uint8_t > @@ -884,6 +888,109 @@ rte_event_port_attr_get(uint8_t dev_id, uint8_t > port_id, uint32_t attr_id, > return 0; > } > > +const struct rte_event_dequeue_callback * > +rte_event_add_dequeue_callback(uint8_t dev_id, uint8_t port_id, > + rte_dequeue_callback_fn fn, void *user_param) > +{ > + struct rte_eventdev *dev; > + struct rte_event_dequeue_callback *cb; > + struct rte_event_dequeue_callback *tail; > + > + /* check input parameters */ > + RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, NULL); > + dev = &rte_eventdevs[dev_id]; > + if (!is_valid_port(dev, port_id)) { > + RTE_EDEV_LOG_ERR("Invalid port_id=%" PRIu8, port_id); > + return NULL; > + } > + > + cb = rte_zmalloc(NULL, sizeof(*cb), 0); > + if (cb == NULL) { > + rte_errno = ENOMEM; > + return NULL; > + } > + cb->fn.dequeue = fn; > + cb->param = user_param; > + > + rte_spinlock_lock(&event_dev_dequeue_cb_lock); > + /* Add the callbacks in fifo order. */ > + tail = rte_eventdevs[dev_id].post_dequeue_burst_cbs[port_id]; > + if (!tail) { > + /* Stores to cb->fn and cb->param should complete before > + * cb is visible to data plane. > + */ > + rte_atomic_store_explicit( > + &rte_eventdevs[dev_id].post_dequeue_burst_cbs[port_id], > + cb, __ATOMIC_RELEASE); ^^^^^^^^^^^^^^^^ rte_memory_order_release (rte_stdatomic.h)
> + } else { > + while (tail->next) > + tail = tail->next; > + /* Stores to cb->fn and cb->param should complete before > + * cb is visible to data plane. > + */ > + rte_atomic_store_explicit(&tail->next, cb, __ATOMIC_RELEASE); ^^^^ same here > + } > + rte_spinlock_unlock(&event_dev_dequeue_cb_lock); > + > + return cb; > +} > + > +int > +rte_event_remove_dequeue_callback(uint8_t dev_id, uint8_t port_id, > + const struct rte_event_dequeue_callback *user_cb) > +{ > + struct rte_eventdev *dev; > + struct rte_event_dequeue_callback *cb; > + struct rte_event_dequeue_callback **prev_cb; > + > + /* Check input parameters. */ > + RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL); > + dev = &rte_eventdevs[dev_id]; > + if (user_cb == NULL || !is_valid_port(dev, port_id)) > + return -EINVAL; > + > + rte_spinlock_lock(&event_dev_dequeue_cb_lock); > + prev_cb = &dev->post_dequeue_burst_cbs[port_id]; > + for (; *prev_cb != NULL; prev_cb = &cb->next) { > + cb = *prev_cb; > + if (cb == user_cb) { > + /* Remove the user cb from the callback list. */ > + rte_atomic_store_explicit(prev_cb, cb->next, > + __ATOMIC_RELAXED); ^^^ and here > + break; > + } > + } > + rte_spinlock_unlock(&event_dev_dequeue_cb_lock); > + > + return 0; > +} > + > +uint16_t rte_eventdev_pmd_dequeue_callback_process(uint8_t dev_id, > + uint8_t port_id, struct rte_event ev[], uint16_t nb_events) > +{ > + struct rte_event_dequeue_callback *cb; > + const struct rte_event_fp_ops *fp_ops; > + > + fp_ops = &rte_event_fp_ops[dev_id]; > + > + /* __ATOMIC_RELEASE memory order was used when the > + * call back was inserted into the list. > + * Since there is a clear dependency between loading > + * cb and cb->fn/cb->next, __ATOMIC_ACQUIRE memory order is > + * not required. > + */ > + cb = rte_atomic_load_explicit((void **)&fp_ops->ev_port.clbk[port_id], ^^^^^^^ needs to be __rte_atomic qualified > + __ATOMIC_RELAXED); ^^^^ rte_memory_order