Hi Honnappa, > + > +/* Allocate a new TQS variable with the name *name* in memory. */ > +struct rte_tqs * __rte_experimental > +rte_tqs_alloc(const char *name, int socket_id, uint64_t lcore_mask) > +{ > + char tqs_name[RTE_TQS_NAMESIZE]; > + struct rte_tailq_entry *te, *tmp_te; > + struct rte_tqs_list *tqs_list; > + struct rte_tqs *v, *tmp_v; > + int ret; > + > + if (name == NULL) { > + RTE_LOG(ERR, TQS, "Invalid input parameters\n"); > + rte_errno = -EINVAL; > + return NULL; > + } > + > + te = rte_zmalloc("TQS_TAILQ_ENTRY", sizeof(*te), 0); > + if (te == NULL) { > + RTE_LOG(ERR, TQS, "Cannot reserve memory for tailq\n"); > + rte_errno = -ENOMEM; > + return NULL; > + } > + > + snprintf(tqs_name, sizeof(tqs_name), "%s", name); > + v = rte_zmalloc_socket(tqs_name, sizeof(struct rte_tqs), > + RTE_CACHE_LINE_SIZE, socket_id); > + if (v == NULL) { > + RTE_LOG(ERR, TQS, "Cannot reserve memory for TQS variable\n"); > + rte_errno = -ENOMEM; > + goto alloc_error; > + } > + > + ret = snprintf(v->name, sizeof(v->name), "%s", name); > + if (ret < 0 || ret >= (int)sizeof(v->name)) { > + rte_errno = -ENAMETOOLONG; > + goto alloc_error; > + } > + > + te->data = (void *) v; > + v->lcore_mask = lcore_mask; > + > + rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK); > + > + tqs_list = RTE_TAILQ_CAST(rte_tqs_tailq.head, rte_tqs_list); > + > + /* Search if a TQS variable with the same name exists already */ > + TAILQ_FOREACH(tmp_te, tqs_list, next) { > + tmp_v = (struct rte_tqs *) tmp_te->data; > + if (strncmp(name, tmp_v->name, RTE_TQS_NAMESIZE) == 0) > + break; > + } > + > + if (tmp_te != NULL) { > + rte_errno = -EEXIST; > + goto tqs_exist; > + } > + > + TAILQ_INSERT_TAIL(tqs_list, te, next); > + > + rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK); > + > + return v; > + > +tqs_exist: > + rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK); > + > +alloc_error: > + rte_free(te); > + rte_free(v); > + return NULL; > +}
That seems quite heavy-weight function just to allocate sync variable. As size of struct rte_tqs is constant and known to the user, might be better just provide rte_tqs_init(struct rte_tqs *tqs, ...) and let user allocate/free memory for it by himself. > + > +/* Add a reader thread, running on an lcore, to the list of threads > + * reporting their quiescent state on a TQS variable. > + */ > +int __rte_experimental > +rte_tqs_register_lcore(struct rte_tqs *v, unsigned int lcore_id) > +{ > + TQS_RETURN_IF_TRUE((v == NULL || lcore_id >= RTE_TQS_MAX_LCORE), > + -EINVAL); It is not very good practice to make function return different values and behave in a different way in debug/non-debug mode. I'd say that for slow-path (functions in .c) it is always good to check input parameters. For fast-path (functions in .h) we sometimes skip such checking, but debug mode can probably use RTE_ASSERT() or so. lcore_id >= RTE_TQS_MAX_LCORE Is this limitation really necessary? First it means that only lcores can use that API (at least data-path part), second even today many machines have more than 64 cores. I think you can easily avoid such limitation, if instead of requiring lcore_id as input parameter, you'll just make it return index of next available entry in w[]. Then tqs_update() can take that index as input parameter. > + > + /* Worker thread has to count the quiescent states > + * only from the current value of token. > + */ > + v->w[lcore_id].cnt = v->token; Wonder what would happen, if new reader will call register(), after writer calls start()? Looks like a race-condition. Or such pattern is not supported? > + > + /* Release the store to initial TQS count so that workers > + * can use it immediately after this function returns. > + */ > + __atomic_fetch_or(&v->lcore_mask, (1UL << lcore_id), __ATOMIC_RELEASE); > + > + return 0; > +} > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Trigger the worker threads to report the quiescent state > + * status. > + * > + * This is implemented as a lock-free function. It is multi-thread > + * safe and can be called from the worker threads as well. > + * > + * @param v > + * TQS variable > + * @param n > + * Expected number of times the quiescent state is entered > + * @param t > + * - If successful, this is the token for this call of the API. > + * This should be passed to rte_tqs_check API. > + * @return > + * - -EINVAL if the parameters are invalid (debug mode compilation only). > + * - 0 Otherwise and always (non-debug mode compilation). > + */ > +static __rte_always_inline int __rte_experimental > +rte_tqs_start(struct rte_tqs *v, unsigned int n, uint32_t *t) > +{ > + TQS_RETURN_IF_TRUE((v == NULL || t == NULL), -EINVAL); > + > + /* This store release will ensure that changes to any data > + * structure are visible to the workers before the token > + * update is visible. > + */ > + *t = __atomic_add_fetch(&v->token, n, __ATOMIC_RELEASE); > + > + return 0; > +} > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Update quiescent state for the worker thread on a lcore. > + * > + * This is implemented as a lock-free function. It is multi-thread safe. > + * All the worker threads registered to report their quiescent state > + * on the TQS variable must call this API. > + * > + * @param v > + * TQS variable > + */ > +static __rte_always_inline void __rte_experimental > +rte_tqs_update(struct rte_tqs *v, unsigned int lcore_id) > +{ > + uint32_t t; > + > + TQS_ERR_LOG_IF_TRUE(v == NULL || lcore_id >= RTE_TQS_MAX_LCORE); > + > + /* Load the token before the worker thread loads any other > + * (lock-free) data structure. This ensures that updates > + * to the data structures are visible if the update > + * to token is visible. > + */ > + t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE); Hmm, I am not very familiar with C11 model, but it looks like a race condition to me: as I understand, update() supposed be called at the end of reader's critical section, correct? But ACQUIRE is only a hoist barrier, which means compiler and cpu are free to move earlier reads (and writes) after it. It probably needs to be a full ACQ_REL here. > + if (v->w[lcore_id].cnt != t) > + v->w[lcore_id].cnt++; > +} > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Checks if all the worker threads have entered the quiescent state > + * 'n' number of times. 'n' is provided in rte_tqs_start API. > + * > + * This is implemented as a lock-free function. It is multi-thread > + * safe and can be called from the worker threads as well. > + * > + * @param v > + * TQS variable > + * @param t > + * Token returned by rte_tqs_start API > + * @param wait > + * If true, block till all the worker threads have completed entering > + * the quiescent state 'n' number of times > + * @return > + * - 0 if all worker threads have NOT passed through specified number > + * of quiescent states. > + * - 1 if all worker threads have passed through specified number > + * of quiescent states. > + * - -EINVAL if the parameters are invalid (debug mode compilation only). > + */ > +static __rte_always_inline int __rte_experimental > +rte_tqs_check(struct rte_tqs *v, uint32_t t, bool wait) > +{ > + uint64_t l; > + uint64_t lcore_mask; > + > + TQS_RETURN_IF_TRUE((v == NULL), -EINVAL); > + > + do { > + /* Load the current lcore_mask before loading the > + * worker thread quiescent state counters. > + */ > + lcore_mask = __atomic_load_n(&v->lcore_mask, __ATOMIC_ACQUIRE); What would happen if reader will call unregister() simultaneously with check() and will update lcore_mask straight after that load? As I understand check() might hang in such case. > + > + while (lcore_mask) { > + l = __builtin_ctz(lcore_mask); > + if (v->w[l].cnt != t) > + break; As I understand, that makes control-path function progress dependent on simultaneous invocation of data-path functions. In some cases that might cause control-path to hang. Let say if data-path function wouldn't be called, or user invokes control-path and data-path functions from the same thread. > + > + lcore_mask &= ~(1UL << l); > + } > + > + if (lcore_mask == 0) > + return 1; > + > + rte_pause(); > + } while (wait); > + > + return 0; > +} > +