Hello David,

please, see my comments inline.

I didn't see the previous versions of the mempool (well, only very roughly) so 
I am
probably missing some points... My point of view is as a user of the handler 
API.
I need to understand the API to implement a custom handler for my purposes.

On Thu, 19 May 2016 14:44:59 +0100
David Hunt <david.hunt at intel.com> wrote:

> Until now, the objects stored in mempool mempool were internally stored a

s/mempool mempool/mempool/

stored _in_ a ring?

> ring. This patch introduce the possibility to register external handlers
> replacing the ring.
> 
> The default behavior remains unchanged, but calling the new function
> rte_mempool_set_handler() right after rte_mempool_create_empty() allows to
> change the handler that will be used when populating the mempool.
> 
> v5 changes: rebasing on top of 35 patch set mempool work.
> 
> Signed-off-by: David Hunt <david.hunt at intel.com>
> Signed-off-by: Olivier Matz <olivier.matz at 6wind.com>
> 
> ---
> app/test/test_mempool_perf.c               |   1 -
>  lib/librte_mempool/Makefile                |   2 +
>  lib/librte_mempool/rte_mempool.c           |  73 ++++------
>  lib/librte_mempool/rte_mempool.h           | 212 
> +++++++++++++++++++++++++----
>  lib/librte_mempool/rte_mempool_default.c   | 147 ++++++++++++++++++++
>  lib/librte_mempool/rte_mempool_handler.c   | 139 +++++++++++++++++++
>  lib/librte_mempool/rte_mempool_version.map |   4 +
>  7 files changed, 506 insertions(+), 72 deletions(-)
>  create mode 100644 lib/librte_mempool/rte_mempool_default.c
>  create mode 100644 lib/librte_mempool/rte_mempool_handler.c
> 
> diff --git a/app/test/test_mempool_perf.c b/app/test/test_mempool_perf.c
> index cdc02a0..091c1df 100644
> --- a/app/test/test_mempool_perf.c
> +++ b/app/test/test_mempool_perf.c
> @@ -161,7 +161,6 @@ per_lcore_mempool_test(__attribute__((unused)) void *arg)
>                                                          n_get_bulk);
>                               if (unlikely(ret < 0)) {
>                                       rte_mempool_dump(stdout, mp);
> -                                     rte_ring_dump(stdout, mp->ring);
>                                       /* in this case, objects are lost... */
>                                       return -1;
>                               }

I think, this should be in a separate patch explaining the reason to remove it.

> diff --git a/lib/librte_mempool/Makefile b/lib/librte_mempool/Makefile
> index 43423e0..f19366e 100644
> --- a/lib/librte_mempool/Makefile
> +++ b/lib/librte_mempool/Makefile
> @@ -42,6 +42,8 @@ LIBABIVER := 2
>  
>  # all source are stored in SRCS-y
>  SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  rte_mempool.c
> +SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  rte_mempool_handler.c
> +SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  rte_mempool_default.c
>  # install includes
>  SYMLINK-$(CONFIG_RTE_LIBRTE_MEMPOOL)-include := rte_mempool.h
>  
> diff --git a/lib/librte_mempool/rte_mempool.c 
> b/lib/librte_mempool/rte_mempool.c
> index 1ab6701..6ec2b3f 100644
> --- a/lib/librte_mempool/rte_mempool.c
> +++ b/lib/librte_mempool/rte_mempool.c
> @@ -148,7 +148,7 @@ mempool_add_elem(struct rte_mempool *mp, void *obj, 
> phys_addr_t physaddr)
>  #endif
>  
>       /* enqueue in ring */
> -     rte_ring_sp_enqueue(mp->ring, obj);
> +     rte_mempool_ext_put_bulk(mp, &obj, 1);

I suppose this is OK, however, replacing "enqueue" by "put" (semantically) 
sounds to me
like a bug. Enqueue is put into a queue. Put is to drop a reference.

