On Fri, Mar 24, 2017 at 04:53:04PM +0000, Harry van Haaren wrote:
> From: Bruce Richardson <bruce.richard...@intel.com>
> 
> add the event enqueue, dequeue and release functions to the eventdev.
> These also include tracking of stats for observability in the load of
> the scheduler.
> Internally in the enqueue function, the various types of enqueue
> operations, to forward an existing event, to send a new event, to
> drop a previous event, are converted to a series of flags which will
> be used by the scheduler code to perform the needed actions for that
> event.
> 
> Signed-off-by: Bruce Richardson <bruce.richard...@intel.com>
> Signed-off-by: Gage Eads <gage.e...@intel.com>
> Signed-off-by: Harry van Haaren <harry.van.haa...@intel.com>
> ---
>  drivers/event/sw/Makefile          |   1 +
>  drivers/event/sw/sw_evdev.c        |   5 +
>  drivers/event/sw/sw_evdev.h        |  32 +++++++
>  drivers/event/sw/sw_evdev_worker.c | 188 
> +++++++++++++++++++++++++++++++++++++
>  4 files changed, 226 insertions(+)
>  create mode 100644 drivers/event/sw/sw_evdev_worker.c
> 
> diff --git a/drivers/event/sw/Makefile b/drivers/event/sw/Makefile
> index d6836e3..b6ecd91 100644
> --- a/drivers/event/sw/Makefile
> +++ b/drivers/event/sw/Makefile
> @@ -53,6 +53,7 @@ EXPORT_MAP := rte_pmd_evdev_sw_version.map
>  
>  # library source files
>  SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev.c
> +SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev_worker.c
>  
>  # export include files
>  SYMLINK-y-include +=
> diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c
> index 82ac3bd..9b2816d 100644
> --- a/drivers/event/sw/sw_evdev.c
> +++ b/drivers/event/sw/sw_evdev.c
> @@ -412,6 +412,7 @@ sw_dev_configure(const struct rte_eventdev *dev)
>       sw->qid_count = conf->nb_event_queues;
>       sw->port_count = conf->nb_event_ports;
>       sw->nb_events_limit = conf->nb_events_limit;
> +     rte_atomic32_set(&sw->inflights, 0);
>  
>       return 0;
>  }
> @@ -550,6 +551,10 @@ sw_probe(const char *name, const char *params)
>               return -EFAULT;
>       }
>       dev->dev_ops = &evdev_sw_ops;
> +     dev->enqueue = sw_event_enqueue;
> +     dev->enqueue_burst = sw_event_enqueue_burst;
> +     dev->dequeue = sw_event_dequeue;
> +     dev->dequeue_burst = sw_event_dequeue_burst;

Is all the code in the sw_probe() valid for multi process? If not, after
function pointer assignment it can return[1] from sw_probe. Just like
another PMD's, we will support configuration API and fastpath API in primary
process and secondary process will be limited to fast path functions.

[1]
        if (rte_eal_process_type() != RTE_PROC_PRIMARY)
                return 0;

