<snip> > > > > Add minimum burst throughout the scheduler pipeline and a flush counter. > > Replace ring API calls with local single threaded implementation where > > possible. > > > > Signed-off-by: Radu Nicolau <radu.nico...@intel.com> > > Thanks for the patch, a few comments inline. > > > --- > > drivers/event/sw/sw_evdev.h | 11 +++- > > drivers/event/sw/sw_evdev_scheduler.c | 83 > > +++++++++++++++++++++++---- > > 2 files changed, 81 insertions(+), 13 deletions(-) > > > > diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h > > index 7c77b2495..95e51065f 100644 > > --- a/drivers/event/sw/sw_evdev.h > > +++ b/drivers/event/sw/sw_evdev.h > > @@ -29,7 +29,13 @@ > > /* report dequeue burst sizes in buckets */ #define > > SW_DEQ_STAT_BUCKET_SHIFT 2 > > /* how many packets pulled from port by sched */ -#define > > SCHED_DEQUEUE_BURST_SIZE 32 > > +#define SCHED_DEQUEUE_BURST_SIZE 64 > > + > > +#define SCHED_MIN_BURST_SIZE 8 > > +#define SCHED_NO_ENQ_CYCLE_FLUSH 256 > > +/* set SCHED_DEQUEUE_BURST_SIZE to 64 or 128 when setting this to 1*/ > > +#define SCHED_REFILL_ONCE_PER_CALL 1 > > Is it possible to make the above #define a runtime option? > Eg, --vdev event_sw,refill_iter=1 > > That would allow packaged versions of DPDK to be usable in both modes. > > > + > > > > #define SW_PORT_HIST_LIST (MAX_SW_PROD_Q_DEPTH) /* size of our > > history list */ #define NUM_SAMPLES 64 /* how many data points use > > for average stats */ @@ -214,6 +220,9 @@ struct sw_evdev { > > uint32_t xstats_count_mode_port; > > uint32_t xstats_count_mode_queue; > > > > + uint16_t sched_flush_count; > > + uint16_t sched_min_burst; > > + > > /* Contains all ports - load balanced and directed */ > > struct sw_port ports[SW_PORTS_MAX] __rte_cache_aligned; > > > > diff --git a/drivers/event/sw/sw_evdev_scheduler.c > > b/drivers/event/sw/sw_evdev_scheduler.c > > index cff747da8..ca6d1caff 100644 > > --- a/drivers/event/sw/sw_evdev_scheduler.c > > +++ b/drivers/event/sw/sw_evdev_scheduler.c > > @@ -26,6 +26,29 @@ > > /* use cheap bit mixing, we only need to lose a few bits */ #define > > SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK) > > > > + > > +/* single object enq and deq for non MT ring */ static > > +__rte_always_inline void sw_nonmt_ring_dequeue(struct rte_ring *r, > > +void **obj) { > > + if ((r->prod.tail - r->cons.tail) < 1) > > + return; > > + void **ring = (void **)&r[1]; > > + *obj = ring[r->cons.tail & r->mask]; > > + r->cons.tail++; > > +} > > +static __rte_always_inline int > > +sw_nonmt_ring_enqueue(struct rte_ring *r, void *obj) { > > + if ((r->capacity + r->cons.tail - r->prod.tail) < 1) > > + return 0; > > + void **ring = (void **)&r[1]; > > + ring[r->prod.tail & r->mask] = obj; > > + r->prod.tail++; > > + return 1; > > + Why not make these APIs part of the rte_ring library? You could further optimize them by keeping the indices on the same cacheline.
> > + > > + > > static inline uint32_t > > sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, > > uint32_t iq_num, unsigned int count) > > @@ -146,9 +169,9 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, > struct > > sw_qid * const qid, > > cq_idx = 0; > > cq = qid->cq_map[cq_idx++]; > > > > - } while (rte_event_ring_free_count( > > - sw->ports[cq].cq_worker_ring) == 0 || > > - sw->ports[cq].inflights == > SW_PORT_HIST_LIST); > > + } while (sw->ports[cq].inflights == SW_PORT_HIST_LIST || > > + rte_event_ring_free_count( > > + sw->ports[cq].cq_worker_ring) == 0); > > > > struct sw_port *p = &sw->ports[cq]; > > if (sw->cq_ring_space[cq] == 0 || > > @@ -164,7 +187,7 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, > struct > > sw_qid * const qid, > > p->hist_list[head].qid = qid_id; > > > > if (keep_order) > > - rte_ring_sc_dequeue(qid->reorder_buffer_freelist, > > + sw_nonmt_ring_dequeue(qid->reorder_buffer_freelist, > > (void *)&p->hist_list[head].rob_entry); > > > > sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe; > > @@ -229,7 +252,7 @@ sw_schedule_qid_to_cq(struct sw_evdev *sw) > > uint32_t pkts_done = 0; > > uint32_t count = iq_count(&qid->iq[iq_num]); > > > > - if (count > 0) { > > + if (count >= sw->sched_min_burst) { > > if (type == SW_SCHED_TYPE_DIRECT) > > pkts_done += sw_schedule_dir_to_cq(sw, qid, > > iq_num, count); > > @@ -267,7 +290,7 @@ sw_schedule_reorder(struct sw_evdev *sw, int > qid_start, int > > qid_end) > > > > for (; qid_start < qid_end; qid_start++) { > > struct sw_qid *qid = &sw->qids[qid_start]; > > - int i, num_entries_in_use; > > + unsigned int i, num_entries_in_use; > > > > if (qid->type != RTE_SCHED_TYPE_ORDERED) > > continue; > > @@ -275,6 +298,9 @@ sw_schedule_reorder(struct sw_evdev *sw, int > qid_start, int > > qid_end) > > num_entries_in_use = rte_ring_free_count( > > qid->reorder_buffer_freelist); > > > > + if (num_entries_in_use < sw->sched_min_burst) > > + num_entries_in_use = 0; > > + > > for (i = 0; i < num_entries_in_use; i++) { > > struct reorder_buffer_entry *entry; > > int j; > > @@ -320,7 +346,7 @@ sw_schedule_reorder(struct sw_evdev *sw, int > qid_start, int > > qid_end) > > if (!entry->ready) { > > entry->fragment_index = 0; > > > > - rte_ring_sp_enqueue( > > + sw_nonmt_ring_enqueue( > > qid->reorder_buffer_freelist, > > entry); > > > > @@ -349,9 +375,11 @@ __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, > int > > allow_reorder) > > uint32_t pkts_iter = 0; > > struct sw_port *port = &sw->ports[port_id]; > > > > +#if !SCHED_REFILL_ONCE_PER_CALL > > /* If shadow ring has 0 pkts, pull from worker ring */ > > if (port->pp_buf_count == 0) > > sw_refill_pp_buf(sw, port); > > +#endif > > As per above comment, this #if would become a runtime check. > Similar for the below #if comments. > > > > while (port->pp_buf_count) { > > const struct rte_event *qe = &port->pp_buf[port- > >pp_buf_start]; > > @@ -467,9 +495,11 @@ sw_schedule_pull_port_dir(struct sw_evdev *sw, > uint32_t > > port_id) > > uint32_t pkts_iter = 0; > > struct sw_port *port = &sw->ports[port_id]; > > > > +#if !SCHED_REFILL_ONCE_PER_CALL > > /* If shadow ring has 0 pkts, pull from worker ring */ > > if (port->pp_buf_count == 0) > > sw_refill_pp_buf(sw, port); > > +#endif > > > > while (port->pp_buf_count) { > > const struct rte_event *qe = &port->pp_buf[port- > >pp_buf_start]; > > @@ -557,12 +587,41 @@ sw_event_schedule(struct rte_eventdev *dev) > > /* push all the internal buffered QEs in port->cq_ring to the > > * worker cores: aka, do the ring transfers batched. > > */ > > + int no_enq = 1; > > for (i = 0; i < sw->port_count; i++) { > > - struct rte_event_ring *worker = sw->ports[i].cq_worker_ring; > > - rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf, > > - sw->ports[i].cq_buf_count, > > - &sw->cq_ring_space[i]); > > - sw->ports[i].cq_buf_count = 0; > > + struct sw_port *port = &sw->ports[i]; > > + struct rte_event_ring *worker = port->cq_worker_ring; > > + > > +#if SCHED_REFILL_ONCE_PER_CALL > > + /* If shadow ring has 0 pkts, pull from worker ring */ > > + if (port->pp_buf_count == 0) > > + sw_refill_pp_buf(sw, port); > > +#endif > > + > > + if (port->cq_buf_count >= sw->sched_min_burst) { > > + rte_event_ring_enqueue_burst(worker, > > + port->cq_buf, > > + port->cq_buf_count, > > + &sw->cq_ring_space[i]); > > + port->cq_buf_count = 0; > > + no_enq = 0; > > + } else { > > + sw->cq_ring_space[i] = > > + rte_event_ring_free_count(worker) - > > + port->cq_buf_count; > > + } > > + } > > + > > + if (no_enq) { > > + if (unlikely(sw->sched_flush_count > > > SCHED_NO_ENQ_CYCLE_FLUSH)) > > + sw->sched_min_burst = 1; > > + else > > + sw->sched_flush_count++; > > + } else { > > + if (sw->sched_flush_count) > > + sw->sched_flush_count--; > > + else > > + sw->sched_min_burst = SCHED_MIN_BURST_SIZE; > > } > > > > } > > -- > > 2.17.1