>  }
>  
>  /* call obj_cb() for each mempool element */
> @@ -300,40 +300,6 @@ rte_mempool_xmem_usage(__rte_unused void *vaddr, 
> uint32_t elt_num,
>       return (size_t)paddr_idx << pg_shift;
>  }
>  
> -/* create the internal ring */
> -static int
> -rte_mempool_ring_create(struct rte_mempool *mp)
> -{
> -     int rg_flags = 0, ret;
> -     char rg_name[RTE_RING_NAMESIZE];
> -     struct rte_ring *r;
> -
> -     ret = snprintf(rg_name, sizeof(rg_name),
> -             RTE_MEMPOOL_MZ_FORMAT, mp->name);
> -     if (ret < 0 || ret >= (int)sizeof(rg_name))
> -             return -ENAMETOOLONG;
> -
> -     /* ring flags */
> -     if (mp->flags & MEMPOOL_F_SP_PUT)
> -             rg_flags |= RING_F_SP_ENQ;
> -     if (mp->flags & MEMPOOL_F_SC_GET)
> -             rg_flags |= RING_F_SC_DEQ;
> -
> -     /* Allocate the ring that will be used to store objects.
> -      * Ring functions will return appropriate errors if we are
> -      * running as a secondary process etc., so no checks made
> -      * in this function for that condition.
> -      */
> -     r = rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
> -             mp->socket_id, rg_flags);
> -     if (r == NULL)
> -             return -rte_errno;
> -
> -     mp->ring = r;
> -     mp->flags |= MEMPOOL_F_RING_CREATED;
> -     return 0;
> -}

This is a big change. I suggest (if possible) to make a separate patch with
something like "replace rte_mempool_ring_create by ...". Where is this code
placed now?