>  
>       sw = dev->data->dev_private;
>       sw->data = dev->data;
> diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h
> index f5515e1..ab372fd 100644
> --- a/drivers/event/sw/sw_evdev.h
> +++ b/drivers/event/sw/sw_evdev.h
> @@ -55,12 +55,36 @@
>  #define SCHED_DEQUEUE_BURST_SIZE 32
>  
> +
> +static inline void
> +sw_event_release(struct sw_port *p, uint8_t index)
> +{
> +     /*
> +      * Drops the next outstanding event in our history. Used on dequeue
> +      * to clear any history before dequeuing more events.
> +      */
> +     RTE_SET_USED(index);
> +
> +     /* create drop message */
> +     struct rte_event ev = {
> +             .op = sw_qe_flag_map[RTE_EVENT_OP_RELEASE],
> +     };
> +
> +     uint16_t free_count;
> +     qe_ring_enqueue_burst(p->rx_worker_ring, &ev, 1, &free_count);
> +
> +     /* each release returns one credit */
> +     p->outstanding_releases--;
> +     p->inflight_credits++;
> +}
> +
> +uint16_t
> +sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num)
> +{
> +     int32_t i;
> +     uint8_t new_ops[PORT_ENQUEUE_MAX_BURST_SIZE];
> +     struct sw_port *p = port;
> +     struct sw_evdev *sw = (void *)p->sw;
> +     uint32_t sw_inflights = rte_atomic32_read(&sw->inflights);
> +
> +     if (p->inflight_max < sw_inflights)
> +             return 0;

likely and unlikely attributes are missing in fastpath functions.
Worth to consider in using those in worker file.

> +     if (num > PORT_ENQUEUE_MAX_BURST_SIZE)
> +             num = PORT_ENQUEUE_MAX_BURST_SIZE;
> +
> +     if (p->inflight_credits < num) {
> +             /* Check if sending events would bring instance over the
> +              * max events threshold
> +              */
> +             uint32_t credit_update_quanta = sw->credit_update_quanta;
> +             if (sw_inflights + credit_update_quanta > sw->nb_events_limit)
> +                     return 0;
> +
> +             rte_atomic32_add(&sw->inflights, credit_update_quanta);
> +             p->inflight_credits += (credit_update_quanta);
> +
> +             if (p->inflight_credits < num)
> +                     return 0;
> +     }
> +
> +     for (i = 0; i < num; i++) {
> +             int op = ev[i].op;
> +             int outstanding = p->outstanding_releases > 0;
> +             const uint8_t invalid_qid = (ev[i].queue_id >= sw->qid_count);
> +
> +             p->inflight_credits -= (op == RTE_EVENT_OP_NEW);
> +             p->inflight_credits += (op == RTE_EVENT_OP_RELEASE) *
> +                                     outstanding;
> +
> +             new_ops[i] = sw_qe_flag_map[op];
> +             new_ops[i] &= ~(invalid_qid << QE_FLAG_VALID_SHIFT);
> +
> +             /* FWD and RELEASE packets will both resolve to taken (assuming
> +              * correct usage of the API), providing very high correct
> +              * prediction rate.
> +              */
> +             if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding)
> +                     p->outstanding_releases--;
> +             /* Branch to avoid touching p->stats except error case */
> +             if (invalid_qid)
> +                     p->stats.rx_dropped++;
> +     }
> +
> +     /* returns number of events actually enqueued */
> +     uint32_t enq = qe_ring_enqueue_burst_with_ops(p->rx_worker_ring, ev, i,
> +                                          new_ops);
> +     if (p->outstanding_releases == 0 && p->last_dequeue_burst_sz != 0) {
> +             uint64_t burst_ticks = rte_get_timer_cycles() -
> +                             p->last_dequeue_ticks;
> +             uint64_t burst_pkt_ticks =
> +                     burst_ticks / p->last_dequeue_burst_sz;
> +             p->avg_pkt_ticks -= p->avg_pkt_ticks / NUM_SAMPLES;
> +             p->avg_pkt_ticks += burst_pkt_ticks / NUM_SAMPLES;
> +             p->last_dequeue_ticks = 0;
> +     }
> +     return enq;
> +}
> +
> +uint16_t
> +sw_event_enqueue(void *port, const struct rte_event *ev)
> +{
> +     return sw_event_enqueue_burst(port, ev, 1);
> +}
> +
> +uint16_t
> +sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num,
> +             uint64_t wait)
> +{
> +     RTE_SET_USED(wait);
> +     struct sw_port *p = (void *)port;
> +     struct sw_evdev *sw = (void *)p->sw;
> +     struct qe_ring *ring = p->cq_worker_ring;
> +     uint32_t credit_update_quanta = sw->credit_update_quanta;
> +
> +     /* check that all previous dequeues have been released */
> +     if (!p->is_directed) {
> +             uint16_t out_rels = p->outstanding_releases;
> +             uint16_t i;
> +             for (i = 0; i < out_rels; i++)
> +                     sw_event_release(p, i);
> +     }
> +
> +     /* Intel modification: may not be in final API */
> +     if (ev == 0)
> +             return 0;

May be we can remove this one in fastpath. Maybe under DEBUG in common code
we can add this.

> +
> +     /* returns number of events actually dequeued */
> +     uint16_t ndeq = qe_ring_dequeue_burst(ring, ev, num);
> +     if (ndeq == 0) {
> +             p->outstanding_releases = 0;
> +             p->zero_polls++;
> +             p->total_polls++;
> +             goto end;
> +     }
> +
> +     /* only add credits for directed ports - LB ports send RELEASEs */
> +     p->inflight_credits += ndeq * p->is_directed;
> +     p->outstanding_releases = ndeq;
> +     p->last_dequeue_burst_sz = ndeq;
> +     p->last_dequeue_ticks = rte_get_timer_cycles();
> +     p->poll_buckets[(ndeq - 1) >> SW_DEQ_STAT_BUCKET_SHIFT]++;
> +     p->total_polls++;
> +
> +end:
> +     if (p->inflight_credits >= credit_update_quanta * 2 &&
> +                     p->inflight_credits > credit_update_quanta + ndeq) {
> +             rte_atomic32_sub(&sw->inflights, credit_update_quanta);
> +             p->inflight_credits -= credit_update_quanta;
> +     }
> +     return ndeq;
> +}
> +
> +uint16_t
> +sw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait)
> +{
> +     return sw_event_dequeue_burst(port, ev, 1, wait);
> +}
> -- 
> 2.7.4
> 

Reply via email to