Add minimum burst throughout the scheduler pipeline and a flush counter.
Replace ring API calls with local single threaded implementation
where possible.

Signed-off-by: Radu Nicolau <radu.nico...@intel.com>
---
 doc/guides/eventdevs/sw.rst            | 22 +++++++
 doc/guides/rel_notes/release_20_11.rst |  4 ++
 drivers/event/sw/sw_evdev.c            | 82 ++++++++++++++++++++++++-
 drivers/event/sw/sw_evdev.h            | 20 ++++++-
 drivers/event/sw/sw_evdev_scheduler.c  | 83 +++++++++++++++++++++-----
 5 files changed, 191 insertions(+), 20 deletions(-)

diff --git a/doc/guides/eventdevs/sw.rst b/doc/guides/eventdevs/sw.rst
index 04c8b0305..69939ab9d 100644
--- a/doc/guides/eventdevs/sw.rst
+++ b/doc/guides/eventdevs/sw.rst
@@ -87,6 +87,28 @@ verify possible gains.
 
     --vdev="event_sw0,credit_quanta=64"
 
+Scheduler tuning arguments
+~~~~~~~~~~~~~
+
+The scheduler minimum number of events that are processed can be increased to
+reduce per event overhead and increase internal burst sizes, which can
+improve throughput.
+
+* ``min_burst`` specifies the minimum number of inflight events that can be
+  moved to the next stage in the scheduler. Default value is 1.
+
+* ``refill_once`` is a switch that when set instructs the scheduler to deque
+  the events waiting in the ingress rings only once per call. The default
+  behavior is to dequeue as needed.
+
+* ``deq_burst`` is the burst size used to dequeue from the port rings.
+  Default value is 32, and it should be increased to 64 or 128 when setting
+  ``refill_once=1``.
+
+.. code-block:: console
+
+    --vdev="event_sw0,min_burst=8,deq_burst=64,refill_once=1"
+
 
 Limitations
 -----------
diff --git a/doc/guides/rel_notes/release_20_11.rst 
b/doc/guides/rel_notes/release_20_11.rst
index f377ab8e8..afc06ca37 100644
--- a/doc/guides/rel_notes/release_20_11.rst
+++ b/doc/guides/rel_notes/release_20_11.rst
@@ -78,6 +78,10 @@ New Features
     ``--portmask=N``
     where N represents the hexadecimal bitmask of ports used.
 
+* **Updated Software Eventdev driver.**
+
+  Added performance tuning arguments to allow tuning the scheduler for
+  better throughtput in high core count use cases.
 
 Removed Items
 -------------
diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c
index 98dae7164..04d05172c 100644
--- a/drivers/event/sw/sw_evdev.c
+++ b/drivers/event/sw/sw_evdev.c
@@ -19,6 +19,9 @@
 #define NUMA_NODE_ARG "numa_node"
 #define SCHED_QUANTA_ARG "sched_quanta"
 #define CREDIT_QUANTA_ARG "credit_quanta"
+#define MIN_BURST_SIZE_ARG "min_burst"
+#define DEQ_BURST_SIZE_ARG "deq_burst"
+#define REFIL_ONCE_ARG "refill_once"
 
 static void
 sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info);
@@ -910,6 +913,35 @@ set_credit_quanta(const char *key __rte_unused, const char 
*value, void *opaque)
        return 0;
 }
 
+static int
+set_deq_burst_sz(const char *key __rte_unused, const char *value, void *opaque)
+{
+       int *deq_burst_sz = opaque;
+       *deq_burst_sz = atoi(value);
+       if (*deq_burst_sz < 0 || *deq_burst_sz > SCHED_DEQUEUE_MAX_BURST_SIZE)
+               return -1;
+       return 0;
+}
+
+static int
+set_min_burst_sz(const char *key __rte_unused, const char *value, void *opaque)
+{
+       int *min_burst_sz = opaque;
+       *min_burst_sz = atoi(value);
+       if (*min_burst_sz < 0 || *min_burst_sz > SCHED_DEQUEUE_MAX_BURST_SIZE)
+               return -1;
+       return 0;
+}
+
+static int
+set_refill_once(const char *key __rte_unused, const char *value, void *opaque)
+{
+       int *refill_once_per_call = opaque;
+       *refill_once_per_call = atoi(value);
+       if (*refill_once_per_call < 0 || *refill_once_per_call > 1)
+               return -1;
+       return 0;
+}
 
 static int32_t sw_sched_service_func(void *args)
 {
@@ -957,6 +989,9 @@ sw_probe(struct rte_vdev_device *vdev)
                NUMA_NODE_ARG,
                SCHED_QUANTA_ARG,
                CREDIT_QUANTA_ARG,
+               MIN_BURST_SIZE_ARG,
+               DEQ_BURST_SIZE_ARG,
+               REFIL_ONCE_ARG,
                NULL
        };
        const char *name;
