On Mon, Oct 16, 2023 at 01:57:12PM -0700, Sivaprasad Tummala wrote:
> Add optional support for inline event processing within pmd dequeue
> call. For a dequeue callback, events dequeued from the event port
> were passed them to a callback function if configured, to allow
> additional processing. e.g. unpack batch of packets from each event
> on dequeue, before passing back to the application.
> 
> Signed-off-by: Sivaprasad Tummala <sivaprasad.tumm...@amd.com>
> ---
>  lib/eventdev/eventdev_pmd.h      |  38 +++++++++++
>  lib/eventdev/eventdev_private.c  |   2 +
>  lib/eventdev/rte_eventdev.c      | 107 +++++++++++++++++++++++++++++++
>  lib/eventdev/rte_eventdev.h      |  95 +++++++++++++++++++++++++++
>  lib/eventdev/rte_eventdev_core.h |  12 +++-
>  lib/eventdev/version.map         |   3 +
>  6 files changed, 256 insertions(+), 1 deletion(-)
> 
> diff --git a/lib/eventdev/eventdev_pmd.h b/lib/eventdev/eventdev_pmd.h
> index a0ee768ce7..ce067b1d5d 100644
> --- a/lib/eventdev/eventdev_pmd.h
> +++ b/lib/eventdev/eventdev_pmd.h
> @@ -97,6 +97,19 @@ struct rte_eventdev_global {
>       uint8_t nb_devs;        /**< Number of devices found */
>  };
>  
> +/**
> + * @internal
> + * Structure used to hold information about the callbacks to be called for a
> + * port on dequeue.
> + */
> +struct rte_event_dequeue_callback {
> +     struct rte_event_dequeue_callback *next;
> +     union{
> +             rte_dequeue_callback_fn dequeue;
> +     } fn;
> +     void *param;
> +};
> +
>  /**
>   * @internal
>   * The data part, with no function pointers, associated with each device.
> @@ -171,6 +184,10 @@ struct rte_eventdev {
>       /**< Pointer to PMD dequeue burst function. */
>       event_maintain_t maintain;
>       /**< Pointer to PMD port maintenance function. */
> +     struct rte_event_dequeue_callback 
> *post_dequeue_burst_cbs[RTE_EVENT_MAX_PORTS_PER_DEV];
> +     /**<  User-supplied functions called from dequeue_burst to post-process
> +      * received packets before passing them to the user
> +      */
>       event_tx_adapter_enqueue_t txa_enqueue_same_dest;
>       /**< Pointer to PMD eth Tx adapter burst enqueue function with
>        * events destined to same Eth port & Tx queue.
> @@ -245,6 +262,27 @@ rte_event_pmd_is_valid_dev(uint8_t dev_id)
>               return 1;
>  }
>  
> +/**
> + * Executes all the user application registered callbacks for the specific
> + * event device.
> + *
> + * @param dev_id
> + *   Event device index.
> + * @param port_id
> + *   Event port index
> + * @param ev
> + *   Points to an array of *nb_events* objects of type *rte_event* structure
> + *   for output to be populated with the dequeued event objects.
> + * @param nb_events
> + *   number of event objects
> + *
> + * @return
> + * The number of event objects
> + */
> +__rte_internal
> +uint16_t rte_eventdev_pmd_dequeue_callback_process(uint8_t dev_id,
> +             uint8_t port_id, struct rte_event ev[], uint16_t nb_events);
> +
>  /**
>   * Definitions of all functions exported by a driver through the
>   * generic structure of type *event_dev_ops* supplied in the
> diff --git a/lib/eventdev/eventdev_private.c b/lib/eventdev/eventdev_private.c
> index 017f97ccab..052c526ce0 100644
> --- a/lib/eventdev/eventdev_private.c
> +++ b/lib/eventdev/eventdev_private.c
> @@ -137,4 +137,6 @@ event_dev_fp_ops_set(struct rte_event_fp_ops *fp_op,
>       fp_op->dma_enqueue = dev->dma_enqueue;
>       fp_op->profile_switch = dev->profile_switch;
>       fp_op->data = dev->data->ports;
> +     fp_op->ev_port.clbk = (void **)(uintptr_t)dev->post_dequeue_burst_cbs;
> +     fp_op->ev_port.data = dev->data->ports;
>  }
> diff --git a/lib/eventdev/rte_eventdev.c b/lib/eventdev/rte_eventdev.c
> index 5feb4326a2..f2540a6aa8 100644
> --- a/lib/eventdev/rte_eventdev.c
> +++ b/lib/eventdev/rte_eventdev.c
> @@ -18,6 +18,7 @@
>  #include <rte_common.h>
>  #include <rte_malloc.h>
>  #include <rte_errno.h>
> +#include <rte_stdatomic.h>
>  #include <ethdev_driver.h>
>  #include <rte_cryptodev.h>
>  #include <rte_dmadev.h>
> @@ -39,6 +40,9 @@ static struct rte_eventdev_global eventdev_globals = {
>  /* Public fastpath APIs. */
>  struct rte_event_fp_ops rte_event_fp_ops[RTE_EVENT_MAX_DEVS];
>  
> +/* spinlock for add/remove dequeue callbacks */
> +static rte_spinlock_t event_dev_dequeue_cb_lock = RTE_SPINLOCK_INITIALIZER;
> +
>  /* Event dev north bound API implementation */
>  
>  uint8_t
> @@ -884,6 +888,109 @@ rte_event_port_attr_get(uint8_t dev_id, uint8_t 
> port_id, uint32_t attr_id,
>       return 0;
>  }
>  
> +const struct rte_event_dequeue_callback *
> +rte_event_add_dequeue_callback(uint8_t dev_id, uint8_t port_id,
> +             rte_dequeue_callback_fn fn, void *user_param)
> +{
> +     struct rte_eventdev *dev;
> +     struct rte_event_dequeue_callback *cb;
> +     struct rte_event_dequeue_callback *tail;
> +
> +     /* check input parameters */
> +     RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, NULL);
> +     dev = &rte_eventdevs[dev_id];
> +     if (!is_valid_port(dev, port_id)) {
> +             RTE_EDEV_LOG_ERR("Invalid port_id=%" PRIu8, port_id);
> +             return NULL;
> +     }
> +
> +     cb = rte_zmalloc(NULL, sizeof(*cb), 0);
> +     if (cb == NULL) {
> +             rte_errno = ENOMEM;
> +             return NULL;
> +     }
> +     cb->fn.dequeue = fn;
> +     cb->param = user_param;
> +
> +     rte_spinlock_lock(&event_dev_dequeue_cb_lock);
> +     /* Add the callbacks in fifo order. */
> +     tail = rte_eventdevs[dev_id].post_dequeue_burst_cbs[port_id];
> +     if (!tail) {
> +             /* Stores to cb->fn and cb->param should complete before
> +              * cb is visible to data plane.
> +              */
> +             rte_atomic_store_explicit(
> +                     &rte_eventdevs[dev_id].post_dequeue_burst_cbs[port_id],
> +                     cb, __ATOMIC_RELEASE);
                            ^^^^^^^^^^^^^^^^ rte_memory_order_release 
(rte_stdatomic.h)

> +     } else {
> +             while (tail->next)
> +                     tail = tail->next;
> +             /* Stores to cb->fn and cb->param should complete before
> +              * cb is visible to data plane.
> +              */
> +             rte_atomic_store_explicit(&tail->next, cb, __ATOMIC_RELEASE);
                                                           ^^^^ same here

> +     }
> +     rte_spinlock_unlock(&event_dev_dequeue_cb_lock);
> +
> +     return cb;
> +}
> +
> +int
> +rte_event_remove_dequeue_callback(uint8_t dev_id, uint8_t port_id,
> +             const struct rte_event_dequeue_callback *user_cb)
> +{
> +     struct rte_eventdev *dev;
> +     struct rte_event_dequeue_callback *cb;
> +     struct rte_event_dequeue_callback **prev_cb;
> +
> +     /* Check input parameters. */
> +     RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
> +     dev = &rte_eventdevs[dev_id];
> +     if (user_cb == NULL || !is_valid_port(dev, port_id))
> +             return -EINVAL;
> +
> +     rte_spinlock_lock(&event_dev_dequeue_cb_lock);
> +     prev_cb = &dev->post_dequeue_burst_cbs[port_id];
> +     for (; *prev_cb != NULL; prev_cb = &cb->next) {
> +             cb = *prev_cb;
> +             if (cb == user_cb) {
> +                     /* Remove the user cb from the callback list. */
> +                     rte_atomic_store_explicit(prev_cb, cb->next,
> +                                             __ATOMIC_RELAXED);
                                                ^^^ and here

> +                     break;
> +             }
> +     }
> +     rte_spinlock_unlock(&event_dev_dequeue_cb_lock);
> +
> +     return 0;
> +}
> +
> +uint16_t rte_eventdev_pmd_dequeue_callback_process(uint8_t dev_id,
> +             uint8_t port_id, struct rte_event ev[], uint16_t nb_events)
> +{
> +     struct rte_event_dequeue_callback *cb;
> +     const struct rte_event_fp_ops *fp_ops;
> +
> +     fp_ops = &rte_event_fp_ops[dev_id];
> +
> +     /* __ATOMIC_RELEASE memory order was used when the
> +      * call back was inserted into the list.
> +      * Since there is a clear dependency between loading
> +      * cb and cb->fn/cb->next, __ATOMIC_ACQUIRE memory order is
> +      * not required.
> +      */
> +     cb = rte_atomic_load_explicit((void **)&fp_ops->ev_port.clbk[port_id],
                                       ^^^^^^^ needs to be __rte_atomic
qualified

> +                                     __ATOMIC_RELAXED);
                                        ^^^^ rte_memory_order
                                

Reply via email to