Zero-copy access to the mempool cache is beneficial for PMD performance, and must be provided by the mempool library to fix [Bug 1052] without a performance regression.
[Bug 1052]: https://bugs.dpdk.org/show_bug.cgi?id=1052 This RFC offers two conceptual variants of zero-copy get: 1. A simple version. 2. A version where existing (hot) objects in the cache are moved to the top of the cache before new objects from the backend driver are pulled in. I would like some early feedback. Also, which variant do you prefer? Notes: * Allowing the 'cache' parameter to be NULL, and getting it from the mempool instead, was inspired by rte_mempool_cache_flush(). * Asserting that the 'mp' parameter is not NULL is not done by other functions, so I omitted it here too. NB: Please ignore formatting. Also, this code has not even been compile tested. PS: No promises, but I expect to offer an RFC for zero-copy put too. :-) 1. Simple version: /** * Get objects from a mempool via zero-copy access to a user-owned mempool cache. * * @param cache * A pointer to the mempool cache. * @param mp * A pointer to the mempool. * @param n * The number of objects to prefetch into the mempool cache. * @return * The pointer to the objects in the mempool cache. * NULL on error * with rte_errno set appropriately. */ static __rte_always_inline void * rte_mempool_cache_get_bulk(struct rte_mempool_cache *cache, struct rte_mempool *mp, unsigned int n) { unsigned int len; if (cache == NULL) cache = rte_mempool_default_cache(mp, rte_lcore_id()); if (cache == NULL) { rte_errno = EINVAL; goto fail; } rte_mempool_trace_cache_get_bulk(cache, mp, n); len = cache->len; if (unlikely(n > len)) { unsigned int size; if (unlikely(n > RTE_MEMPOOL_CACHE_MAX_SIZE)) { rte_errno = EINVAL; goto fail; } /* Fill the cache from the backend; fetch size + requested - len objects. */ size = cache->size; ret = rte_mempool_ops_dequeue_bulk(mp, &cache->objs[len], size + n - len); if (unlikely(ret < 0)) { /* * We are buffer constrained. * Do not fill the cache, just satisfy the request. */ ret = rte_mempool_ops_dequeue_bulk(mp, &cache->objs[len], n - len); if (unlikely(ret < 0)) { rte_errno = -ret; goto fail; } len = 0; } else len = size; } else len -= n; cache->len = len; RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1); RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n); return &cache->objs[len]; fail: RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1); RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n); return NULL; } 2. Advanced version: /** * Get objects from a mempool via zero-copy access to a user-owned mempool cache. * * @param cache * A pointer to the mempool cache. * @param mp * A pointer to the mempool. * @param n * The number of objects to prefetch into the mempool cache. * @return * The pointer to the objects in the mempool cache. * NULL on error * with rte_errno set appropriately. */ static __rte_always_inline void * rte_mempool_cache_get_bulk(struct rte_mempool_cache *cache, struct rte_mempool *mp, unsigned int n) { unsigned int len; if (cache == NULL) cache = rte_mempool_default_cache(mp, rte_lcore_id()); if (cache == NULL) { rte_errno = EINVAL; goto fail; } rte_mempool_trace_cache_get_bulk(cache, mp, n); len = cache->len; if (unlikely(n > len)) { unsigned int size; if (unlikely(n > RTE_MEMPOOL_CACHE_MAX_SIZE)) { rte_errno = EINVAL; goto fail; } /* Fill the cache from the backend; fetch size + requested - len objects. */ size = cache->size; if (likely(size + n >= 2 * len)) { /* * No overlap when copying (dst >= len): size + n - len >= len. * Move (i.e. copy) the existing objects in the cache to the * coming top of the cache, to make room for new objects below. */ rte_memcpy(&cache->objs[size + n - len], &cache->objs[0], len); /* Fill the cache below the existing objects in the cache. */ ret = rte_mempool_ops_dequeue_bulk(mp, &cache->objs[0], size + n - len); if (unlikely(ret < 0)) { goto constrained; } else len = size; } else { /* Fill the cache on top of any objects in it. */ ret = rte_mempool_ops_dequeue_bulk(mp, &cache->objs[len], size + n - len); if (unlikely(ret < 0)) { constrained: /* * We are buffer constrained. * Do not fill the cache, just satisfy the request. */ ret = rte_mempool_ops_dequeue_bulk(mp, &cache->objs[len], n - len); if (unlikely(ret < 0)) { rte_errno = -ret; goto fail; } len = 0; } else len = size; } } else len -= n; cache->len = len; RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1); RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n); return &cache->objs[len]; fail: RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1); RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n); return NULL; } Med venlig hilsen / Kind regards, -Morten Brørup