@@ -966,6 +1001,9 @@ sw_probe(struct rte_vdev_device *vdev)
        int socket_id = rte_socket_id();
        int sched_quanta  = SW_DEFAULT_SCHED_QUANTA;
        int credit_quanta = SW_DEFAULT_CREDIT_QUANTA;
+       int min_burst_size = 1;
+       int deq_burst_size = SCHED_DEQUEUE_DEFAULT_BURST_SIZE;
+       int refill_once = 0;
 
        name = rte_vdev_device_name(vdev);
        params = rte_vdev_device_args(vdev);
@@ -1007,13 +1045,46 @@ sw_probe(struct rte_vdev_device *vdev)
                                return ret;
                        }
 
+                       ret = rte_kvargs_process(kvlist, MIN_BURST_SIZE_ARG,
+                                       set_min_burst_sz, &min_burst_size);
+                       if (ret != 0) {
+                               SW_LOG_ERR(
+                                       "%s: Error parsing minimum burst size 
parameter",
+                                       name);
+                               rte_kvargs_free(kvlist);
+                               return ret;
+                       }
+
+                       ret = rte_kvargs_process(kvlist, DEQ_BURST_SIZE_ARG,
+                                       set_deq_burst_sz, &deq_burst_size);
+                       if (ret != 0) {
+                               SW_LOG_ERR(
+                                       "%s: Error parsing dequeue burst size 
parameter",
+                                       name);
+                               rte_kvargs_free(kvlist);
+                               return ret;
+                       }
+
+                       ret = rte_kvargs_process(kvlist, REFIL_ONCE_ARG,
+                                       set_refill_once, &refill_once);
+                       if (ret != 0) {
+                               SW_LOG_ERR(
+                                       "%s: Error parsing refill once per call 
switch",
+                                       name);
+                               rte_kvargs_free(kvlist);
+                               return ret;
+                       }
+
                        rte_kvargs_free(kvlist);
                }
        }
 
        SW_LOG_INFO(
-                       "Creating eventdev sw device %s, numa_node=%d, 
sched_quanta=%d, credit_quanta=%d\n",
-                       name, socket_id, sched_quanta, credit_quanta);
+                       "Creating eventdev sw device %s, numa_node=%d, "
+                       "sched_quanta=%d, credit_quanta=%d "
+                       "min_burst=%d, deq_burst=%d, refill_once=%d\n",
+                       name, socket_id, sched_quanta, credit_quanta,
+                       min_burst_size, deq_burst_size, refill_once);
 
        dev = rte_event_pmd_vdev_init(name,
                        sizeof(struct sw_evdev), socket_id);
@@ -1038,6 +1109,9 @@ sw_probe(struct rte_vdev_device *vdev)
        /* copy values passed from vdev command line to instance */
        sw->credit_update_quanta = credit_quanta;
        sw->sched_quanta = sched_quanta;
+       sw->sched_min_burst_size = min_burst_size;
+       sw->sched_deq_burst_size = deq_burst_size;
+       sw->refill_once_per_iter = refill_once;
 
        /* register service with EAL */
        struct rte_service_spec service;
@@ -1082,5 +1156,7 @@ static struct rte_vdev_driver evdev_sw_pmd_drv = {
 
 RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_SW_PMD, evdev_sw_pmd_drv);
 RTE_PMD_REGISTER_PARAM_STRING(event_sw, NUMA_NODE_ARG "=<int> "
-               SCHED_QUANTA_ARG "=<int>" CREDIT_QUANTA_ARG "=<int>");
+               SCHED_QUANTA_ARG "=<int>" CREDIT_QUANTA_ARG "=<int>"
+               MIN_BURST_SIZE_ARG "=<int>" DEQ_BURST_SIZE_ARG "=<int>"
+               REFIL_ONCE_ARG "=<int>");
 RTE_LOG_REGISTER(eventdev_sw_log_level, pmd.event.sw, NOTICE);
diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h
index 7c77b2495..2f8d5a5b2 100644
--- a/drivers/event/sw/sw_evdev.h
+++ b/drivers/event/sw/sw_evdev.h
@@ -29,7 +29,13 @@
 /* report dequeue burst sizes in buckets */
 #define SW_DEQ_STAT_BUCKET_SHIFT 2
 /* how many packets pulled from port by sched */
-#define SCHED_DEQUEUE_BURST_SIZE 32
+#define SCHED_DEQUEUE_DEFAULT_BURST_SIZE 32
+/* max buffer size */
+#define SCHED_DEQUEUE_MAX_BURST_SIZE 256
+
+/* Flush the pipeline after this many no enq to cq */
+#define SCHED_NO_ENQ_CYCLE_FLUSH 256
+
 
 #define SW_PORT_HIST_LIST (MAX_SW_PROD_Q_DEPTH) /* size of our history list */
 #define NUM_SAMPLES 64 /* how many data points use for average stats */
@@ -197,7 +203,7 @@ struct sw_port {
        uint32_t pp_buf_start;
        uint32_t pp_buf_count;
        uint16_t cq_buf_count;
-       struct rte_event pp_buf[SCHED_DEQUEUE_BURST_SIZE];
+       struct rte_event pp_buf[SCHED_DEQUEUE_MAX_BURST_SIZE];
        struct rte_event cq_buf[MAX_SW_CONS_Q_DEPTH];
 
        uint8_t num_qids_mapped;
@@ -214,6 +220,16 @@ struct sw_evdev {
        uint32_t xstats_count_mode_port;
        uint32_t xstats_count_mode_queue;
 
+       /* Minimum burst size*/
+       uint32_t sched_min_burst_size __rte_cache_aligned;
+       /* Port dequeue burst size*/
+       uint32_t sched_deq_burst_size;
+       /* Refill pp buffers only once per scheduler call*/
+       uint32_t refill_once_per_iter;
+       /* Current values */
+       uint32_t sched_flush_count;
+       uint32_t sched_min_burst;
+
        /* Contains all ports - load balanced and directed */
        struct sw_port ports[SW_PORTS_MAX] __rte_cache_aligned;
 
diff --git a/drivers/event/sw/sw_evdev_scheduler.c 
b/drivers/event/sw/sw_evdev_scheduler.c
index cff747da8..116f85bc9 100644
--- a/drivers/event/sw/sw_evdev_scheduler.c
+++ b/drivers/event/sw/sw_evdev_scheduler.c
@@ -26,6 +26,29 @@
 /* use cheap bit mixing, we only need to lose a few bits */
 #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK)
 
+
+/* single object enq and deq for non MT ring */
+static __rte_always_inline void
+sw_nonmt_ring_dequeue(struct rte_ring *r, void **obj)
+{
+       if ((r->prod.tail - r->cons.tail) < 1)
+               return;
+       void **ring = (void **)&r[1];
+       *obj = ring[r->cons.tail & r->mask];
+       r->cons.tail++;
+}
+static __rte_always_inline int
+sw_nonmt_ring_enqueue(struct rte_ring *r, void *obj)
+{
+       if ((r->capacity + r->cons.tail - r->prod.tail) < 1)
+               return 0;
+       void **ring = (void **)&r[1];
+       ring[r->prod.tail & r->mask] = obj;
+       r->prod.tail++;
+       return 1;
+}
+
+
 static inline uint32_t
 sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
                uint32_t iq_num, unsigned int count)
@@ -146,9 +169,9 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct 
sw_qid * const qid,
                                cq_idx = 0;
                        cq = qid->cq_map[cq_idx++];
 
-               } while (rte_event_ring_free_count(
-                               sw->ports[cq].cq_worker_ring) == 0 ||
-                               sw->ports[cq].inflights == SW_PORT_HIST_LIST);
+               } while (sw->ports[cq].inflights == SW_PORT_HIST_LIST ||
+                               rte_event_ring_free_count(
+                                       sw->ports[cq].cq_worker_ring) == 0);
 
                struct sw_port *p = &sw->ports[cq];
                if (sw->cq_ring_space[cq] == 0 ||
@@ -164,7 +187,7 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct 
sw_qid * const qid,
                p->hist_list[head].qid = qid_id;
 
                if (keep_order)
-                       rte_ring_sc_dequeue(qid->reorder_buffer_freelist,
+                       sw_nonmt_ring_dequeue(qid->reorder_buffer_freelist,
                                        (void *)&p->hist_list[head].rob_entry);
 
                sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
@@ -229,7 +252,7 @@ sw_schedule_qid_to_cq(struct sw_evdev *sw)
                uint32_t pkts_done = 0;
                uint32_t count = iq_count(&qid->iq[iq_num]);
 
-               if (count > 0) {
+               if (count >= sw->sched_min_burst) {
                        if (type == SW_SCHED_TYPE_DIRECT)
                                pkts_done += sw_schedule_dir_to_cq(sw, qid,
                                                iq_num, count);
@@ -267,7 +290,7 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int 
qid_end)
 
        for (; qid_start < qid_end; qid_start++) {
                struct sw_qid *qid = &sw->qids[qid_start];
-               int i, num_entries_in_use;
+               unsigned int i, num_entries_in_use;
 
                if (qid->type != RTE_SCHED_TYPE_ORDERED)
                        continue;
@@ -275,6 +298,9 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int 
qid_end)
                num_entries_in_use = rte_ring_free_count(
                                        qid->reorder_buffer_freelist);
 
+               if (num_entries_in_use < sw->sched_min_burst)
+                       num_entries_in_use = 0;
+
                for (i = 0; i < num_entries_in_use; i++) {
                        struct reorder_buffer_entry *entry;
                        int j;
@@ -320,7 +346,7 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int 
qid_end)
                        if (!entry->ready) {
                                entry->fragment_index = 0;
 
-                               rte_ring_sp_enqueue(
+                               sw_nonmt_ring_enqueue(
                                                qid->reorder_buffer_freelist,
                                                entry);
 
@@ -339,7 +365,7 @@ sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port)
        struct rte_event_ring *worker = port->rx_worker_ring;
        port->pp_buf_start = 0;
        port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf,
-                       RTE_DIM(port->pp_buf), NULL);
+                       sw->sched_deq_burst_size, NULL);
 }
 
 static __rte_always_inline uint32_t
@@ -350,7 +376,7 @@ __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int 
allow_reorder)
        struct sw_port *port = &sw->ports[port_id];
 
        /* If shadow ring has 0 pkts, pull from worker ring */
