Hi Zoltan,

On 06/25/2015 08:48 PM, Zoltan Kiss wrote:
> The current way has a few problems:
>
> - if cache->len < n, we copy our elements into the cache first, then
>    into obj_table, that's unnecessary
> - if n >= cache_size (or the backfill fails), and we can't fulfil the
>    request from the ring alone, we don't try to combine with the cache
> - if refill fails, we don't return anything, even if the ring has enough
>    for our request
>
> This patch rewrites it severely:
> - at the first part of the function we only try the cache if cache->len < n
> - otherwise take our elements straight from the ring
> - if that fails but we have something in the cache, try to combine them
> - the refill happens at the end, and its failure doesn't modify our return
>    value

Indeed, it looks easier to read that way. I checked the performance with
"mempool_perf_autotest" of app/test and it show that there is no
regression (it is even slightly better in some test cases).

There is a small typo in the title: s/improbe/improve
Please see also a comment below.

>
> Signed-off-by: Zoltan Kiss <zoltan.kiss at linaro.org>
> ---
>   lib/librte_mempool/rte_mempool.h | 63 
> +++++++++++++++++++++++++---------------
>   1 file changed, 39 insertions(+), 24 deletions(-)
>
> diff --git a/lib/librte_mempool/rte_mempool.h 
> b/lib/librte_mempool/rte_mempool.h
> index a8054e1..896946c 100644
> --- a/lib/librte_mempool/rte_mempool.h
> +++ b/lib/librte_mempool/rte_mempool.h
> @@ -948,34 +948,14 @@ __mempool_get_bulk(struct rte_mempool *mp, void 
> **obj_table,
>       unsigned lcore_id = rte_lcore_id();
>       uint32_t cache_size = mp->cache_size;
>
> -     /* cache is not enabled or single consumer */
> +     cache = &mp->local_cache[lcore_id];
> +     /* cache is not enabled or single consumer or not enough */
>       if (unlikely(cache_size == 0 || is_mc == 0 ||
> -                  n >= cache_size || lcore_id >= RTE_MAX_LCORE))
> +                  cache->len < n || lcore_id >= RTE_MAX_LCORE))
>               goto ring_dequeue;
>
> -     cache = &mp->local_cache[lcore_id];
>       cache_objs = cache->objs;
>
> -     /* Can this be satisfied from the cache? */
> -     if (cache->len < n) {
> -             /* No. Backfill the cache first, and then fill from it */
> -             uint32_t req = n + (cache_size - cache->len);
> -
> -             /* How many do we require i.e. number to fill the cache + the 
> request */
> -             ret = rte_ring_mc_dequeue_bulk(mp->ring, 
> &cache->objs[cache->len], req);
> -             if (unlikely(ret < 0)) {
> -                     /*
> -                      * In the offchance that we are buffer constrained,
> -                      * where we are not able to allocate cache + n, go to
> -                      * the ring directly. If that fails, we are truly out of
> -                      * buffers.
> -                      */
> -                     goto ring_dequeue;
> -             }
> -
> -             cache->len += req;
> -     }
> -
>       /* Now fill in the response ... */
>       for (index = 0, len = cache->len - 1; index < n; ++index, len--, 
> obj_table++)
>               *obj_table = cache_objs[len];
> @@ -984,7 +964,8 @@ __mempool_get_bulk(struct rte_mempool *mp, void 
> **obj_table,
>
>       __MEMPOOL_STAT_ADD(mp, get_success, n);
>
> -     return 0;
> +     ret = 0;
> +     goto cache_refill;
>
>   ring_dequeue:
>   #endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
> @@ -995,11 +976,45 @@ ring_dequeue:
>       else
>               ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n);
>
> +#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
> +     if (ret < 0 && is_mc == 1 && cache->len > 0) {

if (unlikely(ret < 0 && is_mc == 1 && cache->len > 0))  ?


> +             uint32_t req = n - cache->len;
> +
> +             ret = rte_ring_mc_dequeue_bulk(mp->ring, obj_table, req);
> +             if (ret == 0) {
> +                     cache_objs = cache->objs;
> +                     obj_table += req;
> +                     for (index = 0; index < cache->len;
> +                          ++index, ++obj_table)
> +                             *obj_table = cache_objs[index];
> +                     cache->len = 0;
> +             }
> +     }
> +#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
> +
>       if (ret < 0)
>               __MEMPOOL_STAT_ADD(mp, get_fail, n);
>       else
>               __MEMPOOL_STAT_ADD(mp, get_success, n);
>
> +#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
> +cache_refill:
> +     /* If previous dequeue was OK and we have less than n, start refill */
> +     if (ret == 0 && cache_size > 0 && cache->len < n) {

Not sure it's likely or unlikely there. I'll tend to say unlikely
as the cache size is probably big compared to n most of the time.

I don't know if it would have a real performance impact thought, but
I think it won't hurt.


Regards,
Olivier


> +             uint32_t req = cache_size - cache->len;
> +
> +             cache_objs = cache->objs;
> +             ret = rte_ring_mc_dequeue_bulk(mp->ring,
> +                                            &cache->objs[cache->len],
> +                                            req);
> +             if (likely(ret == 0))
> +                     cache->len += req;
> +             else
> +                     /* Don't spoil the return value */
> +                     ret = 0;
> +     }
> +#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
> +
>       return ret;
>   }
>
>

Reply via email to