From: = Morten Brørup <m...@smartsharesystems.com>

Zero-copy access to mempool caches is beneficial for PMD performance, and
must be provided by the mempool library to fix [Bug 1052] without a
performance regression.

[Bug 1052]: https://bugs.dpdk.org/show_bug.cgi?id=1052

Bugzilla ID: 1052

Signed-off-by: Morten Brørup <m...@smartsharesystems.com>
Signed-off-by: Kamalakshitha Aligeri <kamalakshitha.alig...@arm.com>
---
v10:
* Added mempool test cases with zero-copy API's
v9:
* Also set rte_errno in zero-copy put function, if returning NULL.
  (Honnappa)
* Revert v3 comparison to prevent overflow if n is really huge and len is
  non-zero. (Olivier)
v8:
* Actually include the rte_errno header file.
  Note to self: The changes only take effect on the disk after the file in
  the text editor has been saved.
v7:
* Fix typo in function description. (checkpatch)
* Zero-copy functions may set rte_errno; include rte_errno header file.
  (ci/loongarch-compilation)
v6:
* Improve description of the 'n' parameter to the zero-copy get function.
  (Konstantin, Bruce)
* The caches used for zero-copy may not be user-owned, so remove this word
  from the function descriptions. (Kamalakshitha)
v5:
* Bugfix: Compare zero-copy get request to the cache size instead of the
  flush threshold; otherwise refill could overflow the memory allocated
  for the cache. (Andrew)
* Split the zero-copy put function into an internal function doing the
  work, and a public function with trace.
* Avoid code duplication by rewriting rte_mempool_do_generic_put() to use
  the internal zero-copy put function. (Andrew)
* Corrected the return type of rte_mempool_cache_zc_put_bulk() from void *
  to void **; it returns a pointer to an array of objects.
* Fix coding style: Add missing curly brackets. (Andrew)
v4:
* Fix checkpatch warnings.
v3:
* Bugfix: Respect the cache size; compare to the flush threshold instead
  of RTE_MEMPOOL_CACHE_MAX_SIZE.
* Added 'rewind' function for incomplete 'put' operations. (Konstantin)
* Replace RTE_ASSERTs with runtime checks of the request size.
  Instead of failing, return NULL if the request is too big. (Konstantin)
* Modified comparison to prevent overflow if n is really huge and len is
  non-zero. (Andrew)
* Updated the comments in the code.
v2:
* Fix checkpatch warnings.
* Fix missing registration of trace points.
* The functions are inline, so they don't go into the map file.
v1 changes from the RFC:
* Removed run-time parameter checks. (Honnappa)
  This is a hot fast path function; requiring correct application
  behaviour, i.e. function parameters must be valid.
* Added RTE_ASSERT for parameters instead.
  Code for this is only generated if built with RTE_ENABLE_ASSERT.
* Removed fallback when 'cache' parameter is not set. (Honnappa)
* Chose the simple get function; i.e. do not move the existing objects in
  the cache to the top of the new stack, just leave them at the bottom.
* Renamed the functions. Other suggestions are welcome, of course. ;-)
* Updated the function descriptions.
* Added the functions to trace_fp and version.map.

 app/test/test_mempool.c            |  81 +++++++---
 lib/mempool/mempool_trace_points.c |   9 ++
 lib/mempool/rte_mempool.h          | 239 +++++++++++++++++++++++++----
 lib/mempool/rte_mempool_trace_fp.h |  23 +++
 lib/mempool/version.map            |   9 ++
 5 files changed, 311 insertions(+), 50 deletions(-)

