<snip>

> >
> > Add minimum burst throughout the scheduler pipeline and a flush counter.
> > Replace ring API calls with local single threaded implementation where
> > possible.
> >
> > Signed-off-by: Radu Nicolau <radu.nico...@intel.com>
> 
> Thanks for the patch, a few comments inline.
> 
> > ---
> >  drivers/event/sw/sw_evdev.h           | 11 +++-
> >  drivers/event/sw/sw_evdev_scheduler.c | 83
> > +++++++++++++++++++++++----
> >  2 files changed, 81 insertions(+), 13 deletions(-)
> >
> > diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h
> > index 7c77b2495..95e51065f 100644
> > --- a/drivers/event/sw/sw_evdev.h
> > +++ b/drivers/event/sw/sw_evdev.h
> > @@ -29,7 +29,13 @@
> >  /* report dequeue burst sizes in buckets */  #define
> > SW_DEQ_STAT_BUCKET_SHIFT 2
> >  /* how many packets pulled from port by sched */ -#define
> > SCHED_DEQUEUE_BURST_SIZE 32
> > +#define SCHED_DEQUEUE_BURST_SIZE 64
> > +
> > +#define SCHED_MIN_BURST_SIZE 8
> > +#define SCHED_NO_ENQ_CYCLE_FLUSH 256
> > +/* set SCHED_DEQUEUE_BURST_SIZE to 64 or 128 when setting this to 1*/
> > +#define SCHED_REFILL_ONCE_PER_CALL 1
> 
> Is it possible to make the above #define a runtime option?
> Eg, --vdev event_sw,refill_iter=1
> 
> That would allow packaged versions of DPDK to be usable in both modes.
> 
> > +
> >
> >  #define SW_PORT_HIST_LIST (MAX_SW_PROD_Q_DEPTH) /* size of our
> > history list */  #define NUM_SAMPLES 64 /* how many data points use
> > for average stats */ @@ -214,6 +220,9 @@ struct sw_evdev {
> >     uint32_t xstats_count_mode_port;
> >     uint32_t xstats_count_mode_queue;
> >
> > +   uint16_t sched_flush_count;
> > +   uint16_t sched_min_burst;
> > +
> >     /* Contains all ports - load balanced and directed */
> >     struct sw_port ports[SW_PORTS_MAX] __rte_cache_aligned;
> >
> > diff --git a/drivers/event/sw/sw_evdev_scheduler.c
> > b/drivers/event/sw/sw_evdev_scheduler.c
> > index cff747da8..ca6d1caff 100644
> > --- a/drivers/event/sw/sw_evdev_scheduler.c
> > +++ b/drivers/event/sw/sw_evdev_scheduler.c
> > @@ -26,6 +26,29 @@
> >  /* use cheap bit mixing, we only need to lose a few bits */  #define
> > SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK)
> >
> > +
> > +/* single object enq and deq for non MT ring */ static
> > +__rte_always_inline void sw_nonmt_ring_dequeue(struct rte_ring *r,
> > +void **obj) {
> > +   if ((r->prod.tail - r->cons.tail) < 1)
> > +           return;
> > +   void **ring = (void **)&r[1];
> > +   *obj = ring[r->cons.tail & r->mask];
> > +   r->cons.tail++;
> > +}
> > +static __rte_always_inline int
> > +sw_nonmt_ring_enqueue(struct rte_ring *r, void *obj) {
> > +   if ((r->capacity + r->cons.tail - r->prod.tail) < 1)
> > +           return 0;
> > +   void **ring = (void **)&r[1];
> > +   ring[r->prod.tail & r->mask] = obj;
> > +   r->prod.tail++;
> > +   return 1;
> > +
Why not make these APIs part of the rte_ring library? You could further 
optimize them by keeping the indices on the same cacheline.

> > +
> > +
> >  static inline uint32_t
> >  sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
> >             uint32_t iq_num, unsigned int count)
> > @@ -146,9 +169,9 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw,
> struct
> > sw_qid * const qid,
> >                             cq_idx = 0;
> >                     cq = qid->cq_map[cq_idx++];
> >
> > -           } while (rte_event_ring_free_count(
> > -                           sw->ports[cq].cq_worker_ring) == 0 ||
> > -                           sw->ports[cq].inflights ==
> SW_PORT_HIST_LIST);
> > +           } while (sw->ports[cq].inflights == SW_PORT_HIST_LIST ||
> > +                           rte_event_ring_free_count(
> > +                                   sw->ports[cq].cq_worker_ring) == 0);
> >
> >             struct sw_port *p = &sw->ports[cq];
> >             if (sw->cq_ring_space[cq] == 0 ||
> > @@ -164,7 +187,7 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw,
> struct
> > sw_qid * const qid,
> >             p->hist_list[head].qid = qid_id;
> >
> >             if (keep_order)
> > -                   rte_ring_sc_dequeue(qid->reorder_buffer_freelist,
> > +                   sw_nonmt_ring_dequeue(qid->reorder_buffer_freelist,
> >                                     (void *)&p->hist_list[head].rob_entry);
> >
> >             sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
> > @@ -229,7 +252,7 @@ sw_schedule_qid_to_cq(struct sw_evdev *sw)
> >             uint32_t pkts_done = 0;
> >             uint32_t count = iq_count(&qid->iq[iq_num]);
> >
> > -           if (count > 0) {
> > +           if (count >= sw->sched_min_burst) {
> >                     if (type == SW_SCHED_TYPE_DIRECT)
> >                             pkts_done += sw_schedule_dir_to_cq(sw, qid,
> >                                             iq_num, count);
> > @@ -267,7 +290,7 @@ sw_schedule_reorder(struct sw_evdev *sw, int
> qid_start, int
> > qid_end)
> >
> >     for (; qid_start < qid_end; qid_start++) {
> >             struct sw_qid *qid = &sw->qids[qid_start];
> > -           int i, num_entries_in_use;
> > +           unsigned int i, num_entries_in_use;
> >
> >             if (qid->type != RTE_SCHED_TYPE_ORDERED)
> >                     continue;
> > @@ -275,6 +298,9 @@ sw_schedule_reorder(struct sw_evdev *sw, int
> qid_start, int
> > qid_end)
> >             num_entries_in_use = rte_ring_free_count(
> >                                     qid->reorder_buffer_freelist);
> >
> > +           if (num_entries_in_use < sw->sched_min_burst)
> > +                   num_entries_in_use = 0;
> > +
> >             for (i = 0; i < num_entries_in_use; i++) {
> >                     struct reorder_buffer_entry *entry;
> >                     int j;
> > @@ -320,7 +346,7 @@ sw_schedule_reorder(struct sw_evdev *sw, int
> qid_start, int
> > qid_end)
> >                     if (!entry->ready) {
> >                             entry->fragment_index = 0;
> >
> > -                           rte_ring_sp_enqueue(
> > +                           sw_nonmt_ring_enqueue(
> >                                             qid->reorder_buffer_freelist,
> >                                             entry);
> >
> > @@ -349,9 +375,11 @@ __pull_port_lb(struct sw_evdev *sw, uint32_t port_id,
> int
> > allow_reorder)
> >     uint32_t pkts_iter = 0;
> >     struct sw_port *port = &sw->ports[port_id];
> >
> > +#if !SCHED_REFILL_ONCE_PER_CALL
> >     /* If shadow ring has 0 pkts, pull from worker ring */
> >     if (port->pp_buf_count == 0)
> >             sw_refill_pp_buf(sw, port);
> > +#endif
> 
> As per above comment, this #if would become a runtime check.
> Similar for the below #if comments.
> 
> 
> >     while (port->pp_buf_count) {
> >             const struct rte_event *qe = &port->pp_buf[port-
> >pp_buf_start];
> > @@ -467,9 +495,11 @@ sw_schedule_pull_port_dir(struct sw_evdev *sw,
> uint32_t
> > port_id)
> >     uint32_t pkts_iter = 0;
> >     struct sw_port *port = &sw->ports[port_id];
> >
> > +#if !SCHED_REFILL_ONCE_PER_CALL
> >     /* If shadow ring has 0 pkts, pull from worker ring */
> >     if (port->pp_buf_count == 0)
> >             sw_refill_pp_buf(sw, port);
> > +#endif
> >
> >     while (port->pp_buf_count) {
> >             const struct rte_event *qe = &port->pp_buf[port-
> >pp_buf_start];
> > @@ -557,12 +587,41 @@ sw_event_schedule(struct rte_eventdev *dev)
> >     /* push all the internal buffered QEs in port->cq_ring to the
> >      * worker cores: aka, do the ring transfers batched.
> >      */
> > +   int no_enq = 1;
> >     for (i = 0; i < sw->port_count; i++) {
> > -           struct rte_event_ring *worker = sw->ports[i].cq_worker_ring;
> > -           rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
> > -                           sw->ports[i].cq_buf_count,
> > -                           &sw->cq_ring_space[i]);
> > -           sw->ports[i].cq_buf_count = 0;
> > +           struct sw_port *port = &sw->ports[i];
> > +           struct rte_event_ring *worker = port->cq_worker_ring;
> > +
> > +#if SCHED_REFILL_ONCE_PER_CALL
> > +           /* If shadow ring has 0 pkts, pull from worker ring */
> > +           if (port->pp_buf_count == 0)
> > +                   sw_refill_pp_buf(sw, port);
> > +#endif
> > +
> > +           if (port->cq_buf_count >= sw->sched_min_burst) {
> > +                   rte_event_ring_enqueue_burst(worker,
> > +                                   port->cq_buf,
> > +                                   port->cq_buf_count,
> > +                                   &sw->cq_ring_space[i]);
> > +                   port->cq_buf_count = 0;
> > +                   no_enq = 0;
> > +           } else {
> > +                   sw->cq_ring_space[i] =
> > +                                   rte_event_ring_free_count(worker) -
> > +                                   port->cq_buf_count;
> > +           }
> > +   }
> > +
> > +   if (no_enq) {
> > +           if (unlikely(sw->sched_flush_count >
> > SCHED_NO_ENQ_CYCLE_FLUSH))
> > +                   sw->sched_min_burst = 1;
> > +           else
> > +                   sw->sched_flush_count++;
> > +   } else {
> > +           if (sw->sched_flush_count)
> > +                   sw->sched_flush_count--;
> > +           else
> > +                   sw->sched_min_burst = SCHED_MIN_BURST_SIZE;
> >     }
> >
> >  }
> > --
> > 2.17.1

Reply via email to