-       if (port->pp_buf_count == 0)
+       if (!sw->refill_once_per_iter && port->pp_buf_count == 0)
                sw_refill_pp_buf(sw, port);
 
        while (port->pp_buf_count) {
@@ -468,7 +494,7 @@ sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t 
port_id)
        struct sw_port *port = &sw->ports[port_id];
 
        /* If shadow ring has 0 pkts, pull from worker ring */
-       if (port->pp_buf_count == 0)
+       if (!sw->refill_once_per_iter && port->pp_buf_count == 0)
                sw_refill_pp_buf(sw, port);
 
        while (port->pp_buf_count) {
@@ -557,12 +583,39 @@ sw_event_schedule(struct rte_eventdev *dev)
        /* push all the internal buffered QEs in port->cq_ring to the
         * worker cores: aka, do the ring transfers batched.
         */
+       int no_enq = 1;
        for (i = 0; i < sw->port_count; i++) {
-               struct rte_event_ring *worker = sw->ports[i].cq_worker_ring;
-               rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
-                               sw->ports[i].cq_buf_count,
-                               &sw->cq_ring_space[i]);
-               sw->ports[i].cq_buf_count = 0;
+               struct sw_port *port = &sw->ports[i];
+               struct rte_event_ring *worker = port->cq_worker_ring;
+
+               /* If shadow ring has 0 pkts, pull from worker ring */
+               if (sw->refill_once_per_iter && port->pp_buf_count == 0)
+                       sw_refill_pp_buf(sw, port);
+
+               if (port->cq_buf_count >= sw->sched_min_burst) {
+                       rte_event_ring_enqueue_burst(worker,
+                                       port->cq_buf,
+                                       port->cq_buf_count,
+                                       &sw->cq_ring_space[i]);
+                       port->cq_buf_count = 0;
+                       no_enq = 0;
+               } else {
+                       sw->cq_ring_space[i] =
+                                       rte_event_ring_free_count(worker) -
+                                       port->cq_buf_count;
+               }
+       }
+
+       if (no_enq) {
+               if (unlikely(sw->sched_flush_count > SCHED_NO_ENQ_CYCLE_FLUSH))
+                       sw->sched_min_burst = 1;
+               else
+                       sw->sched_flush_count++;
+       } else {
+               if (sw->sched_flush_count)
+                       sw->sched_flush_count--;
+               else
+                       sw->sched_min_burst = sw->sched_min_burst_size;
        }
 
 }
-- 
2.17.1

Reply via email to