diff --git a/app/test/test_mempool.c b/app/test/test_mempool.c
index 8e493eda47..6d29f5bc7b 100644
--- a/app/test/test_mempool.c
+++ b/app/test/test_mempool.c
@@ -74,7 +74,7 @@ my_obj_init(struct rte_mempool *mp, __rte_unused void *arg,

 /* basic tests (done on one core) */
 static int
-test_mempool_basic(struct rte_mempool *mp, int use_external_cache)
+test_mempool_basic(struct rte_mempool *mp, int use_external_cache, int 
use_zc_api)
 {
        uint32_t *objnum;
        void **objtable;
@@ -84,6 +84,7 @@ test_mempool_basic(struct rte_mempool *mp, int 
use_external_cache)
        unsigned i, j;
        int offset;
        struct rte_mempool_cache *cache;
+       void **cache_objs;

        if (use_external_cache) {
                /* Create a user-owned mempool cache. */
@@ -100,8 +101,13 @@ test_mempool_basic(struct rte_mempool *mp, int 
use_external_cache)
        rte_mempool_dump(stdout, mp);

        printf("get an object\n");
-       if (rte_mempool_generic_get(mp, &obj, 1, cache) < 0)
-               GOTO_ERR(ret, out);
+       if (use_zc_api) {
+               cache_objs = rte_mempool_cache_zc_get_bulk(cache, mp, 1);
+               obj = *cache_objs;
+       } else {
+               if (rte_mempool_generic_get(mp, &obj, 1, cache) < 0)
+                       GOTO_ERR(ret, out);
+       }
        rte_mempool_dump(stdout, mp);

        /* tests that improve coverage */
@@ -123,21 +129,41 @@ test_mempool_basic(struct rte_mempool *mp, int 
use_external_cache)
 #endif

        printf("put the object back\n");
-       rte_mempool_generic_put(mp, &obj, 1, cache);
+       if (use_zc_api) {
+               cache_objs = rte_mempool_cache_zc_put_bulk(cache, mp, 1);
+               rte_memcpy(cache_objs, &obj, sizeof(void *));
+       } else {
+               rte_mempool_generic_put(mp, &obj, 1, cache);
+       }
        rte_mempool_dump(stdout, mp);

        printf("get 2 objects\n");
-       if (rte_mempool_generic_get(mp, &obj, 1, cache) < 0)
-               GOTO_ERR(ret, out);
-       if (rte_mempool_generic_get(mp, &obj2, 1, cache) < 0) {
-               rte_mempool_generic_put(mp, &obj, 1, cache);
-               GOTO_ERR(ret, out);
+       if (use_zc_api) {
+               cache_objs = rte_mempool_cache_zc_get_bulk(cache, mp, 1);
+               obj = *cache_objs;
+               cache_objs = rte_mempool_cache_zc_get_bulk(cache, mp, 1);
+               obj2 = *cache_objs;
+       } else {
+               if (rte_mempool_generic_get(mp, &obj, 1, cache) < 0)
+                       GOTO_ERR(ret, out);
+               if (rte_mempool_generic_get(mp, &obj2, 1, cache) < 0) {
+                       rte_mempool_generic_put(mp, &obj, 1, cache);
+                       GOTO_ERR(ret, out);
+               }
        }
        rte_mempool_dump(stdout, mp);

        printf("put the objects back\n");
-       rte_mempool_generic_put(mp, &obj, 1, cache);
-       rte_mempool_generic_put(mp, &obj2, 1, cache);
+       if (use_zc_api) {
+               cache_objs = rte_mempool_cache_zc_put_bulk(cache, mp, 1);
+               rte_memcpy(cache_objs, &obj, sizeof(void *));
+               cache_objs = rte_mempool_cache_zc_put_bulk(cache, mp, 1);
+               rte_memcpy(cache_objs, &obj2, sizeof(void *));
+
+       } else {
+               rte_mempool_generic_put(mp, &obj, 1, cache);
+               rte_mempool_generic_put(mp, &obj2, 1, cache);
+       }
        rte_mempool_dump(stdout, mp);

        /*
@@ -149,8 +175,13 @@ test_mempool_basic(struct rte_mempool *mp, int 
use_external_cache)
                GOTO_ERR(ret, out);

        for (i = 0; i < MEMPOOL_SIZE; i++) {
-               if (rte_mempool_generic_get(mp, &objtable[i], 1, cache) < 0)
-                       break;
+               if (use_zc_api) {
+                       cache_objs = rte_mempool_cache_zc_get_bulk(cache, mp, 
1);
+                       objtable[i] = *cache_objs;
+               } else {
+                       if (rte_mempool_generic_get(mp, &objtable[i], 1, cache) 
< 0)
+                               break;
+               }
        }

        /*
@@ -170,8 +201,12 @@ test_mempool_basic(struct rte_mempool *mp, int 
use_external_cache)
                        if (obj_data[j] != 0)
                                ret = -1;
                }
-
-               rte_mempool_generic_put(mp, &objtable[i], 1, cache);
+               if (use_zc_api) {
+                       cache_objs = rte_mempool_cache_zc_put_bulk(cache, mp, 
1);
+                       rte_memcpy(cache_objs, &objtable[i], sizeof(void *));
+               } else {
+                       rte_mempool_generic_put(mp, &objtable[i], 1, cache);
+               }
        }

        free(objtable);
@@ -979,15 +1014,19 @@ test_mempool(void)
        rte_mempool_list_dump(stdout);

        /* basic tests without cache */
-       if (test_mempool_basic(mp_nocache, 0) < 0)
+       if (test_mempool_basic(mp_nocache, 0, 0) < 0)
+               GOTO_ERR(ret, err);
+
+       /* basic tests with zero-copy API's */
+       if (test_mempool_basic(mp_cache, 0, 1) < 0)
                GOTO_ERR(ret, err);

-       /* basic tests with cache */
-       if (test_mempool_basic(mp_cache, 0) < 0)
+       /* basic tests with user-owned cache and zero-copy API's */
+       if (test_mempool_basic(mp_nocache, 1, 1) < 0)
                GOTO_ERR(ret, err);

        /* basic tests with user-owned cache */
-       if (test_mempool_basic(mp_nocache, 1) < 0)
+       if (test_mempool_basic(mp_nocache, 1, 0) < 0)
                GOTO_ERR(ret, err);

        /* more basic tests without cache */
@@ -1008,10 +1047,10 @@ test_mempool(void)
                GOTO_ERR(ret, err);

        /* test the stack handler */
-       if (test_mempool_basic(mp_stack, 1) < 0)
+       if (test_mempool_basic(mp_stack, 1, 0) < 0)
                GOTO_ERR(ret, err);

-       if (test_mempool_basic(default_pool, 1) < 0)
+       if (test_mempool_basic(default_pool, 1, 0) < 0)
                GOTO_ERR(ret, err);

        /* test mempool event callbacks */
diff --git a/lib/mempool/mempool_trace_points.c 
b/lib/mempool/mempool_trace_points.c
index 307018094d..8735a07971 100644
--- a/lib/mempool/mempool_trace_points.c
+++ b/lib/mempool/mempool_trace_points.c
@@ -77,3 +77,12 @@ RTE_TRACE_POINT_REGISTER(rte_mempool_trace_ops_free,

 RTE_TRACE_POINT_REGISTER(rte_mempool_trace_set_ops_byname,
        lib.mempool.set.ops.byname)
+
+RTE_TRACE_POINT_REGISTER(rte_mempool_trace_cache_zc_put_bulk,
+       lib.mempool.cache.zc.put.bulk)
+
+RTE_TRACE_POINT_REGISTER(rte_mempool_trace_cache_zc_put_rewind,
+       lib.mempool.cache.zc.put.rewind)
+
+RTE_TRACE_POINT_REGISTER(rte_mempool_trace_cache_zc_get_bulk,
+       lib.mempool.cache.zc.get.bulk)
diff --git a/lib/mempool/rte_mempool.h b/lib/mempool/rte_mempool.h
index 9f530db24b..94f895c329 100644
--- a/lib/mempool/rte_mempool.h
+++ b/lib/mempool/rte_mempool.h
@@ -42,6 +42,7 @@
 #include <rte_config.h>
 #include <rte_spinlock.h>
 #include <rte_debug.h>
+#include <rte_errno.h>
 #include <rte_lcore.h>
 #include <rte_branch_prediction.h>
 #include <rte_ring.h>
@@ -1346,6 +1347,199 @@ rte_mempool_cache_flush(struct rte_mempool_cache *cache,
        cache->len = 0;
 }

+
+/**
+ * @internal used by rte_mempool_cache_zc_put_bulk() and 
rte_mempool_do_generic_put().
+ *
+ * Zero-copy put objects in a mempool cache backed by the specified mempool.
+ *
+ * @param cache
+ *   A pointer to the mempool cache.
+ * @param mp
+ *   A pointer to the mempool.
+ * @param n
+ *   The number of objects to be put in the mempool cache.
+ * @return
+ *   The pointer to where to put the objects in the mempool cache.
+ *   NULL, with rte_errno set to EINVAL, if the request itself is too big
+ *   for the cache, i.e. exceeds the cache flush threshold.
+ */
+static __rte_always_inline void **
+__rte_mempool_cache_zc_put_bulk(struct rte_mempool_cache *cache,
+               struct rte_mempool *mp,
+               unsigned int n)
+{
+       void **cache_objs;
+
+       RTE_ASSERT(cache != NULL);
+       RTE_ASSERT(mp != NULL);
+
+       if (cache->len + n <= cache->flushthresh) {
+               /*
+                * The objects can be added to the cache without crossing the
+                * flush threshold.
+                */
+               cache_objs = &cache->objs[cache->len];
+               cache->len += n;
+       } else if (likely(n <= cache->flushthresh)) {
+               /*
+                * The request itself fits into the cache.
+                * But first, the cache must be flushed to the backend, so
+                * adding the objects does not cross the flush threshold.
+                */
+               cache_objs = &cache->objs[0];
+               rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache->len);
+               cache->len = n;
+       } else {
+               /* The request itself is too big for the cache. */
+               rte_errno = EINVAL;
+               return NULL;
+       }
+
+       RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
+       RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
+
+       return cache_objs;
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: This API may change, or be removed, without prior notice.
+ *
+ * Zero-copy put objects in a mempool cache backed by the specified mempool.
+ *
+ * @param cache
+ *   A pointer to the mempool cache.
+ * @param mp
+ *   A pointer to the mempool.
+ * @param n
+ *   The number of objects to be put in the mempool cache.
+ * @return
+ *   The pointer to where to put the objects in the mempool cache.
+ *   NULL if the request itself is too big for the cache, i.e.
+ *   exceeds the cache flush threshold.
+ */
+__rte_experimental
+static __rte_always_inline void **
+rte_mempool_cache_zc_put_bulk(struct rte_mempool_cache *cache,
+               struct rte_mempool *mp,
+               unsigned int n)
+{
+       RTE_ASSERT(cache != NULL);
+       RTE_ASSERT(mp != NULL);
+
+       rte_mempool_trace_cache_zc_put_bulk(cache, mp, n);
+       return __rte_mempool_cache_zc_put_bulk(cache, mp, n);
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: This API may change, or be removed, without prior notice.
+ *
+ * Zero-copy un-put objects in a mempool cache.
+ *
+ * @param cache
+ *   A pointer to the mempool cache.
+ * @param n
+ *   The number of objects not put in the mempool cache after calling
+ *   rte_mempool_cache_zc_put_bulk().
+ */
+__rte_experimental
+static __rte_always_inline void
+rte_mempool_cache_zc_put_rewind(struct rte_mempool_cache *cache,
+               unsigned int n)
+{
+       RTE_ASSERT(cache != NULL);
+       RTE_ASSERT(n <= cache->len);
+
+       rte_mempool_trace_cache_zc_put_rewind(cache, n);
+
+       cache->len -= n;
+
+       RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, (int)-n);
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: This API may change, or be removed, without prior notice.
+ *
+ * Zero-copy get objects from a mempool cache backed by the specified mempool.
+ *
+ * @param cache
+ *   A pointer to the mempool cache.
+ * @param mp
+ *   A pointer to the mempool.
+ * @param n
+ *   The number of objects to be made available for extraction from the 
mempool cache.
+ * @return
+ *   The pointer to the objects in the mempool cache.
+ *   NULL on error; i.e. the cache + the pool does not contain 'n' objects.
+ *   With rte_errno set to the error code of the mempool dequeue function,
+ *   or EINVAL if the request itself is too big for the cache, i.e.
+ *   exceeds the cache flush threshold.
+ */
+__rte_experimental
+static __rte_always_inline void *
+rte_mempool_cache_zc_get_bulk(struct rte_mempool_cache *cache,
+               struct rte_mempool *mp,
+               unsigned int n)
+{
+       unsigned int len, size;
+
+       RTE_ASSERT(cache != NULL);
+       RTE_ASSERT(mp != NULL);
+
+       rte_mempool_trace_cache_zc_get_bulk(cache, mp, n);
+
+       len = cache->len;
+       size = cache->size;
+
+       if (n <= len) {
+               /* The request can be satisfied from the cache as is. */
+               len -= n;
+       } else if (likely(n <= size)) {
+               /*
+                * The request itself can be satisfied from the cache.
+                * But first, the cache must be filled from the backend;
+                * fetch size + requested - len objects.
+                */
+               int ret;
+
+               ret = rte_mempool_ops_dequeue_bulk(mp, &cache->objs[len], size 
+ n - len);
+               if (unlikely(ret < 0)) {
+                       /*
+                        * We are buffer constrained.
+                        * Do not fill the cache, just satisfy the request.
+                        */
+                       ret = rte_mempool_ops_dequeue_bulk(mp, 
&cache->objs[len], n - len);
+                       if (unlikely(ret < 0)) {
+                               /* Unable to satisfy the request. */
+
+                               RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
+                               RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);
+
+                               rte_errno = -ret;
+                               return NULL;
+                       }
+
+                       len = 0;
+               } else {
+                       len = size;
+               }
+       } else {
+               /* The request itself is too big for the cache. */
+               rte_errno = EINVAL;
+               return NULL;
+       }
+
+       cache->len = len;
+
+       RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
+       RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);
+
+       return &cache->objs[len];
+}
+
 /**
  * @internal Put several objects back in the mempool; used internally.
  * @param mp
@@ -1364,32 +1558,25 @@ rte_mempool_do_generic_put(struct rte_mempool *mp, void 
* const *obj_table,
 {
        void **cache_objs;

-       /* No cache provided */
-       if (unlikely(cache == NULL))
-               goto driver_enqueue;
+       /* No cache provided? */
+       if (unlikely(cache == NULL)) {
+               /* Increment stats now, adding in mempool always succeeds. */
+               RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1);
+               RTE_MEMPOOL_STAT_ADD(mp, put_objs, n);

-       /* increment stat now, adding in mempool always success */
-       RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
-       RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);
+               goto driver_enqueue;
+       }

-       /* The request itself is too big for the cache */
-       if (unlikely(n > cache->flushthresh))
-               goto driver_enqueue_stats_incremented;
+       /* Prepare to add the objects to the cache. */
+       cache_objs = __rte_mempool_cache_zc_put_bulk(cache, mp, n);

-       /*
-        * The cache follows the following algorithm:
-        *   1. If the objects cannot be added to the cache without crossing
-        *      the flush threshold, flush the cache to the backend.
-        *   2. Add the objects to the cache.
-        */
+       /* The request itself is too big for the cache? */
+       if (unlikely(cache_objs == NULL)) {
+               /* Increment stats now, adding in mempool always succeeds. */
+               RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_bulk, 1);
+               RTE_MEMPOOL_CACHE_STAT_ADD(cache, put_objs, n);

-       if (cache->len + n <= cache->flushthresh) {
-               cache_objs = &cache->objs[cache->len];
-               cache->len += n;
-       } else {
-               cache_objs = &cache->objs[0];
-               rte_mempool_ops_enqueue_bulk(mp, cache_objs, cache->len);
-               cache->len = n;
+               goto driver_enqueue;
        }

        /* Add the objects to the cache. */
@@ -1399,13 +1586,7 @@ rte_mempool_do_generic_put(struct rte_mempool *mp, void 
* const *obj_table,

 driver_enqueue:

-       /* increment stat now, adding in mempool always success */
-       RTE_MEMPOOL_STAT_ADD(mp, put_bulk, 1);
-       RTE_MEMPOOL_STAT_ADD(mp, put_objs, n);
-
-driver_enqueue_stats_incremented:
-
-       /* push objects to the backend */
+       /* Push the objects to the backend. */
        rte_mempool_ops_enqueue_bulk(mp, obj_table, n);
 }

diff --git a/lib/mempool/rte_mempool_trace_fp.h 
b/lib/mempool/rte_mempool_trace_fp.h
index ed060e887c..14666457f7 100644
--- a/lib/mempool/rte_mempool_trace_fp.h
+++ b/lib/mempool/rte_mempool_trace_fp.h
@@ -109,6 +109,29 @@ RTE_TRACE_POINT_FP(
        rte_trace_point_emit_ptr(mempool);
 )

+RTE_TRACE_POINT_FP(
+       rte_mempool_trace_cache_zc_put_bulk,
+       RTE_TRACE_POINT_ARGS(void *cache, void *mempool, uint32_t nb_objs),
+       rte_trace_point_emit_ptr(cache);
+       rte_trace_point_emit_ptr(mempool);
+       rte_trace_point_emit_u32(nb_objs);
+)
+
+RTE_TRACE_POINT_FP(
+       rte_mempool_trace_cache_zc_put_rewind,
+       RTE_TRACE_POINT_ARGS(void *cache, uint32_t nb_objs),
+       rte_trace_point_emit_ptr(cache);
+       rte_trace_point_emit_u32(nb_objs);
+)
+
+RTE_TRACE_POINT_FP(
+       rte_mempool_trace_cache_zc_get_bulk,
+       RTE_TRACE_POINT_ARGS(void *cache, void *mempool, uint32_t nb_objs),
+       rte_trace_point_emit_ptr(cache);
+       rte_trace_point_emit_ptr(mempool);
+       rte_trace_point_emit_u32(nb_objs);
+)
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/lib/mempool/version.map b/lib/mempool/version.map
index dff2d1cb55..06cb83ad9d 100644
--- a/lib/mempool/version.map
+++ b/lib/mempool/version.map
@@ -49,6 +49,15 @@ EXPERIMENTAL {
        __rte_mempool_trace_get_contig_blocks;
        __rte_mempool_trace_default_cache;
        __rte_mempool_trace_cache_flush;
+       __rte_mempool_trace_ops_populate;
+       __rte_mempool_trace_ops_alloc;
+       __rte_mempool_trace_ops_free;
+       __rte_mempool_trace_set_ops_byname;
+
+       # added in 23.03
+       __rte_mempool_trace_cache_zc_put_bulk;
+       __rte_mempool_trace_cache_zc_put_rewind;
+       __rte_mempool_trace_cache_zc_get_bulk;
 };

 INTERNAL {
--
2.25.1

Reply via email to