> -----Original Message----- > From: Nicolau, Radu <radu.nico...@intel.com> > Sent: Tuesday, September 8, 2020 11:52 AM > To: dev@dpdk.org > Cc: jer...@marvell.com; Van Haaren, Harry <harry.van.haa...@intel.com>; > Nicolau, Radu <radu.nico...@intel.com> > Subject: [PATCH v1] event/sw: performance improvements > > Add minimum burst throughout the scheduler pipeline and a flush counter. > Replace ring API calls with local single threaded implementation > where possible. > > Signed-off-by: Radu Nicolau <radu.nico...@intel.com>
Thanks for the patch, a few comments inline. > --- > drivers/event/sw/sw_evdev.h | 11 +++- > drivers/event/sw/sw_evdev_scheduler.c | 83 +++++++++++++++++++++++---- > 2 files changed, 81 insertions(+), 13 deletions(-) > > diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h > index 7c77b2495..95e51065f 100644 > --- a/drivers/event/sw/sw_evdev.h > +++ b/drivers/event/sw/sw_evdev.h > @@ -29,7 +29,13 @@ > /* report dequeue burst sizes in buckets */ > #define SW_DEQ_STAT_BUCKET_SHIFT 2 > /* how many packets pulled from port by sched */ > -#define SCHED_DEQUEUE_BURST_SIZE 32 > +#define SCHED_DEQUEUE_BURST_SIZE 64 > + > +#define SCHED_MIN_BURST_SIZE 8 > +#define SCHED_NO_ENQ_CYCLE_FLUSH 256 > +/* set SCHED_DEQUEUE_BURST_SIZE to 64 or 128 when setting this to 1*/ > +#define SCHED_REFILL_ONCE_PER_CALL 1 Is it possible to make the above #define a runtime option? Eg, --vdev event_sw,refill_iter=1 That would allow packaged versions of DPDK to be usable in both modes. > + > > #define SW_PORT_HIST_LIST (MAX_SW_PROD_Q_DEPTH) /* size of our history list > */ > #define NUM_SAMPLES 64 /* how many data points use for average stats */ > @@ -214,6 +220,9 @@ struct sw_evdev { > uint32_t xstats_count_mode_port; > uint32_t xstats_count_mode_queue; > > + uint16_t sched_flush_count; > + uint16_t sched_min_burst; > + > /* Contains all ports - load balanced and directed */ > struct sw_port ports[SW_PORTS_MAX] __rte_cache_aligned; > > diff --git a/drivers/event/sw/sw_evdev_scheduler.c > b/drivers/event/sw/sw_evdev_scheduler.c > index cff747da8..ca6d1caff 100644 > --- a/drivers/event/sw/sw_evdev_scheduler.c > +++ b/drivers/event/sw/sw_evdev_scheduler.c > @@ -26,6 +26,29 @@ > /* use cheap bit mixing, we only need to lose a few bits */ > #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK) > > + > +/* single object enq and deq for non MT ring */ > +static __rte_always_inline void > +sw_nonmt_ring_dequeue(struct rte_ring *r, void **obj) > +{ > + if ((r->prod.tail - r->cons.tail) < 1) > + return; > + void **ring = (void **)&r[1]; > + *obj = ring[r->cons.tail & r->mask]; > + r->cons.tail++; > +} > +static __rte_always_inline int > +sw_nonmt_ring_enqueue(struct rte_ring *r, void *obj) > +{ > + if ((r->capacity + r->cons.tail - r->prod.tail) < 1) > + return 0; > + void **ring = (void **)&r[1]; > + ring[r->prod.tail & r->mask] = obj; > + r->prod.tail++; > + return 1; > +} > + > + > static inline uint32_t > sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, > uint32_t iq_num, unsigned int count) > @@ -146,9 +169,9 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct > sw_qid * const qid, > cq_idx = 0; > cq = qid->cq_map[cq_idx++]; > > - } while (rte_event_ring_free_count( > - sw->ports[cq].cq_worker_ring) == 0 || > - sw->ports[cq].inflights == SW_PORT_HIST_LIST); > + } while (sw->ports[cq].inflights == SW_PORT_HIST_LIST || > + rte_event_ring_free_count( > + sw->ports[cq].cq_worker_ring) == 0); > > struct sw_port *p = &sw->ports[cq]; > if (sw->cq_ring_space[cq] == 0 || > @@ -164,7 +187,7 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct > sw_qid * const qid, > p->hist_list[head].qid = qid_id; > > if (keep_order) > - rte_ring_sc_dequeue(qid->reorder_buffer_freelist, > + sw_nonmt_ring_dequeue(qid->reorder_buffer_freelist, > (void *)&p->hist_list[head].rob_entry); > > sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe; > @@ -229,7 +252,7 @@ sw_schedule_qid_to_cq(struct sw_evdev *sw) > uint32_t pkts_done = 0; > uint32_t count = iq_count(&qid->iq[iq_num]); > > - if (count > 0) { > + if (count >= sw->sched_min_burst) { > if (type == SW_SCHED_TYPE_DIRECT) > pkts_done += sw_schedule_dir_to_cq(sw, qid, > iq_num, count); > @@ -267,7 +290,7 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, > int > qid_end) > > for (; qid_start < qid_end; qid_start++) { > struct sw_qid *qid = &sw->qids[qid_start]; > - int i, num_entries_in_use; > + unsigned int i, num_entries_in_use; > > if (qid->type != RTE_SCHED_TYPE_ORDERED) > continue; > @@ -275,6 +298,9 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, > int > qid_end) > num_entries_in_use = rte_ring_free_count( > qid->reorder_buffer_freelist); > > + if (num_entries_in_use < sw->sched_min_burst) > + num_entries_in_use = 0; > + > for (i = 0; i < num_entries_in_use; i++) { > struct reorder_buffer_entry *entry; > int j; > @@ -320,7 +346,7 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, > int > qid_end) > if (!entry->ready) { > entry->fragment_index = 0; > > - rte_ring_sp_enqueue( > + sw_nonmt_ring_enqueue( > qid->reorder_buffer_freelist, > entry); > > @@ -349,9 +375,11 @@ __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int > allow_reorder) > uint32_t pkts_iter = 0; > struct sw_port *port = &sw->ports[port_id]; > > +#if !SCHED_REFILL_ONCE_PER_CALL > /* If shadow ring has 0 pkts, pull from worker ring */ > if (port->pp_buf_count == 0) > sw_refill_pp_buf(sw, port); > +#endif As per above comment, this #if would become a runtime check. Similar for the below #if comments. > while (port->pp_buf_count) { > const struct rte_event *qe = &port->pp_buf[port->pp_buf_start]; > @@ -467,9 +495,11 @@ sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t > port_id) > uint32_t pkts_iter = 0; > struct sw_port *port = &sw->ports[port_id]; > > +#if !SCHED_REFILL_ONCE_PER_CALL > /* If shadow ring has 0 pkts, pull from worker ring */ > if (port->pp_buf_count == 0) > sw_refill_pp_buf(sw, port); > +#endif > > while (port->pp_buf_count) { > const struct rte_event *qe = &port->pp_buf[port->pp_buf_start]; > @@ -557,12 +587,41 @@ sw_event_schedule(struct rte_eventdev *dev) > /* push all the internal buffered QEs in port->cq_ring to the > * worker cores: aka, do the ring transfers batched. > */ > + int no_enq = 1; > for (i = 0; i < sw->port_count; i++) { > - struct rte_event_ring *worker = sw->ports[i].cq_worker_ring; > - rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf, > - sw->ports[i].cq_buf_count, > - &sw->cq_ring_space[i]); > - sw->ports[i].cq_buf_count = 0; > + struct sw_port *port = &sw->ports[i]; > + struct rte_event_ring *worker = port->cq_worker_ring; > + > +#if SCHED_REFILL_ONCE_PER_CALL > + /* If shadow ring has 0 pkts, pull from worker ring */ > + if (port->pp_buf_count == 0) > + sw_refill_pp_buf(sw, port); > +#endif > + > + if (port->cq_buf_count >= sw->sched_min_burst) { > + rte_event_ring_enqueue_burst(worker, > + port->cq_buf, > + port->cq_buf_count, > + &sw->cq_ring_space[i]); > + port->cq_buf_count = 0; > + no_enq = 0; > + } else { > + sw->cq_ring_space[i] = > + rte_event_ring_free_count(worker) - > + port->cq_buf_count; > + } > + } > + > + if (no_enq) { > + if (unlikely(sw->sched_flush_count > > SCHED_NO_ENQ_CYCLE_FLUSH)) > + sw->sched_min_burst = 1; > + else > + sw->sched_flush_count++; > + } else { > + if (sw->sched_flush_count) > + sw->sched_flush_count--; > + else > + sw->sched_min_burst = SCHED_MIN_BURST_SIZE; > } > > } > -- > 2.17.1