<snip>
> >
> > >
> > > On Thu, Sep 30, 2021 at 10:57 PM Dharmik Thakkar
> > > <dharmik.thak...@arm.com> wrote:
> > > >
> > > > Current mempool per core cache implementation is based on pointer
> > > > For most architectures, each pointer consumes 64b Replace it with
> > > > index-based implementation, where in each buffer is addressed by
> > > > (pool address + index) It will reduce memory requirements
> > > >
> > > > L3Fwd performance testing reveals minor improvements in the cache
> > > > performance and no change in throughput
> > > >
> > > > Micro-benchmarking the patch using mempool_perf_test shows
> > > > significant improvement with majority of the test cases
> > > >
> > > > Future plan involves replacing global pool's pointer-based
> > > > implementation with index-based implementation
> > > >
> > > > Signed-off-by: Dharmik Thakkar <dharmik.thak...@arm.com>
> > >
> > >
> > > Sane idea. Like VPP, we tried to do this for rte_graph, but not
> > > observed much gain.
> > > Since lcore cache is typically 512, maybe there is a gain on the mempool
> path.
> > > Also, Since you are enabling only for local cache, it is good as
> > > mempool drivers can work as-is.(i.e HW drivers works with 64bit) I
> > > think, getting more performance numbers for various cases may be the
> next step.
> > The gain is not observed in terms of PPS improvement, but do see some
> improvements that PMUs indicate. This approach definitely results in savings
> in number of cache lines utilized.
> 
> OK. IMO, If PPS has regression then this path is not viable, else it may be 
> OK.
PPS has not regressed. It has improved, but not significantly.
Other way to look at this is, we are doing the same work with less amount of 
resources.

