Add optional support for inline event processing within pmd dequeue call. For a dequeue callback, events dequeued from the event port were passed them to a callback function if configured, to allow additional processing. e.g. unpack batch of packets from each event on dequeue, before passing back to the application.
Signed-off-by: Sivaprasad Tummala <sivaprasad.tumm...@amd.com> --- lib/eventdev/eventdev_pmd.h | 38 +++++++++++ lib/eventdev/eventdev_private.c | 2 + lib/eventdev/rte_eventdev.c | 107 +++++++++++++++++++++++++++++++ lib/eventdev/rte_eventdev.h | 95 +++++++++++++++++++++++++++ lib/eventdev/rte_eventdev_core.h | 12 +++- lib/eventdev/version.map | 3 + 6 files changed, 256 insertions(+), 1 deletion(-) diff --git a/lib/eventdev/eventdev_pmd.h b/lib/eventdev/eventdev_pmd.h index a0ee768ce7..ce067b1d5d 100644 --- a/lib/eventdev/eventdev_pmd.h +++ b/lib/eventdev/eventdev_pmd.h @@ -97,6 +97,19 @@ struct rte_eventdev_global { uint8_t nb_devs; /**< Number of devices found */ }; +/** + * @internal + * Structure used to hold information about the callbacks to be called for a + * port on dequeue. + */ +struct rte_event_dequeue_callback { + struct rte_event_dequeue_callback *next; + union{ + rte_dequeue_callback_fn dequeue; + } fn; + void *param; +}; + /** * @internal * The data part, with no function pointers, associated with each device. @@ -171,6 +184,10 @@ struct rte_eventdev { /**< Pointer to PMD dequeue burst function. */ event_maintain_t maintain; /**< Pointer to PMD port maintenance function. */ + struct rte_event_dequeue_callback *post_dequeue_burst_cbs[RTE_EVENT_MAX_PORTS_PER_DEV]; + /**< User-supplied functions called from dequeue_burst to post-process + * received packets before passing them to the user + */ event_tx_adapter_enqueue_t txa_enqueue_same_dest; /**< Pointer to PMD eth Tx adapter burst enqueue function with * events destined to same Eth port & Tx queue. @@ -245,6 +262,27 @@ rte_event_pmd_is_valid_dev(uint8_t dev_id) return 1; } +/** + * Executes all the user application registered callbacks for the specific + * event device. + * + * @param dev_id + * Event device index. + * @param port_id + * Event port index + * @param ev + * Points to an array of *nb_events* objects of type *rte_event* structure + * for output to be populated with the dequeued event objects. + * @param nb_events + * number of event objects + * + * @return + * The number of event objects + */ +__rte_internal +uint16_t rte_eventdev_pmd_dequeue_callback_process(uint8_t dev_id, + uint8_t port_id, struct rte_event ev[], uint16_t nb_events); + /** * Definitions of all functions exported by a driver through the * generic structure of type *event_dev_ops* supplied in the diff --git a/lib/eventdev/eventdev_private.c b/lib/eventdev/eventdev_private.c index 017f97ccab..052c526ce0 100644 --- a/lib/eventdev/eventdev_private.c +++ b/lib/eventdev/eventdev_private.c @@ -137,4 +137,6 @@ event_dev_fp_ops_set(struct rte_event_fp_ops *fp_op, fp_op->dma_enqueue = dev->dma_enqueue; fp_op->profile_switch = dev->profile_switch; fp_op->data = dev->data->ports; + fp_op->ev_port.clbk = (void **)(uintptr_t)dev->post_dequeue_burst_cbs; + fp_op->ev_port.data = dev->data->ports; } diff --git a/lib/eventdev/rte_eventdev.c b/lib/eventdev/rte_eventdev.c index 5feb4326a2..f2540a6aa8 100644 --- a/lib/eventdev/rte_eventdev.c +++ b/lib/eventdev/rte_eventdev.c @@ -18,6 +18,7 @@ #include <rte_common.h> #include <rte_malloc.h> #include <rte_errno.h> +#include <rte_stdatomic.h> #include <ethdev_driver.h> #include <rte_cryptodev.h> #include <rte_dmadev.h> @@ -39,6 +40,9 @@ static struct rte_eventdev_global eventdev_globals = { /* Public fastpath APIs. */ struct rte_event_fp_ops rte_event_fp_ops[RTE_EVENT_MAX_DEVS]; +/* spinlock for add/remove dequeue callbacks */ +static rte_spinlock_t event_dev_dequeue_cb_lock = RTE_SPINLOCK_INITIALIZER; + /* Event dev north bound API implementation */ uint8_t @@ -884,6 +888,109 @@ rte_event_port_attr_get(uint8_t dev_id, uint8_t port_id, uint32_t attr_id, return 0; } +const struct rte_event_dequeue_callback * +rte_event_add_dequeue_callback(uint8_t dev_id, uint8_t port_id, + rte_dequeue_callback_fn fn, void *user_param) +{ + struct rte_eventdev *dev; + struct rte_event_dequeue_callback *cb; + struct rte_event_dequeue_callback *tail; + + /* check input parameters */ + RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, NULL); + dev = &rte_eventdevs[dev_id]; + if (!is_valid_port(dev, port_id)) { + RTE_EDEV_LOG_ERR("Invalid port_id=%" PRIu8, port_id); + return NULL; + } + + cb = rte_zmalloc(NULL, sizeof(*cb), 0); + if (cb == NULL) { + rte_errno = ENOMEM; + return NULL; + } + cb->fn.dequeue = fn; + cb->param = user_param; + + rte_spinlock_lock(&event_dev_dequeue_cb_lock); + /* Add the callbacks in fifo order. */ + tail = rte_eventdevs[dev_id].post_dequeue_burst_cbs[port_id]; + if (!tail) { + /* Stores to cb->fn and cb->param should complete before + * cb is visible to data plane. + */ + rte_atomic_store_explicit( + &rte_eventdevs[dev_id].post_dequeue_burst_cbs[port_id], + cb, __ATOMIC_RELEASE); + } else { + while (tail->next) + tail = tail->next; + /* Stores to cb->fn and cb->param should complete before + * cb is visible to data plane. + */ + rte_atomic_store_explicit(&tail->next, cb, __ATOMIC_RELEASE); + } + rte_spinlock_unlock(&event_dev_dequeue_cb_lock); + + return cb; +} + +int +rte_event_remove_dequeue_callback(uint8_t dev_id, uint8_t port_id, + const struct rte_event_dequeue_callback *user_cb) +{ + struct rte_eventdev *dev; + struct rte_event_dequeue_callback *cb; + struct rte_event_dequeue_callback **prev_cb; + + /* Check input parameters. */ + RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL); + dev = &rte_eventdevs[dev_id]; + if (user_cb == NULL || !is_valid_port(dev, port_id)) + return -EINVAL; + + rte_spinlock_lock(&event_dev_dequeue_cb_lock); + prev_cb = &dev->post_dequeue_burst_cbs[port_id]; + for (; *prev_cb != NULL; prev_cb = &cb->next) { + cb = *prev_cb; + if (cb == user_cb) { + /* Remove the user cb from the callback list. */ + rte_atomic_store_explicit(prev_cb, cb->next, + __ATOMIC_RELAXED); + break; + } + } + rte_spinlock_unlock(&event_dev_dequeue_cb_lock); + + return 0; +} + +uint16_t rte_eventdev_pmd_dequeue_callback_process(uint8_t dev_id, + uint8_t port_id, struct rte_event ev[], uint16_t nb_events) +{ + struct rte_event_dequeue_callback *cb; + const struct rte_event_fp_ops *fp_ops; + + fp_ops = &rte_event_fp_ops[dev_id]; + + /* __ATOMIC_RELEASE memory order was used when the + * call back was inserted into the list. + * Since there is a clear dependency between loading + * cb and cb->fn/cb->next, __ATOMIC_ACQUIRE memory order is + * not required. + */ + cb = rte_atomic_load_explicit((void **)&fp_ops->ev_port.clbk[port_id], + __ATOMIC_RELAXED); + if (unlikely(cb != NULL)) + while (cb != NULL) { + nb_events = cb->fn.dequeue(dev_id, port_id, ev, + nb_events, cb->param); + cb = cb->next; + } + + return nb_events; +} + int rte_event_port_get_monitor_addr(uint8_t dev_id, uint8_t port_id, struct rte_power_monitor_cond *pmc) diff --git a/lib/eventdev/rte_eventdev.h b/lib/eventdev/rte_eventdev.h index 38dbbc2617..c0097c0a23 100644 --- a/lib/eventdev/rte_eventdev.h +++ b/lib/eventdev/rte_eventdev.h @@ -954,6 +954,101 @@ void rte_event_port_quiesce(uint8_t dev_id, uint8_t port_id, rte_eventdev_port_flush_t release_cb, void *args); +struct rte_event_dequeue_callback; + +/** + * Function type used for dequeue event processing callbacks. + * + * The callback function is called on dequeue with a burst of events that have + * been received on the given event port. + * + * @param dev_id + * The identifier of the device. + * @param port_id + * The identifier of the event port. + * @param[out] ev + * Points to an array of *nb_events* objects of type *rte_event* structure + * for output to be populated with the dequeued event objects. + * @param nb_events + * The maximum number of event objects to dequeue, typically number of + * rte_event_port_dequeue_depth() available for this port. + * @param opaque + * Opaque pointer of event port callback related data. + * + * @return + * The number of event objects returned to the user. + */ +typedef uint16_t (*rte_dequeue_callback_fn)(uint8_t dev_id, uint8_t port_id, + struct rte_event *ev, uint16_t nb_events, void *user_param); + +/** + * Add a callback to be called on event dequeue on a given event device port. + * + * This API configures a function to be called for each burst of + * events dequeued on a given event device port. The return value is a pointer + * that can be used to later remove the callback using + * rte_event_remove_dequeue_callback(). + * + * Multiple functions are called in the order that they are added. + * + * @param dev_id + * The identifier of the device. + * @param port_id + * The identifier of the event port. + * @param fn + * The callback function + * @param user_param + * A generic pointer parameter which will be passed to each invocation of the + * callback function on this event device port. Inter-thread synchronization + * of any user data changes is the responsibility of the user. + * + * @return + * NULL on error. + * On success, a pointer value which can later be used to remove the callback. + */ +__rte_experimental +const struct rte_event_dequeue_callback * +rte_event_add_dequeue_callback(uint8_t dev_id, uint8_t port_id, + rte_dequeue_callback_fn fn, void *user_param); + +/** + * Remove a dequeue event callback from a given event device port. + * + * This API is used to removed callbacks that were added to a event device port + * using rte_event_add_dequeue_callback(). + * + * Note: the callback is removed from the callback list but it isn't freed + * since the it may still be in use. The memory for the callback can be + * subsequently freed back by the application by calling rte_free(): + * + * - Immediately - if the device is stopped, or the user knows that no + * callbacks are in flight e.g. if called from the thread doing dequeue + * on that port. + * + * - After a short delay - where the delay is sufficient to allow any + * in-flight callbacks to complete. Alternately, the RCU mechanism can be + * used to detect when data plane threads have ceased referencing the + * callback memory. + * + * @param dev_id + * The identifier of the device. + * @param port_id + * The identifier of the event port. + * @param user_cb + * The callback function + * + * @return + * - 0: Success. Callback was removed. + * - -ENODEV: If *dev_id* is invalid. + * - -EINVAL: The port_id is out of range, or the callback + * is NULL. + */ +__rte_experimental +int +rte_event_remove_dequeue_callback(uint8_t dev_id, uint8_t port_id, + const struct rte_event_dequeue_callback *user_cb); + + /** * The queue depth of the port on the enqueue side */ diff --git a/lib/eventdev/rte_eventdev_core.h b/lib/eventdev/rte_eventdev_core.h index 5b405518d1..5ce93c4b6f 100644 --- a/lib/eventdev/rte_eventdev_core.h +++ b/lib/eventdev/rte_eventdev_core.h @@ -49,6 +49,14 @@ typedef uint16_t (*event_dma_adapter_enqueue_t)(void *port, struct rte_event ev[ typedef int (*event_profile_switch_t)(void *port, uint8_t profile); /**< @internal Switch active link profile on the event port. */ +struct rte_eventdev_port_data { + void **data; + /**< points to array of internal port data pointers */ + void **clbk; + /**< points to array of port callback data pointers */ +}; +/**< @internal Structure used to hold opaque eventdev port data. */ + struct rte_event_fp_ops { void **data; /**< points to array of internal port data pointers */ @@ -76,7 +84,9 @@ struct rte_event_fp_ops { /**< PMD DMA adapter enqueue function. */ event_profile_switch_t profile_switch; /**< PMD Event switch profile function. */ - uintptr_t reserved[4]; + struct rte_eventdev_port_data ev_port; + /**< Eventdev port data. */ + uintptr_t reserved[1]; } __rte_cache_aligned; extern struct rte_event_fp_ops rte_event_fp_ops[RTE_EVENT_MAX_DEVS]; diff --git a/lib/eventdev/version.map b/lib/eventdev/version.map index fa9eb069ff..a0c7aa5bbd 100644 --- a/lib/eventdev/version.map +++ b/lib/eventdev/version.map @@ -155,6 +155,8 @@ EXPERIMENTAL { rte_event_port_profile_unlink; rte_event_port_profile_links_get; rte_event_port_get_monitor_addr; + rte_event_add_dequeue_callback; + rte_event_remove_dequeue_callback; __rte_eventdev_trace_port_profile_switch; }; @@ -165,6 +167,7 @@ INTERNAL { event_dev_fp_ops_set; event_dev_probing_finish; rte_event_pmd_allocate; + rte_eventdev_pmd_dequeue_callback_process; rte_event_pmd_get_named_dev; rte_event_pmd_is_valid_dev; rte_event_pmd_pci_probe; -- 2.34.1