The implementation-specific opaque data is shared between arm and cancel operations. The state flag acts as a guard variable to make sure the update of opaque data is synchronized. This patch uses c11 atomics with explicit one way memory barrier instead of full barriers rte_smp_w/rmb() to synchronize the opaque data between timer arm and cancel threads.
Signed-off-by: Phil Yang <phil.y...@arm.com> Reviewed-by: Dharmik Thakkar <dharmik.thak...@arm.com> Reviewed-by: Ruifeng Wang <ruifeng.w...@arm.com> --- v2: 1. Removed implementation-specific opaque data cleanup code. 2. Replaced thread fence with atomic ACQURE/RELEASE ordering on state access. lib/librte_eventdev/rte_event_timer_adapter.c | 55 ++++++++++++++++++--------- lib/librte_eventdev/rte_event_timer_adapter.h | 2 +- 2 files changed, 38 insertions(+), 19 deletions(-) diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c b/lib/librte_eventdev/rte_event_timer_adapter.c index 8909a8c..ca00258 100644 --- a/lib/librte_eventdev/rte_event_timer_adapter.c +++ b/lib/librte_eventdev/rte_event_timer_adapter.c @@ -629,7 +629,8 @@ swtim_callback(struct rte_timer *tim) sw->expired_timers[sw->n_expired_timers++] = tim; sw->stats.evtim_exp_count++; - evtim->state = RTE_EVENT_TIMER_NOT_ARMED; + __atomic_store_n(&evtim->state, RTE_EVENT_TIMER_NOT_ARMED, + __ATOMIC_RELEASE); } if (event_buffer_batch_ready(&sw->buffer)) { @@ -1020,6 +1021,7 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter, int n_lcores; /* Timer list for this lcore is not in use. */ uint16_t exp_state = 0; + enum rte_event_timer_state n_state; #ifdef RTE_LIBRTE_EVENTDEV_DEBUG /* Check that the service is running. */ @@ -1060,30 +1062,36 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter, } for (i = 0; i < nb_evtims; i++) { - /* Don't modify the event timer state in these cases */ - if (evtims[i]->state == RTE_EVENT_TIMER_ARMED) { + n_state = __atomic_load_n(&evtims[i]->state, __ATOMIC_ACQUIRE); + if (n_state == RTE_EVENT_TIMER_ARMED) { rte_errno = EALREADY; break; - } else if (!(evtims[i]->state == RTE_EVENT_TIMER_NOT_ARMED || - evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) { + } else if (!(n_state == RTE_EVENT_TIMER_NOT_ARMED || + n_state == RTE_EVENT_TIMER_CANCELED)) { rte_errno = EINVAL; break; } ret = check_timeout(evtims[i], adapter); if (unlikely(ret == -1)) { - evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOLATE; + __atomic_store_n(&evtims[i]->state, + RTE_EVENT_TIMER_ERROR_TOOLATE, + __ATOMIC_RELAXED); rte_errno = EINVAL; break; } else if (unlikely(ret == -2)) { - evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOEARLY; + __atomic_store_n(&evtims[i]->state, + RTE_EVENT_TIMER_ERROR_TOOEARLY, + __ATOMIC_RELAXED); rte_errno = EINVAL; break; } if (unlikely(check_destination_event_queue(evtims[i], adapter) < 0)) { - evtims[i]->state = RTE_EVENT_TIMER_ERROR; + __atomic_store_n(&evtims[i]->state, + RTE_EVENT_TIMER_ERROR, + __ATOMIC_RELAXED); rte_errno = EINVAL; break; } @@ -1099,13 +1107,18 @@ __swtim_arm_burst(const struct rte_event_timer_adapter *adapter, SINGLE, lcore_id, NULL, evtims[i]); if (ret < 0) { /* tim was in RUNNING or CONFIG state */ - evtims[i]->state = RTE_EVENT_TIMER_ERROR; + __atomic_store_n(&evtims[i]->state, + RTE_EVENT_TIMER_ERROR, + __ATOMIC_RELEASE); break; } - rte_smp_wmb(); EVTIM_LOG_DBG("armed an event timer"); - evtims[i]->state = RTE_EVENT_TIMER_ARMED; + /* RELEASE ordering guarantees the adapter specific value + * changes observed before the update of state. + */ + __atomic_store_n(&evtims[i]->state, RTE_EVENT_TIMER_ARMED, + __ATOMIC_RELEASE); } if (i < nb_evtims) @@ -1132,6 +1145,7 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter, struct rte_timer *timp; uint64_t opaque; struct swtim *sw = swtim_pmd_priv(adapter); + enum rte_event_timer_state n_state; #ifdef RTE_LIBRTE_EVENTDEV_DEBUG /* Check that the service is running. */ @@ -1143,16 +1157,18 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter, for (i = 0; i < nb_evtims; i++) { /* Don't modify the event timer state in these cases */ - if (evtims[i]->state == RTE_EVENT_TIMER_CANCELED) { + /* ACQUIRE ordering guarantees the access of implementation + * specific opague data under the correct state. + */ + n_state = __atomic_load_n(&evtims[i]->state, __ATOMIC_ACQUIRE); + if (n_state == RTE_EVENT_TIMER_CANCELED) { rte_errno = EALREADY; break; - } else if (evtims[i]->state != RTE_EVENT_TIMER_ARMED) { + } else if (n_state != RTE_EVENT_TIMER_ARMED) { rte_errno = EINVAL; break; } - rte_smp_rmb(); - opaque = evtims[i]->impl_opaque[0]; timp = (struct rte_timer *)(uintptr_t)opaque; RTE_ASSERT(timp != NULL); @@ -1166,9 +1182,12 @@ swtim_cancel_burst(const struct rte_event_timer_adapter *adapter, rte_mempool_put(sw->tim_pool, (void **)timp); - evtims[i]->state = RTE_EVENT_TIMER_CANCELED; - - rte_smp_wmb(); + /* The RELEASE ordering here pairs with atomic ordering + * to make sure the state update data observed between + * threads. + */ + __atomic_store_n(&evtims[i]->state, RTE_EVENT_TIMER_CANCELED, + __ATOMIC_RELEASE); } return i; diff --git a/lib/librte_eventdev/rte_event_timer_adapter.h b/lib/librte_eventdev/rte_event_timer_adapter.h index d2ebcb0..6f64b90 100644 --- a/lib/librte_eventdev/rte_event_timer_adapter.h +++ b/lib/librte_eventdev/rte_event_timer_adapter.h @@ -467,7 +467,7 @@ struct rte_event_timer { * - op: RTE_EVENT_OP_NEW * - event_type: RTE_EVENT_TYPE_TIMER */ - volatile enum rte_event_timer_state state; + enum rte_event_timer_state state; /**< State of the event timer. */ uint64_t timeout_ticks; /**< Expiry timer ticks expressed in number of *timer_ticks_ns* from -- 2.7.4