Hi Vladimir, Thanks for your review and the comments. All the comments will be addressed in next version.
/Ruifeng > -----Original Message----- > From: Medvedkin, Vladimir <vladimir.medved...@intel.com> > Sent: Thursday, September 19, 2019 00:16 > To: Ruifeng Wang (Arm Technology China) <ruifeng.w...@arm.com>; > bruce.richard...@intel.com; olivier.m...@6wind.com > Cc: dev@dpdk.org; step...@networkplumber.org; > konstantin.anan...@intel.com; Gavin Hu (Arm Technology China) > <gavin...@arm.com>; Honnappa Nagarahalli > <honnappa.nagaraha...@arm.com>; Dharmik Thakkar > <dharmik.thak...@arm.com>; nd <n...@arm.com> > Subject: Re: [PATCH v2 3/6] lib/lpm: integrate RCU QSBR > > Hi Ruifeng, > > Thanks for this patchseries, see comments below. > > On 06/09/2019 10:45, Ruifeng Wang wrote: > > Currently, the tbl8 group is freed even though the readers might be > > using the tbl8 group entries. The freed tbl8 group can be reallocated > > quickly. This results in incorrect lookup results. > > > > RCU QSBR process is integrated for safe tbl8 group reclaim. > > Refer to RCU documentation to understand various aspects of > > integrating RCU library into other libraries. > > > > Signed-off-by: Ruifeng Wang <ruifeng.w...@arm.com> > > Reviewed-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com> > > --- > > lib/librte_lpm/Makefile | 3 +- > > lib/librte_lpm/meson.build | 2 + > > lib/librte_lpm/rte_lpm.c | 223 +++++++++++++++++++++++++++-- > > lib/librte_lpm/rte_lpm.h | 22 +++ > > lib/librte_lpm/rte_lpm_version.map | 6 + > > lib/meson.build | 3 +- > > 6 files changed, 244 insertions(+), 15 deletions(-) > > > > diff --git a/lib/librte_lpm/Makefile b/lib/librte_lpm/Makefile index > > a7946a1c5..ca9e16312 100644 > > --- a/lib/librte_lpm/Makefile > > +++ b/lib/librte_lpm/Makefile > > @@ -6,9 +6,10 @@ include $(RTE_SDK)/mk/rte.vars.mk > > # library name > > LIB = librte_lpm.a > > > > +CFLAGS += -DALLOW_EXPERIMENTAL_API > > CFLAGS += -O3 > > CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -LDLIBS += -lrte_eal > > -lrte_hash > > +LDLIBS += -lrte_eal -lrte_hash -lrte_rcu > > > > EXPORT_MAP := rte_lpm_version.map > > > > diff --git a/lib/librte_lpm/meson.build b/lib/librte_lpm/meson.build > > index a5176d8ae..19a35107f 100644 > > --- a/lib/librte_lpm/meson.build > > +++ b/lib/librte_lpm/meson.build > > @@ -2,9 +2,11 @@ > > # Copyright(c) 2017 Intel Corporation > > > > version = 2 > > +allow_experimental_apis = true > > sources = files('rte_lpm.c', 'rte_lpm6.c') > > headers = files('rte_lpm.h', 'rte_lpm6.h') > > # since header files have different names, we can install all vector > > headers > > # without worrying about which architecture we actually need > > headers += files('rte_lpm_altivec.h', 'rte_lpm_neon.h', 'rte_lpm_sse.h') > > deps += ['hash'] > > +deps += ['rcu'] > > diff --git a/lib/librte_lpm/rte_lpm.c b/lib/librte_lpm/rte_lpm.c index > > 3a929a1b1..9764b8de6 100644 > > --- a/lib/librte_lpm/rte_lpm.c > > +++ b/lib/librte_lpm/rte_lpm.c > > @@ -1,5 +1,6 @@ > > /* SPDX-License-Identifier: BSD-3-Clause > > * Copyright(c) 2010-2014 Intel Corporation > > + * Copyright(c) 2019 Arm Limited > > */ > > > > #include <string.h> > > @@ -22,6 +23,7 @@ > > #include <rte_rwlock.h> > > #include <rte_spinlock.h> > > #include <rte_tailq.h> > > +#include <rte_ring.h> > > > > #include "rte_lpm.h" > > > > @@ -39,6 +41,11 @@ enum valid_flag { > > VALID > > }; > > > > +struct __rte_lpm_qs_item { > > + uint64_t token; /**< QSBR token.*/ > > + uint32_t index; /**< tbl8 group index.*/ > > +}; > > + > > /* Macro to enable/disable run-time checks. */ > > #if defined(RTE_LIBRTE_LPM_DEBUG) > > #include <rte_debug.h> > > @@ -381,6 +388,7 @@ rte_lpm_free_v1604(struct rte_lpm *lpm) > > > > rte_mcfg_tailq_write_unlock(); > > > > + rte_ring_free(lpm->qs_fifo); > > rte_free(lpm->tbl8); > > rte_free(lpm->rules_tbl); > > rte_free(lpm); > > @@ -390,6 +398,147 @@ BIND_DEFAULT_SYMBOL(rte_lpm_free, _v1604, > 16.04); > > MAP_STATIC_SYMBOL(void rte_lpm_free(struct rte_lpm *lpm), > > rte_lpm_free_v1604); > > > > +/* Add an item into FIFO. > > + * return: 0 - success > > + */ > > +static int > > +__rte_lpm_rcu_qsbr_fifo_push(struct rte_ring *fifo, > > + struct __rte_lpm_qs_item *item) > > +{ > > + if (rte_ring_free_count(fifo) < 2) { > > + RTE_LOG(ERR, LPM, "QS FIFO full\n"); > > + rte_errno = ENOSPC; > > + return 1; > > + } > > + > > + (void)rte_ring_sp_enqueue(fifo, (void *)(uintptr_t)item->token); > > + (void)rte_ring_sp_enqueue(fifo, (void *)(uintptr_t)item->index); > > + > > + return 0; > > +} > > + > > +/* Remove item from FIFO. > > + * Used when data observed by rte_ring_peek. > > + */ > > +static void > > +__rte_lpm_rcu_qsbr_fifo_pop(struct rte_ring *fifo, > > + struct __rte_lpm_qs_item *item) > Is it necessary to pass the pointer for struct __rte_lpm_qs_item? > According to code only item.index is used after this call. > > +{ > > + void *obj_token = NULL; > > + void *obj_index = NULL; > > + > > + (void)rte_ring_sc_dequeue(fifo, &obj_token); > I think it is not necessary to cast here. > > + (void)rte_ring_sc_dequeue(fifo, &obj_index); > > + > > + if (item) { > I think it is redundant, it is always not NULL. > > + item->token = (uint64_t)((uintptr_t)obj_token); > > + item->index = (uint32_t)((uintptr_t)obj_index); > > + } > > +} > > + > > +/* Max number of tbl8 groups to reclaim at one time. */ > > +#define RCU_QSBR_RECLAIM_SIZE 8 > > + > > +/* When RCU QSBR FIFO usage is above > 1/(2^RCU_QSBR_RECLAIM_LEVEL), > > + * reclaim will be triggered by tbl8_free. > > + */ > > +#define RCU_QSBR_RECLAIM_LEVEL 3 > > + > > +/* Reclaim some tbl8 groups based on quiescent state check. > > + * RCU_QSBR_RECLAIM_SIZE groups will be reclaimed at max. > > + * Params: lpm - lpm object handle > > + * index - (onput) one of successfully reclaimed tbl8 groups > > + * return: 0 - success, 1 - no group reclaimed. > > + */ > > +static uint32_t > > +__rte_lpm_rcu_qsbr_reclaim_chunk(struct rte_lpm *lpm, uint32_t > > +*index) { > > + struct __rte_lpm_qs_item qs_item; > > + struct rte_lpm_tbl_entry *tbl8_entry = NULL; > It is not necessary to init it with NULL. > > + void *obj_token; > > + uint32_t cnt = 0; > > + > > + RTE_LOG(DEBUG, LPM, "RCU QSBR reclaimation triggered.\n"); > > + /* Check reader threads quiescent state and > > + * reclaim as much tbl8 groups as possible. > > + */ > > + while ((cnt < RCU_QSBR_RECLAIM_SIZE) && > > + (rte_ring_peek(lpm->qs_fifo, &obj_token) == 0) && > > + (rte_rcu_qsbr_check(lpm->qsv, > (uint64_t)((uintptr_t)obj_token), > > + false) == 1)) { > > + __rte_lpm_rcu_qsbr_fifo_pop(lpm->qs_fifo, &qs_item); > > + > > + tbl8_entry = &lpm->tbl8[qs_item.index * > > + > RTE_LPM_TBL8_GROUP_NUM_ENTRIES]; > > + memset(&tbl8_entry[0], 0, > > + RTE_LPM_TBL8_GROUP_NUM_ENTRIES * > > + sizeof(tbl8_entry[0])); > > + cnt++; > > + } > > + > > + RTE_LOG(DEBUG, LPM, "RCU QSBR reclaimed %u groups.\n", cnt); > > + if (cnt) { > > + if (index) > > + *index = qs_item.index; > > + return 0; > > + } > > + return 1; > > +} > > + > > +/* Trigger tbl8 group reclaim when necessary. > > + * Reclaim happens when RCU QSBR queue usage > > + * is over 1/(2^RCU_QSBR_RECLAIM_LEVEL). > > + */ > > +static void > > +__rte_lpm_rcu_qsbr_try_reclaim(struct rte_lpm *lpm) { > > + if (lpm->qsv == NULL) > > + return; > This check is redundant. > > + > > + if (rte_ring_count(lpm->qs_fifo) < > > + (rte_ring_get_capacity(lpm->qs_fifo) >> > RCU_QSBR_RECLAIM_LEVEL)) > > + return; > > + > > + (void)__rte_lpm_rcu_qsbr_reclaim_chunk(lpm, NULL); } > > + > > +/* Associate QSBR variable with an LPM object. > > + */ > > +int > > +rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_rcu_qsbr *v) { > > + uint32_t qs_fifo_size; > > + char rcu_ring_name[RTE_RING_NAMESIZE]; > > + > > + if ((lpm == NULL) || (v == NULL)) { > > + rte_errno = EINVAL; > > + return 1; > > + } > > + > > + if (lpm->qsv) { > > + rte_errno = EEXIST; > > + return 1; > > + } > > + > > + /* round up qs_fifo_size to next power of two that is not less than > > + * number_tbl8s. Will store 'token' and 'index'. > > + */ > > + qs_fifo_size = rte_align32pow2((2 * lpm->number_tbl8s) + 1); > > + > > + /* Init QSBR reclaiming FIFO. */ > > + snprintf(rcu_ring_name, sizeof(rcu_ring_name), "LPM_RCU_%s", > lpm->name); > > + lpm->qs_fifo = rte_ring_create(rcu_ring_name, qs_fifo_size, > > + SOCKET_ID_ANY, 0); > > + if (lpm->qs_fifo == NULL) { > > + RTE_LOG(ERR, LPM, "LPM QS FIFO memory allocation > failed\n"); > > + rte_errno = ENOMEM; > rte_ring_create() sets rte_errno on error, I don't think we need to rewrite it > here. > > + return 1; > > + } > > + lpm->qsv = v; > > + > > + return 0; > > +} > > + > > /* > > * Adds a rule to the rule table. > > * > > @@ -640,6 +789,35 @@ rule_find_v1604(struct rte_lpm *lpm, uint32_t > ip_masked, uint8_t depth) > > return -EINVAL; > > } > > > > +static int32_t > > +tbl8_alloc_reclaimed(struct rte_lpm *lpm) { > > + struct rte_lpm_tbl_entry *tbl8_entry = NULL; > > + uint32_t index; > > + > > + if (lpm->qsv != NULL) { > > + if (__rte_lpm_rcu_qsbr_reclaim_chunk(lpm, &index) == 0) { > > + /* Set the last reclaimed tbl8 group as VALID. */ > > + struct rte_lpm_tbl_entry new_tbl8_entry = { > > + .next_hop = 0, > > + .valid = INVALID, > > + .depth = 0, > > + .valid_group = VALID, > > + }; > > + > > + tbl8_entry = &lpm->tbl8[index * > > + > RTE_LPM_TBL8_GROUP_NUM_ENTRIES]; > > + __atomic_store(tbl8_entry, &new_tbl8_entry, > > + __ATOMIC_RELAXED); > > + > > + /* Return group index for reclaimed tbl8 group. */ > > + return index; > > + } > > + } > > + > > + return -ENOSPC; > > +} > > + > > /* > > * Find, clean and allocate a tbl8. > > */ > > @@ -679,14 +857,15 @@ tbl8_alloc_v20(struct rte_lpm_tbl_entry_v20 > *tbl8) > > } > > > > static int32_t > > -tbl8_alloc_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t > > number_tbl8s) > > +tbl8_alloc_v1604(struct rte_lpm *lpm) > > { > > uint32_t group_idx; /* tbl8 group index. */ > > struct rte_lpm_tbl_entry *tbl8_entry; > > > > /* Scan through tbl8 to find a free (i.e. INVALID) tbl8 group. */ > > - for (group_idx = 0; group_idx < number_tbl8s; group_idx++) { > > - tbl8_entry = &tbl8[group_idx * > RTE_LPM_TBL8_GROUP_NUM_ENTRIES]; > > + for (group_idx = 0; group_idx < lpm->number_tbl8s; group_idx++) { > > + tbl8_entry = &lpm->tbl8[group_idx * > > + > RTE_LPM_TBL8_GROUP_NUM_ENTRIES]; > > /* If a free tbl8 group is found clean it and set as VALID. */ > > if (!tbl8_entry->valid_group) { > > struct rte_lpm_tbl_entry new_tbl8_entry = { @@ - > 708,8 +887,8 @@ > > tbl8_alloc_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s) > > } > > } > > > > - /* If there are no tbl8 groups free then return error. */ > > - return -ENOSPC; > > + /* If there are no tbl8 groups free then check reclaim queue. */ > > + return tbl8_alloc_reclaimed(lpm); > > } > > > > static void > > @@ -728,13 +907,31 @@ tbl8_free_v20(struct rte_lpm_tbl_entry_v20 > *tbl8, uint32_t tbl8_group_start) > > } > > > > static void > > -tbl8_free_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t > > tbl8_group_start) > > +tbl8_free_v1604(struct rte_lpm *lpm, uint32_t tbl8_group_start) > > { > > - /* Set tbl8 group invalid*/ > > + struct __rte_lpm_qs_item qs_item; > > struct rte_lpm_tbl_entry zero_tbl8_entry = {0}; > > > > - __atomic_store(&tbl8[tbl8_group_start], &zero_tbl8_entry, > > - __ATOMIC_RELAXED); > > + if (lpm->qsv != NULL) { > > + /* Push into QSBR FIFO. */ > > + qs_item.token = rte_rcu_qsbr_start(lpm->qsv); > > + qs_item.index = > > + tbl8_group_start / > RTE_LPM_TBL8_GROUP_NUM_ENTRIES; > > + if (__rte_lpm_rcu_qsbr_fifo_push(lpm->qs_fifo, > &qs_item) != 0) > > + /* This should never happen as FIFO size is big > enough > > + * to hold all tbl8 groups. > > + */ > > + RTE_LOG(ERR, LPM, "Failed to push QSBR FIFO\n"); > > + > > + /* Speculatively reclaim tbl8 groups. > > + * Help spread the reclaim work load across multiple calls. > > + */ > > + __rte_lpm_rcu_qsbr_try_reclaim(lpm); > > + } else { > > + /* Set tbl8 group invalid*/ > > + __atomic_store(&lpm->tbl8[tbl8_group_start], > &zero_tbl8_entry, > > + __ATOMIC_RELAXED); > > + } > > } > > > > static __rte_noinline int32_t > > @@ -1037,7 +1234,7 @@ add_depth_big_v1604(struct rte_lpm *lpm, > > uint32_t ip_masked, uint8_t depth, > > > > if (!lpm->tbl24[tbl24_index].valid) { > > /* Search for a free tbl8 group. */ > > - tbl8_group_index = tbl8_alloc_v1604(lpm->tbl8, lpm- > >number_tbl8s); > > + tbl8_group_index = tbl8_alloc_v1604(lpm); > > > > /* Check tbl8 allocation was successful. */ > > if (tbl8_group_index < 0) { > > @@ -1083,7 +1280,7 @@ add_depth_big_v1604(struct rte_lpm *lpm, > uint32_t ip_masked, uint8_t depth, > > } /* If valid entry but not extended calculate the index into Table8. */ > > else if (lpm->tbl24[tbl24_index].valid_group == 0) { > > /* Search for free tbl8 group. */ > > - tbl8_group_index = tbl8_alloc_v1604(lpm->tbl8, lpm- > >number_tbl8s); > > + tbl8_group_index = tbl8_alloc_v1604(lpm); > > > > if (tbl8_group_index < 0) { > > return tbl8_group_index; > > @@ -1818,7 +2015,7 @@ delete_depth_big_v1604(struct rte_lpm *lpm, > uint32_t ip_masked, > > */ > > lpm->tbl24[tbl24_index].valid = 0; > > __atomic_thread_fence(__ATOMIC_RELEASE); > > - tbl8_free_v1604(lpm->tbl8, tbl8_group_start); > > + tbl8_free_v1604(lpm, tbl8_group_start); > > } else if (tbl8_recycle_index > -1) { > > /* Update tbl24 entry. */ > > struct rte_lpm_tbl_entry new_tbl24_entry = { @@ -1834,7 > +2031,7 @@ > > delete_depth_big_v1604(struct rte_lpm *lpm, uint32_t ip_masked, > > __atomic_store(&lpm->tbl24[tbl24_index], > &new_tbl24_entry, > > __ATOMIC_RELAXED); > > __atomic_thread_fence(__ATOMIC_RELEASE); > > - tbl8_free_v1604(lpm->tbl8, tbl8_group_start); > > + tbl8_free_v1604(lpm, tbl8_group_start); > > } > > #undef group_idx > > return 0; > > diff --git a/lib/librte_lpm/rte_lpm.h b/lib/librte_lpm/rte_lpm.h index > > 906ec4483..5079fb262 100644 > > --- a/lib/librte_lpm/rte_lpm.h > > +++ b/lib/librte_lpm/rte_lpm.h > > @@ -1,5 +1,6 @@ > > /* SPDX-License-Identifier: BSD-3-Clause > > * Copyright(c) 2010-2014 Intel Corporation > > + * Copyright(c) 2019 Arm Limited > > */ > > > > #ifndef _RTE_LPM_H_ > > @@ -21,6 +22,7 @@ > > #include <rte_common.h> > > #include <rte_vect.h> > > #include <rte_compat.h> > > +#include <rte_rcu_qsbr.h> > > > > #ifdef __cplusplus > > extern "C" { > > @@ -186,6 +188,8 @@ struct rte_lpm { > > __rte_cache_aligned; /**< LPM tbl24 table. */ > > struct rte_lpm_tbl_entry *tbl8; /**< LPM tbl8 table. */ > > struct rte_lpm_rule *rules_tbl; /**< LPM rules. */ > > + struct rte_rcu_qsbr *qsv; /**< RCU QSBR variable for tbl8 > group.*/ > > + struct rte_ring *qs_fifo; /**< RCU QSBR reclaiming queue. */ > > }; > > > > /** > > @@ -248,6 +252,24 @@ rte_lpm_free_v20(struct rte_lpm_v20 *lpm); > > void > > rte_lpm_free_v1604(struct rte_lpm *lpm); > > > > +/** > > + * Associate RCU QSBR variable with an LPM object. > > + * > > + * @param lpm > > + * the lpm object to add RCU QSBR > > + * @param v > > + * RCU QSBR variable > > + * @return > > + * On success - 0 > > + * On error - 1 with error code set in rte_errno. > > + * Possible rte_errno codes are: > > + * - EINVAL - invalid pointer > > + * - EEXIST - already added QSBR > > + * - ENOMEM - memory allocation failure > > + */ > > +__rte_experimental > > +int rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_rcu_qsbr > > +*v); > > + > > /** > > * Add a rule to the LPM table. > > * > > diff --git a/lib/librte_lpm/rte_lpm_version.map > > b/lib/librte_lpm/rte_lpm_version.map > > index 90beac853..b353aabd2 100644 > > --- a/lib/librte_lpm/rte_lpm_version.map > > +++ b/lib/librte_lpm/rte_lpm_version.map > > @@ -44,3 +44,9 @@ DPDK_17.05 { > > rte_lpm6_lookup_bulk_func; > > > > } DPDK_16.04; > > + > > +EXPERIMENTAL { > > + global: > > + > > + rte_lpm_rcu_qsbr_add; > > +}; > > diff --git a/lib/meson.build b/lib/meson.build index > > e5ff83893..3a96f005d 100644 > > --- a/lib/meson.build > > +++ b/lib/meson.build > > @@ -11,6 +11,7 @@ > > libraries = [ > > 'kvargs', # eal depends on kvargs > > 'eal', # everything depends on eal > > + 'rcu', # hash and lpm depends on this > > 'ring', 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core > > 'cmdline', > > 'metrics', # bitrate/latency stats depends on this @@ -22,7 +23,7 > > @@ libraries = [ > > 'gro', 'gso', 'ip_frag', 'jobstats', > > 'kni', 'latencystats', 'lpm', 'member', > > 'power', 'pdump', 'rawdev', > > - 'rcu', 'reorder', 'sched', 'security', 'stack', 'vhost', > > + 'reorder', 'sched', 'security', 'stack', 'vhost', > > # ipsec lib depends on net, crypto and security > > 'ipsec', > > # add pkt framework libs which use other libs from above > > -- > Regards, > Vladimir