Hi guys,

> 
> From: Ruifeng Wang <ruifeng.w...@arm.com>
> 
> Currently, the tbl8 group is freed even though the readers might be
> using the tbl8 group entries. The freed tbl8 group can be reallocated
> quickly. This results in incorrect lookup results.
> 
> RCU QSBR process is integrated for safe tbl8 group reclaim.
> Refer to RCU documentation to understand various aspects of
> integrating RCU library into other libraries.
> 
> Signed-off-by: Ruifeng Wang <ruifeng.w...@arm.com>
> Reviewed-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>
> ---
>  lib/librte_lpm/Makefile            |   3 +-
>  lib/librte_lpm/meson.build         |   2 +
>  lib/librte_lpm/rte_lpm.c           | 102 +++++++++++++++++++++++++----
>  lib/librte_lpm/rte_lpm.h           |  21 ++++++
>  lib/librte_lpm/rte_lpm_version.map |   6 ++
>  5 files changed, 122 insertions(+), 12 deletions(-)
> 
> diff --git a/lib/librte_lpm/Makefile b/lib/librte_lpm/Makefile
> index a7946a1c5..ca9e16312 100644
> --- a/lib/librte_lpm/Makefile
> +++ b/lib/librte_lpm/Makefile
> @@ -6,9 +6,10 @@ include $(RTE_SDK)/mk/rte.vars.mk
>  # library name
>  LIB = librte_lpm.a
> 
> +CFLAGS += -DALLOW_EXPERIMENTAL_API
>  CFLAGS += -O3
>  CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
> -LDLIBS += -lrte_eal -lrte_hash
> +LDLIBS += -lrte_eal -lrte_hash -lrte_rcu
> 
>  EXPORT_MAP := rte_lpm_version.map
> 
> diff --git a/lib/librte_lpm/meson.build b/lib/librte_lpm/meson.build
> index a5176d8ae..19a35107f 100644
> --- a/lib/librte_lpm/meson.build
> +++ b/lib/librte_lpm/meson.build
> @@ -2,9 +2,11 @@
>  # Copyright(c) 2017 Intel Corporation
> 
>  version = 2
> +allow_experimental_apis = true
>  sources = files('rte_lpm.c', 'rte_lpm6.c')
>  headers = files('rte_lpm.h', 'rte_lpm6.h')
>  # since header files have different names, we can install all vector headers
>  # without worrying about which architecture we actually need
>  headers += files('rte_lpm_altivec.h', 'rte_lpm_neon.h', 'rte_lpm_sse.h')
>  deps += ['hash']
> +deps += ['rcu']
> diff --git a/lib/librte_lpm/rte_lpm.c b/lib/librte_lpm/rte_lpm.c
> index 3a929a1b1..ca58d4b35 100644
> --- a/lib/librte_lpm/rte_lpm.c
> +++ b/lib/librte_lpm/rte_lpm.c
> @@ -1,5 +1,6 @@
>  /* SPDX-License-Identifier: BSD-3-Clause
>   * Copyright(c) 2010-2014 Intel Corporation
> + * Copyright(c) 2019 Arm Limited
>   */
> 
>  #include <string.h>
> @@ -381,6 +382,8 @@ rte_lpm_free_v1604(struct rte_lpm *lpm)
> 
>       rte_mcfg_tailq_write_unlock();
> 
> +     if (lpm->dq)
> +             rte_rcu_qsbr_dq_delete(lpm->dq);
>       rte_free(lpm->tbl8);
>       rte_free(lpm->rules_tbl);
>       rte_free(lpm);
> @@ -390,6 +393,59 @@ BIND_DEFAULT_SYMBOL(rte_lpm_free, _v1604, 16.04);
>  MAP_STATIC_SYMBOL(void rte_lpm_free(struct rte_lpm *lpm),
>               rte_lpm_free_v1604);
> 
> +struct __rte_lpm_rcu_dq_entry {
> +     uint32_t tbl8_group_index;
> +     uint32_t pad;
> +};
> +
> +static void
> +__lpm_rcu_qsbr_free_resource(void *p, void *data)
> +{
> +     struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
> +     struct __rte_lpm_rcu_dq_entry *e =
> +                     (struct __rte_lpm_rcu_dq_entry *)data;
> +     struct rte_lpm_tbl_entry *tbl8 = (struct rte_lpm_tbl_entry *)p;
> +
> +     /* Set tbl8 group invalid */
> +     __atomic_store(&tbl8[e->tbl8_group_index], &zero_tbl8_entry,
> +             __ATOMIC_RELAXED);
> +}
> +
> +/* Associate QSBR variable with an LPM object.
> + */
> +int
> +rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_rcu_qsbr *v)
> +{
> +     char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
> +     struct rte_rcu_qsbr_dq_parameters params;
> +
> +     if ((lpm == NULL) || (v == NULL)) {
> +             rte_errno = EINVAL;
> +             return 1;
> +     }
> +
> +     if (lpm->dq) {
> +             rte_errno = EEXIST;
> +             return 1;
> +     }
> +
> +     /* Init QSBR defer queue. */
> +     snprintf(rcu_dq_name, sizeof(rcu_dq_name), "LPM_RCU_%s", lpm->name);
> +     params.name = rcu_dq_name;
> +     params.size = lpm->number_tbl8s;
> +     params.esize = sizeof(struct __rte_lpm_rcu_dq_entry);
> +     params.f = __lpm_rcu_qsbr_free_resource;
> +     params.p = lpm->tbl8;
> +     params.v = v;
> +     lpm->dq = rte_rcu_qsbr_dq_create(&params);
> +     if (lpm->dq == NULL) {
> +             RTE_LOG(ERR, LPM, "LPM QS defer queue creation failed\n");
> +             return 1;
> +     }

Few thoughts about that function:
It names rcu_qsbr_add() but in fact it allocates defer queue for give rcu var.
So first thought - is it always necessary?
For some use-cases I suppose user might be ok to wait for quiescent state change
inside tbl8_free()?     
Another thing you do allocate defer queue, but it is internal, so user can't 
call
reclaim() manually, which looks strange.
Why not to return defer_queue pointer to the user, so he can call reclaim() 
himself
at appropriate time?
Third thing - you always allocate defer queue with size equal to number of tbl8.
Though I understand it could be up to 16M tbl8 groups inside the LPM.
Do we really need defer queue that long?
Especially  considering that current rcu_defer_queue will start reclamation 
when 1/8
of defer_quueue becomes full and wouldn't reclaim more then 1/16 of it.
Probably better to let user to decide himself how long defer_queue he needs for 
that LPM?

Konstantin


> +
> +     return 0;
> +}
> +
>  /*
>   * Adds a rule to the rule table.
>   *
> @@ -679,14 +735,15 @@ tbl8_alloc_v20(struct rte_lpm_tbl_entry_v20 *tbl8)
>  }
> 
>  static int32_t
> -tbl8_alloc_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t number_tbl8s)
> +__tbl8_alloc_v1604(struct rte_lpm *lpm)
>  {
>       uint32_t group_idx; /* tbl8 group index. */
>       struct rte_lpm_tbl_entry *tbl8_entry;
> 
>       /* Scan through tbl8 to find a free (i.e. INVALID) tbl8 group. */
> -     for (group_idx = 0; group_idx < number_tbl8s; group_idx++) {
> -             tbl8_entry = &tbl8[group_idx * RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
> +     for (group_idx = 0; group_idx < lpm->number_tbl8s; group_idx++) {
> +             tbl8_entry = &lpm->tbl8[group_idx *
> +                                     RTE_LPM_TBL8_GROUP_NUM_ENTRIES];
>               /* If a free tbl8 group is found clean it and set as VALID. */
>               if (!tbl8_entry->valid_group) {
>                       struct rte_lpm_tbl_entry new_tbl8_entry = {
> @@ -712,6 +769,21 @@ tbl8_alloc_v1604(struct rte_lpm_tbl_entry *tbl8, 
> uint32_t number_tbl8s)
>       return -ENOSPC;
>  }
> 
> +static int32_t
> +tbl8_alloc_v1604(struct rte_lpm *lpm)
> +{
> +     int32_t group_idx; /* tbl8 group index. */
> +
> +     group_idx = __tbl8_alloc_v1604(lpm);
> +     if ((group_idx < 0) && (lpm->dq != NULL)) {
> +             /* If there are no tbl8 groups try to reclaim some. */
> +             if (rte_rcu_qsbr_dq_reclaim(lpm->dq) == 0)
> +                     group_idx = __tbl8_alloc_v1604(lpm);
> +     }
> +
> +     return group_idx;
> +}
> +
>  static void
>  tbl8_free_v20(struct rte_lpm_tbl_entry_v20 *tbl8, uint32_t tbl8_group_start)
>  {
> @@ -728,13 +800,21 @@ tbl8_free_v20(struct rte_lpm_tbl_entry_v20 *tbl8, 
> uint32_t tbl8_group_start)
>  }
> 
>  static void
> -tbl8_free_v1604(struct rte_lpm_tbl_entry *tbl8, uint32_t tbl8_group_start)
> +tbl8_free_v1604(struct rte_lpm *lpm, uint32_t tbl8_group_start)
>  {
> -     /* Set tbl8 group invalid*/
>       struct rte_lpm_tbl_entry zero_tbl8_entry = {0};
> +     struct __rte_lpm_rcu_dq_entry e;
> 
> -     __atomic_store(&tbl8[tbl8_group_start], &zero_tbl8_entry,
> -                     __ATOMIC_RELAXED);
> +     if (lpm->dq != NULL) {
> +             e.tbl8_group_index = tbl8_group_start;
> +             e.pad = 0;
> +             /* Push into QSBR defer queue. */
> +             rte_rcu_qsbr_dq_enqueue(lpm->dq, (void *)&e);
> +     } else {
> +             /* Set tbl8 group invalid*/
> +             __atomic_store(&lpm->tbl8[tbl8_group_start], &zero_tbl8_entry,
> +                             __ATOMIC_RELAXED);
> +     }
>  }
> 
>  static __rte_noinline int32_t
> @@ -1037,7 +1117,7 @@ add_depth_big_v1604(struct rte_lpm *lpm, uint32_t 
> ip_masked, uint8_t depth,
> 
>       if (!lpm->tbl24[tbl24_index].valid) {
>               /* Search for a free tbl8 group. */
> -             tbl8_group_index = tbl8_alloc_v1604(lpm->tbl8, 
> lpm->number_tbl8s);
> +             tbl8_group_index = tbl8_alloc_v1604(lpm);
> 
>               /* Check tbl8 allocation was successful. */
>               if (tbl8_group_index < 0) {
> @@ -1083,7 +1163,7 @@ add_depth_big_v1604(struct rte_lpm *lpm, uint32_t 
> ip_masked, uint8_t depth,
>       } /* If valid entry but not extended calculate the index into Table8. */
>       else if (lpm->tbl24[tbl24_index].valid_group == 0) {
>               /* Search for free tbl8 group. */
> -             tbl8_group_index = tbl8_alloc_v1604(lpm->tbl8, 
> lpm->number_tbl8s);
> +             tbl8_group_index = tbl8_alloc_v1604(lpm);
> 
>               if (tbl8_group_index < 0) {
>                       return tbl8_group_index;
> @@ -1818,7 +1898,7 @@ delete_depth_big_v1604(struct rte_lpm *lpm, uint32_t 
> ip_masked,
>                */
>               lpm->tbl24[tbl24_index].valid = 0;
>               __atomic_thread_fence(__ATOMIC_RELEASE);
> -             tbl8_free_v1604(lpm->tbl8, tbl8_group_start);
> +             tbl8_free_v1604(lpm, tbl8_group_start);
>       } else if (tbl8_recycle_index > -1) {
>               /* Update tbl24 entry. */
>               struct rte_lpm_tbl_entry new_tbl24_entry = {
> @@ -1834,7 +1914,7 @@ delete_depth_big_v1604(struct rte_lpm *lpm, uint32_t 
> ip_masked,
>               __atomic_store(&lpm->tbl24[tbl24_index], &new_tbl24_entry,
>                               __ATOMIC_RELAXED);
>               __atomic_thread_fence(__ATOMIC_RELEASE);
> -             tbl8_free_v1604(lpm->tbl8, tbl8_group_start);
> +             tbl8_free_v1604(lpm, tbl8_group_start);
>       }
>  #undef group_idx
>       return 0;
> diff --git a/lib/librte_lpm/rte_lpm.h b/lib/librte_lpm/rte_lpm.h
> index 906ec4483..49c12a68d 100644
> --- a/lib/librte_lpm/rte_lpm.h
> +++ b/lib/librte_lpm/rte_lpm.h
> @@ -1,5 +1,6 @@
>  /* SPDX-License-Identifier: BSD-3-Clause
>   * Copyright(c) 2010-2014 Intel Corporation
> + * Copyright(c) 2019 Arm Limited
>   */
> 
>  #ifndef _RTE_LPM_H_
> @@ -21,6 +22,7 @@
>  #include <rte_common.h>
>  #include <rte_vect.h>
>  #include <rte_compat.h>
> +#include <rte_rcu_qsbr.h>
> 
>  #ifdef __cplusplus
>  extern "C" {
> @@ -186,6 +188,7 @@ struct rte_lpm {
>                       __rte_cache_aligned; /**< LPM tbl24 table. */
>       struct rte_lpm_tbl_entry *tbl8; /**< LPM tbl8 table. */
>       struct rte_lpm_rule *rules_tbl; /**< LPM rules. */
> +     struct rte_rcu_qsbr_dq *dq;     /**< RCU QSBR defer queue.*/
>  };
> 
>  /**
> @@ -248,6 +251,24 @@ rte_lpm_free_v20(struct rte_lpm_v20 *lpm);
>  void
>  rte_lpm_free_v1604(struct rte_lpm *lpm);
> 
> +/**
> + * Associate RCU QSBR variable with an LPM object.
> + *
> + * @param lpm
> + *   the lpm object to add RCU QSBR
> + * @param v
> + *   RCU QSBR variable
> + * @return
> + *   On success - 0
> + *   On error - 1 with error code set in rte_errno.
> + *   Possible rte_errno codes are:
> + *   - EINVAL - invalid pointer
> + *   - EEXIST - already added QSBR
> + *   - ENOMEM - memory allocation failure
> + */
> +__rte_experimental
> +int rte_lpm_rcu_qsbr_add(struct rte_lpm *lpm, struct rte_rcu_qsbr *v);
> +
>  /**
>   * Add a rule to the LPM table.
>   *
> diff --git a/lib/librte_lpm/rte_lpm_version.map 
> b/lib/librte_lpm/rte_lpm_version.map
> index 90beac853..b353aabd2 100644
> --- a/lib/librte_lpm/rte_lpm_version.map
> +++ b/lib/librte_lpm/rte_lpm_version.map
> @@ -44,3 +44,9 @@ DPDK_17.05 {
>       rte_lpm6_lookup_bulk_func;
> 
>  } DPDK_16.04;
> +
> +EXPERIMENTAL {
> +     global:
> +
> +     rte_lpm_rcu_qsbr_add;
> +};
> --
> 2.17.1

Reply via email to