On Thu, Sep 30, 2021 at 10:57 PM Dharmik Thakkar <dharmik.thak...@arm.com> wrote: > > Current mempool per core cache implementation is based on pointer > For most architectures, each pointer consumes 64b > Replace it with index-based implementation, where in each buffer > is addressed by (pool address + index) > It will reduce memory requirements > > L3Fwd performance testing reveals minor improvements in the cache > performance and no change in throughput > > Micro-benchmarking the patch using mempool_perf_test shows > significant improvement with majority of the test cases > > Future plan involves replacing global pool's pointer-based implementation > with index-based implementation > > Signed-off-by: Dharmik Thakkar <dharmik.thak...@arm.com>
Sane idea. Like VPP, we tried to do this for rte_graph, but not observed much gain. Since lcore cache is typically 512, maybe there is a gain on the mempool path. Also, Since you are enabling only for local cache, it is good as mempool drivers can work as-is.(i.e HW drivers works with 64bit) I think, getting more performance numbers for various cases may be the next step. > --- > drivers/mempool/ring/rte_mempool_ring.c | 2 +- > lib/mempool/rte_mempool.c | 8 +++ > lib/mempool/rte_mempool.h | 74 ++++++++++++++++++++++--- > 3 files changed, 74 insertions(+), 10 deletions(-) > > diff --git a/drivers/mempool/ring/rte_mempool_ring.c > b/drivers/mempool/ring/rte_mempool_ring.c > index b1f09ff28f4d..e55913e47f21 100644 > --- a/drivers/mempool/ring/rte_mempool_ring.c > +++ b/drivers/mempool/ring/rte_mempool_ring.c > @@ -101,7 +101,7 @@ ring_alloc(struct rte_mempool *mp, uint32_t rg_flags) > return -rte_errno; > > mp->pool_data = r; > - > + mp->local_cache_base_addr = &r[1]; > return 0; > } > > diff --git a/lib/mempool/rte_mempool.c b/lib/mempool/rte_mempool.c > index 59a588425bd6..424bdb19c323 100644 > --- a/lib/mempool/rte_mempool.c > +++ b/lib/mempool/rte_mempool.c > @@ -480,6 +480,7 @@ rte_mempool_populate_default(struct rte_mempool *mp) > int ret; > bool need_iova_contig_obj; > size_t max_alloc_size = SIZE_MAX; > + unsigned lcore_id; > > ret = mempool_ops_alloc_once(mp); > if (ret != 0) > @@ -600,6 +601,13 @@ rte_mempool_populate_default(struct rte_mempool *mp) > } > } > > + /* Init all default caches. */ > + if (mp->cache_size != 0) { > + for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) > + mp->local_cache[lcore_id].local_cache_base_value = > + *(void **)mp->local_cache_base_addr; > + } > + > rte_mempool_trace_populate_default(mp); > return mp->size; > > diff --git a/lib/mempool/rte_mempool.h b/lib/mempool/rte_mempool.h > index 4235d6f0bf2b..545405c0d3ce 100644 > --- a/lib/mempool/rte_mempool.h > +++ b/lib/mempool/rte_mempool.h > @@ -51,6 +51,8 @@ > #include <rte_memcpy.h> > #include <rte_common.h> > > +#include <arm_neon.h> > + > #include "rte_mempool_trace_fp.h" > > #ifdef __cplusplus > @@ -91,11 +93,12 @@ struct rte_mempool_cache { > uint32_t size; /**< Size of the cache */ > uint32_t flushthresh; /**< Threshold before we flush excess elements > */ > uint32_t len; /**< Current cache count */ > + void *local_cache_base_value; /**< Base value to calculate indices */ > /* > * Cache is allocated to this size to allow it to overflow in certain > * cases to avoid needless emptying of cache. > */ > - void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE * 3]; /**< Cache objects */ > + uint32_t objs[RTE_MEMPOOL_CACHE_MAX_SIZE * 3]; /**< Cache objects */ > } __rte_cache_aligned; > > /** > @@ -172,7 +175,6 @@ struct rte_mempool_objtlr { > * A list of memory where objects are stored > */ > STAILQ_HEAD(rte_mempool_memhdr_list, rte_mempool_memhdr); > - > /** > * Callback used to free a memory chunk > */ > @@ -244,6 +246,7 @@ struct rte_mempool { > int32_t ops_index; > > struct rte_mempool_cache *local_cache; /**< Per-lcore local cache */ > + void *local_cache_base_addr; /**< Reference to the base value */ > > uint32_t populated_size; /**< Number of populated objects. */ > struct rte_mempool_objhdr_list elt_list; /**< List of objects in pool > */ > @@ -1269,7 +1272,15 @@ rte_mempool_cache_flush(struct rte_mempool_cache > *cache, > if (cache == NULL || cache->len == 0) > return; > rte_mempool_trace_cache_flush(cache, mp); > - rte_mempool_ops_enqueue_bulk(mp, cache->objs, cache->len); > + > + unsigned int i; > + unsigned int cache_len = cache->len; > + void *obj_table[RTE_MEMPOOL_CACHE_MAX_SIZE * 3]; > + void *base_value = cache->local_cache_base_value; > + uint32_t *cache_objs = cache->objs; > + for (i = 0; i < cache_len; i++) > + obj_table[i] = (void *) RTE_PTR_ADD(base_value, > cache_objs[i]); > + rte_mempool_ops_enqueue_bulk(mp, obj_table, cache->len); > cache->len = 0; > } > > @@ -1289,7 +1300,9 @@ static __rte_always_inline void > __mempool_generic_put(struct rte_mempool *mp, void * const *obj_table, > unsigned int n, struct rte_mempool_cache *cache) > { > - void **cache_objs; > + uint32_t *cache_objs; > + void *base_value; > + uint32_t i; > > /* increment stat now, adding in mempool always success */ > __MEMPOOL_STAT_ADD(mp, put_bulk, 1); > @@ -1301,6 +1314,12 @@ __mempool_generic_put(struct rte_mempool *mp, void * > const *obj_table, > > cache_objs = &cache->objs[cache->len]; > > + base_value = cache->local_cache_base_value; > + > + uint64x2_t v_obj_table; > + uint64x2_t v_base_value = vdupq_n_u64((uint64_t)base_value); > + uint32x2_t v_cache_objs; > + > /* > * The cache follows the following algorithm > * 1. Add the objects to the cache > @@ -1309,12 +1328,26 @@ __mempool_generic_put(struct rte_mempool *mp, void * > const *obj_table, > */ > > /* Add elements back into the cache */ > - rte_memcpy(&cache_objs[0], obj_table, sizeof(void *) * n); > + > +#if defined __ARM_NEON > + for (i = 0; i < (n & ~0x1); i+=2) { > + v_obj_table = vld1q_u64((const uint64_t *)&obj_table[i]); > + v_cache_objs = vqmovn_u64(vsubq_u64(v_obj_table, > v_base_value)); > + vst1_u32(cache_objs + i, v_cache_objs); > + } > + if (n & 0x1) { > + cache_objs[i] = (uint32_t) RTE_PTR_DIFF(obj_table[i], > base_value); > + } > +#else > + for (i = 0; i < n; i++) { > + cache_objs[i] = (uint32_t) RTE_PTR_DIFF(obj_table[i], > base_value); > + } > +#endif > > cache->len += n; > > if (cache->len >= cache->flushthresh) { > - rte_mempool_ops_enqueue_bulk(mp, &cache->objs[cache->size], > + rte_mempool_ops_enqueue_bulk(mp, obj_table + cache->len - > cache->size, > cache->len - cache->size); > cache->len = cache->size; > } > @@ -1415,23 +1448,26 @@ __mempool_generic_get(struct rte_mempool *mp, void > **obj_table, > unsigned int n, struct rte_mempool_cache *cache) > { > int ret; > + uint32_t i; > uint32_t index, len; > - void **cache_objs; > + uint32_t *cache_objs; > > /* No cache provided or cannot be satisfied from cache */ > if (unlikely(cache == NULL || n >= cache->size)) > goto ring_dequeue; > > + void *base_value = cache->local_cache_base_value; > cache_objs = cache->objs; > > /* Can this be satisfied from the cache? */ > if (cache->len < n) { > /* No. Backfill the cache first, and then fill from it */ > uint32_t req = n + (cache->size - cache->len); > + void *temp_objs[RTE_MEMPOOL_CACHE_MAX_SIZE * 3]; /**< Cache > objects */ > > /* How many do we require i.e. number to fill the cache + the > request */ > ret = rte_mempool_ops_dequeue_bulk(mp, > - &cache->objs[cache->len], req); > + temp_objs, req); > if (unlikely(ret < 0)) { > /* > * In the off chance that we are buffer constrained, > @@ -1442,12 +1478,32 @@ __mempool_generic_get(struct rte_mempool *mp, void > **obj_table, > goto ring_dequeue; > } > > + len = cache->len; > + for (i = 0; i < req; ++i, ++len) { > + cache_objs[len] = (uint32_t) > RTE_PTR_DIFF(temp_objs[i], base_value); > + } > + > cache->len += req; > } > > + uint64x2_t v_obj_table; > + uint64x2_t v_cache_objs; > + uint64x2_t v_base_value = vdupq_n_u64((uint64_t)base_value); > + > /* Now fill in the response ... */ > +#if defined __ARM_NEON > + for (index = 0, len = cache->len - 1; index < (n & ~0x1); index+=2, > + len-=2, obj_table+=2) { > + v_cache_objs = vmovl_u32(vld1_u32(cache_objs + len - 1)); > + v_obj_table = vaddq_u64(v_cache_objs, v_base_value); > + vst1q_u64((uint64_t *)obj_table, v_obj_table); > + } > + if (n & 0x1) > + *obj_table = (void *) RTE_PTR_ADD(base_value, > cache_objs[len]); > +#else > for (index = 0, len = cache->len - 1; index < n; ++index, len--, > obj_table++) > - *obj_table = cache_objs[len]; > + *obj_table = (void *) RTE_PTR_ADD(base_value, > cache_objs[len]); > +#endif > > cache->len -= n; > > -- > 2.17.1 >