Hi Gabriel, > > Instead of each priv_timer struct containing a single skiplist, this > commit adds a skiplist for each enabled lcore to priv_timer. In the case > that multiple lcores repeatedly install timers on the same target lcore, > this change reduces lock contention for the target lcore's skiplists and > increases performance.
I am not an rte_timer expert, but there is one thing that worries me: It seems that complexity of timer_manage() has increased with that patch quite a bit: now it has to check/process up to RTE_MAX_LCORE skiplists instead of one, also it has to somehow to properly sort up to RTE_MAX_LCORE lists of retrieved (ready to run) timers. Wouldn't all that affect it's running time? I understand your intention to reduce lock contention, but I suppose at least it could be done in a configurable way. Let say allow user to specify dimension of pending_lists[] at init phase or so. Then timer from lcore_id=N will endup in pending_lists[N%RTE_DIM(pendilng_list)]. Another thought - might be better to divide pending timers list not by client (lcore) id, but by expiration time - some analog of timer wheel or so. That, I think might greatly decrease the probability that timer_manage() and timer_add() will try to access the same list. >From other side timer_manage() still would have to consume skip-lists one by >one. Though I suppose that's quite radical change from what we have right now. Konstantin > > Signed-off-by: Gabriel Carrillo <erik.g.carri...@intel.com> > --- > v2: > * Address checkpatch warnings > > lib/librte_timer/rte_timer.c | 309 > +++++++++++++++++++++++++++---------------- > lib/librte_timer/rte_timer.h | 9 +- > 2 files changed, 202 insertions(+), 116 deletions(-) > > diff --git a/lib/librte_timer/rte_timer.c b/lib/librte_timer/rte_timer.c > index 5ee0840..da2fc1a 100644 > --- a/lib/librte_timer/rte_timer.c > +++ b/lib/librte_timer/rte_timer.c > @@ -37,6 +37,7 @@ > #include <inttypes.h> > #include <assert.h> > #include <sys/queue.h> > +#include <stdbool.h> > > #include <rte_atomic.h> > #include <rte_common.h> > @@ -56,17 +57,19 @@ > > LIST_HEAD(rte_timer_list, rte_timer); > > +struct skiplist { > + struct rte_timer head; /**< dummy timer instance to head up list */ > + rte_spinlock_t lock; /**< lock to protect list access */ > + unsigned int depth; /**< track the current depth of the skiplist */ > +} __rte_cache_aligned; > + > struct priv_timer { > - struct rte_timer pending_head; /**< dummy timer instance to head up > list */ > - rte_spinlock_t list_lock; /**< lock to protect list access */ > + /** one pending list per enabled lcore */ > + struct skiplist pending_lists[RTE_MAX_LCORE]; > > /** per-core variable that true if a timer was updated on this > * core since last reset of the variable */ > int updated; > - > - /** track the current depth of the skiplist */ > - unsigned curr_skiplist_depth; > - > unsigned prev_lcore; /**< used for lcore round robin */ > > /** running timer on this lcore now */ > @@ -81,6 +84,10 @@ struct priv_timer { > /** per-lcore private info for timers */ > static struct priv_timer priv_timer[RTE_MAX_LCORE]; > > +/** cache of IDs of enabled lcores */ > +static unsigned int enabled_lcores[RTE_MAX_LCORE]; > +static int n_enabled_lcores; > + > /* when debug is enabled, store some statistics */ > #ifdef RTE_LIBRTE_TIMER_DEBUG > #define __TIMER_STAT_ADD(name, n) do { > \ > @@ -96,14 +103,25 @@ static struct priv_timer priv_timer[RTE_MAX_LCORE]; > void > rte_timer_subsystem_init(void) > { > - unsigned lcore_id; > + unsigned int lcore_id1, lcore_id2; > + struct skiplist *list; > + int i, j; > + > + RTE_LCORE_FOREACH(lcore_id1) > + enabled_lcores[n_enabled_lcores++] = lcore_id1; > > /* since priv_timer is static, it's zeroed by default, so only init some > * fields. > */ > - for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id ++) { > - rte_spinlock_init(&priv_timer[lcore_id].list_lock); > - priv_timer[lcore_id].prev_lcore = lcore_id; > + for (i = 0, lcore_id1 = enabled_lcores[i]; i < n_enabled_lcores; > + lcore_id1 = enabled_lcores[++i]) { > + priv_timer[lcore_id1].prev_lcore = lcore_id1; > + > + for (j = 0, lcore_id2 = enabled_lcores[j]; j < n_enabled_lcores; > + lcore_id2 = enabled_lcores[++j]) { > + list = &priv_timer[lcore_id1].pending_lists[lcore_id2]; > + rte_spinlock_init(&list->lock); > + } > } > } > > @@ -114,7 +132,8 @@ rte_timer_init(struct rte_timer *tim) > union rte_timer_status status; > > status.state = RTE_TIMER_STOP; > - status.owner = RTE_TIMER_NO_OWNER; > + status.installer = RTE_TIMER_NO_LCORE; > + status.owner = RTE_TIMER_NO_LCORE; > tim->status.u32 = status.u32; > } > > @@ -142,7 +161,7 @@ timer_set_config_state(struct rte_timer *tim, > * or ready to run on local core, exit > */ > if (prev_status.state == RTE_TIMER_RUNNING && > - (prev_status.owner != (uint16_t)lcore_id || > + (prev_status.owner != (int)lcore_id || > tim != priv_timer[lcore_id].running_tim)) > return -1; > > @@ -153,7 +172,8 @@ timer_set_config_state(struct rte_timer *tim, > /* here, we know that timer is stopped or pending, > * mark it atomically as being configured */ > status.state = RTE_TIMER_CONFIG; > - status.owner = (int16_t)lcore_id; > + status.installer = RTE_TIMER_NO_LCORE; > + status.owner = lcore_id; > success = rte_atomic32_cmpset(&tim->status.u32, > prev_status.u32, > status.u32); > @@ -185,7 +205,8 @@ timer_set_running_state(struct rte_timer *tim) > /* here, we know that timer is stopped or pending, > * mark it atomically as being configured */ > status.state = RTE_TIMER_RUNNING; > - status.owner = (int16_t)lcore_id; > + status.installer = prev_status.installer; > + status.owner = lcore_id; > success = rte_atomic32_cmpset(&tim->status.u32, > prev_status.u32, > status.u32); > @@ -236,11 +257,11 @@ timer_get_skiplist_level(unsigned curr_depth) > * are <= that time value. > */ > static void > -timer_get_prev_entries(uint64_t time_val, unsigned tim_lcore, > +timer_get_prev_entries(uint64_t time_val, struct skiplist *list, > struct rte_timer **prev) > { > - unsigned lvl = priv_timer[tim_lcore].curr_skiplist_depth; > - prev[lvl] = &priv_timer[tim_lcore].pending_head; > + unsigned int lvl = list->depth; > + prev[lvl] = &list->head; > while(lvl != 0) { > lvl--; > prev[lvl] = prev[lvl+1]; > @@ -255,15 +276,15 @@ timer_get_prev_entries(uint64_t time_val, unsigned > tim_lcore, > * all skiplist levels. > */ > static void > -timer_get_prev_entries_for_node(struct rte_timer *tim, unsigned tim_lcore, > +timer_get_prev_entries_for_node(struct rte_timer *tim, struct skiplist *list, > struct rte_timer **prev) > { > int i; > /* to get a specific entry in the list, look for just lower than the > time > * values, and then increment on each level individually if necessary > */ > - timer_get_prev_entries(tim->expire - 1, tim_lcore, prev); > - for (i = priv_timer[tim_lcore].curr_skiplist_depth - 1; i >= 0; i--) { > + timer_get_prev_entries(tim->expire - 1, list, prev); > + for (i = list->depth - 1; i >= 0; i--) { > while (prev[i]->sl_next[i] != NULL && > prev[i]->sl_next[i] != tim && > prev[i]->sl_next[i]->expire <= tim->expire) > @@ -282,25 +303,25 @@ timer_add(struct rte_timer *tim, unsigned tim_lcore, > int local_is_locked) > unsigned lcore_id = rte_lcore_id(); > unsigned lvl; > struct rte_timer *prev[MAX_SKIPLIST_DEPTH+1]; > + struct skiplist *list = &priv_timer[tim_lcore].pending_lists[lcore_id]; > > /* if timer needs to be scheduled on another core, we need to > * lock the list; if it is on local core, we need to lock if > * we are not called from rte_timer_manage() */ > if (tim_lcore != lcore_id || !local_is_locked) > - rte_spinlock_lock(&priv_timer[tim_lcore].list_lock); > + rte_spinlock_lock(&list->lock); > > /* find where exactly this element goes in the list of elements > * for each depth. */ > - timer_get_prev_entries(tim->expire, tim_lcore, prev); > + timer_get_prev_entries(tim->expire, list, prev); > > /* now assign it a new level and add at that level */ > - const unsigned tim_level = timer_get_skiplist_level( > - priv_timer[tim_lcore].curr_skiplist_depth); > - if (tim_level == priv_timer[tim_lcore].curr_skiplist_depth) > - priv_timer[tim_lcore].curr_skiplist_depth++; > + const unsigned int tim_level = timer_get_skiplist_level(list->depth); > + if (tim_level == list->depth) > + list->depth++; > > lvl = tim_level; > - while (lvl > 0) { > + while (lvl > 0 && lvl < MAX_SKIPLIST_DEPTH + 1) { > tim->sl_next[lvl] = prev[lvl]->sl_next[lvl]; > prev[lvl]->sl_next[lvl] = tim; > lvl--; > @@ -310,11 +331,10 @@ timer_add(struct rte_timer *tim, unsigned tim_lcore, > int local_is_locked) > > /* save the lowest list entry into the expire field of the dummy hdr > * NOTE: this is not atomic on 32-bit*/ > - priv_timer[tim_lcore].pending_head.expire = priv_timer[tim_lcore].\ > - pending_head.sl_next[0]->expire; > + list->head.expire = list->head.sl_next[0]->expire; > > if (tim_lcore != lcore_id || !local_is_locked) > - rte_spinlock_unlock(&priv_timer[tim_lcore].list_lock); > + rte_spinlock_unlock(&list->lock); > } > > /* > @@ -330,35 +350,38 @@ timer_del(struct rte_timer *tim, union rte_timer_status > prev_status, > unsigned prev_owner = prev_status.owner; > int i; > struct rte_timer *prev[MAX_SKIPLIST_DEPTH+1]; > + struct skiplist *list; > + > + list = &priv_timer[prev_owner].pending_lists[prev_status.installer]; > > /* if timer needs is pending another core, we need to lock the > * list; if it is on local core, we need to lock if we are not > * called from rte_timer_manage() */ > if (prev_owner != lcore_id || !local_is_locked) > - rte_spinlock_lock(&priv_timer[prev_owner].list_lock); > + rte_spinlock_lock(&list->lock); > > /* save the lowest list entry into the expire field of the dummy hdr. > * NOTE: this is not atomic on 32-bit */ > - if (tim == priv_timer[prev_owner].pending_head.sl_next[0]) > - priv_timer[prev_owner].pending_head.expire = > - ((tim->sl_next[0] == NULL) ? 0 : > tim->sl_next[0]->expire); > + if (tim == list->head.sl_next[0]) > + list->head.expire = ((tim->sl_next[0] == NULL) ? > + 0 : tim->sl_next[0]->expire); > > /* adjust pointers from previous entries to point past this */ > - timer_get_prev_entries_for_node(tim, prev_owner, prev); > - for (i = priv_timer[prev_owner].curr_skiplist_depth - 1; i >= 0; i--) { > + timer_get_prev_entries_for_node(tim, list, prev); > + for (i = list->depth - 1; i >= 0; i--) { > if (prev[i]->sl_next[i] == tim) > prev[i]->sl_next[i] = tim->sl_next[i]; > } > > /* in case we deleted last entry at a level, adjust down max level */ > - for (i = priv_timer[prev_owner].curr_skiplist_depth - 1; i >= 0; i--) > - if (priv_timer[prev_owner].pending_head.sl_next[i] == NULL) > - priv_timer[prev_owner].curr_skiplist_depth --; > + for (i = list->depth - 1; i >= 0; i--) > + if (list->head.sl_next[i] == NULL) > + list->depth--; > else > break; > > if (prev_owner != lcore_id || !local_is_locked) > - rte_spinlock_unlock(&priv_timer[prev_owner].list_lock); > + rte_spinlock_unlock(&list->lock); > } > > /* Reset and start the timer associated with the timer handle (private func) > */ > @@ -416,7 +439,8 @@ __rte_timer_reset(struct rte_timer *tim, uint64_t expire, > * the state so we don't need to use cmpset() here */ > rte_wmb(); > status.state = RTE_TIMER_PENDING; > - status.owner = (int16_t)tim_lcore; > + status.installer = lcore_id; > + status.owner = tim_lcore; > tim->status.u32 = status.u32; > > return 0; > @@ -484,7 +508,8 @@ rte_timer_stop(struct rte_timer *tim) > /* mark timer as stopped */ > rte_wmb(); > status.state = RTE_TIMER_STOP; > - status.owner = RTE_TIMER_NO_OWNER; > + status.installer = RTE_TIMER_NO_LCORE; > + status.owner = RTE_TIMER_NO_LCORE; > tim->status.u32 = status.u32; > > return 0; > @@ -506,119 +531,179 @@ rte_timer_pending(struct rte_timer *tim) > } > > /* must be called periodically, run all timer that expired */ > -void rte_timer_manage(void) > +void > +rte_timer_manage(void) > { > union rte_timer_status status; > - struct rte_timer *tim, *next_tim; > - struct rte_timer *run_first_tim, **pprev; > - unsigned lcore_id = rte_lcore_id(); > + struct rte_timer *tim, *next_tim, **pprev; > + struct rte_timer *run_first_tims[RTE_MAX_LCORE + 1]; > struct rte_timer *prev[MAX_SKIPLIST_DEPTH + 1]; > - uint64_t cur_time; > - int i, ret; > + struct priv_timer *priv_tim; > + unsigned int installer_lcore, lcore_id = rte_lcore_id(); > + uint64_t cur_time, min_expire; > + int i, j, min_idx, ret; > + int nrunlists = 0; > + int local_is_locked = 0; > + struct skiplist *dest_list, *list = NULL; > + bool done; > > /* timer manager only runs on EAL thread with valid lcore_id */ > assert(lcore_id < RTE_MAX_LCORE); > > + priv_tim = &priv_timer[lcore_id]; > + > __TIMER_STAT_ADD(manage, 1); > - /* optimize for the case where per-cpu list is empty */ > - if (priv_timer[lcore_id].pending_head.sl_next[0] == NULL) > - return; > - cur_time = rte_get_timer_cycles(); > + for (i = 0, installer_lcore = enabled_lcores[i]; i < n_enabled_lcores; > + installer_lcore = enabled_lcores[++i]) { > + list = &priv_tim->pending_lists[installer_lcore]; > + > + /* optimize for the case where list is empty */ > + if (list->head.sl_next[0] == NULL) > + continue; > + cur_time = rte_get_timer_cycles(); > > #ifdef RTE_ARCH_X86_64 > - /* on 64-bit the value cached in the pending_head.expired will be > - * updated atomically, so we can consult that for a quick check here > - * outside the lock */ > - if (likely(priv_timer[lcore_id].pending_head.expire > cur_time)) > - return; > + /* on 64-bit the value cached in the list->head.expired will be > + * updated atomically, so we can consult that for a quick check > + * here outside the lock > + */ > + if (likely(list->head.expire > cur_time)) > + continue; > #endif > > - /* browse ordered list, add expired timers in 'expired' list */ > - rte_spinlock_lock(&priv_timer[lcore_id].list_lock); > + /* browse ordered list, add expired timers in 'expired' list */ > + rte_spinlock_lock(&list->lock); > > - /* if nothing to do just unlock and return */ > - if (priv_timer[lcore_id].pending_head.sl_next[0] == NULL || > - priv_timer[lcore_id].pending_head.sl_next[0]->expire > cur_time) { > - rte_spinlock_unlock(&priv_timer[lcore_id].list_lock); > - return; > - } > + /* if nothing to do just unlock and continue */ > + if (list->head.sl_next[0] == NULL || > + list->head.sl_next[0]->expire > cur_time) { > + rte_spinlock_unlock(&list->lock); > + continue; > + } > > - /* save start of list of expired timers */ > - tim = priv_timer[lcore_id].pending_head.sl_next[0]; > + /* save start of list of expired timers */ > + tim = list->head.sl_next[0]; > + > + /* break the existing list at current time point */ > + timer_get_prev_entries(cur_time, list, prev); > + for (j = list->depth - 1; j >= 0; j--) { > + if (prev[j] == &list->head) > + continue; > + list->head.sl_next[j] = > + prev[j]->sl_next[j]; > + if (prev[j]->sl_next[j] == NULL) > + list->depth--; > + prev[j]->sl_next[j] = NULL; > + } > > - /* break the existing list at current time point */ > - timer_get_prev_entries(cur_time, lcore_id, prev); > - for (i = priv_timer[lcore_id].curr_skiplist_depth -1; i >= 0; i--) { > - if (prev[i] == &priv_timer[lcore_id].pending_head) > - continue; > - priv_timer[lcore_id].pending_head.sl_next[i] = > - prev[i]->sl_next[i]; > - if (prev[i]->sl_next[i] == NULL) > - priv_timer[lcore_id].curr_skiplist_depth--; > - prev[i] ->sl_next[i] = NULL; > - } > + /* transition run-list from PENDING to RUNNING */ > + run_first_tims[nrunlists] = tim; > + pprev = &run_first_tims[nrunlists]; > + nrunlists++; > + > + for (; tim != NULL; tim = next_tim) { > + next_tim = tim->sl_next[0]; > + > + ret = timer_set_running_state(tim); > + if (likely(ret == 0)) { > + pprev = &tim->sl_next[0]; > + } else { > + /* another core is trying to re-config this one, > + * remove it from local expired list > + */ > + *pprev = next_tim; > + } > + } > > - /* transition run-list from PENDING to RUNNING */ > - run_first_tim = tim; > - pprev = &run_first_tim; > + /* update the next to expire timer value */ > + list->head.expire = (list->head.sl_next[0] == NULL) ? > + 0 : list->head.sl_next[0]->expire; > > - for ( ; tim != NULL; tim = next_tim) { > - next_tim = tim->sl_next[0]; > + rte_spinlock_unlock(&list->lock); > + } > > - ret = timer_set_running_state(tim); > - if (likely(ret == 0)) { > - pprev = &tim->sl_next[0]; > - } else { > - /* another core is trying to re-config this one, > - * remove it from local expired list > - */ > - *pprev = next_tim; > + /* Now process the run lists */ > + while (1) { > + done = true; > + min_expire = UINT64_MAX; > + min_idx = 0; > + > + /* Find the next oldest timer to process */ > + for (i = 0; i < nrunlists; i++) { > + tim = run_first_tims[i]; > + > + if (tim != NULL && tim->expire < min_expire) { > + min_expire = tim->expire; > + min_idx = i; > + done = false; > + } > } > - } > > - /* update the next to expire timer value */ > - priv_timer[lcore_id].pending_head.expire = > - (priv_timer[lcore_id].pending_head.sl_next[0] == NULL) ? 0 : > - priv_timer[lcore_id].pending_head.sl_next[0]->expire; > + if (done) > + break; > + > + tim = run_first_tims[min_idx]; > > - rte_spinlock_unlock(&priv_timer[lcore_id].list_lock); > + /* Move down the runlist from which we picked a timer to > + * execute > + */ > + run_first_tims[min_idx] = run_first_tims[min_idx]->sl_next[0]; > > - /* now scan expired list and call callbacks */ > - for (tim = run_first_tim; tim != NULL; tim = next_tim) { > - next_tim = tim->sl_next[0]; > - priv_timer[lcore_id].updated = 0; > - priv_timer[lcore_id].running_tim = tim; > + priv_tim->updated = 0; > + priv_tim->running_tim = tim; > > /* execute callback function with list unlocked */ > tim->f(tim, tim->arg); > > __TIMER_STAT_ADD(pending, -1); > /* the timer was stopped or reloaded by the callback > - * function, we have nothing to do here */ > - if (priv_timer[lcore_id].updated == 1) > + * function, we have nothing to do here > + */ > + if (priv_tim->updated == 1) > continue; > > if (tim->period == 0) { > /* remove from done list and mark timer as stopped */ > status.state = RTE_TIMER_STOP; > - status.owner = RTE_TIMER_NO_OWNER; > + status.installer = RTE_TIMER_NO_LCORE; > + status.owner = RTE_TIMER_NO_LCORE; > rte_wmb(); > tim->status.u32 = status.u32; > } > else { > - /* keep it in list and mark timer as pending */ > - rte_spinlock_lock(&priv_timer[lcore_id].list_lock); > + dest_list = &priv_tim->pending_lists[lcore_id]; > + > + /* If the destination list is the current list > + * we can acquire the lock here, and hold it > + * across removal and insertion of the timer. > + */ > + if (list == dest_list) { > + rte_spinlock_lock(&list->lock); > + local_is_locked = 1; > + } > + > + /* set timer state back to PENDING and > + * reinsert it in pending list > + */ > status.state = RTE_TIMER_PENDING; > __TIMER_STAT_ADD(pending, 1); > - status.owner = (int16_t)lcore_id; > + status.installer = tim->status.installer; > + status.owner = lcore_id; > rte_wmb(); > tim->status.u32 = status.u32; > - __rte_timer_reset(tim, tim->expire + tim->period, > - tim->period, lcore_id, tim->f, tim->arg, 1); > - rte_spinlock_unlock(&priv_timer[lcore_id].list_lock); > + > + __rte_timer_reset(tim, > + tim->expire + tim->period, > + tim->period, lcore_id, > + tim->f, tim->arg, local_is_locked); > + > + if (local_is_locked) { > + rte_spinlock_unlock(&list->lock); > + local_is_locked = 0; > + } > } > } > - priv_timer[lcore_id].running_tim = NULL; > + priv_tim->running_tim = NULL; > } > > /* dump statistics about timers */ > diff --git a/lib/librte_timer/rte_timer.h b/lib/librte_timer/rte_timer.h > index a276a73..4cc6baf 100644 > --- a/lib/librte_timer/rte_timer.h > +++ b/lib/librte_timer/rte_timer.h > @@ -77,7 +77,7 @@ extern "C" { > #define RTE_TIMER_RUNNING 2 /**< State: timer function is running. */ > #define RTE_TIMER_CONFIG 3 /**< State: timer is being configured. */ > > -#define RTE_TIMER_NO_OWNER -2 /**< Timer has no owner. */ > +#define RTE_TIMER_NO_LCORE -2 > > /** > * Timer type: Periodic or single (one-shot). > @@ -94,10 +94,11 @@ enum rte_timer_type { > union rte_timer_status { > RTE_STD_C11 > struct { > - uint16_t state; /**< Stop, pending, running, config. */ > - int16_t owner; /**< The lcore that owns the timer. */ > + unsigned state : 2; > + int installer : 15; > + int owner : 15; > }; > - uint32_t u32; /**< To atomic-set status + owner. */ > + uint32_t u32; /**< To atomic-set status, installer, owner */ > }; > > #ifdef RTE_LIBRTE_TIMER_DEBUG > -- > 2.6.4