> -
>  /* free a memchunk allocated with rte_memzone_reserve() */
>  static void
>  rte_mempool_memchunk_mz_free(__rte_unused struct rte_mempool_memhdr *memhdr,
> @@ -351,7 +317,7 @@ rte_mempool_free_memchunks(struct rte_mempool *mp)
>       void *elt;
>  
>       while (!STAILQ_EMPTY(&mp->elt_list)) {
> -             rte_ring_sc_dequeue(mp->ring, &elt);
> +             rte_mempool_ext_get_bulk(mp, &elt, 1);

Similar as for put_bulk... Replacing "dequeue" by "get" (semantically) sounds 
to me
like a bug. Dequeue is drop from a queue. Get is to obtain a reference.

>               (void)elt;
>               STAILQ_REMOVE_HEAD(&mp->elt_list, next);
>               mp->populated_size--;
> @@ -380,15 +346,18 @@ rte_mempool_populate_phys(struct rte_mempool *mp, char 
> *vaddr,
>       unsigned i = 0;
>       size_t off;
>       struct rte_mempool_memhdr *memhdr;
> -     int ret;
>  
>       /* create the internal ring if not already done */
>       if ((mp->flags & MEMPOOL_F_RING_CREATED) == 0) {
> -             ret = rte_mempool_ring_create(mp);
> -             if (ret < 0)
> -                     return ret;
> +             rte_errno = 0;
> +             mp->pool = rte_mempool_ext_alloc(mp);
> +             if (mp->pool == NULL) {
> +                     if (rte_errno == 0)
> +                             return -EINVAL;
> +                     else
> +                             return -rte_errno;
> +             }
>       }
> -

Is this a whitespace change?

>       /* mempool is already populated */
>       if (mp->populated_size >= mp->size)
>               return -ENOSPC;
> @@ -700,7 +669,7 @@ rte_mempool_free(struct rte_mempool *mp)
>       rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
>  
>       rte_mempool_free_memchunks(mp);
> -     rte_ring_free(mp->ring);
> +     rte_mempool_ext_free(mp);
>       rte_memzone_free(mp->mz);
>  }
>  
> @@ -812,6 +781,20 @@ rte_mempool_create_empty(const char *name, unsigned n, 
> unsigned elt_size,
>               RTE_PTR_ADD(mp, MEMPOOL_HEADER_SIZE(mp, 0));
>  
>       te->data = mp;
> +
> +     /*
> +      * Since we have 4 combinations of the SP/SC/MP/MC examine the flags to
> +      * set the correct index into the handler table.
> +      */
> +     if (flags & (MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET))
> +             rte_mempool_set_handler(mp, "ring_sp_sc");
> +     else if (flags & MEMPOOL_F_SP_PUT)
> +             rte_mempool_set_handler(mp, "ring_sp_mc");
> +     else if (flags & MEMPOOL_F_SC_GET)
> +             rte_mempool_set_handler(mp, "ring_mp_sc");
> +     else
> +             rte_mempool_set_handler(mp, "ring_mp_mc");
> +

Do I understand it well that this code preserves behaviour of the previous API?
Because otherwise it looks strange.

>       rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
>       TAILQ_INSERT_TAIL(mempool_list, te, next);
>       rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
> @@ -927,7 +910,7 @@ rte_mempool_count(const struct rte_mempool *mp)
>       unsigned count;
>       unsigned lcore_id;
>  
> -     count = rte_ring_count(mp->ring);
> +     count = rte_mempool_ext_get_count(mp);
>  
>       if (mp->cache_size == 0)
>               return count;
> @@ -1120,7 +1103,7 @@ rte_mempool_dump(FILE *f, struct rte_mempool *mp)
>  
>       fprintf(f, "mempool <%s>@%p\n", mp->name, mp);
>       fprintf(f, "  flags=%x\n", mp->flags);
> -     fprintf(f, "  ring=<%s>@%p\n", mp->ring->name, mp->ring);
> +     fprintf(f, "  pool=%p\n", mp->pool);
>       fprintf(f, "  phys_addr=0x%" PRIx64 "\n", mp->mz->phys_addr);
>       fprintf(f, "  nb_mem_chunks=%u\n", mp->nb_mem_chunks);
>       fprintf(f, "  size=%"PRIu32"\n", mp->size);
> @@ -1141,7 +1124,7 @@ rte_mempool_dump(FILE *f, struct rte_mempool *mp)
>       }
>  
>       cache_count = rte_mempool_dump_cache(f, mp);
> -     common_count = rte_ring_count(mp->ring);
> +     common_count = rte_mempool_ext_get_count(mp);
>       if ((cache_count + common_count) > mp->size)
>               common_count = mp->size - cache_count;
>       fprintf(f, "  common_pool_count=%u\n", common_count);
> diff --git a/lib/librte_mempool/rte_mempool.h 
> b/lib/librte_mempool/rte_mempool.h
> index 60339bd..ed2c110 100644
> --- a/lib/librte_mempool/rte_mempool.h
> +++ b/lib/librte_mempool/rte_mempool.h
> @@ -67,6 +67,7 @@
>  #include <inttypes.h>
>  #include <sys/queue.h>
>  
> +#include <rte_spinlock.h>
>  #include <rte_log.h>
>  #include <rte_debug.h>
>  #include <rte_lcore.h>
> @@ -203,7 +204,15 @@ struct rte_mempool_memhdr {
>   */
>  struct rte_mempool {
>       char name[RTE_MEMPOOL_NAMESIZE]; /**< Name of mempool. */
> -     struct rte_ring *ring;           /**< Ring to store objects. */
> +     void *pool;                      /**< Ring or ext-pool to store 
> objects. */
> +     /**
> +      * Index into the array of structs containing callback fn pointers.
> +      * We're using an index here rather than pointers to the callbacks
> +      * to facilitate any secondary processes that may want to use
> +      * this mempool. Any function pointers stored in the mempool
> +      * directly would not be valid for secondary processes.
> +      */

I think, this comment should go to the rte_mempool_handler_table definition
leaving a here a short note about it.

> +     int32_t handler_idx;
>       const struct rte_memzone *mz;    /**< Memzone where pool is allocated */
>       int flags;                       /**< Flags of the mempool. */
>       int socket_id;                   /**< Socket id passed at mempool 
> creation. */
> @@ -325,6 +334,175 @@ void rte_mempool_check_cookies(const struct rte_mempool 
> *mp,
>  #define __mempool_check_cookies(mp, obj_table_const, n, free) do {} while(0)
>  #endif /* RTE_LIBRTE_MEMPOOL_DEBUG */
>  
> +#define RTE_MEMPOOL_HANDLER_NAMESIZE 32 /**< Max length of handler name. */
> +
> +/** Allocate the external pool. */

What is the purpose of this callback?
What exactly does it allocate?
Some rte_mempool internals?
Or the memory?
What does it return?

> +typedef void *(*rte_mempool_alloc_t)(struct rte_mempool *mp);
> +
> +/** Free the external pool. */

Why this *_free callback does not accept the rte_mempool param?

> +typedef void (*rte_mempool_free_t)(void *p);
> +
> +/** Put an object in the external pool. */

What is the *p pointer?
What is the obj_table?
Why is it void *?
Why is it const?

> +typedef int (*rte_mempool_put_t)(void *p, void * const *obj_table, unsigned 
> n);

Probably, "unsigned int n" is better.

> +
> +/** Get an object from the external pool. */
> +typedef int (*rte_mempool_get_t)(void *p, void **obj_table, unsigned n);

Probably, "unsigned int n" is better.

> +
> +/** Return the number of available objects in the external pool. */

What is the purpose of the *_get_count callback? I guess it can introduce
race conditions...

> +typedef unsigned (*rte_mempool_get_count)(void *p);

unsigned int

> +
> +/** Structure defining a mempool handler. */

Later in the text, I suggested to rename rte_mempool_handler to rte_mempool_ops.
I believe that it explains the purpose of this struct better. It would improve
consistency in function names (the *_ext_* mark is very strange and 
inconsistent).

> +struct rte_mempool_handler {
> +     char name[RTE_MEMPOOL_HANDLER_NAMESIZE]; /**< Name of mempool handler */
> +     rte_mempool_alloc_t alloc;       /**< Allocate the external pool. */
> +     rte_mempool_free_t free;         /**< Free the external pool. */
> +     rte_mempool_put_t put;           /**< Put an object. */
> +     rte_mempool_get_t get;           /**< Get an object. */
> +     rte_mempool_get_count get_count; /**< Get the number of available objs. 
> */
> +} __rte_cache_aligned;
> +
> +#define RTE_MEMPOOL_MAX_HANDLER_IDX 16  /**< Max number of registered 
> handlers */
> +
> +/** Structure storing the table of registered handlers. */
> +struct rte_mempool_handler_table {
> +     rte_spinlock_t sl;     /**< Spinlock for add/delete. */
> +     uint32_t num_handlers; /**< Number of handlers in the table. */
> +     /** Storage for all possible handlers. */
> +     struct rte_mempool_handler handler[RTE_MEMPOOL_MAX_HANDLER_IDX];
> +};

The handlers are implemented as an array due to multi-process access.
Is it correct? I'd expect a note about it here.

> +
> +/** Array of registered handlers */
> +extern struct rte_mempool_handler_table rte_mempool_handler_table;
> +
> +/**
> + * @internal Get the mempool handler from its index.
> + *
> + * @param handler_idx
> + *   The index of the handler in the handler table. It must be a valid
> + *   index: (0 <= idx < num_handlers).
> + * @return
> + *   The pointer to the handler in the table.
> + */
> +static struct rte_mempool_handler *
> +rte_mempool_handler_get(int handler_idx)
> +{
> +     return &rte_mempool_handler_table.handler[handler_idx];

Is it always safe? Can we belive the handler_idx is inside the boundaries?
At least some RTE_VERIFY would be nice here...

> +}
> +
> +/**
> + * @internal wrapper for external mempool manager alloc callback.
> + *
> + * @param mp
> + *   Pointer to the memory pool.
> + * @return
> + *   The opaque pointer to the external pool.
> + */
> +void *
> +rte_mempool_ext_alloc(struct rte_mempool *mp);
> +
> +/**
> + * @internal wrapper for external mempool manager get callback.
> + *
> + * @param mp
> + *   Pointer to the memory pool.
> + * @param obj_table
> + *   Pointer to a table of void * pointers (objects).
> + * @param n
> + *   Number of objects to get.
> + * @return
> + *   - 0: Success; got n objects.
> + *   - <0: Error; code of handler get function.

Should this doc be more specific about the possible failures?

> + */
> +static inline int
> +rte_mempool_ext_get_bulk(struct rte_mempool *mp, void **obj_table, unsigned 
> n)
> +{
> +     struct rte_mempool_handler *handler;
> +
> +     handler = rte_mempool_handler_get(mp->handler_idx);
> +     return handler->get(mp->pool, obj_table, n);
> +}
> +
> +/**
> + * @internal wrapper for external mempool manager put callback.
> + *
> + * @param mp
> + *   Pointer to the memory pool.
> + * @param obj_table
> + *   Pointer to a table of void * pointers (objects).
> + * @param n
> + *   Number of objects to put.
> + * @return
> + *   - 0: Success; n objects supplied.
> + *   - <0: Error; code of handler put function.

Should this doc be more specific about the possible failures?

> + */
> +static inline int
> +rte_mempool_ext_put_bulk(struct rte_mempool *mp, void * const *obj_table,
> +             unsigned n)
> +{
> +     struct rte_mempool_handler *handler;
> +
> +     handler = rte_mempool_handler_get(mp->handler_idx);
> +     return handler->put(mp->pool, obj_table, n);
> +}
> +
> +/**
> + * @internal wrapper for external mempool manager get_count callback.
> + *
> + * @param mp
> + *   Pointer to the memory pool.
> + * @return
> + *   The number of available objects in the external pool.
> + */
> +unsigned

unsigned int

> +rte_mempool_ext_get_count(const struct rte_mempool *mp);
> +
> +/**
> + * @internal wrapper for external mempool manager free callback.
> + *
> + * @param mp
> + *   Pointer to the memory pool.
> + */
> +void
> +rte_mempool_ext_free(struct rte_mempool *mp);
> +
> +/**
> + * Set the handler of a mempool
> + *
> + * This can only be done on a mempool that is not populated, i.e. just after
> + * a call to rte_mempool_create_empty().
> + *
> + * @param mp
> + *   Pointer to the memory pool.
> + * @param name
> + *   Name of the handler.
> + * @return
> + *   - 0: Sucess; the new handler is configured.
> + *   - <0: Error (errno)

Should this doc be more specific about the possible failures?

The body of rte_mempool_set_handler does not set errno at all.
It returns e.g. -EEXIST.

> + */
> +int
> +rte_mempool_set_handler(struct rte_mempool *mp, const char *name);
> +
> +/**
> + * Register an external pool handler.
> + *
> + * @param h
> + *   Pointer to the external pool handler
> + * @return
> + *   - >=0: Sucess; return the index of the handler in the table.
> + *   - <0: Error (errno)

Should this doc be more specific about the possible failures?

> + */
> +int rte_mempool_handler_register(struct rte_mempool_handler *h);
> +
> +/**
> + * Macro to statically register an external pool handler.
> + */
> +#define MEMPOOL_REGISTER_HANDLER(h)                                  \
> +     void mp_hdlr_init_##h(void);                                    \
> +     void __attribute__((constructor, used)) mp_hdlr_init_##h(void)  \
> +     {                                                               \
> +             rte_mempool_handler_register(&h);                       \
> +     }
> +

There might be a little catch. If there is no more room for handlers, calling 
the
rte_mempool_handler_register would fail silently as the error reporting does not
work when calling a constructor (or at least, this is my experience).

Not a big deal but...

>  /**
>   * An object callback function for mempool.
>   *
> @@ -736,7 +914,7 @@ void rte_mempool_dump(FILE *f, struct rte_mempool *mp);
>   */
>  static inline void __attribute__((always_inline))
>  __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
> -                 unsigned n, int is_mp)
> +                 unsigned n, __rte_unused int is_mp)
>  {
>       struct rte_mempool_cache *cache;
>       uint32_t index;
> @@ -774,7 +952,7 @@ __mempool_put_bulk(struct rte_mempool *mp, void * const 
> *obj_table,
>       cache->len += n;
>  
>       if (cache->len >= flushthresh) {
> -             rte_ring_mp_enqueue_bulk(mp->ring, &cache->objs[cache_size],
> +             rte_mempool_ext_put_bulk(mp, &cache->objs[cache_size],
>                               cache->len - cache_size);
>               cache->len = cache_size;
>       }
> @@ -782,26 +960,10 @@ __mempool_put_bulk(struct rte_mempool *mp, void * const 
> *obj_table,
>       return;
>  
>  ring_enqueue:
> -
>       /* push remaining objects in ring */
> -#ifdef RTE_LIBRTE_MEMPOOL_DEBUG
> -     if (is_mp) {
> -             if (rte_ring_mp_enqueue_bulk(mp->ring, obj_table, n) < 0)
> -                     rte_panic("cannot put objects in mempool\n");
> -     }
> -     else {
> -             if (rte_ring_sp_enqueue_bulk(mp->ring, obj_table, n) < 0)
> -                     rte_panic("cannot put objects in mempool\n");
> -     }
> -#else
> -     if (is_mp)
> -             rte_ring_mp_enqueue_bulk(mp->ring, obj_table, n);
> -     else
> -             rte_ring_sp_enqueue_bulk(mp->ring, obj_table, n);
> -#endif
> +     rte_mempool_ext_put_bulk(mp, obj_table, n);

This is a big change. Does it remove the RTE_LIBRTE_MEMPOOL_DEBUG config option
entirely? If so, I suggest to first do this in a separated patch and then
replace the original *_enqueue_bulk by your *_ext_put_bulk (or better 
*_ops_put_bulk
as I explain below).

>  }
>  
> -
>  /**
>   * Put several objects back in the mempool (multi-producers safe).
>   *
> @@ -922,7 +1084,7 @@ rte_mempool_put(struct rte_mempool *mp, void *obj)
>   */
>  static inline int __attribute__((always_inline))
>  __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
> -                unsigned n, int is_mc)
> +                unsigned n, __rte_unused int is_mc)
>  {
>       int ret;
>       struct rte_mempool_cache *cache;
> @@ -945,7 +1107,8 @@ __mempool_get_bulk(struct rte_mempool *mp, void 
> **obj_table,
>               uint32_t req = n + (cache_size - cache->len);
>  
>               /* How many do we require i.e. number to fill the cache + the 
> request */
> -             ret = rte_ring_mc_dequeue_bulk(mp->ring, 
> &cache->objs[cache->len], req);
> +             ret = rte_mempool_ext_get_bulk(mp,
> +                     &cache->objs[cache->len], req);
>               if (unlikely(ret < 0)) {
>                       /*
>                        * In the offchance that we are buffer constrained,
> @@ -972,10 +1135,7 @@ __mempool_get_bulk(struct rte_mempool *mp, void 
> **obj_table,
>  ring_dequeue:
>  
>       /* get remaining objects from ring */
> -     if (is_mc)
> -             ret = rte_ring_mc_dequeue_bulk(mp->ring, obj_table, n);
> -     else
> -             ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n);
> +     ret = rte_mempool_ext_get_bulk(mp, obj_table, n);
>  
>       if (ret < 0)
>               __MEMPOOL_STAT_ADD(mp, get_fail, n);
> diff --git a/lib/librte_mempool/rte_mempool_default.c 
> b/lib/librte_mempool/rte_mempool_default.c
> new file mode 100644
> index 0000000..a6ac65a
> --- /dev/null
> +++ b/lib/librte_mempool/rte_mempool_default.c
> @@ -0,0 +1,147 @@
> +/*-
> + *   BSD LICENSE
> + *
> + *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
> + *   All rights reserved.
> + *
> + *   Redistribution and use in source and binary forms, with or without
> + *   modification, are permitted provided that the following conditions
> + *   are met:
> + *
> + *     * Redistributions of source code must retain the above copyright
> + *       notice, this list of conditions and the following disclaimer.
> + *     * Redistributions in binary form must reproduce the above copyright
> + *       notice, this list of conditions and the following disclaimer in
> + *       the documentation and/or other materials provided with the
> + *       distribution.
> + *     * Neither the name of Intel Corporation nor the names of its
> + *       contributors may be used to endorse or promote products derived
> + *       from this software without specific prior written permission.
> + *
> + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> + */
> +
> +#include <stdio.h>
> +#include <string.h>
> +
> +#include <rte_errno.h>
> +#include <rte_ring.h>
> +#include <rte_mempool.h>
> +
> +static int
> +common_ring_mp_put(void *p, void * const *obj_table, unsigned n)
> +{
> +     return rte_ring_mp_enqueue_bulk((struct rte_ring *)p, obj_table, n);
> +}
> +
> +static int
> +common_ring_sp_put(void *p, void * const *obj_table, unsigned n)
> +{
> +     return rte_ring_sp_enqueue_bulk((struct rte_ring *)p, obj_table, n);
> +}
> +
> +static int
> +common_ring_mc_get(void *p, void **obj_table, unsigned n)
> +{
> +     return rte_ring_mc_dequeue_bulk((struct rte_ring *)p, obj_table, n);
> +}
> +
> +static int
> +common_ring_sc_get(void *p, void **obj_table, unsigned n)
> +{
> +     return rte_ring_sc_dequeue_bulk((struct rte_ring *)p, obj_table, n);
> +}
> +
> +static unsigned
> +common_ring_get_count(void *p)
> +{
> +     return rte_ring_count((struct rte_ring *)p);
> +}
> +
> +
> +static void *
> +common_ring_alloc(struct rte_mempool *mp)
> +{
> +     int rg_flags = 0, ret;
> +     char rg_name[RTE_RING_NAMESIZE];
> +     struct rte_ring *r;
> +
> +     ret = snprintf(rg_name, sizeof(rg_name),
> +             RTE_MEMPOOL_MZ_FORMAT, mp->name);
> +     if (ret < 0 || ret >= (int)sizeof(rg_name)) {
> +             rte_errno = ENAMETOOLONG;
> +             return NULL;
> +     }
> +
> +     /* ring flags */
> +     if (mp->flags & MEMPOOL_F_SP_PUT)
> +             rg_flags |= RING_F_SP_ENQ;
> +     if (mp->flags & MEMPOOL_F_SC_GET)
> +             rg_flags |= RING_F_SC_DEQ;
> +
> +     /* Allocate the ring that will be used to store objects.
> +      * Ring functions will return appropriate errors if we are
> +      * running as a secondary process etc., so no checks made
> +      * in this function for that condition. */
> +     r = rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
> +             mp->socket_id, rg_flags);
> +
> +     return r;
> +}
> +
> +static void
> +common_ring_free(void *p)
> +{
> +     rte_ring_free((struct rte_ring *)p);
> +}
> +
> +static struct rte_mempool_handler handler_mp_mc = {
> +     .name = "ring_mp_mc",
> +     .alloc = common_ring_alloc,
> +     .free = common_ring_free,
> +     .put = common_ring_mp_put,
> +     .get = common_ring_mc_get,
> +     .get_count = common_ring_get_count,
> +};
> +
> +static struct rte_mempool_handler handler_sp_sc = {
> +     .name = "ring_sp_sc",
> +     .alloc = common_ring_alloc,
> +     .free = common_ring_free,
> +     .put = common_ring_sp_put,
> +     .get = common_ring_sc_get,
> +     .get_count = common_ring_get_count,
> +};
> +
> +static struct rte_mempool_handler handler_mp_sc = {
> +     .name = "ring_mp_sc",
> +     .alloc = common_ring_alloc,
> +     .free = common_ring_free,
> +     .put = common_ring_mp_put,
> +     .get = common_ring_sc_get,
> +     .get_count = common_ring_get_count,
> +};
> +
> +static struct rte_mempool_handler handler_sp_mc = {
> +     .name = "ring_sp_mc",
> +     .alloc = common_ring_alloc,
> +     .free = common_ring_free,
> +     .put = common_ring_sp_put,
> +     .get = common_ring_mc_get,
> +     .get_count = common_ring_get_count,
> +};
> +

Introducing those handlers can go as a separate patch. IMHO, that would simplify
the review process a lot. First introduce the mechanism, then add something
inside.

I'd also note that those handlers are always available and what kind of memory
do they use...

> +MEMPOOL_REGISTER_HANDLER(handler_mp_mc);
> +MEMPOOL_REGISTER_HANDLER(handler_sp_sc);
> +MEMPOOL_REGISTER_HANDLER(handler_mp_sc);
> +MEMPOOL_REGISTER_HANDLER(handler_sp_mc);
> diff --git a/lib/librte_mempool/rte_mempool_handler.c 
> b/lib/librte_mempool/rte_mempool_handler.c
> new file mode 100644
> index 0000000..78611f8
> --- /dev/null
> +++ b/lib/librte_mempool/rte_mempool_handler.c
> @@ -0,0 +1,139 @@
> +/*-
> + *   BSD LICENSE
> + *
> + *   Copyright(c) 2016 Intel Corporation. All rights reserved.
> + *   Copyright(c) 2016 6WIND S.A.
> + *   All rights reserved.
> + *
> + *   Redistribution and use in source and binary forms, with or without
> + *   modification, are permitted provided that the following conditions
> + *   are met:
> + *
> + *     * Redistributions of source code must retain the above copyright
> + *       notice, this list of conditions and the following disclaimer.
> + *     * Redistributions in binary form must reproduce the above copyright
> + *       notice, this list of conditions and the following disclaimer in
> + *       the documentation and/or other materials provided with the
> + *       distribution.
> + *     * Neither the name of Intel Corporation nor the names of its
> + *       contributors may be used to endorse or promote products derived
> + *       from this software without specific prior written permission.
> + *
> + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> + */
> +
> +#include <stdio.h>
> +#include <string.h>
> +
> +#include <rte_mempool.h>
> +
> +/* indirect jump table to support external memory pools */
> +struct rte_mempool_handler_table rte_mempool_handler_table = {
> +     .sl =  RTE_SPINLOCK_INITIALIZER ,
> +     .num_handlers = 0
> +};
> +
> +/* add a new handler in rte_mempool_handler_table, return its index */

It seems to me that there is no way how to put some opaque pointer into the
handler. In such case I would expect I can do something like:

struct my_handler {
        struct rte_mempool_handler h;
        ...
} handler;

rte_mempool_handler_register(&handler.h);

But I cannot because you copy the contents of the handler. By the way, this
should be documented.

How can I pass an opaque pointer here? The only way I see is through the
rte_mempool.pool. In that case, what about renaming the rte_mempool_handler
to rte_mempool_ops? Because semantically, it is not a handler, it just holds
the operations.

This would improve some namings:

rte_mempool_ext_alloc -> rte_mempool_ops_alloc
rte_mempool_ext_free -> rte_mempool_ops_free
rte_mempool_ext_get_count -> rte_mempool_ops_get_count
rte_mempool_handler_register -> rte_mempool_ops_register

seems to be more readable to me. The *_ext_* mark does not say anything 
valuable.
It just scares a bit :).

> +int
> +rte_mempool_handler_register(struct rte_mempool_handler *h)
> +{
> +     struct rte_mempool_handler *handler;
> +     int16_t handler_idx;
> +
> +     rte_spinlock_lock(&rte_mempool_handler_table.sl);
> +
> +     if (rte_mempool_handler_table.num_handlers >= 
> RTE_MEMPOOL_MAX_HANDLER_IDX) {
> +             rte_spinlock_unlock(&rte_mempool_handler_table.sl);
> +             RTE_LOG(ERR, MEMPOOL,
> +                     "Maximum number of mempool handlers exceeded\n");
> +             return -ENOSPC;
> +     }
> +
> +     if (h->put == NULL || h->get == NULL || h->get_count == NULL) {
> +             rte_spinlock_unlock(&rte_mempool_handler_table.sl);
> +             RTE_LOG(ERR, MEMPOOL,
> +                     "Missing callback while registering mempool handler\n");
> +             return -EINVAL;
> +     }
> +
> +     handler_idx = rte_mempool_handler_table.num_handlers++;
> +     handler = &rte_mempool_handler_table.handler[handler_idx];
> +     snprintf(handler->name, sizeof(handler->name), "%s", h->name);
> +     handler->alloc = h->alloc;
> +     handler->put = h->put;
> +     handler->get = h->get;
> +     handler->get_count = h->get_count;
> +
> +     rte_spinlock_unlock(&rte_mempool_handler_table.sl);
> +
> +     return handler_idx;
> +}
> +
> +/* wrapper to allocate an external pool handler */
> +void *
> +rte_mempool_ext_alloc(struct rte_mempool *mp)
> +{
> +     struct rte_mempool_handler *handler;
> +
> +     handler = rte_mempool_handler_get(mp->handler_idx);
> +     if (handler->alloc == NULL)
> +             return NULL;
> +     return handler->alloc(mp);
> +}
> +
> +/* wrapper to free an external pool handler */
> +void
> +rte_mempool_ext_free(struct rte_mempool *mp)
> +{
> +     struct rte_mempool_handler *handler;
> +
> +     handler = rte_mempool_handler_get(mp->handler_idx);
> +     if (handler->free == NULL)
> +             return;
> +     return handler->free(mp);
> +}
> +
> +/* wrapper to get available objects in an external pool handler */
> +unsigned
> +rte_mempool_ext_get_count(const struct rte_mempool *mp)
> +{
> +     struct rte_mempool_handler *handler;
> +
> +     handler = rte_mempool_handler_get(mp->handler_idx);
> +     return handler->get_count(mp->pool);
> +}
> +
> +/* set the handler of a mempool */

The doc comment should say "this sets a handler previously registered by
the rte_mempool_handler_register function ...". I was confused and didn't
understand how the handlers are inserted into the table.

> +int
> +rte_mempool_set_handler(struct rte_mempool *mp, const char *name)
> +{
> +     struct rte_mempool_handler *handler = NULL;
> +     unsigned i;
> +
> +     /* too late, the mempool is already populated */
> +     if (mp->flags & MEMPOOL_F_RING_CREATED)
> +             return -EEXIST;
> +
> +     for (i = 0; i < rte_mempool_handler_table.num_handlers; i++) {
> +             if (!strcmp(name, rte_mempool_handler_table.handler[i].name)) {
> +                     handler = &rte_mempool_handler_table.handler[i];
> +                     break;
> +             }
> +     }
> +
> +     if (handler == NULL)
> +             return -EINVAL;
> +
> +     mp->handler_idx = i;
> +     return 0;
> +}
> diff --git a/lib/librte_mempool/rte_mempool_version.map 
> b/lib/librte_mempool/rte_mempool_version.map
> index f63461b..a0e9aed 100644
> --- a/lib/librte_mempool/rte_mempool_version.map
> +++ b/lib/librte_mempool/rte_mempool_version.map
> @@ -19,6 +19,8 @@ DPDK_2.0 {
>  DPDK_16.7 {
>       global:
>  
> +     rte_mempool_handler_table;
> +
>       rte_mempool_check_cookies;
>       rte_mempool_obj_iter;
>       rte_mempool_mem_iter;
> @@ -29,6 +31,8 @@ DPDK_16.7 {
>       rte_mempool_populate_default;
>       rte_mempool_populate_anon;
>       rte_mempool_free;
> +     rte_mempool_set_handler;
> +     rte_mempool_handler_register;
>  
>       local: *;
>  } DPDK_2.0;

Regards
Jan

Reply via email to