> -----Original Message-----
> From: Nicolau, Radu <radu.nico...@intel.com>
> Sent: Tuesday, September 8, 2020 11:52 AM
> To: dev@dpdk.org
> Cc: jer...@marvell.com; Van Haaren, Harry <harry.van.haa...@intel.com>;
> Nicolau, Radu <radu.nico...@intel.com>
> Subject: [PATCH v1] event/sw: performance improvements
> 
> Add minimum burst throughout the scheduler pipeline and a flush counter.
> Replace ring API calls with local single threaded implementation
> where possible.
> 
> Signed-off-by: Radu Nicolau <radu.nico...@intel.com>

Thanks for the patch, a few comments inline.

> ---
>  drivers/event/sw/sw_evdev.h           | 11 +++-
>  drivers/event/sw/sw_evdev_scheduler.c | 83 +++++++++++++++++++++++----
>  2 files changed, 81 insertions(+), 13 deletions(-)
> 
> diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h
> index 7c77b2495..95e51065f 100644
> --- a/drivers/event/sw/sw_evdev.h
> +++ b/drivers/event/sw/sw_evdev.h
> @@ -29,7 +29,13 @@
>  /* report dequeue burst sizes in buckets */
>  #define SW_DEQ_STAT_BUCKET_SHIFT 2
>  /* how many packets pulled from port by sched */
> -#define SCHED_DEQUEUE_BURST_SIZE 32
> +#define SCHED_DEQUEUE_BURST_SIZE 64
> +
> +#define SCHED_MIN_BURST_SIZE 8
> +#define SCHED_NO_ENQ_CYCLE_FLUSH 256
> +/* set SCHED_DEQUEUE_BURST_SIZE to 64 or 128 when setting this to 1*/
> +#define SCHED_REFILL_ONCE_PER_CALL 1

Is it possible to make the above #define a runtime option?
Eg, --vdev event_sw,refill_iter=1

That would allow packaged versions of DPDK to be usable in both modes.

