On Fri, Mar 24, 2017 at 04:53:04PM +0000, Harry van Haaren wrote: > From: Bruce Richardson <bruce.richard...@intel.com> > > add the event enqueue, dequeue and release functions to the eventdev. > These also include tracking of stats for observability in the load of > the scheduler. > Internally in the enqueue function, the various types of enqueue > operations, to forward an existing event, to send a new event, to > drop a previous event, are converted to a series of flags which will > be used by the scheduler code to perform the needed actions for that > event. > > Signed-off-by: Bruce Richardson <bruce.richard...@intel.com> > Signed-off-by: Gage Eads <gage.e...@intel.com> > Signed-off-by: Harry van Haaren <harry.van.haa...@intel.com> > --- > drivers/event/sw/Makefile | 1 + > drivers/event/sw/sw_evdev.c | 5 + > drivers/event/sw/sw_evdev.h | 32 +++++++ > drivers/event/sw/sw_evdev_worker.c | 188 > +++++++++++++++++++++++++++++++++++++ > 4 files changed, 226 insertions(+) > create mode 100644 drivers/event/sw/sw_evdev_worker.c > > diff --git a/drivers/event/sw/Makefile b/drivers/event/sw/Makefile > index d6836e3..b6ecd91 100644 > --- a/drivers/event/sw/Makefile > +++ b/drivers/event/sw/Makefile > @@ -53,6 +53,7 @@ EXPORT_MAP := rte_pmd_evdev_sw_version.map > > # library source files > SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev.c > +SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev_worker.c > > # export include files > SYMLINK-y-include += > diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c > index 82ac3bd..9b2816d 100644 > --- a/drivers/event/sw/sw_evdev.c > +++ b/drivers/event/sw/sw_evdev.c > @@ -412,6 +412,7 @@ sw_dev_configure(const struct rte_eventdev *dev) > sw->qid_count = conf->nb_event_queues; > sw->port_count = conf->nb_event_ports; > sw->nb_events_limit = conf->nb_events_limit; > + rte_atomic32_set(&sw->inflights, 0); > > return 0; > } > @@ -550,6 +551,10 @@ sw_probe(const char *name, const char *params) > return -EFAULT; > } > dev->dev_ops = &evdev_sw_ops; > + dev->enqueue = sw_event_enqueue; > + dev->enqueue_burst = sw_event_enqueue_burst; > + dev->dequeue = sw_event_dequeue; > + dev->dequeue_burst = sw_event_dequeue_burst;
Is all the code in the sw_probe() valid for multi process? If not, after function pointer assignment it can return[1] from sw_probe. Just like another PMD's, we will support configuration API and fastpath API in primary process and secondary process will be limited to fast path functions. [1] if (rte_eal_process_type() != RTE_PROC_PRIMARY) return 0; > > sw = dev->data->dev_private; > sw->data = dev->data; > diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h > index f5515e1..ab372fd 100644 > --- a/drivers/event/sw/sw_evdev.h > +++ b/drivers/event/sw/sw_evdev.h > @@ -55,12 +55,36 @@ > #define SCHED_DEQUEUE_BURST_SIZE 32 > > + > +static inline void > +sw_event_release(struct sw_port *p, uint8_t index) > +{ > + /* > + * Drops the next outstanding event in our history. Used on dequeue > + * to clear any history before dequeuing more events. > + */ > + RTE_SET_USED(index); > + > + /* create drop message */ > + struct rte_event ev = { > + .op = sw_qe_flag_map[RTE_EVENT_OP_RELEASE], > + }; > + > + uint16_t free_count; > + qe_ring_enqueue_burst(p->rx_worker_ring, &ev, 1, &free_count); > + > + /* each release returns one credit */ > + p->outstanding_releases--; > + p->inflight_credits++; > +} > + > +uint16_t > +sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num) > +{ > + int32_t i; > + uint8_t new_ops[PORT_ENQUEUE_MAX_BURST_SIZE]; > + struct sw_port *p = port; > + struct sw_evdev *sw = (void *)p->sw; > + uint32_t sw_inflights = rte_atomic32_read(&sw->inflights); > + > + if (p->inflight_max < sw_inflights) > + return 0; likely and unlikely attributes are missing in fastpath functions. Worth to consider in using those in worker file. > + if (num > PORT_ENQUEUE_MAX_BURST_SIZE) > + num = PORT_ENQUEUE_MAX_BURST_SIZE; > + > + if (p->inflight_credits < num) { > + /* Check if sending events would bring instance over the > + * max events threshold > + */ > + uint32_t credit_update_quanta = sw->credit_update_quanta; > + if (sw_inflights + credit_update_quanta > sw->nb_events_limit) > + return 0; > + > + rte_atomic32_add(&sw->inflights, credit_update_quanta); > + p->inflight_credits += (credit_update_quanta); > + > + if (p->inflight_credits < num) > + return 0; > + } > + > + for (i = 0; i < num; i++) { > + int op = ev[i].op; > + int outstanding = p->outstanding_releases > 0; > + const uint8_t invalid_qid = (ev[i].queue_id >= sw->qid_count); > + > + p->inflight_credits -= (op == RTE_EVENT_OP_NEW); > + p->inflight_credits += (op == RTE_EVENT_OP_RELEASE) * > + outstanding; > + > + new_ops[i] = sw_qe_flag_map[op]; > + new_ops[i] &= ~(invalid_qid << QE_FLAG_VALID_SHIFT); > + > + /* FWD and RELEASE packets will both resolve to taken (assuming > + * correct usage of the API), providing very high correct > + * prediction rate. > + */ > + if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding) > + p->outstanding_releases--; > + /* Branch to avoid touching p->stats except error case */ > + if (invalid_qid) > + p->stats.rx_dropped++; > + } > + > + /* returns number of events actually enqueued */ > + uint32_t enq = qe_ring_enqueue_burst_with_ops(p->rx_worker_ring, ev, i, > + new_ops); > + if (p->outstanding_releases == 0 && p->last_dequeue_burst_sz != 0) { > + uint64_t burst_ticks = rte_get_timer_cycles() - > + p->last_dequeue_ticks; > + uint64_t burst_pkt_ticks = > + burst_ticks / p->last_dequeue_burst_sz; > + p->avg_pkt_ticks -= p->avg_pkt_ticks / NUM_SAMPLES; > + p->avg_pkt_ticks += burst_pkt_ticks / NUM_SAMPLES; > + p->last_dequeue_ticks = 0; > + } > + return enq; > +} > + > +uint16_t > +sw_event_enqueue(void *port, const struct rte_event *ev) > +{ > + return sw_event_enqueue_burst(port, ev, 1); > +} > + > +uint16_t > +sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num, > + uint64_t wait) > +{ > + RTE_SET_USED(wait); > + struct sw_port *p = (void *)port; > + struct sw_evdev *sw = (void *)p->sw; > + struct qe_ring *ring = p->cq_worker_ring; > + uint32_t credit_update_quanta = sw->credit_update_quanta; > + > + /* check that all previous dequeues have been released */ > + if (!p->is_directed) { > + uint16_t out_rels = p->outstanding_releases; > + uint16_t i; > + for (i = 0; i < out_rels; i++) > + sw_event_release(p, i); > + } > + > + /* Intel modification: may not be in final API */ > + if (ev == 0) > + return 0; May be we can remove this one in fastpath. Maybe under DEBUG in common code we can add this. > + > + /* returns number of events actually dequeued */ > + uint16_t ndeq = qe_ring_dequeue_burst(ring, ev, num); > + if (ndeq == 0) { > + p->outstanding_releases = 0; > + p->zero_polls++; > + p->total_polls++; > + goto end; > + } > + > + /* only add credits for directed ports - LB ports send RELEASEs */ > + p->inflight_credits += ndeq * p->is_directed; > + p->outstanding_releases = ndeq; > + p->last_dequeue_burst_sz = ndeq; > + p->last_dequeue_ticks = rte_get_timer_cycles(); > + p->poll_buckets[(ndeq - 1) >> SW_DEQ_STAT_BUCKET_SHIFT]++; > + p->total_polls++; > + > +end: > + if (p->inflight_credits >= credit_update_quanta * 2 && > + p->inflight_credits > credit_update_quanta + ndeq) { > + rte_atomic32_sub(&sw->inflights, credit_update_quanta); > + p->inflight_credits -= credit_update_quanta; > + } > + return ndeq; > +} > + > +uint16_t > +sw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait) > +{ > + return sw_event_dequeue_burst(port, ev, 1, wait); > +} > -- > 2.7.4 >