> -----Original Message----- > From: dev <dev-boun...@dpdk.org> On Behalf Of Phil Yang > Sent: Friday, June 12, 2020 7:20 PM > To: dev@dpdk.org; erik.g.carri...@intel.com > Cc: d...@linux.vnet.ibm.com; Honnappa Nagarahalli > <honnappa.nagaraha...@arm.com>; Ruifeng Wang > <ruifeng.w...@arm.com>; Dharmik Thakkar <dharmik.thak...@arm.com>; > nd <n...@arm.com> > Subject: [dpdk-dev] [PATCH 3/3] eventdev: relax smp barriers with c11 > atomics > > The implementation-specific opaque data is shared between arm and cancel > operations. The state flag acts as a guard variable to make sure the > update of opaque data is synchronized. This patch uses c11 atomics with > explicit one way memory barrier instead of full barriers rte_smp_w/rmb() > to synchronize the opaque data between timer arm and cancel threads. > > Signed-off-by: Phil Yang <phil.y...@arm.com> > Reviewed-by: Dharmik Thakkar <dharmik.thak...@arm.com> > Reviewed-by: Ruifeng Wang <ruifeng.w...@arm.com> > --- > lib/librte_eventdev/rte_event_timer_adapter.c | 55 > ++++++++++++++++++--------- > lib/librte_eventdev/rte_event_timer_adapter.h | 2 +- > 2 files changed, 38 insertions(+), 19 deletions(-) > > diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c > b/lib/librte_eventdev/rte_event_timer_adapter.c > index 6947efb..0a26501 100644 > --- a/lib/librte_eventdev/rte_event_timer_adapter.c > +++ b/lib/librte_eventdev/rte_event_timer_adapter.c > @@ -629,7 +629,8 @@ swtim_callback(struct rte_timer *tim) > sw->expired_timers[sw->n_expired_timers++] = tim; > sw->stats.evtim_exp_count++; > > - evtim->state = RTE_EVENT_TIMER_NOT_ARMED; > + __atomic_store_n(&evtim->state, > RTE_EVENT_TIMER_NOT_ARMED, > + __ATOMIC_RELEASE); > } > > if (event_buffer_batch_ready(&sw->buffer)) { > @@ -1020,6 +1021,7 @@ __swtim_arm_burst(const struct > rte_event_timer_adapter *adapter, > int n_lcores; > /* Timer is not armed state */ > int16_t exp_state = 0; > + enum rte_event_timer_state n_state; > > #ifdef RTE_LIBRTE_EVENTDEV_DEBUG > /* Check that the service is running. */ > @@ -1060,30 +1062,36 @@ __swtim_arm_burst(const struct > rte_event_timer_adapter *adapter, > } > > for (i = 0; i < nb_evtims; i++) { > - /* Don't modify the event timer state in these cases */ > - if (evtims[i]->state == RTE_EVENT_TIMER_ARMED) { > + n_state = __atomic_load_n(&evtims[i]->state, > __ATOMIC_RELAXED); > + if (n_state == RTE_EVENT_TIMER_ARMED) { > rte_errno = EALREADY; > break; > - } else if (!(evtims[i]->state == > RTE_EVENT_TIMER_NOT_ARMED || > - evtims[i]->state == > RTE_EVENT_TIMER_CANCELED)) { > + } else if (!(n_state == RTE_EVENT_TIMER_NOT_ARMED || > + n_state == RTE_EVENT_TIMER_CANCELED)) { > rte_errno = EINVAL; > break; > } > > ret = check_timeout(evtims[i], adapter); > if (unlikely(ret == -1)) { > - evtims[i]->state = > RTE_EVENT_TIMER_ERROR_TOOLATE; > + __atomic_store_n(&evtims[i]->state, > + RTE_EVENT_TIMER_ERROR_TOOLATE, > + __ATOMIC_RELAXED); > rte_errno = EINVAL; > break; > } else if (unlikely(ret == -2)) { > - evtims[i]->state = > RTE_EVENT_TIMER_ERROR_TOOEARLY; > + __atomic_store_n(&evtims[i]->state, > + > RTE_EVENT_TIMER_ERROR_TOOEARLY, > + __ATOMIC_RELAXED); > rte_errno = EINVAL; > break; > } > > if (unlikely(check_destination_event_queue(evtims[i], > adapter) < 0)) { > - evtims[i]->state = RTE_EVENT_TIMER_ERROR; > + __atomic_store_n(&evtims[i]->state, > + RTE_EVENT_TIMER_ERROR, > + __ATOMIC_RELAXED); > rte_errno = EINVAL; > break; > } > @@ -1099,13 +1107,18 @@ __swtim_arm_burst(const struct > rte_event_timer_adapter *adapter, > SINGLE, lcore_id, NULL, evtims[i]); > if (ret < 0) { > /* tim was in RUNNING or CONFIG state */ > - evtims[i]->state = RTE_EVENT_TIMER_ERROR; > + __atomic_store_n(&evtims[i]->state, > + RTE_EVENT_TIMER_ERROR, > + __ATOMIC_RELEASE); > break; > } > > - rte_smp_wmb(); > EVTIM_LOG_DBG("armed an event timer"); > - evtims[i]->state = RTE_EVENT_TIMER_ARMED; > + /* RELEASE ordering guarantees the adapter specific value > + * changes observed before the update of state. > + */ > + __atomic_store_n(&evtims[i]->state, > RTE_EVENT_TIMER_ARMED, > + __ATOMIC_RELEASE); > } > > if (i < nb_evtims) > @@ -1132,6 +1145,7 @@ swtim_cancel_burst(const struct > rte_event_timer_adapter *adapter, > struct rte_timer *timp; > uint64_t opaque; > struct swtim *sw = swtim_pmd_priv(adapter); > + enum rte_event_timer_state n_state; > > #ifdef RTE_LIBRTE_EVENTDEV_DEBUG > /* Check that the service is running. */ > @@ -1143,16 +1157,18 @@ swtim_cancel_burst(const struct > rte_event_timer_adapter *adapter, > > for (i = 0; i < nb_evtims; i++) { > /* Don't modify the event timer state in these cases */ > - if (evtims[i]->state == RTE_EVENT_TIMER_CANCELED) { > + /* ACQUIRE ordering guarantees the access of > implementation > + * specific opague data under the correct state. > + */ > + n_state = __atomic_load_n(&evtims[i]->state, > __ATOMIC_ACQUIRE); > + if (n_state == RTE_EVENT_TIMER_CANCELED) { > rte_errno = EALREADY; > break; > - } else if (evtims[i]->state != RTE_EVENT_TIMER_ARMED) { > + } else if (n_state != RTE_EVENT_TIMER_ARMED) { > rte_errno = EINVAL; > break; > } > > - rte_smp_rmb(); > - > opaque = evtims[i]->impl_opaque[0]; > timp = (struct rte_timer *)(uintptr_t)opaque; > RTE_ASSERT(timp != NULL); > @@ -1166,11 +1182,14 @@ swtim_cancel_burst(const struct > rte_event_timer_adapter *adapter, > > rte_mempool_put(sw->tim_pool, (void **)timp); > > - evtims[i]->state = RTE_EVENT_TIMER_CANCELED; > + __atomic_store_n(&evtims[i]->state, > RTE_EVENT_TIMER_CANCELED, > + __ATOMIC_RELAXED); > evtims[i]->impl_opaque[0] = 0; > evtims[i]->impl_opaque[1] = 0;
Is that safe to remove impl_opaque cleanup above? Once the soft timer canceled, the __swtim_arm_burst cannot access these two fields under the RTE_EVENT_TIMER_CANCELED state. After new timer armed, it refills these two fields in the __swtim_arm_burst thread, which is the only producer of these two fields. I think the only risk is that the values of these two field might be unknow after swtim_cancel_burst. So it should be safe to remove them if no other thread access them after canceling the timer. Any comments on this? If we remove these two instructions, we can also remove the __atomic_thread_fence below to save performance penalty. Thanks, Phil > - > - rte_smp_wmb(); > + /* The RELEASE fence make sure the clean up > + * of opaque data observed between threads. > + */ > + __atomic_thread_fence(__ATOMIC_RELEASE); > } > > return i; > diff --git a/lib/librte_eventdev/rte_event_timer_adapter.h > b/lib/librte_eventdev/rte_event_timer_adapter.h > index d2ebcb0..6f64b90 100644 > --- a/lib/librte_eventdev/rte_event_timer_adapter.h > +++ b/lib/librte_eventdev/rte_event_timer_adapter.h > @@ -467,7 +467,7 @@ struct rte_event_timer { > * - op: RTE_EVENT_OP_NEW > * - event_type: RTE_EVENT_TYPE_TIMER > */ > - volatile enum rte_event_timer_state state; > + enum rte_event_timer_state state; > /**< State of the event timer. */ > uint64_t timeout_ticks; > /**< Expiry timer ticks expressed in number of *timer_ticks_ns* > from > -- > 2.7.4