Zero-copy access to the mempool cache is beneficial for PMD performance, and 
must be provided by the mempool library to fix [Bug 1052] without a performance 
regression.

[Bug 1052]: https://bugs.dpdk.org/show_bug.cgi?id=1052


This RFC offers two conceptual variants of zero-copy get:
1. A simple version.
2. A version where existing (hot) objects in the cache are moved to the top of 
the cache before new objects from the backend driver are pulled in.

I would like some early feedback. Also, which variant do you prefer?

Notes:
* Allowing the 'cache' parameter to be NULL, and getting it from the mempool 
instead, was inspired by rte_mempool_cache_flush().
* Asserting that the 'mp' parameter is not NULL is not done by other functions, 
so I omitted it here too.

NB: Please ignore formatting. Also, this code has not even been compile tested.


PS: No promises, but I expect to offer an RFC for zero-copy put too. :-)


1. Simple version:

/**
 * Get objects from a mempool via zero-copy access to a user-owned mempool 
cache.
 *
 * @param cache
 *   A pointer to the mempool cache.
 * @param mp
 *   A pointer to the mempool.
 * @param n
 *   The number of objects to prefetch into the mempool cache.
 * @return
 *   The pointer to the objects in the mempool cache.
 *   NULL on error
 *   with rte_errno set appropriately.
 */
static __rte_always_inline void *
rte_mempool_cache_get_bulk(struct rte_mempool_cache *cache,
        struct rte_mempool *mp,
        unsigned int n)
{
    unsigned int len;

    if (cache == NULL)
        cache = rte_mempool_default_cache(mp, rte_lcore_id());
    if (cache == NULL) {
        rte_errno = EINVAL;
        goto fail;
    }

    rte_mempool_trace_cache_get_bulk(cache, mp, n);

    len = cache->len;

    if (unlikely(n > len)) {
        unsigned int size;

        if (unlikely(n > RTE_MEMPOOL_CACHE_MAX_SIZE)) {
            rte_errno = EINVAL;
            goto fail;
        }

        /* Fill the cache from the backend; fetch size + requested - len 
objects. */
        size = cache->size;

        ret = rte_mempool_ops_dequeue_bulk(mp, &cache->objs[len], size + n - 
len);
        if (unlikely(ret < 0)) {
            /*
             * We are buffer constrained.
             * Do not fill the cache, just satisfy the request.
             */
            ret = rte_mempool_ops_dequeue_bulk(mp, &cache->objs[len], n - len);
            if (unlikely(ret < 0)) {
                rte_errno = -ret;
                goto fail;
            }

            len = 0;
        } else
            len = size;
    } else
        len -= n;

    cache->len = len;

    RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
    RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);

    return &cache->objs[len];

fail:

    RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
    RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);

    return NULL;
}


2. Advanced version:

/**
 * Get objects from a mempool via zero-copy access to a user-owned mempool 
cache.
 *
 * @param cache
 *   A pointer to the mempool cache.
 * @param mp
 *   A pointer to the mempool.
 * @param n
 *   The number of objects to prefetch into the mempool cache.
 * @return
 *   The pointer to the objects in the mempool cache.
 *   NULL on error
 *   with rte_errno set appropriately.
 */
static __rte_always_inline void *
rte_mempool_cache_get_bulk(struct rte_mempool_cache *cache,
        struct rte_mempool *mp,
        unsigned int n)
{
    unsigned int len;

    if (cache == NULL)
        cache = rte_mempool_default_cache(mp, rte_lcore_id());
    if (cache == NULL) {
        rte_errno = EINVAL;
        goto fail;
    }

    rte_mempool_trace_cache_get_bulk(cache, mp, n);

    len = cache->len;

    if (unlikely(n > len)) {
        unsigned int size;

        if (unlikely(n > RTE_MEMPOOL_CACHE_MAX_SIZE)) {
            rte_errno = EINVAL;
            goto fail;
        }

        /* Fill the cache from the backend; fetch size + requested - len 
objects. */
        size = cache->size;

        if (likely(size + n >= 2 * len)) {
            /*
             * No overlap when copying (dst >= len): size + n - len >= len.
             * Move (i.e. copy) the existing objects in the cache to the
             * coming top of the cache, to make room for new objects below.
             */
            rte_memcpy(&cache->objs[size + n - len], &cache->objs[0], len);

            /* Fill the cache below the existing objects in the cache. */
            ret = rte_mempool_ops_dequeue_bulk(mp, &cache->objs[0], size + n - 
len);
            if (unlikely(ret < 0)) {
                goto constrained;
            } else
                len = size;
        } else {
            /* Fill the cache on top of any objects in it. */
            ret = rte_mempool_ops_dequeue_bulk(mp, &cache->objs[len], size + n 
- len);
            if (unlikely(ret < 0)) {

constrained:
                /*
                 * We are buffer constrained.
                 * Do not fill the cache, just satisfy the request.
                 */
                ret = rte_mempool_ops_dequeue_bulk(mp, &cache->objs[len], n - 
len);
                if (unlikely(ret < 0)) {
                    rte_errno = -ret;
                    goto fail;
                }

                len = 0;
            } else
                len = size;
        }
    } else
        len -= n;

    cache->len = len;

    RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_bulk, 1);
    RTE_MEMPOOL_CACHE_STAT_ADD(cache, get_success_objs, n);

    return &cache->objs[len];

fail:

    RTE_MEMPOOL_STAT_ADD(mp, get_fail_bulk, 1);
    RTE_MEMPOOL_STAT_ADD(mp, get_fail_objs, n);

    return NULL;
}


Med venlig hilsen / Kind regards,
-Morten Brørup

Reply via email to