> 
> 
> >
> > >
> > > > ---
> > > >  drivers/mempool/ring/rte_mempool_ring.c |  2 +-
> > > >  lib/mempool/rte_mempool.c               |  8 +++
> > > >  lib/mempool/rte_mempool.h               | 74 ++++++++++++++++++++++---
> > > >  3 files changed, 74 insertions(+), 10 deletions(-)
> > > >
> > > > diff --git a/drivers/mempool/ring/rte_mempool_ring.c
> > > > b/drivers/mempool/ring/rte_mempool_ring.c
> > > > index b1f09ff28f4d..e55913e47f21 100644
> > > > --- a/drivers/mempool/ring/rte_mempool_ring.c
> > > > +++ b/drivers/mempool/ring/rte_mempool_ring.c
> > > > @@ -101,7 +101,7 @@ ring_alloc(struct rte_mempool *mp, uint32_t
> > > rg_flags)
> > > >                 return -rte_errno;
> > > >
> > > >         mp->pool_data = r;
> > > > -
> > > > +       mp->local_cache_base_addr = &r[1];
> > > >         return 0;
> > > >  }
> > > >
> > > > diff --git a/lib/mempool/rte_mempool.c b/lib/mempool/rte_mempool.c
> > > > index 59a588425bd6..424bdb19c323 100644
> > > > --- a/lib/mempool/rte_mempool.c
> > > > +++ b/lib/mempool/rte_mempool.c
> > > > @@ -480,6 +480,7 @@ rte_mempool_populate_default(struct
> > > rte_mempool *mp)
> > > >         int ret;
> > > >         bool need_iova_contig_obj;
> > > >         size_t max_alloc_size = SIZE_MAX;
> > > > +       unsigned lcore_id;
> > > >
> > > >         ret = mempool_ops_alloc_once(mp);
> > > >         if (ret != 0)
> > > > @@ -600,6 +601,13 @@ rte_mempool_populate_default(struct
> > > rte_mempool *mp)
> > > >                 }
> > > >         }
> > > >
> > > > +       /* Init all default caches. */
> > > > +       if (mp->cache_size != 0) {
> > > > +               for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++)
> > > > +                       
> > > > mp->local_cache[lcore_id].local_cache_base_value =
> > > > +                               *(void **)mp->local_cache_base_addr;
> > > > +       }
> > > > +
> > > >         rte_mempool_trace_populate_default(mp);
> > > >         return mp->size;
> > > >
> > > > diff --git a/lib/mempool/rte_mempool.h b/lib/mempool/rte_mempool.h
> > > > index 4235d6f0bf2b..545405c0d3ce 100644
> > > > --- a/lib/mempool/rte_mempool.h
> > > > +++ b/lib/mempool/rte_mempool.h
> > > > @@ -51,6 +51,8 @@
> > > >  #include <rte_memcpy.h>
> > > >  #include <rte_common.h>
> > > >
> > > > +#include <arm_neon.h>
> > > > +
> > > >  #include "rte_mempool_trace_fp.h"
> > > >
> > > >  #ifdef __cplusplus
> > > > @@ -91,11 +93,12 @@ struct rte_mempool_cache {
> > > >         uint32_t size;        /**< Size of the cache */
> > > >         uint32_t flushthresh; /**< Threshold before we flush
> > > > excess elements
> > > */
> > > >         uint32_t len;         /**< Current cache count */
> > > > +       void *local_cache_base_value; /**< Base value to calculate
> > > > + indices */
> > > >         /*
> > > >          * Cache is allocated to this size to allow it to overflow in 
> > > > certain
> > > >          * cases to avoid needless emptying of cache.
> > > >          */
> > > > -       void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE * 3]; /**< Cache
> objects
> > > */
> > > > +       uint32_t objs[RTE_MEMPOOL_CACHE_MAX_SIZE * 3]; /**< Cache
> > > > + objects */
> > > >  } __rte_cache_aligned;
> > > >
> > > >  /**
> > > > @@ -172,7 +175,6 @@ struct rte_mempool_objtlr {
> > > >   * A list of memory where objects are stored
> > > >   */
> > > >  STAILQ_HEAD(rte_mempool_memhdr_list, rte_mempool_memhdr);
> > > > -
> > > >  /**
> > > >   * Callback used to free a memory chunk
> > > >   */
> > > > @@ -244,6 +246,7 @@ struct rte_mempool {
> > > >         int32_t ops_index;
> > > >
> > > >         struct rte_mempool_cache *local_cache; /**< Per-lcore
> > > > local cache */
> > > > +       void *local_cache_base_addr; /**< Reference to the base
> > > > + value */
> > > >
> > > >         uint32_t populated_size;         /**< Number of populated 
> > > > objects. */
> > > >         struct rte_mempool_objhdr_list elt_list; /**< List of
> > > > objects in pool */ @@ -1269,7 +1272,15 @@
> > > > rte_mempool_cache_flush(struct
> > > rte_mempool_cache *cache,
> > > >         if (cache == NULL || cache->len == 0)
> > > >                 return;
> > > >         rte_mempool_trace_cache_flush(cache, mp);
> > > > -       rte_mempool_ops_enqueue_bulk(mp, cache->objs, cache->len);
> > > > +
> > > > +       unsigned int i;
> > > > +       unsigned int cache_len = cache->len;
> > > > +       void *obj_table[RTE_MEMPOOL_CACHE_MAX_SIZE * 3];
> > > > +       void *base_value = cache->local_cache_base_value;
> > > > +       uint32_t *cache_objs = cache->objs;
> > > > +       for (i = 0; i < cache_len; i++)
> > > > +               obj_table[i] = (void *) RTE_PTR_ADD(base_value,
> cache_objs[i]);
> > > > +       rte_mempool_ops_enqueue_bulk(mp, obj_table, cache->len);
> > > >         cache->len = 0;
> > > >  }
> > > >
> > > > @@ -1289,7 +1300,9 @@ static __rte_always_inline void
> > > > __mempool_generic_put(struct rte_mempool *mp, void * const
> *obj_table,
> > > >                       unsigned int n, struct rte_mempool_cache
> > > > *cache) {
> > > > -       void **cache_objs;
> > > > +       uint32_t *cache_objs;
> > > > +       void *base_value;
> > > > +       uint32_t i;
> > > >
> > > >         /* increment stat now, adding in mempool always success */
> > > >         __MEMPOOL_STAT_ADD(mp, put_bulk, 1); @@ -1301,6 +1314,12
> > > > @@ __mempool_generic_put(struct rte_mempool *mp, void * const
> > > > *obj_table,
> > > >
> > > >         cache_objs = &cache->objs[cache->len];
> > > >
> > > > +       base_value = cache->local_cache_base_value;
> > > > +
> > > > +       uint64x2_t v_obj_table;
> > > > +       uint64x2_t v_base_value = vdupq_n_u64((uint64_t)base_value);
> > > > +       uint32x2_t v_cache_objs;
> > > > +
> > > >         /*
> > > >          * The cache follows the following algorithm
> > > >          *   1. Add the objects to the cache
> > > > @@ -1309,12 +1328,26 @@ __mempool_generic_put(struct
> rte_mempool
> > > *mp, void * const *obj_table,
> > > >          */
> > > >
> > > >         /* Add elements back into the cache */
> > > > -       rte_memcpy(&cache_objs[0], obj_table, sizeof(void *) * n);
> > > > +
> > > > +#if defined __ARM_NEON
> > > > +       for (i = 0; i < (n & ~0x1); i+=2) {
> > > > +               v_obj_table = vld1q_u64((const uint64_t 
> > > > *)&obj_table[i]);
> > > > +               v_cache_objs = vqmovn_u64(vsubq_u64(v_obj_table,
> > > v_base_value));
> > > > +               vst1_u32(cache_objs + i, v_cache_objs);
> > > > +       }
> > > > +       if (n & 0x1) {
> > > > +               cache_objs[i] = (uint32_t) RTE_PTR_DIFF(obj_table[i],
> base_value);
> > > > +       }
> > > > +#else
> > > > +       for (i = 0; i < n; i++) {
> > > > +               cache_objs[i] = (uint32_t) RTE_PTR_DIFF(obj_table[i],
> base_value);
> > > > +       }
> > > > +#endif
> > > >
> > > >         cache->len += n;
> > > >
> > > >         if (cache->len >= cache->flushthresh) {
> > > > -               rte_mempool_ops_enqueue_bulk(mp, &cache->objs[cache-
> >size],
> > > > +               rte_mempool_ops_enqueue_bulk(mp, obj_table +
> > > > + cache->len - cache->size,
> > > >                                 cache->len - cache->size);
> > > >                 cache->len = cache->size;
> > > >         }
> > > > @@ -1415,23 +1448,26 @@ __mempool_generic_get(struct
> rte_mempool
> > > *mp, void **obj_table,
> > > >                       unsigned int n, struct rte_mempool_cache
> > > > *cache) {
> > > >         int ret;
> > > > +       uint32_t i;
> > > >         uint32_t index, len;
> > > > -       void **cache_objs;
> > > > +       uint32_t *cache_objs;
> > > >
> > > >         /* No cache provided or cannot be satisfied from cache */
> > > >         if (unlikely(cache == NULL || n >= cache->size))
> > > >                 goto ring_dequeue;
> > > >
> > > > +       void *base_value = cache->local_cache_base_value;
> > > >         cache_objs = cache->objs;
> > > >
> > > >         /* Can this be satisfied from the cache? */
> > > >         if (cache->len < n) {
> > > >                 /* No. Backfill the cache first, and then fill from it 
> > > > */
> > > >                 uint32_t req = n + (cache->size - cache->len);
> > > > +               void *temp_objs[RTE_MEMPOOL_CACHE_MAX_SIZE * 3];
> > > > + /**< Cache objects */
> > > >
> > > >                 /* How many do we require i.e. number to fill the
> > > > cache + the
> > > request */
> > > >                 ret = rte_mempool_ops_dequeue_bulk(mp,
> > > > -                       &cache->objs[cache->len], req);
> > > > +                       temp_objs, req);
> > > >                 if (unlikely(ret < 0)) {
> > > >                         /*
> > > >                          * In the off chance that we are buffer
> > > > constrained, @@ -1442,12 +1478,32 @@
> __mempool_generic_get(struct
> > > rte_mempool *mp, void **obj_table,
> > > >                         goto ring_dequeue;
> > > >                 }
> > > >
> > > > +               len = cache->len;
> > > > +               for (i = 0; i < req; ++i, ++len) {
> > > > +                       cache_objs[len] = (uint32_t)
> > > > + RTE_PTR_DIFF(temp_objs[i],
> > > base_value);
> > > > +               }
> > > > +
> > > >                 cache->len += req;
> > > >         }
> > > >
> > > > +       uint64x2_t v_obj_table;
> > > > +       uint64x2_t v_cache_objs;
> > > > +       uint64x2_t v_base_value =
> > > > + vdupq_n_u64((uint64_t)base_value);
> > > > +
> > > >         /* Now fill in the response ... */
> > > > +#if defined __ARM_NEON
> > > > +       for (index = 0, len = cache->len - 1; index < (n & ~0x1); 
> > > > index+=2,
> > > > +                                               len-=2, obj_table+=2) {
> > > > +               v_cache_objs = vmovl_u32(vld1_u32(cache_objs + len - 
> > > > 1));
> > > > +               v_obj_table = vaddq_u64(v_cache_objs, v_base_value);
> > > > +               vst1q_u64((uint64_t *)obj_table, v_obj_table);
> > > > +       }
> > > > +       if (n & 0x1)
> > > > +               *obj_table = (void *) RTE_PTR_ADD(base_value,
> > > > +cache_objs[len]); #else
> > > >         for (index = 0, len = cache->len - 1; index < n; ++index,
> > > > len--,
> > > obj_table++)
> > > > -               *obj_table = cache_objs[len];
> > > > +               *obj_table = (void *) RTE_PTR_ADD(base_value,
> > > > +cache_objs[len]); #endif
> > > >
> > > >         cache->len -= n;
> > > >
> > > > --
> > > > 2.17.1
> > > >

Reply via email to