Hi guys, > > From: Ruifeng Wang <ruifeng.w...@arm.com> > > Currently, the tbl8 group is freed even though the readers might be > using the tbl8 group entries. The freed tbl8 group can be reallocated > quickly. This results in incorrect lookup results. > > RCU QSBR process is integrated for safe tbl8 group reclaim. > Refer to RCU documentation to understand various aspects of > integrating RCU library into other libraries. > > Signed-off-by: Ruifeng Wang <ruifeng.w...@arm.com> > Reviewed-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com> > --- > lib/librte_lpm/Makefile | 3 +- > lib/librte_lpm/meson.build | 2 + > lib/librte_lpm/rte_lpm.c | 102 +++++++++++++++++++++++++---- > lib/librte_lpm/rte_lpm.h | 21 ++++++ > lib/librte_lpm/rte_lpm_version.map | 6 ++ > 5 files changed, 122 insertions(+), 12 deletions(-) > > diff --git a/lib/librte_lpm/Makefile b/lib/librte_lpm/Makefile > index a7946a1c5..ca9e16312 100644 > --- a/lib/librte_lpm/Makefile > +++ b/lib/librte_lpm/Makefile > @@ -6,9 +6,10 @@ include $(RTE_SDK)/mk/rte.vars.mk > # library name > LIB = librte_lpm.a > > +CFLAGS += -DALLOW_EXPERIMENTAL_API > CFLAGS += -O3 > CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) > -LDLIBS += -lrte_eal -lrte_hash > +LDLIBS += -lrte_eal -lrte_hash -lrte_rcu > > EXPORT_MAP := rte_lpm_version.map > > diff --git a/lib/librte_lpm/meson.build b/lib/librte_lpm/meson.build > index a5176d8ae..19a35107f 100644 > --- a/lib/librte_lpm/meson.build > +++ b/lib/librte_lpm/meson.build > @@ -2,9 +2,11 @@ > # Copyright(c) 2017 Intel Corporation > > version = 2 > +allow_experimental_apis = true > sources = files('rte_lpm.c', 'rte_lpm6.c') > headers = files('rte_lpm.h', 'rte_lpm6.h') > # since header files have different names, we can install all vector headers > # without worrying about which architecture we actually need > headers += files('rte_lpm_altivec.h', 'rte_lpm_neon.h', 'rte_lpm_sse.h') > deps += ['hash'] > +deps += ['rcu'] > diff --git a/lib/librte_lpm/rte_lpm.c b/lib/librte_lpm/rte_lpm.c > index 3a929a1b1..ca58d4b35 100644 > --- a/lib/librte_lpm/rte_lpm.c > +++ b/lib/librte_lpm/rte_lpm.c > @@ -1,5 +1,6 @@ > /* SPDX-License-Identifier: BSD-3-Clause > * Copyright(c) 2010-2014 Intel Corporation > + * Copyright(c) 2019 Arm Limited > */ > > #include <string.h> > @@ -381,6 +382,8 @@ rte_lpm_free_v1604(struct rte_lpm *lpm) > > rte_mcfg_tailq_write_unlock(); > > + if (lpm->dq) > + rte_rcu_qsbr_dq_delete(lpm->dq); > rte_free(lpm->tbl8); > rte_free(lpm->rules_tbl); > rte_free(lpm); > @@ -390,6 +393,59 @@ BIND_DEFAULT_SYMBOL(rte_lpm_free, _v1604, 16.04); > MAP_STATIC_SYMBOL(void rte_lpm_free(struct rte_lpm *lpm), > rte_lpm_free_v1604); > > +struct __rte_lpm_rcu_dq_entry { > + uint32_t tbl8_group_index; > + uint32_t pad; > +}; > + > +static void > +__lpm_rcu_qsbr_free_resource(void *p, void *data) > +{ > + struct rte_lpm_tbl_entry zero_tbl8_entry = {0}; > + struct __rte_lpm_rcu_dq_entry *e = > + (struct __rte_lpm_rcu_dq_entry *)data; > + struct rte_lpm_tbl_entry *tbl8 = (struct rte_lpm_tbl_entry *)p; > + > + /* Set tbl8 group invalid */ > + __atomic_store(&tbl8[e->tbl8_group_index], &zero_tbl8_entry, > + __ATOMIC_RELAXED); > +} > + > +/* Associate QSBR variable with an LPM object. > + */ > +int > +rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_rcu_qsbr *v) > +{ > + char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE]; > + struct rte_rcu_qsbr_dq_parameters params; > + > + if ((lpm == NULL) || (v == NULL)) { > + rte_errno = EINVAL; > + return 1; > + } > + > + if (lpm->dq) { > + rte_errno = EEXIST; > + return 1; > + } > + > + /* Init QSBR defer queue. */ > + snprintf(rcu_dq_name, sizeof(rcu_dq_name), "LPM_RCU_%s", lpm->name); > + params.name = rcu_dq_name; > + params.size = lpm->number_tbl8s; > + params.esize = sizeof(struct __rte_lpm_rcu_dq_entry); > + params.f = __lpm_rcu_qsbr_free_resource; > + params.p = lpm->tbl8; > + params.v = v; > + lpm->dq = rte_rcu_qsbr_dq_create(¶ms); > + if (lpm->dq == NULL) { > + RTE_LOG(ERR, LPM, "LPM QS defer queue creation failed\n"); > + return 1; > + }
Few thoughts about that function: It names rcu_qsbr_add() but in fact it allocates defer queue for give rcu var. So first thought - is it always necessary? For some use-cases I suppose user might be ok to wait for quiescent state change inside tbl8_free()? Another thing you do allocate defer queue, but it is internal, so user can't call reclaim() manually, which looks strange. Why not to return defer_queue pointer to the user, so he can call reclaim() himself at appropriate time? Third thing - you always allocate defer queue with size equal to number of tbl8. Though I understand it could be up to 16M tbl8 groups inside the LPM. Do we really need defer queue that long? Especially considering that current rcu_defer_queue will start reclamation when 1/8 of defer_quueue becomes full and wouldn't reclaim more then 1/16 of it. Probably better to let user to decide himself how long defer_queue he needs for that LPM? Konstantin > + > + return 0; > +} > + > /* > * Adds a rule to the rule table. > * > @@ -679,14 +735,15 @@ tbl8_alloc_v20(struct rte_lpm_tbl_entry_v20 *tbl8) > } > > static int32_t > -tbl8_alloc_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s) > +__tbl8_alloc_v1604(struct rte_lpm *lpm) > { > uint32_t group_idx; /* tbl8 group index. */ > struct rte_lpm_tbl_entry *tbl8_entry; > > /* Scan through tbl8 to find a free (i.e. INVALID) tbl8 group. */ > - for (group_idx = 0; group_idx < number_tbl8s; group_idx++) { > - tbl8_entry = &tbl8[group_idx * RTE_LPM_TBL8_GROUP_NUM_ENTRIES]; > + for (group_idx = 0; group_idx < lpm->number_tbl8s; group_idx++) { > + tbl8_entry = &lpm->tbl8[group_idx * > + RTE_LPM_TBL8_GROUP_NUM_ENTRIES]; > /* If a free tbl8 group is found clean it and set as VALID. */ > if (!tbl8_entry->valid_group) { > struct rte_lpm_tbl_entry new_tbl8_entry = { > @@ -712,6 +769,21 @@ tbl8_alloc_v1604(struct rte_lpm_tbl_entry *tbl8, > uint32_t number_tbl8s) > return -ENOSPC; > } > > +static int32_t > +tbl8_alloc_v1604(struct rte_lpm *lpm) > +{ > + int32_t group_idx; /* tbl8 group index. */ > + > + group_idx = __tbl8_alloc_v1604(lpm); > + if ((group_idx < 0) && (lpm->dq != NULL)) { > + /* If there are no tbl8 groups try to reclaim some. */ > + if (rte_rcu_qsbr_dq_reclaim(lpm->dq) == 0) > + group_idx = __tbl8_alloc_v1604(lpm); > + } > + > + return group_idx; > +} > + > static void > tbl8_free_v20(struct rte_lpm_tbl_entry_v20 *tbl8, uint32_t tbl8_group_start) > { > @@ -728,13 +800,21 @@ tbl8_free_v20(struct rte_lpm_tbl_entry_v20 *tbl8, > uint32_t tbl8_group_start) > } > > static void > -tbl8_free_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t tbl8_group_start) > +tbl8_free_v1604(struct rte_lpm *lpm, uint32_t tbl8_group_start) > { > - /* Set tbl8 group invalid*/ > struct rte_lpm_tbl_entry zero_tbl8_entry = {0}; > + struct __rte_lpm_rcu_dq_entry e; > > - __atomic_store(&tbl8[tbl8_group_start], &zero_tbl8_entry, > - __ATOMIC_RELAXED); > + if (lpm->dq != NULL) { > + e.tbl8_group_index = tbl8_group_start; > + e.pad = 0; > + /* Push into QSBR defer queue. */ > + rte_rcu_qsbr_dq_enqueue(lpm->dq, (void *)&e); > + } else { > + /* Set tbl8 group invalid*/ > + __atomic_store(&lpm->tbl8[tbl8_group_start], &zero_tbl8_entry, > + __ATOMIC_RELAXED); > + } > } > > static __rte_noinline int32_t > @@ -1037,7 +1117,7 @@ add_depth_big_v1604(struct rte_lpm *lpm, uint32_t > ip_masked, uint8_t depth, > > if (!lpm->tbl24[tbl24_index].valid) { > /* Search for a free tbl8 group. */ > - tbl8_group_index = tbl8_alloc_v1604(lpm->tbl8, > lpm->number_tbl8s); > + tbl8_group_index = tbl8_alloc_v1604(lpm); > > /* Check tbl8 allocation was successful. */ > if (tbl8_group_index < 0) { > @@ -1083,7 +1163,7 @@ add_depth_big_v1604(struct rte_lpm *lpm, uint32_t > ip_masked, uint8_t depth, > } /* If valid entry but not extended calculate the index into Table8. */ > else if (lpm->tbl24[tbl24_index].valid_group == 0) { > /* Search for free tbl8 group. */ > - tbl8_group_index = tbl8_alloc_v1604(lpm->tbl8, > lpm->number_tbl8s); > + tbl8_group_index = tbl8_alloc_v1604(lpm); > > if (tbl8_group_index < 0) { > return tbl8_group_index; > @@ -1818,7 +1898,7 @@ delete_depth_big_v1604(struct rte_lpm *lpm, uint32_t > ip_masked, > */ > lpm->tbl24[tbl24_index].valid = 0; > __atomic_thread_fence(__ATOMIC_RELEASE); > - tbl8_free_v1604(lpm->tbl8, tbl8_group_start); > + tbl8_free_v1604(lpm, tbl8_group_start); > } else if (tbl8_recycle_index > -1) { > /* Update tbl24 entry. */ > struct rte_lpm_tbl_entry new_tbl24_entry = { > @@ -1834,7 +1914,7 @@ delete_depth_big_v1604(struct rte_lpm *lpm, uint32_t > ip_masked, > __atomic_store(&lpm->tbl24[tbl24_index], &new_tbl24_entry, > __ATOMIC_RELAXED); > __atomic_thread_fence(__ATOMIC_RELEASE); > - tbl8_free_v1604(lpm->tbl8, tbl8_group_start); > + tbl8_free_v1604(lpm, tbl8_group_start); > } > #undef group_idx > return 0; > diff --git a/lib/librte_lpm/rte_lpm.h b/lib/librte_lpm/rte_lpm.h > index 906ec4483..49c12a68d 100644 > --- a/lib/librte_lpm/rte_lpm.h > +++ b/lib/librte_lpm/rte_lpm.h > @@ -1,5 +1,6 @@ > /* SPDX-License-Identifier: BSD-3-Clause > * Copyright(c) 2010-2014 Intel Corporation > + * Copyright(c) 2019 Arm Limited > */ > > #ifndef _RTE_LPM_H_ > @@ -21,6 +22,7 @@ > #include <rte_common.h> > #include <rte_vect.h> > #include <rte_compat.h> > +#include <rte_rcu_qsbr.h> > > #ifdef __cplusplus > extern "C" { > @@ -186,6 +188,7 @@ struct rte_lpm { > __rte_cache_aligned; /**< LPM tbl24 table. */ > struct rte_lpm_tbl_entry *tbl8; /**< LPM tbl8 table. */ > struct rte_lpm_rule *rules_tbl; /**< LPM rules. */ > + struct rte_rcu_qsbr_dq *dq; /**< RCU QSBR defer queue.*/ > }; > > /** > @@ -248,6 +251,24 @@ rte_lpm_free_v20(struct rte_lpm_v20 *lpm); > void > rte_lpm_free_v1604(struct rte_lpm *lpm); > > +/** > + * Associate RCU QSBR variable with an LPM object. > + * > + * @param lpm > + * the lpm object to add RCU QSBR > + * @param v > + * RCU QSBR variable > + * @return > + * On success - 0 > + * On error - 1 with error code set in rte_errno. > + * Possible rte_errno codes are: > + * - EINVAL - invalid pointer > + * - EEXIST - already added QSBR > + * - ENOMEM - memory allocation failure > + */ > +__rte_experimental > +int rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_rcu_qsbr *v); > + > /** > * Add a rule to the LPM table. > * > diff --git a/lib/librte_lpm/rte_lpm_version.map > b/lib/librte_lpm/rte_lpm_version.map > index 90beac853..b353aabd2 100644 > --- a/lib/librte_lpm/rte_lpm_version.map > +++ b/lib/librte_lpm/rte_lpm_version.map > @@ -44,3 +44,9 @@ DPDK_17.05 { > rte_lpm6_lookup_bulk_func; > > } DPDK_16.04; > + > +EXPERIMENTAL { > + global: > + > + rte_lpm_rcu_qsbr_add; > +}; > -- > 2.17.1