On Wed, Jan 10, 2018 at 06:21:04PM -0600, Erik Gabriel Carrillo wrote: > Define the callback function for the service that corresponds to an > adapter instance, as well as the callback for expired timers that the > service manages. > > Signed-off-by: Erik Gabriel Carrillo <erik.g.carri...@intel.com> > --- > lib/librte_eventdev/rte_event_timer_adapter.c | 198 > +++++++++++++++++++++++++- > lib/librte_eventdev/rte_event_timer_adapter.h | 2 +- > 2 files changed, 198 insertions(+), 2 deletions(-) > > diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c > b/lib/librte_eventdev/rte_event_timer_adapter.c > index 38e52cb..0266ad5 100644 > --- a/lib/librte_eventdev/rte_event_timer_adapter.c > +++ b/lib/librte_eventdev/rte_event_timer_adapter.c > @@ -40,8 +40,10 @@ > #include <rte_malloc.h> > #include <rte_ring.h> > #include <rte_mempool.h> > +#include <rte_common.h> > #include <rte_timer.h> > #include <rte_service_component.h> > +#include <rte_cycles.h> > > #include "rte_eventdev.h" > #include "rte_eventdev_pmd.h" > @@ -460,10 +462,198 @@ struct msg { > struct rte_event_timer *evtim; > }; <snip> > + if (n != 1 && rte_errno == -ENOSPC) { > + /* If we couldn't enqueue because the event port was > + * backpressured, put the timer back in the skiplist with an > + * immediate expiry value so we can process it again on the > + * next iteration. > + */ > + rte_timer_reset_sync(tim, SINGLE, 0, rte_lcore_id(), > + sw_event_timer_cb, evtim); > + } else { > + sw_data->nb_armed_evtims--; > + rte_wmb();
Any reason for using barrier here?. IMO smp_wmb() would be more than sufficient or use atomics. > + evtim->state = RTE_EVENT_TIMER_NOT_ARMED; > + rte_mempool_put(sw_data->tim_pool, (void **)&tim); > + } > +} > + > +static __rte_always_inline uint64_t > +get_timeout_cycles(struct rte_event_timer *evtim, > + struct rte_event_timer_adapter *adapter) > +{ > + uint64_t timeout_ns; > + > + timeout_ns = evtim->timeout_ticks * adapter->data->conf.timer_tick_ns; > +#define NSECPERSEC 1E9 > + return timeout_ns * rte_get_timer_hz() / NSECPERSEC; > + > +} > + > +/* Check that event timer timeout value is in range */ > +static __rte_always_inline int > +check_timeout(struct rte_event_timer *evtim, > + const struct rte_event_timer_adapter *adapter) > +{ > + uint64_t tmo_nsec = evtim->timeout_ticks * > + adapter->data->conf.timer_tick_ns; > + > + return (tmo_nsec > adapter->data->conf.max_tmo_ns) ? -1 > + : (tmo_nsec < adapter->data->conf.timer_tick_ns) ? -2 > + : 0; Consider simplifying this for readability. > +} > + > +/* Check that event timer event queue sched type matches destination event > queue > + * sched type > + */ > +static __rte_always_inline int > +check_destination_event_queue(struct rte_event_timer *evtim, > + const struct rte_event_timer_adapter *adapter) <snip> > + > +#define NB_OBJS 32 > static int > sw_event_timer_adapter_service_func(void *arg) > { > - RTE_SET_USED(arg); > + int i, num_msgs, ret; > + uint64_t cycles; > + uint16_t nb_events; > + struct rte_event_timer_adapter *adapter; > + struct rte_event_timer_adapter_sw_data *sw_data; > + struct rte_event_timer *evtim = NULL; > + struct rte_timer *tim = NULL; > + struct msg *msg, *msgs[NB_OBJS]; > + > + adapter = arg; > + sw_data = adapter->data->adapter_priv; > + > + while (!rte_ring_empty(sw_data->msg_ring)) { > + num_msgs = rte_ring_dequeue_burst(sw_data->msg_ring, > + (void **)msgs, NB_OBJS, NULL); > + > + for (i = 0; i < num_msgs; i++) { > + msg = msgs[i]; > + evtim = msg->evtim; > + > + tim = (struct rte_timer *)evtim->impl_opaque[0]; > + RTE_ASSERT(tim != NULL); > + > + switch (msg->type) { > + case MSG_TYPE_ARM: > + if (validate_event_timer(evtim, adapter) < 0) { > + rte_mempool_put(sw_data->tim_pool, > + (void **)&tim); > + continue; > + } > + > + /* Checks passed; set an rte_timer */ > + cycles = get_timeout_cycles(msg->evtim, > + adapter); > + rte_timer_reset_sync(tim, cycles, SINGLE, > + rte_lcore_id(), > + sw_event_timer_cb, > + msg->evtim); > + > + sw_data->nb_armed_evtims++; > + rte_wmb(); Same as above comment. > + evtim->state = RTE_EVENT_TIMER_ARMED; > + break; > + case MSG_TYPE_CANCEL: > + /* The event timer was either not armed or it > + * fired after this cancel request was queued > + * and before the request was processed. > + */ > + if (evtim->state != RTE_EVENT_TIMER_ARMED) > + continue; > + > + rte_timer_stop_sync(tim); > + rte_mempool_put(sw_data->tim_pool, > + (void **)&tim); > + sw_data->nb_armed_evtims--; > + rte_wmb(); Same as above comment. > + msg->evtim->state = RTE_EVENT_TIMER_CANCELED; > + break; > + } > + } > + > + rte_mempool_put_bulk(sw_data->msg_pool, (void **)msgs, > + num_msgs); > + } > + > + rte_timer_manage(); Consider calling rte_timer_manage() before ARM new set of timers also, poll it based on the timeout interval configured. > + > + /* Could use for stats */ > + RTE_SET_USED(nb_events); > + RTE_SET_USED(ret); > + > return 0; > } > <snip>