Add optional support for inline event processing within dequeue call. For a dequeue callback, events dequeued from the event port were passed them to a callback function if configured, to allow additional processing. e.g. unpack batch of packets from each event on dequeue, before passing back to the application.
Signed-off-by: Sivaprasad Tummala <sivaprasad.tumm...@amd.com> --- lib/eventdev/eventdev_pmd.h | 17 ++++ lib/eventdev/eventdev_private.c | 17 ++++ lib/eventdev/rte_eventdev.c | 78 +++++++++++++++++ lib/eventdev/rte_eventdev.h | 145 ++++++++++++++++++++++++++++++- lib/eventdev/rte_eventdev_core.h | 12 ++- lib/eventdev/version.map | 6 ++ 6 files changed, 272 insertions(+), 3 deletions(-) diff --git a/lib/eventdev/eventdev_pmd.h b/lib/eventdev/eventdev_pmd.h index 7b12f80f57..c87e06993f 100644 --- a/lib/eventdev/eventdev_pmd.h +++ b/lib/eventdev/eventdev_pmd.h @@ -97,6 +97,19 @@ struct rte_eventdev_global { uint8_t nb_devs; /**< Number of devices found */ }; +/** + * @internal + * Structure used to hold information about the callbacks to be called for a + * port on dequeue. + */ +struct rte_event_dequeue_callback { + struct rte_event_dequeue_callback *next; + union{ + rte_dequeue_callback_fn dequeue; + } fn; + void *param; +}; + /** * @internal * The data part, with no function pointers, associated with each device. @@ -173,6 +186,10 @@ struct rte_eventdev { /**< Pointer to PMD dequeue burst function. */ event_maintain_t maintain; /**< Pointer to PMD port maintenance function. */ + struct rte_event_dequeue_callback *post_dequeue_burst_cbs[RTE_EVENT_MAX_PORTS_PER_DEV]; + /**< User-supplied functions called from dequeue_burst to post-process + * received packets before passing them to the user + */ event_tx_adapter_enqueue_t txa_enqueue_same_dest; /**< Pointer to PMD eth Tx adapter burst enqueue function with * events destined to same Eth port & Tx queue. diff --git a/lib/eventdev/eventdev_private.c b/lib/eventdev/eventdev_private.c index 1d3d9d357e..6d1cbdb17d 100644 --- a/lib/eventdev/eventdev_private.c +++ b/lib/eventdev/eventdev_private.c @@ -118,4 +118,21 @@ event_dev_fp_ops_set(struct rte_event_fp_ops *fp_op, fp_op->txa_enqueue_same_dest = dev->txa_enqueue_same_dest; fp_op->ca_enqueue = dev->ca_enqueue; fp_op->data = dev->data->ports; + fp_op->ev_port.clbk = (void **)(uintptr_t)dev->post_dequeue_burst_cbs; + fp_op->ev_port.data = dev->data->ports; +} + +uint16_t +rte_event_dequeue_callbacks(uint8_t dev_id, uint8_t port_id, + struct rte_event *ev, uint16_t nb_events, void *opaque) +{ + static uint16_t nb_rx; + const struct rte_event_dequeue_callback *cb = opaque; + + while (cb != NULL) { + nb_rx = cb->fn.dequeue(dev_id, port_id, ev, + nb_events, cb->param); + cb = cb->next; + } + return nb_rx; } diff --git a/lib/eventdev/rte_eventdev.c b/lib/eventdev/rte_eventdev.c index ff77194783..0d43cb2d0a 100644 --- a/lib/eventdev/rte_eventdev.c +++ b/lib/eventdev/rte_eventdev.c @@ -38,6 +38,9 @@ static struct rte_eventdev_global eventdev_globals = { /* Public fastpath APIs. */ struct rte_event_fp_ops rte_event_fp_ops[RTE_EVENT_MAX_DEVS]; +/* spinlock for add/remove dequeue callbacks */ +static rte_spinlock_t event_dev_dequeue_cb_lock = RTE_SPINLOCK_INITIALIZER; + /* Event dev north bound API implementation */ uint8_t @@ -860,6 +863,81 @@ rte_event_port_attr_get(uint8_t dev_id, uint8_t port_id, uint32_t attr_id, return 0; } +const struct rte_event_dequeue_callback * +rte_event_add_dequeue_callback(uint8_t dev_id, uint8_t port_id, + rte_dequeue_callback_fn fn, void *user_param) +{ + struct rte_eventdev *dev; + struct rte_event_dequeue_callback *cb; + struct rte_event_dequeue_callback *tail; + + /* check input parameters */ + RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, NULL); + dev = &rte_eventdevs[dev_id]; + if (!is_valid_port(dev, port_id)) { + RTE_EDEV_LOG_ERR("Invalid port_id=%" PRIu8, port_id); + return NULL; + } + + cb = rte_zmalloc(NULL, sizeof(*cb), 0); + if (cb == NULL) { + rte_errno = ENOMEM; + return NULL; + } + cb->fn.dequeue = fn; + cb->param = user_param; + + rte_spinlock_lock(&event_dev_dequeue_cb_lock); + /* Add the callbacks in fifo order. */ + tail = rte_eventdevs[dev_id].post_dequeue_burst_cbs[port_id]; + if (!tail) { + /* Stores to cb->fn and cb->param should complete before + * cb is visible to data plane. + */ + __atomic_store_n( + &rte_eventdevs[dev_id].post_dequeue_burst_cbs[port_id], + cb, __ATOMIC_RELEASE); + } else { + while (tail->next) + tail = tail->next; + /* Stores to cb->fn and cb->param should complete before + * cb is visible to data plane. + */ + __atomic_store_n(&tail->next, cb, __ATOMIC_RELEASE); + } + rte_spinlock_unlock(&event_dev_dequeue_cb_lock); + + return cb; +} + +int +rte_event_remove_dequeue_callback(uint8_t dev_id, uint8_t port_id, + const struct rte_event_dequeue_callback *user_cb) +{ + struct rte_eventdev *dev; + + /* Check input parameters. */ + RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL); + dev = &rte_eventdevs[dev_id]; + if (user_cb == NULL || !is_valid_port(dev, port_id)) + return -EINVAL; + + rte_spinlock_lock(&event_dev_dequeue_cb_lock); + prev_cb = &dev->post_dequeue_burst_cbs[port_id]; + for (; *prev_cb != NULL; prev_cb = &cb->next) { + cb = *prev_cb; + if (cb == user_cb) { + /* Remove the user cb from the callback list. */ + __atomic_store_n(prev_cb, cb->next, __ATOMIC_RELAXED); + ret = 0; + break; + } + } + rte_spinlock_unlock(&event_dev_dequeue_cb_lock); + + return ret; +} + int rte_event_port_get_monitor_addr(uint8_t dev_id, uint8_t port_id, struct rte_power_monitor_cond *pmc) diff --git a/lib/eventdev/rte_eventdev.h b/lib/eventdev/rte_eventdev.h index 841b1fb9b5..9ccd259058 100644 --- a/lib/eventdev/rte_eventdev.h +++ b/lib/eventdev/rte_eventdev.h @@ -948,6 +948,100 @@ void rte_event_port_quiesce(uint8_t dev_id, uint8_t port_id, rte_eventdev_port_flush_t release_cb, void *args); +struct rte_event_dequeue_callback; + +/** + * Function type used for dequeue event processing callbacks. + * + * The callback function is called on dequeue with a burst of events that have + * been received on the given event port. + * + * @param dev_id + * The identifier of the device. + * @param port_id + * The identifier of the event port. + * @param[out] ev + * Points to an array of *nb_events* objects of type *rte_event* structure + * for output to be populated with the dequeued event objects. + * @param nb_events + * The maximum number of event objects to dequeue, typically number of + * rte_event_port_dequeue_depth() available for this port. + * @param opaque + * Opaque pointer of event port callback related data. + * + * @return + * The number of event objects returned to the user. + */ +typedef uint16_t (*rte_dequeue_callback_fn)(uint8_t dev_id, uint8_t port_id, + struct rte_event *ev, uint16_t nb_events, void *user_param); + +/** + * Add a callback to be called on event dequeue on a given event device port. + * + * This API configures a function to be called for each burst of + * events dequeued on a given event device port. The return value is a pointer + * that can be used to later remove the callback using + * rte_event_remove_dequeue_callback(). + * + * Multiple functions are called in the order that they are added. + * + * @param dev_id + * The identifier of the device. + * @param port_id + * The identifier of the event port. + * @param fn + * The callback function + * @param user_param + * A generic pointer parameter which will be passed to each invocation of the + * callback function on this event device port. Inter-thread synchronization + * of any user data changes is the responsibility of the user. + * + * @return + * NULL on error. + * On success, a pointer value which can later be used to remove the callback. + */ +__rte_experimental +const struct rte_event_dequeue_callback * +rte_event_add_dequeue_callback(uint8_t dev_id, uint8_t port_id, + rte_dequeue_callback_fn fn, void *user_param); + +/** + * Remove a dequeue event callback from a given event device port. + * + * This API is used to removed callbacks that were added to a event device port + * using rte_event_add_dequeue_callback(). + * + * Note: the callback is removed from the callback list but it isn't freed + * since the it may still be in use. The memory for the callback can be + * subsequently freed back by the application by calling rte_free(): + * + * - Immediately - if the device is stopped, or the user knows that no + * callbacks are in flight e.g. if called from the thread doing dequeue + * on that port. + * + * - After a short delay - where the delay is sufficient to allow any + * in-flight callbacks to complete. Alternately, the RCU mechanism can be + * used to detect when data plane threads have ceased referencing the + * callback memory. + * + * @param dev_id + * The identifier of the device. + * @param port_id + * The identifier of the event port. + * @param user_cb + * The callback function + * + * @return + * - 0: Success. Callback was removed. + * - -ENODEV: If *dev_id* is invalid. + * - -EINVAL: The port_id is out of range, or the callback + * is NULL. + */ +__rte_experimental +int +rte_event_remove_dequeue_callback(uint8_t dev_id, uint8_t port_id, + const struct rte_event_dequeue_callback *user_cb); + /** * The queue depth of the port on the enqueue side */ @@ -2133,6 +2227,34 @@ rte_event_enqueue_forward_burst(uint8_t dev_id, uint8_t port_id, fp_ops->enqueue_forward_burst); } +/** + * @internal + * Helper routine for rte_event_dequeue_burst(). + * Should be called at exit from PMD's rte_event_dequeue() implementation. + * Does necessary post-processing - invokes dequeue callbacks if any, etc. + * + * @param dev_id + * The identifier of the device. + * @param port_id + * The identifier of the event port. + * @param[out] ev + * Points to an array of *nb_events* objects of type *rte_event* structure + * for output to be populated with the dequeued event objects. + * @param nb_events + * The maximum number of event objects to dequeue, typically number of + * rte_event_port_dequeue_depth() available for this port. + * @param opaque + * Opaque pointer of event port callback related data. + * + * @return + * The number of event objects actually dequeued from the port. The return + * value can be less than the value of the *nb_events* parameter when the + * event port's queue is not full. + */ +__rte_experimental +uint16_t rte_event_dequeue_callbacks(uint8_t dev_id, uint8_t port_id, + struct rte_event *ev, uint16_t nb_events, void *opaque); + /** * Dequeue a burst of events objects or an event object from the event port * designated by its *event_port_id*, on an event device designated @@ -2205,6 +2327,7 @@ rte_event_dequeue_burst(uint8_t dev_id, uint8_t port_id, struct rte_event ev[], { const struct rte_event_fp_ops *fp_ops; void *port; + uint16_t nb_rx; fp_ops = &rte_event_fp_ops[dev_id]; port = fp_ops->data[port_id]; @@ -2226,10 +2349,28 @@ rte_event_dequeue_burst(uint8_t dev_id, uint8_t port_id, struct rte_event ev[], * requests nb_events as const one */ if (nb_events == 1) - return (fp_ops->dequeue)(port, ev, timeout_ticks); + nb_rx = fp_ops->dequeue(port, ev, timeout_ticks); else - return (fp_ops->dequeue_burst)(port, ev, nb_events, + nb_rx = fp_ops->dequeue_burst(port, ev, nb_events, timeout_ticks); + + { + void *cb; + + /* __ATOMIC_RELEASE memory order was used when the + * call back was inserted into the list. + * Since there is a clear dependency between loading + * cb and cb->fn/cb->next, __ATOMIC_ACQUIRE memory order is + * not required. + */ + cb = __atomic_load_n((void **)&fp_ops->ev_port.clbk[port_id], + __ATOMIC_RELAXED); + if (unlikely(cb != NULL)) + nb_rx = rte_event_dequeue_callbacks(dev_id, port_id, + ev, nb_rx, cb); + } + + return nb_rx; } #define RTE_EVENT_DEV_MAINT_OP_FLUSH (1 << 0) diff --git a/lib/eventdev/rte_eventdev_core.h b/lib/eventdev/rte_eventdev_core.h index c328bdbc82..b364ecc2a5 100644 --- a/lib/eventdev/rte_eventdev_core.h +++ b/lib/eventdev/rte_eventdev_core.h @@ -42,6 +42,14 @@ typedef uint16_t (*event_crypto_adapter_enqueue_t)(void *port, uint16_t nb_events); /**< @internal Enqueue burst of events on crypto adapter */ +struct rte_eventdev_port_data { + void **data; + /**< points to array of internal port data pointers */ + void **clbk; + /**< points to array of port callback data pointers */ +}; +/**< @internal Structure used to hold opaque eventdev port data. */ + struct rte_event_fp_ops { void **data; /**< points to array of internal port data pointers */ @@ -65,7 +73,9 @@ struct rte_event_fp_ops { /**< PMD Tx adapter enqueue same destination function. */ event_crypto_adapter_enqueue_t ca_enqueue; /**< PMD Crypto adapter enqueue function. */ - uintptr_t reserved[6]; + struct rte_eventdev_port_data ev_port; + /**< Eventdev port data. */ + uintptr_t reserved[3]; } __rte_cache_aligned; extern struct rte_event_fp_ops rte_event_fp_ops[RTE_EVENT_MAX_DEVS]; diff --git a/lib/eventdev/version.map b/lib/eventdev/version.map index 89068a5713..8ce54f5017 100644 --- a/lib/eventdev/version.map +++ b/lib/eventdev/version.map @@ -131,6 +131,12 @@ EXPERIMENTAL { rte_event_eth_tx_adapter_runtime_params_init; rte_event_eth_tx_adapter_runtime_params_set; rte_event_timer_remaining_ticks_get; + + # added in 23.07 + rte_event_dequeue_callbacks + rte_event_add_dequeue_callback + rte_event_remove_dequeue_callback + rte_event_port_get_monitor_addr }; INTERNAL { -- 2.34.1