> +
> 
>  #define SW_PORT_HIST_LIST (MAX_SW_PROD_Q_DEPTH) /* size of our history list
> */
>  #define NUM_SAMPLES 64 /* how many data points use for average stats */
> @@ -214,6 +220,9 @@ struct sw_evdev {
>       uint32_t xstats_count_mode_port;
>       uint32_t xstats_count_mode_queue;
> 
> +     uint16_t sched_flush_count;
> +     uint16_t sched_min_burst;
> +
>       /* Contains all ports - load balanced and directed */
>       struct sw_port ports[SW_PORTS_MAX] __rte_cache_aligned;
> 
> diff --git a/drivers/event/sw/sw_evdev_scheduler.c
> b/drivers/event/sw/sw_evdev_scheduler.c
> index cff747da8..ca6d1caff 100644
> --- a/drivers/event/sw/sw_evdev_scheduler.c
> +++ b/drivers/event/sw/sw_evdev_scheduler.c
> @@ -26,6 +26,29 @@
>  /* use cheap bit mixing, we only need to lose a few bits */
>  #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK)
> 
> +
> +/* single object enq and deq for non MT ring */
> +static __rte_always_inline void
> +sw_nonmt_ring_dequeue(struct rte_ring *r, void **obj)
> +{
> +     if ((r->prod.tail - r->cons.tail) < 1)
> +             return;
> +     void **ring = (void **)&r[1];
> +     *obj = ring[r->cons.tail & r->mask];
> +     r->cons.tail++;
> +}
> +static __rte_always_inline int
> +sw_nonmt_ring_enqueue(struct rte_ring *r, void *obj)
> +{
> +     if ((r->capacity + r->cons.tail - r->prod.tail) < 1)
> +             return 0;
> +     void **ring = (void **)&r[1];
> +     ring[r->prod.tail & r->mask] = obj;
> +     r->prod.tail++;
> +     return 1;
> +}
> +
> +
>  static inline uint32_t
>  sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid,
>               uint32_t iq_num, unsigned int count)
> @@ -146,9 +169,9 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct
> sw_qid * const qid,
>                               cq_idx = 0;
>                       cq = qid->cq_map[cq_idx++];
> 
> -             } while (rte_event_ring_free_count(
> -                             sw->ports[cq].cq_worker_ring) == 0 ||
> -                             sw->ports[cq].inflights == SW_PORT_HIST_LIST);
> +             } while (sw->ports[cq].inflights == SW_PORT_HIST_LIST ||
> +                             rte_event_ring_free_count(
> +                                     sw->ports[cq].cq_worker_ring) == 0);
> 
>               struct sw_port *p = &sw->ports[cq];
>               if (sw->cq_ring_space[cq] == 0 ||
> @@ -164,7 +187,7 @@ sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct
> sw_qid * const qid,
>               p->hist_list[head].qid = qid_id;
> 
>               if (keep_order)
> -                     rte_ring_sc_dequeue(qid->reorder_buffer_freelist,
> +                     sw_nonmt_ring_dequeue(qid->reorder_buffer_freelist,
>                                       (void *)&p->hist_list[head].rob_entry);
> 
>               sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe;
> @@ -229,7 +252,7 @@ sw_schedule_qid_to_cq(struct sw_evdev *sw)
>               uint32_t pkts_done = 0;
>               uint32_t count = iq_count(&qid->iq[iq_num]);
> 
> -             if (count > 0) {
> +             if (count >= sw->sched_min_burst) {
>                       if (type == SW_SCHED_TYPE_DIRECT)
>                               pkts_done += sw_schedule_dir_to_cq(sw, qid,
>                                               iq_num, count);
> @@ -267,7 +290,7 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, 
> int
> qid_end)
> 
>       for (; qid_start < qid_end; qid_start++) {
>               struct sw_qid *qid = &sw->qids[qid_start];
> -             int i, num_entries_in_use;
> +             unsigned int i, num_entries_in_use;
> 
>               if (qid->type != RTE_SCHED_TYPE_ORDERED)
>                       continue;
> @@ -275,6 +298,9 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, 
> int
> qid_end)
>               num_entries_in_use = rte_ring_free_count(
>                                       qid->reorder_buffer_freelist);
> 
> +             if (num_entries_in_use < sw->sched_min_burst)
> +                     num_entries_in_use = 0;
> +
>               for (i = 0; i < num_entries_in_use; i++) {
>                       struct reorder_buffer_entry *entry;
>                       int j;
> @@ -320,7 +346,7 @@ sw_schedule_reorder(struct sw_evdev *sw, int qid_start, 
> int
> qid_end)
>                       if (!entry->ready) {
>                               entry->fragment_index = 0;
> 
> -                             rte_ring_sp_enqueue(
> +                             sw_nonmt_ring_enqueue(
>                                               qid->reorder_buffer_freelist,
>                                               entry);
> 
> @@ -349,9 +375,11 @@ __pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int
> allow_reorder)
>       uint32_t pkts_iter = 0;
>       struct sw_port *port = &sw->ports[port_id];
> 
> +#if !SCHED_REFILL_ONCE_PER_CALL
>       /* If shadow ring has 0 pkts, pull from worker ring */
>       if (port->pp_buf_count == 0)
>               sw_refill_pp_buf(sw, port);
> +#endif

As per above comment, this #if would become a runtime check.
Similar for the below #if comments.


>       while (port->pp_buf_count) {
>               const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
> @@ -467,9 +495,11 @@ sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t
> port_id)
>       uint32_t pkts_iter = 0;
>       struct sw_port *port = &sw->ports[port_id];
> 
> +#if !SCHED_REFILL_ONCE_PER_CALL
>       /* If shadow ring has 0 pkts, pull from worker ring */
>       if (port->pp_buf_count == 0)
>               sw_refill_pp_buf(sw, port);
> +#endif
> 
>       while (port->pp_buf_count) {
>               const struct rte_event *qe = &port->pp_buf[port->pp_buf_start];
> @@ -557,12 +587,41 @@ sw_event_schedule(struct rte_eventdev *dev)
>       /* push all the internal buffered QEs in port->cq_ring to the
>        * worker cores: aka, do the ring transfers batched.
>        */
> +     int no_enq = 1;
>       for (i = 0; i < sw->port_count; i++) {
> -             struct rte_event_ring *worker = sw->ports[i].cq_worker_ring;
> -             rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf,
> -                             sw->ports[i].cq_buf_count,
> -                             &sw->cq_ring_space[i]);
> -             sw->ports[i].cq_buf_count = 0;
> +             struct sw_port *port = &sw->ports[i];
> +             struct rte_event_ring *worker = port->cq_worker_ring;
> +
> +#if SCHED_REFILL_ONCE_PER_CALL
> +             /* If shadow ring has 0 pkts, pull from worker ring */
> +             if (port->pp_buf_count == 0)
> +                     sw_refill_pp_buf(sw, port);
> +#endif
> +
> +             if (port->cq_buf_count >= sw->sched_min_burst) {
> +                     rte_event_ring_enqueue_burst(worker,
> +                                     port->cq_buf,
> +                                     port->cq_buf_count,
> +                                     &sw->cq_ring_space[i]);
> +                     port->cq_buf_count = 0;
> +                     no_enq = 0;
> +             } else {
> +                     sw->cq_ring_space[i] =
> +                                     rte_event_ring_free_count(worker) -
> +                                     port->cq_buf_count;
> +             }
> +     }
> +
> +     if (no_enq) {
> +             if (unlikely(sw->sched_flush_count >
> SCHED_NO_ENQ_CYCLE_FLUSH))
> +                     sw->sched_min_burst = 1;
> +             else
> +                     sw->sched_flush_count++;
> +     } else {
> +             if (sw->sched_flush_count)
> +                     sw->sched_flush_count--;
> +             else
> +                     sw->sched_min_burst = SCHED_MIN_BURST_SIZE;
>       }
> 
>  }
> --
> 2.17.1

Reply via email to