When application creates the timer adapter by passing `RTE_EVENT_TIMER_ADAPTER_F_SP_PUT` flag, we can optimize the arm sequence by removing the locking overhead.
Signed-off-by: Pavan Nikhilesh <pbhagavat...@caviumnetworks.com> --- drivers/event/octeontx/timvf_evdev.c | 22 +++- drivers/event/octeontx/timvf_evdev.h | 5 + drivers/event/octeontx/timvf_worker.c | 65 ++++++++++++ drivers/event/octeontx/timvf_worker.h | 183 ++++++++++++++++++++++++++++++++++ 4 files changed, 270 insertions(+), 5 deletions(-) diff --git a/drivers/event/octeontx/timvf_evdev.c b/drivers/event/octeontx/timvf_evdev.c index d0ba42263..6cf5d4846 100644 --- a/drivers/event/octeontx/timvf_evdev.c +++ b/drivers/event/octeontx/timvf_evdev.c @@ -174,6 +174,7 @@ timvf_ring_create(struct rte_event_timer_adapter *adptr) struct rte_event_timer_adapter_conf *rcfg = &adptr->data->conf; struct timvf_ring *timr; struct octeontx_timvf_info tinfo; + unsigned int mp_flags = 0; if (octeontx_timvf_info(&tinfo) < 0) return -ENODEV; @@ -224,6 +225,11 @@ timvf_ring_create(struct rte_event_timer_adapter *adptr) timr->nb_chunks = nb_timers / nb_chunk_slots; + if (rcfg->flags & RTE_EVENT_TIMER_ADAPTER_F_SP_PUT) { + mp_flags = MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET; + timvf_log_info("Using single producer mode"); + } + timr->meta.bkt = rte_zmalloc("octeontx_timvf_bucket", (timr->meta.nb_bkts) * sizeof(struct tim_mem_bucket), 0); @@ -261,8 +267,12 @@ timvf_ring_create(struct rte_event_timer_adapter *adptr) timvf_write64(0x7, (uint8_t *)timr->vbar0 + TIM_VF_NRSPERR_ENA_W1C); timvf_write64(0x7, (uint8_t *)timr->vbar0 + TIM_VF_NRSPERR_ENA_W1S); - adptr->arm_burst = timvf_timer_reg_burst_mp; - adptr->arm_tmo_tick_burst = NULL; + if (mp_flags) + adptr->arm_burst = timvf_timer_reg_burst_sp; + else + adptr->arm_burst = timvf_timer_reg_burst_mp; + + adptr->arm_tmo_tick_burst = timvf_timer_reg_brst; adptr->cancel_burst = timvf_timer_unreg_burst; return 0; @@ -297,11 +307,13 @@ timvf_timer_adapter_caps_get(const struct rte_eventdev *dev, uint64_t flags, uint32_t *caps, const struct rte_event_timer_adapter_ops **ops) { RTE_SET_USED(dev); - RTE_SET_USED(flags); - timvf_ops.arm_burst = timvf_timer_reg_burst_mp; - timvf_ops.arm_tmo_tick_burst = NULL; + if (flags & RTE_EVENT_TIMER_ADAPTER_F_SP_PUT) + timvf_ops.arm_burst = timvf_timer_reg_burst_sp; + else + timvf_ops.arm_burst = timvf_timer_reg_burst_mp; + timvf_ops.arm_tmo_tick_burst = timvf_timer_reg_brst; timvf_ops.cancel_burst = timvf_timer_unreg_burst; *caps = RTE_EVENT_TIMER_ADAPTER_CAP_INTERNAL_PORT; *ops = &timvf_ops; diff --git a/drivers/event/octeontx/timvf_evdev.h b/drivers/event/octeontx/timvf_evdev.h index c80e147e8..b5db233bb 100644 --- a/drivers/event/octeontx/timvf_evdev.h +++ b/drivers/event/octeontx/timvf_evdev.h @@ -186,8 +186,13 @@ bkt_mod(uint32_t rel_bkt, uint32_t nb_bkts) int timvf_timer_adapter_caps_get(const struct rte_eventdev *dev, uint64_t flags, uint32_t *caps, const struct rte_event_timer_adapter_ops **ops); +int timvf_timer_reg_brst(const struct rte_event_timer_adapter *adptr, + struct rte_event_timer **tim, const uint64_t timeout_tick, + const uint16_t nb_timers); int timvf_timer_unreg_burst(const struct rte_event_timer_adapter *adptr, struct rte_event_timer **tim, const uint16_t nb_timers); +int timvf_timer_reg_burst_sp(const struct rte_event_timer_adapter *adptr, + struct rte_event_timer **tim, const uint16_t nb_timers); int timvf_timer_reg_burst_mp(const struct rte_event_timer_adapter *adptr, struct rte_event_timer **tim, const uint16_t nb_timers); diff --git a/drivers/event/octeontx/timvf_worker.c b/drivers/event/octeontx/timvf_worker.c index 7a924fd11..3e48f3ca6 100644 --- a/drivers/event/octeontx/timvf_worker.c +++ b/drivers/event/octeontx/timvf_worker.c @@ -5,6 +5,42 @@ #include "timvf_worker.h" +int +timvf_timer_reg_brst(const struct rte_event_timer_adapter *adptr, + struct rte_event_timer **tim, const uint64_t timeout_tick, + const uint16_t nb_timers) +{ + int ret; + uint16_t set_timers = 0; + uint16_t idx; + uint16_t arr_idx = 0; + struct timvf_ring *timr = adptr->data->adapter_priv; + struct tim_mem_entry entry[TIMVF_MAX_BURST] __rte_cache_aligned; + + if (unlikely(timeout_tick > timr->meta.nb_bkts)) { + for (idx = 0; idx < nb_timers; idx++) + tim[idx]->state = RTE_EVENT_TIMER_ERROR_TOOLATE; + rte_errno = -EINVAL; + return 0; + } + + while (arr_idx < nb_timers) { + for (idx = 0; idx < TIMVF_MAX_BURST && (arr_idx < nb_timers); + idx++, arr_idx++) { + entry[idx].w0 = + (tim[arr_idx]->ev.event & 0xFFC000000000) >> 6 | + (tim[arr_idx]->ev.event & 0xFFFFFFFF); + entry[idx].wqe = tim[arr_idx]->ev.u64; + } + ret = timvf_add_entry_brst(timr, timeout_tick, &tim[set_timers], + entry, idx); + set_timers += ret; + if (ret != idx) + break; + } + return set_timers; +} + int timvf_timer_unreg_burst(const struct rte_event_timer_adapter *adptr, struct rte_event_timer **tim, const uint16_t nb_timers) @@ -23,6 +59,35 @@ timvf_timer_unreg_burst(const struct rte_event_timer_adapter *adptr, return index; } +int +timvf_timer_reg_burst_sp(const struct rte_event_timer_adapter *adptr, + struct rte_event_timer **tim, const uint16_t nb_timers) +{ + int ret; + uint16_t index; + struct tim_mem_entry entry; + struct timvf_ring *timr = adptr->data->adapter_priv; + for (index = 0; index < nb_timers; index++) { + if (unlikely(tim[index]->timeout_ticks > timr->meta.nb_bkts)) { + tim[index]->state = RTE_EVENT_TIMER_ERROR_TOOLATE; + rte_errno = -EINVAL; + break; + } + + entry.w0 = (tim[index]->ev.event & 0xFFC000000000) >> 6 | + (tim[index]->ev.event & 0xFFFFFFFF); + entry.wqe = tim[index]->ev.u64; + ret = timvf_add_entry_sp(timr, tim[index]->timeout_ticks, + tim[index], &entry); + if (unlikely(ret)) { + rte_errno = -ret; + break; + } + } + + return index; +} + int timvf_timer_reg_burst_mp(const struct rte_event_timer_adapter *adptr, struct rte_event_timer **tim, const uint16_t nb_timers) diff --git a/drivers/event/octeontx/timvf_worker.h b/drivers/event/octeontx/timvf_worker.h index b63dd763c..320eb6ac1 100644 --- a/drivers/event/octeontx/timvf_worker.h +++ b/drivers/event/octeontx/timvf_worker.h @@ -160,6 +160,118 @@ timr_clr_bkt(struct timvf_ring *timr, struct tim_mem_bucket *bkt) return (struct tim_mem_entry *)bkt->first_chunk; } +/* Burst mode functions */ +static inline int __hot +timvf_add_entry_brst(struct timvf_ring *timr, const uint16_t rel_bkt, + struct rte_event_timer **tim, const struct tim_mem_entry *ents, + const uint16_t nb_timers) +{ + int16_t rem; + int16_t crem = 0; + uint8_t lock_cnt; + uint16_t index = 0; + uint16_t chunk_remainder = 0; + uint32_t bucket; + uint32_t tbkt_id; + const uint32_t nb_bkts = timr->meta.nb_bkts; + const uint64_t start = timr->meta.ring_start_cyc; + uint64_t pos_reg; + uint64_t lock_sema; + struct tim_mem_bucket *bkt; + struct tim_mem_entry *chunk; + +__retry: + pos_reg = (rte_rdtsc() - start); + bucket = rte_reciprocal_divide_u64(pos_reg, + &timr->meta.fast_div) + rel_bkt; + tbkt_id = timr->meta.get_target_bkt(bucket, nb_bkts); + bkt = &timr->meta.bkt[tbkt_id]; + + /* Only one thread beyond this. */ + lock_sema = timr_bkt_inc_lock(bkt); + lock_cnt = (uint8_t) + ((lock_sema >> TIM_BUCKET_W1_S_LOCK) & TIM_BUCKET_W1_M_LOCK); + + if (lock_cnt) { + timr_bkt_dec_lock(bkt); + goto __retry; + } + + /* Bucket related checks. */ + if (unlikely(timr_bkt_get_shbt(lock_sema))) { + timr_bkt_dec_lock(bkt); + goto __retry; + } + + chunk_remainder = timr_bkt_fetch_rem(lock_sema); + rem = chunk_remainder - nb_timers; + if (rem < 0) { + crem = nb_chunk_slots - chunk_remainder; + if (chunk_remainder && crem) { + chunk = ((struct tim_mem_entry *)bkt->current_chunk) + + crem; + for (; index < chunk_remainder; index++) { + *chunk = *(ents + index); + tim[index]->impl_opaque[0] = (uint64_t)chunk++; + tim[index]->impl_opaque[1] = (uint64_t)bkt; + tim[index]->state = RTE_EVENT_TIMER_ARMED; + } + timr_bkt_sub_rem(bkt, chunk_remainder); + timr_bkt_add_nent(bkt, chunk_remainder); + } + rem = nb_timers - chunk_remainder; + ents = ents + chunk_remainder; + if (bkt->nb_entry || !bkt->first_chunk) { + if (unlikely(rte_mempool_get(timr->meta.chunk_pool, + (void **)&chunk))) { + /* + * No more chunks, return number of entries + * successfully copied. + */ + timr_bkt_dec_lock(bkt); + rte_errno = -ENOMEM; + tim[index]->state = RTE_EVENT_TIMER_ERROR; + return crem; + } + if (bkt->nb_entry) { + *(uint64_t *)( + (struct tim_mem_entry *)bkt->current_chunk + + nb_chunk_slots) = (uint64_t) chunk; + } else { + bkt->first_chunk = (uint64_t) chunk; + } + } else { + chunk = timr_clr_bkt(timr, bkt); + bkt->first_chunk = (uint64_t) chunk; + } + *(uint64_t *)(chunk + nb_chunk_slots) = 0; + bkt->current_chunk = (uint64_t) chunk; + + for (; index < nb_timers; index++) { + *chunk = *(ents + index); + tim[index]->impl_opaque[0] = (uint64_t)chunk++; + tim[index]->impl_opaque[1] = (uint64_t)bkt; + tim[index]->state = RTE_EVENT_TIMER_ARMED; + } + timr_bkt_set_rem(bkt, nb_chunk_slots - rem); + timr_bkt_add_nent(bkt, rem); + } else { + chunk = (struct tim_mem_entry *)bkt->current_chunk; + chunk += (nb_chunk_slots - chunk_remainder); + for (; index < nb_timers; index++) { + *chunk = *(ents + index); + tim[index]->impl_opaque[0] = (uint64_t)chunk++; + tim[index]->impl_opaque[1] = (uint64_t)bkt; + tim[index]->state = RTE_EVENT_TIMER_ARMED; + } + timr_bkt_sub_rem(bkt, nb_timers); + timr_bkt_add_nent(bkt, nb_timers); + } + + timr_bkt_dec_lock(bkt); + return nb_timers; +} + static inline int __hot timvf_rem_entry(struct rte_event_timer *tim) { @@ -192,6 +304,77 @@ timvf_rem_entry(struct rte_event_timer *tim) return 0; } +/* Single producer functions. */ +static inline int __hot +timvf_add_entry_sp(struct timvf_ring *timr, const uint32_t rel_bkt, + struct rte_event_timer *tim, const struct tim_mem_entry *pent) +{ + int16_t rem; + uint32_t bucket; + uint32_t tbkt_id; + const uint32_t nb_bkts = timr->meta.nb_bkts; + uint64_t lock_sema; + uint64_t pos_reg; + const uint64_t start = timr->meta.ring_start_cyc; + struct tim_mem_bucket *bkt; + struct tim_mem_entry *chunk; + + pos_reg = (rte_rdtsc() - start); + bucket = rte_reciprocal_divide_u64(pos_reg, + &timr->meta.fast_div) + rel_bkt; + tbkt_id = timr->meta.get_target_bkt(bucket, nb_bkts); + bkt = &timr->meta.bkt[tbkt_id]; +__retry: + /*Get Bucket sema*/ + lock_sema = timr_bkt_fetch_sema(bkt); + /* Bucket related checks. */ + if (unlikely(timr_bkt_get_shbt(lock_sema))) + goto __retry; + + /* Insert the work. */ + rem = timr_bkt_fetch_rem(lock_sema); + + if (!rem) { + /* SP mode will have only one thread. */ + if (bkt->nb_entry || !bkt->first_chunk) { + if (unlikely(rte_mempool_get(timr->meta.chunk_pool, + (void **)&chunk))) { + timr_bkt_set_rem(bkt, 0); + tim->impl_opaque[0] = + tim->impl_opaque[1] = 0; + tim->state = RTE_EVENT_TIMER_ERROR; + return -ENOMEM; + } + if (bkt->nb_entry) { + *(uint64_t *)((struct tim_mem_entry *) + bkt->current_chunk + + nb_chunk_slots) = + (uint64_t) chunk; + } else { + bkt->first_chunk = (uint64_t) chunk; + } + *(uint64_t *)(chunk + nb_chunk_slots) = 0; + } else { + chunk = timr_clr_bkt(timr, bkt); + *(uint64_t *)(chunk + nb_chunk_slots) = 0; + bkt->first_chunk = (uint64_t) chunk; + } + bkt->current_chunk = (uint64_t) chunk; + timr_bkt_set_rem(bkt, nb_chunk_slots - 1); + } else { + chunk = (struct tim_mem_entry *)bkt->current_chunk; + chunk += nb_chunk_slots - rem; + } + /* Copy work entry. */ + *chunk = *pent; + timr_bkt_inc_nent(bkt); + + tim->impl_opaque[0] = (uint64_t)chunk; + tim->impl_opaque[1] = (uint64_t)bkt; + tim->state = RTE_EVENT_TIMER_ARMED; + return 0; +} + /* Multi producer functions. */ static inline int __hot timvf_add_entry_mp(struct timvf_ring *timr, const uint32_t rel_bkt, -- 2.16.1