Buffer timer expiry events generated while walking a "run list"
in rte_timer_manage, and burst enqueue them to an event device
to the extent possible.

Signed-off-by: Erik Gabriel Carrillo <erik.g.carri...@intel.com>
---
 lib/librte_eventdev/rte_event_timer_adapter.c | 118 +++++++++++++++++++++++---
 1 file changed, 108 insertions(+), 10 deletions(-)

diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c 
b/lib/librte_eventdev/rte_event_timer_adapter.c
index 8bd9ebc..176cc5b 100644
--- a/lib/librte_eventdev/rte_event_timer_adapter.c
+++ b/lib/librte_eventdev/rte_event_timer_adapter.c
@@ -447,7 +447,93 @@ rte_event_timer_cancel_burst(const struct 
rte_event_timer_adapter *adapter,
 }
 
 /*
- * Software event timer adapter ops definitions
+ * Software event timer adapter buffer helper functions
+ */
+
+#define EVENT_BUFFER_SZ 1024
+#if EVENT_BUFFER_SZ < 1 || EVENT_BUFFER_SZ > 65536
+#error "EVENT_BUFFER_SZ must be between 1 and 65536"
+#endif
+
+#define EVENT_BUFFER_BATCHSZ 32
+
+struct event_buffer {
+       uint16_t head;
+       uint16_t tail;
+       uint16_t count;
+       struct rte_event events[EVENT_BUFFER_SZ];
+} __rte_cache_aligned;
+
+static inline bool
+event_buffer_full(struct event_buffer *bufp)
+{
+       return (bufp->tail + 1) % EVENT_BUFFER_SZ == bufp->head;
+}
+
+static inline bool
+event_buffer_batch_ready(struct event_buffer *bufp)
+{
+       return bufp->count >= EVENT_BUFFER_BATCHSZ;
+}
+
+static void
+event_buffer_init(struct event_buffer *bufp)
+{
+       bufp->head = bufp->tail = bufp->count = 0;
+       memset(&bufp->events, 0, sizeof(struct rte_event) * EVENT_BUFFER_SZ);
+}
+
+static int
+event_buffer_add(struct event_buffer *bufp, struct rte_event *eventp)
+{
+       uint16_t *tailp = &bufp->tail;
+       struct rte_event *buf_eventp;
+
+       if (event_buffer_full(bufp))
+               return -1;
+
+       buf_eventp = &bufp->events[*tailp];
+       rte_memcpy(buf_eventp, eventp, sizeof(struct rte_event));
+
+       *tailp = (*tailp + 1) % EVENT_BUFFER_SZ;
+       bufp->count++;
+
+       return 0;
+}
+
+static void
+event_buffer_flush(struct event_buffer *bufp, uint8_t dev_id, uint8_t port_id,
+                  uint16_t *nb_events_flushed)
+{
+       uint16_t n = 0;
+       uint16_t *headp = &bufp->head;
+       uint16_t *tailp = &bufp->tail;
+       struct rte_event *events = bufp->events;
+
+       if (*tailp > *headp)
+               n = *tailp - *headp;
+       else if (*tailp < *headp)
+               n = EVENT_BUFFER_SZ - *headp;
+       else {  /* buffer empty */
+               *nb_events_flushed = 0;
+               return;
+       }
+
+       *nb_events_flushed = rte_event_enqueue_burst(dev_id, port_id,
+                                                    &events[*headp], n);
+       if (*nb_events_flushed != n && rte_errno == -EINVAL) {
+               /* We must have hit a bad event...skip it to ensure we don't
+                * hang on it.
+                */
+               (*nb_events_flushed)++;
+       }
+
+       *headp = (*headp + *nb_events_flushed) % EVENT_BUFFER_SZ;
+       bufp->count -= *nb_events_flushed;
+}
+
+/*
+ * Software event timer adapter implementation
  */
 
 struct rte_event_timer_adapter_sw_data {
@@ -461,6 +547,8 @@ struct rte_event_timer_adapter_sw_data {
        struct rte_mempool *msg_pool;
        /* Mempool containing timer objects */
        struct rte_mempool *tim_pool;
+       /* Buffered timer expiry events to be enqueued to an event device. */
+       struct event_buffer buffer;
 };
 
 enum msg_type {MSG_TYPE_ARM, MSG_TYPE_CANCEL};
@@ -473,7 +561,8 @@ struct msg {
 static void
 sw_event_timer_cb(struct rte_timer *tim, void *arg)
 {
-       uint16_t n;
+       uint16_t nb_evs_flushed;
+       int ret;
        struct rte_event_timer *evtim;
        struct rte_event_timer_adapter *adapter;
        struct rte_event_timer_adapter_sw_data *sw_data;
@@ -482,14 +571,10 @@ sw_event_timer_cb(struct rte_timer *tim, void *arg)
        adapter = (struct rte_event_timer_adapter *)evtim->impl_opaque[1];
        sw_data = adapter->data->adapter_priv;
 
-       n = rte_event_enqueue_burst(adapter->data->event_dev_id,
-                                   adapter->data->event_port_id,
-                                   &evtim->ev,
-                                   1);
-       if (n != 1 && rte_errno == -ENOSPC) {
-               /* If we couldn't enqueue because the event port was
-                * backpressured, put the timer back in the skiplist with an
-                * immediate expiry value so we can process it again on the
+       ret = event_buffer_add(&sw_data->buffer, &evtim->ev);
+       if (ret < 0) {
+               /* If event buffer is full, put timer back in list with
+                * immediate expiry value, so that we process it again on the
                 * next iteration.
                 */
                rte_timer_reset_sync(tim, SINGLE, 0, rte_lcore_id(),
@@ -500,6 +585,13 @@ sw_event_timer_cb(struct rte_timer *tim, void *arg)
                evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
                rte_mempool_put(sw_data->tim_pool, (void **)&tim);
        }
+
+       if (event_buffer_batch_ready(&sw_data->buffer)) {
+               event_buffer_flush(&sw_data->buffer,
+                                  adapter->data->event_dev_id,
+                                  adapter->data->event_port_id,
+                                  &nb_evs_flushed);
+       }
 }
 
 static __rte_always_inline uint64_t
@@ -658,10 +750,14 @@ sw_event_timer_adapter_service_func(void *arg)
 
        rte_timer_manage();
 
+       event_buffer_flush(&sw_data->buffer, adapter->data->event_dev_id,
+                          adapter->data->event_port_id, &nb_events);
+
        /* Could use for stats */
        RTE_SET_USED(nb_events);
        RTE_SET_USED(ret);
 
+
        return 0;
 }
 
@@ -726,6 +822,8 @@ sw_event_timer_adapter_init(struct rte_event_timer_adapter 
*adapter)
                return -1;
        }
 
+       event_buffer_init(&sw_data->buffer);
+
        /* Register a service component */
        memset(&service, 0, sizeof(service));
        snprintf(service.name, RTE_SERVICE_NAME_MAX,
-- 
2.6.4

Reply via email to