> From: Kamalakshitha Aligeri [mailto:kamalakshitha.alig...@arm.com]
> Sent: Wednesday, 16 November 2022 18.25
> 
> Expose the pointer to free space in per core cache in PMD, so that the
> objects can be directly copied to cache without any temporary storage
> 
> Signed-off-by: Kamalakshitha Aligeri <kamalakshitha.alig...@arm.com>
> ---

Please build your patch in continuation of my patch [1], and use 
rte_mempool_cache_zc_put_bulk() instead of rte_mempool_get_cache().

[1]: https://inbox.dpdk.org/dev/20221116180419.98937-1...@smartsharesystems.com/

Some initial comments follow inline below.


> v2: Integration of API in vector PMD
> v1: API to return pointer to free space on per-core cache  and
> integration of API in scalar PMD
> 
>  app/test/test_mempool.c                 | 140 ++++++++++++++++++++++++
>  drivers/net/i40e/i40e_rxtx_vec_avx512.c |  46 +++-----
>  drivers/net/i40e/i40e_rxtx_vec_common.h |  22 +++-
>  lib/mempool/rte_mempool.h               |  46 ++++++++
>  4 files changed, 219 insertions(+), 35 deletions(-)
> 
> diff --git a/app/test/test_mempool.c b/app/test/test_mempool.c
> index 8e493eda47..a0160336dd 100644
> --- a/app/test/test_mempool.c
> +++ b/app/test/test_mempool.c
> @@ -187,6 +187,142 @@ test_mempool_basic(struct rte_mempool *mp, int
> use_external_cache)
>       return ret;
>  }
> 
> +/* basic tests (done on one core) */
> +static int
> +test_mempool_get_cache(struct rte_mempool *mp, int use_external_cache)
> +{
> +     uint32_t *objnum;
> +     void **objtable;
> +     void *obj, *obj2;
> +     char *obj_data;
> +     int ret = 0;
> +     unsigned int i, j;
> +     int offset;
> +     struct rte_mempool_cache *cache;
> +     void **cache_objs;
> +
> +     if (use_external_cache) {
> +             /* Create a user-owned mempool cache. */
> +             cache =
> rte_mempool_cache_create(RTE_MEMPOOL_CACHE_MAX_SIZE,
> +                                              SOCKET_ID_ANY);
> +             if (cache == NULL)
> +                     RET_ERR();
> +     } else {
> +             /* May be NULL if cache is disabled. */
> +             cache = rte_mempool_default_cache(mp, rte_lcore_id());
> +     }
> +
> +     /* dump the mempool status */
> +     rte_mempool_dump(stdout, mp);
> +
> +     printf("get an object\n");
> +     if (rte_mempool_generic_get(mp, &obj, 1, cache) < 0)
> +             GOTO_ERR(ret, out);
> +     rte_mempool_dump(stdout, mp);
> +
> +     /* tests that improve coverage */
> +     printf("get object count\n");
> +     /* We have to count the extra caches, one in this case. */
> +     offset = use_external_cache ? 1 * cache->len : 0;
> +     if (rte_mempool_avail_count(mp) + offset != MEMPOOL_SIZE - 1)
> +             GOTO_ERR(ret, out);
> +
> +     printf("get private data\n");
> +     if (rte_mempool_get_priv(mp) != (char *)mp +
> +                     RTE_MEMPOOL_HEADER_SIZE(mp, mp->cache_size))
> +             GOTO_ERR(ret, out);
> +
> +#ifndef RTE_EXEC_ENV_FREEBSD /* rte_mem_virt2iova() not supported on
> bsd */
> +     printf("get physical address of an object\n");
> +     if (rte_mempool_virt2iova(obj) != rte_mem_virt2iova(obj))
> +             GOTO_ERR(ret, out);
> +#endif
> +
> +
> +     printf("put the object back\n");
> +     cache_objs = rte_mempool_get_cache(mp, 1);

Use rte_mempool_cache_zc_put_bulk() instead.

> +     if (cache_objs != NULL)
> +             rte_memcpy(cache_objs, &obj, sizeof(void *));
> +     else
> +             rte_mempool_ops_enqueue_bulk(mp, &obj, 1);

rte_mempool_ops_enqueue_bulk() is an mempool internal function, and it lacks 
proper instrumentation. Use this instead:

rte_mempool_generic_put(mp, &obj, 1, NULL);

> +
> +     rte_mempool_dump(stdout, mp);
> +
> +     printf("get 2 objects\n");
> +     if (rte_mempool_generic_get(mp, &obj, 1, cache) < 0)
> +             GOTO_ERR(ret, out);
> +     if (rte_mempool_generic_get(mp, &obj2, 1, cache) < 0) {
> +             rte_mempool_generic_put(mp, &obj, 1, cache);
> +             GOTO_ERR(ret, out);
> +     }
> +     rte_mempool_dump(stdout, mp);
> +
> +     printf("put the objects back\n");
> +     cache_objs = rte_mempool_get_cache(mp, 1);

Use rte_mempool_cache_zc_put_bulk() instead.

> +     if (cache_objs != NULL)
> +             rte_memcpy(mp, &obj, sizeof(void *));
> +     else
> +             rte_mempool_ops_enqueue_bulk(mp, &obj, 1);

Use rte_mempool_generic_put() instead.

> +
> +     cache_objs = rte_mempool_get_cache(mp, 1);

Use rte_mempool_cache_zc_put_bulk() instead.

> +     if (cache_objs != NULL)
> +             rte_memcpy(mp, &obj2, sizeof(void *));
> +     else
> +             rte_mempool_ops_enqueue_bulk(mp, &obj2, 1);

Use rte_mempool_generic_put() instead.

> +     rte_mempool_dump(stdout, mp);
> +
> +     /*
> +      * get many objects: we cannot get them all because the cache
> +      * on other cores may not be empty.
> +      */
> +     objtable = malloc(MEMPOOL_SIZE * sizeof(void *));
> +     if (objtable == NULL)
> +             GOTO_ERR(ret, out);
> +
> +     for (i = 0; i < MEMPOOL_SIZE; i++) {
> +             if (rte_mempool_generic_get(mp, &objtable[i], 1, cache) <
> 0)
> +                     break;
> +     }
> +
> +     /*
> +      * for each object, check that its content was not modified,
> +      * and put objects back in pool
> +      */
> +     cache_objs = rte_mempool_get_cache(mp, MEMPOOL_SIZE);

Use rte_mempool_cache_zc_put_bulk() instead.

Also, this will always fail (return NULL) if MEMPOOL_SIZE is larger than the 
cache size.


> +     if (cache_objs != NULL) {
> +             while (i--) {
> +                     obj = objtable[i];
> +                     obj_data = obj;
> +                     objnum = obj;
> +                     if (*objnum > MEMPOOL_SIZE) {
> +                             printf("bad object number(%d)\n", *objnum);
> +                             ret = -1;
> +                             break;
> +                     }
> +                     for (j = sizeof(*objnum); j < mp->elt_size; j++) {
> +                             if (obj_data[j] != 0)
> +                                     ret = -1;
> +                     }
> +
> +                     rte_memcpy(&cache_objs[i], &objtable[i], sizeof(void
> *));
> +             }
> +     } else {
> +             rte_mempool_ops_enqueue_bulk(mp, objtable, MEMPOOL_SIZE);

Use rte_mempool_generic_put() instead.

> +     }
> +
> +     free(objtable);
> +     if (ret == -1)
> +             printf("objects were modified!\n");
> +
> +out:
> +     if (use_external_cache) {
> +             rte_mempool_cache_flush(cache, mp);
> +             rte_mempool_cache_free(cache);
> +     }
> +
> +     return ret;
> +}
> +
>  static int test_mempool_creation_with_exceeded_cache_size(void)
>  {
>       struct rte_mempool *mp_cov;
> @@ -986,6 +1122,10 @@ test_mempool(void)
>       if (test_mempool_basic(mp_cache, 0) < 0)
>               GOTO_ERR(ret, err);
> 
> +     /* basic tests with get cache */
> +     if (test_mempool_get_cache(mp_cache, 0) < 0)
> +             GOTO_ERR(ret, err);
> +
>       /* basic tests with user-owned cache */
>       if (test_mempool_basic(mp_nocache, 1) < 0)
>               GOTO_ERR(ret, err);
> diff --git a/drivers/net/i40e/i40e_rxtx_vec_avx512.c
> b/drivers/net/i40e/i40e_rxtx_vec_avx512.c
> index 60c97d5331..bfdb4f21f9 100644
> --- a/drivers/net/i40e/i40e_rxtx_vec_avx512.c
> +++ b/drivers/net/i40e/i40e_rxtx_vec_avx512.c
> @@ -902,14 +902,7 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue
> *txq)
> 
>       if (txq->offloads & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE && (n & 31)
> == 0) {
>               struct rte_mempool *mp = txep[0].mbuf->pool;
> -             void **cache_objs;
> -             struct rte_mempool_cache *cache =
> rte_mempool_default_cache(mp,
> -                             rte_lcore_id());
> -
> -             if (!cache || cache->len == 0)
> -                     goto normal;
> -
> -             cache_objs = &cache->objs[cache->len];
> +             void **cache_objs = rte_mempool_get_cache(mp, n);

Use rte_mempool_cache_zc_put_bulk() instead.


Remove these:
>               if (n > RTE_MEMPOOL_CACHE_MAX_SIZE) {
>                       rte_mempool_ops_enqueue_bulk(mp, (void *)txep, n);

The (n > RTE_MEMPOOL_CACHE_MAX_SIZE) comparison is obsolete, because 
rte_mempool_cache_zc_put_bulk() will return NULL if there is no cache.

> @@ -922,29 +915,22 @@ i40e_tx_free_bufs_avx512(struct i40e_tx_queue
> *txq)
>                *   crosses the cache flush threshold) is flushed to the
> ring.
>                */
>               /* Add elements back into the cache */
> -             uint32_t copied = 0;
> -             /* n is multiple of 32 */
> -             while (copied < n) {
> -                     const __m512i a = _mm512_load_si512(&txep[copied]);
> -                     const __m512i b = _mm512_load_si512(&txep[copied +
> 8]);
> -                     const __m512i c = _mm512_load_si512(&txep[copied +
> 16]);
> -                     const __m512i d = _mm512_load_si512(&txep[copied +
> 24]);
> -
> -                     _mm512_storeu_si512(&cache_objs[copied], a);
> -                     _mm512_storeu_si512(&cache_objs[copied + 8], b);
> -                     _mm512_storeu_si512(&cache_objs[copied + 16], c);
> -                     _mm512_storeu_si512(&cache_objs[copied + 24], d);
> -                     copied += 32;
> -             }
> -             cache->len += n;
> -
> -             if (cache->len >= cache->flushthresh) {
> -                     rte_mempool_ops_enqueue_bulk
> -                             (mp, &cache->objs[cache->size],
> -                             cache->len - cache->size);
> -                     cache->len = cache->size;
> +             if (cache_objs != NULL) {
> +                     uint32_t copied = 0;
> +                     /* n is multiple of 32 */
> +                     while (copied < n) {
> +                             const __m512i a =
> _mm512_load_si512(&txep[copied]);
> +                             const __m512i b =
> _mm512_load_si512(&txep[copied + 8]);
> +                             const __m512i c =
> _mm512_load_si512(&txep[copied + 16]);
> +                             const __m512i d =
> _mm512_load_si512(&txep[copied + 24]);
> +
> +                             _mm512_storeu_si512(&cache_objs[copied], a);
> +                             _mm512_storeu_si512(&cache_objs[copied + 8],
> b);
> +                             _mm512_storeu_si512(&cache_objs[copied + 16],
> c);
> +                             _mm512_storeu_si512(&cache_objs[copied + 24],
> d);
> +                             copied += 32;

And add this here instead of the (n > RTE_MEMPOOL_CACHE_MAX_SIZE) comparison:

+} else
+    rte_mempool_generic_put();

> +                     }
>               }
> -             goto done;
>       }
> 
>  normal:
> diff --git a/drivers/net/i40e/i40e_rxtx_vec_common.h
> b/drivers/net/i40e/i40e_rxtx_vec_common.h
> index fe1a6ec75e..4389ab9094 100644
> --- a/drivers/net/i40e/i40e_rxtx_vec_common.h
> +++ b/drivers/net/i40e/i40e_rxtx_vec_common.h
> @@ -99,14 +99,26 @@ i40e_tx_free_bufs(struct i40e_tx_queue *txq)
>         * tx_next_dd - (tx_rs_thresh-1)
>         */
>       txep = &txq->sw_ring[txq->tx_next_dd - (n - 1)];
> +     struct rte_mempool *mp = txep[0].mbuf->pool;
> 
>       if (txq->offloads & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE) {
> -             for (i = 0; i < n; i++) {
> -                     free[i] = txep[i].mbuf;
> -                     /* no need to reset txep[i].mbuf in vector path */
> +             void **cache_objs;
> +             cache_objs = rte_mempool_get_cache(mp, n);

Use rte_mempool_cache_zc_put_bulk() instead.

> +
> +             if (cache_objs != NULL) {
> +                     for (i = 0; i < n; i++) {
> +                             /* no need to reset txep[i].mbuf in vector path
> */
> +                             rte_memcpy(&cache_objs[i], &txep->mbuf,
> sizeof(struct rte_mbuf));
> +                             txep++;
> +                     }
> +                     goto done;
> +             } else {
> +                     for (i = 0; i < n; i++) {
> +                             free[i] = txep->mbuf;
> +                             txep++;
> +                     }
> +                     rte_mempool_ops_enqueue_bulk(mp, (void **)free, n);

Use rte_mempool_generic_put() instead.

>               }
> -             rte_mempool_put_bulk(free[0]->pool, (void **)free, n);
> -             goto done;
>       }
> 
>       m = rte_pktmbuf_prefree_seg(txep[0].mbuf);
> diff --git a/lib/mempool/rte_mempool.h b/lib/mempool/rte_mempool.h
> index 1f5707f46a..480b1eb585 100644
> --- a/lib/mempool/rte_mempool.h
> +++ b/lib/mempool/rte_mempool.h
> @@ -1360,6 +1360,52 @@ rte_mempool_do_generic_put(struct rte_mempool
> *mp, void * const *obj_table,
>       rte_mempool_ops_enqueue_bulk(mp, obj_table, n);
>  }
> 
> +/**
> + * @internal Put several objects back in the mempool; used internally.
> + * @param mp
> + *   A pointer to the mempool structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to store back in the mempool, must be
> strictly
> + *   positive.
> + * @param cache
> + *   A pointer to a mempool cache structure. May be NULL if not
> needed.
> + */
> +static __rte_always_inline void**
> +rte_mempool_get_cache(struct rte_mempool *mp, unsigned int n)

The zero-copy functions must be public, not internal. Internal functions are 
only intended to be used inside the library, so the PMDs should not call  
mempool internal functions.

As mentioned above: Please build in continuation of my patch, instead of 
providing your own similar function.

> +{
> +     void **cache_objs;
> +
> +     struct rte_mempool_cache *cache = rte_mempool_default_cache(mp,
> rte_lcore_id());
> +
> +     /* increment stat now, adding in mempool always success */
> +     RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1);
> +     RTE_MEMPOOL_STAT_ADD(mp, put_objs, n);
> +
> +     /* No cache provided or the request itself is too big for the
> cache */
> +     if (unlikely(cache == NULL || n > cache->flushthresh))
> +             return NULL;
> +
> +     /*
> +      * The cache follows the following algorithm:
> +      *   1. If the objects cannot be added to the cache without
> crossing
> +      *      the flush threshold, flush the cache to the backend.
> +      *   2. Add the objects to the cache.
> +      */
> +
> +     if (cache->len + n <= cache->flushthresh) {
> +             cache_objs = &cache->objs[cache->len];
> +             cache->len += n;
> +     } else {
> +             cache_objs = &cache->objs[0];
> +             rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache->len);
> +             cache->len = n;
> +     }
> +
> +     return cache_objs;
> +
> +}
> 
>  /**
>   * Put several objects back in the mempool.
> --
> 2.25.1
> 

Reply via email to