> -----Original Message-----
> From: dev [mailto:dev-bounces at dpdk.org] On Behalf Of Zoltan Kiss
> Sent: Wednesday, July 01, 2015 10:04 AM
> To: dev at dpdk.org
> Subject: [dpdk-dev] [PATCH v2] mempool: improve cache search
> 
> The current way has a few problems:
> 
> - if cache->len < n, we copy our elements into the cache first, then
>   into obj_table, that's unnecessary
> - if n >= cache_size (or the backfill fails), and we can't fulfil the
>   request from the ring alone, we don't try to combine with the cache
> - if refill fails, we don't return anything, even if the ring has enough
>   for our request
> 
> This patch rewrites it severely:
> - at the first part of the function we only try the cache if cache->len < n
> - otherwise take our elements straight from the ring
> - if that fails but we have something in the cache, try to combine them
> - the refill happens at the end, and its failure doesn't modify our return
>   value
> 
> Signed-off-by: Zoltan Kiss <zoltan.kiss at linaro.org>
> ---
> v2:
> - fix subject
> - add unlikely for branch where request is fulfilled both from cache and ring
> 
>  lib/librte_mempool/rte_mempool.h | 63 
> +++++++++++++++++++++++++---------------
>  1 file changed, 39 insertions(+), 24 deletions(-)
> 
> diff --git a/lib/librte_mempool/rte_mempool.h 
> b/lib/librte_mempool/rte_mempool.h
> index 6d4ce9a..1e96f03 100644
> --- a/lib/librte_mempool/rte_mempool.h
> +++ b/lib/librte_mempool/rte_mempool.h
> @@ -947,34 +947,14 @@ __mempool_get_bulk(struct rte_mempool *mp, void 
> **obj_table,
>       unsigned lcore_id = rte_lcore_id();
>       uint32_t cache_size = mp->cache_size;
> 
> -     /* cache is not enabled or single consumer */
> +     cache = &mp->local_cache[lcore_id];
> +     /* cache is not enabled or single consumer or not enough */
>       if (unlikely(cache_size == 0 || is_mc == 0 ||
> -                  n >= cache_size || lcore_id >= RTE_MAX_LCORE))
> +                  cache->len < n || lcore_id >= RTE_MAX_LCORE))
>               goto ring_dequeue;
> 
> -     cache = &mp->local_cache[lcore_id];
>       cache_objs = cache->objs;
> 
> -     /* Can this be satisfied from the cache? */
> -     if (cache->len < n) {
> -             /* No. Backfill the cache first, and then fill from it */
> -             uint32_t req = n + (cache_size - cache->len);
> -
> -             /* How many do we require i.e. number to fill the cache + the 
> request */
> -             ret = rte_ring_mc_dequeue_bulk(mp->ring, 
> &cache->objs[cache->len], req);
> -             if (unlikely(ret < 0)) {
> -                     /*
> -                      * In the offchance that we are buffer constrained,
> -                      * where we are not able to allocate cache + n, go to
> -                      * the ring directly. If that fails, we are truly out of
> -                      * buffers.
> -                      */
> -                     goto ring_dequeue;
> -             }
> -
> -             cache->len += req;
> -     }
> -
>       /* Now fill in the response ... */
>       for (index = 0, len = cache->len - 1; index < n; ++index, len--, 
> obj_table++)
>               *obj_table = cache_objs[len];
> @@ -983,7 +963,8 @@ __mempool_get_bulk(struct rte_mempool *mp, void 
> **obj_table,
> 
>       __MEMPOOL_STAT_ADD(mp, get_success, n);
> 
> -     return 0;
> +     ret = 0;
> +     goto cache_refill;
> 
>  ring_dequeue:
>  #endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
> @@ -994,11 +975,45 @@ ring_dequeue:
>       else
>               ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n);
> 
> +#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
> +     if (unlikely(ret < 0 && is_mc == 1 && cache->len > 0)) {
> +             uint32_t req = n - cache->len;
> +
> +             ret = rte_ring_mc_dequeue_bulk(mp->ring, obj_table, req);
> +             if (ret == 0) {
> +                     cache_objs = cache->objs;
> +                     obj_table += req;
> +                     for (index = 0; index < cache->len;
> +                          ++index, ++obj_table)
> +                             *obj_table = cache_objs[index];
> +                     cache->len = 0;
> +             }
> +     }
> +#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
> +
>       if (ret < 0)
>               __MEMPOOL_STAT_ADD(mp, get_fail, n);
>       else
>               __MEMPOOL_STAT_ADD(mp, get_success, n);
> 
> +#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
> +cache_refill:

Ok, so if I get things right: if the lcore runs out of entries in cache,
then on next __mempool_get_bulk() it has to do ring_dequeue() twice:
1. to satisfy user request
2. to refill the cache.
Right?
If that so, then I think the current approach:
ring_dequeue() once to refill the cache, then copy entries from the cache to 
the user
is a cheaper(faster) one for many cases.
Especially when same pool is shared between multiple threads.
For example when thread is doing RX only (no TX).


> +     /* If previous dequeue was OK and we have less than n, start refill */
> +     if (ret == 0 && cache_size > 0 && cache->len < n) {
> +             uint32_t req = cache_size - cache->len;


It could be that n > cache_size.
For that case, there probably no point to refill the cache, as you took 
entrires from the ring
and cache was intact. 

Konstantin

> +
> +             cache_objs = cache->objs;
> +             ret = rte_ring_mc_dequeue_bulk(mp->ring,
> +                                            &cache->objs[cache->len],
> +                                            req);
> +             if (likely(ret == 0))
> +                     cache->len += req;
> +             else
> +                     /* Don't spoil the return value */
> +                     ret = 0;
> +     }
> +#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
> +
>       return ret;
>  }
> 
> --
> 1.9.1

Reply via email to