The current way has a few problems: - if cache->len < n, we copy our elements into the cache first, then into obj_table, that's unnecessary - if n >= cache_size (or the backfill fails), and we can't fulfil the request from the ring alone, we don't try to combine with the cache - if refill fails, we don't return anything, even if the ring has enough for our request
This patch rewrites it severely: - at the first part of the function we only try the cache if cache->len < n - otherwise take our elements straight from the ring - if that fails but we have something in the cache, try to combine them - the refill happens at the end, and its failure doesn't modify our return value Signed-off-by: Zoltan Kiss <zoltan.kiss at linaro.org> --- lib/librte_mempool/rte_mempool.h | 63 +++++++++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 24 deletions(-) diff --git a/lib/librte_mempool/rte_mempool.h b/lib/librte_mempool/rte_mempool.h index a8054e1..896946c 100644 --- a/lib/librte_mempool/rte_mempool.h +++ b/lib/librte_mempool/rte_mempool.h @@ -948,34 +948,14 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table, unsigned lcore_id = rte_lcore_id(); uint32_t cache_size = mp->cache_size; - /* cache is not enabled or single consumer */ + cache = &mp->local_cache[lcore_id]; + /* cache is not enabled or single consumer or not enough */ if (unlikely(cache_size == 0 || is_mc == 0 || - n >= cache_size || lcore_id >= RTE_MAX_LCORE)) + cache->len < n || lcore_id >= RTE_MAX_LCORE)) goto ring_dequeue; - cache = &mp->local_cache[lcore_id]; cache_objs = cache->objs; - /* Can this be satisfied from the cache? */ - if (cache->len < n) { - /* No. Backfill the cache first, and then fill from it */ - uint32_t req = n + (cache_size - cache->len); - - /* How many do we require i.e. number to fill the cache + the request */ - ret = rte_ring_mc_dequeue_bulk(mp->ring, &cache->objs[cache->len], req); - if (unlikely(ret < 0)) { - /* - * In the offchance that we are buffer constrained, - * where we are not able to allocate cache + n, go to - * the ring directly. If that fails, we are truly out of - * buffers. - */ - goto ring_dequeue; - } - - cache->len += req; - } - /* Now fill in the response ... */ for (index = 0, len = cache->len - 1; index < n; ++index, len--, obj_table++) *obj_table = cache_objs[len]; @@ -984,7 +964,8 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table, __MEMPOOL_STAT_ADD(mp, get_success, n); - return 0; + ret = 0; + goto cache_refill; ring_dequeue: #endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */ @@ -995,11 +976,45 @@ ring_dequeue: else ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n); +#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0 + if (ret < 0 && is_mc == 1 && cache->len > 0) { + uint32_t req = n - cache->len; + + ret = rte_ring_mc_dequeue_bulk(mp->ring, obj_table, req); + if (ret == 0) { + cache_objs = cache->objs; + obj_table += req; + for (index = 0; index < cache->len; + ++index, ++obj_table) + *obj_table = cache_objs[index]; + cache->len = 0; + } + } +#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */ + if (ret < 0) __MEMPOOL_STAT_ADD(mp, get_fail, n); else __MEMPOOL_STAT_ADD(mp, get_success, n); +#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0 +cache_refill: + /* If previous dequeue was OK and we have less than n, start refill */ + if (ret == 0 && cache_size > 0 && cache->len < n) { + uint32_t req = cache_size - cache->len; + + cache_objs = cache->objs; + ret = rte_ring_mc_dequeue_bulk(mp->ring, + &cache->objs[cache->len], + req); + if (likely(ret == 0)) + cache->len += req; + else + /* Don't spoil the return value */ + ret = 0; + } +#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */ + return ret; } -- 1.9.1