Hi Gage, Thank you for your contribution on non-blocking data structures. I think they are important to extend DPDK into additional use cases.
I am wondering if it makes sense to decouple the NB stack data structure from mempool driver (similar to rte_ring)? I see that stack based mempool implements the stack data structure in the driver. But, NB stack might not be such a trivial data structure. It might be useful for the applications or other use cases as well. I also suggest that we use C11 __atomic_xxx APIs for memory operations. The rte_atomic64_xxx APIs use __sync_xxx APIs which do not provide the capability to express memory orderings. Please find few comments inline. > > This commit adds support for non-blocking (linked list based) stack > mempool handler. The stack uses a 128-bit compare-and-swap instruction, > and thus is limited to x86_64. The 128-bit CAS atomically updates the stack > top pointer and a modification counter, which protects against the ABA > problem. > > In mempool_perf_autotest the lock-based stack outperforms the non- > blocking handler*, however: > - For applications with preemptible pthreads, a lock-based stack's > worst-case performance (i.e. one thread being preempted while > holding the spinlock) is much worse than the non-blocking stack's. > - Using per-thread mempool caches will largely mitigate the performance > difference. > > *Test setup: x86_64 build with default config, dual-socket Xeon E5-2699 v4, > running on isolcpus cores with a tickless scheduler. The lock-based stack's > rate_persec was 1x-3.5x the non-blocking stack's. > > Signed-off-by: Gage Eads <gage.e...@intel.com> > Acked-by: Andrew Rybchenko <arybche...@solarflare.com> > --- > MAINTAINERS | 4 + > config/common_base | 1 + > doc/guides/prog_guide/env_abstraction_layer.rst | 5 + > drivers/mempool/Makefile | 3 + > drivers/mempool/meson.build | 3 +- > drivers/mempool/nb_stack/Makefile | 23 ++++ > drivers/mempool/nb_stack/meson.build | 6 + > drivers/mempool/nb_stack/nb_lifo.h | 147 > +++++++++++++++++++++ > drivers/mempool/nb_stack/rte_mempool_nb_stack.c | 125 > ++++++++++++++++++ > .../nb_stack/rte_mempool_nb_stack_version.map | 4 + > mk/rte.app.mk | 7 +- > 11 files changed, 325 insertions(+), 3 deletions(-) create mode 100644 > drivers/mempool/nb_stack/Makefile create mode 100644 > drivers/mempool/nb_stack/meson.build > create mode 100644 drivers/mempool/nb_stack/nb_lifo.h > create mode 100644 drivers/mempool/nb_stack/rte_mempool_nb_stack.c > create mode 100644 > drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map > > diff --git a/MAINTAINERS b/MAINTAINERS > index 470f36b9c..5519d3323 100644 > --- a/MAINTAINERS > +++ b/MAINTAINERS > @@ -416,6 +416,10 @@ M: Artem V. Andreev <artem.andr...@oktetlabs.ru> > M: Andrew Rybchenko <arybche...@solarflare.com> > F: drivers/mempool/bucket/ > > +Non-blocking stack memory pool > +M: Gage Eads <gage.e...@intel.com> > +F: drivers/mempool/nb_stack/ > + > > Bus Drivers > ----------- > diff --git a/config/common_base b/config/common_base index > 964a6956e..8a51f36b1 100644 > --- a/config/common_base > +++ b/config/common_base > @@ -726,6 +726,7 @@ CONFIG_RTE_LIBRTE_MEMPOOL_DEBUG=n # > CONFIG_RTE_DRIVER_MEMPOOL_BUCKET=y > CONFIG_RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB=64 > +CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK=y > CONFIG_RTE_DRIVER_MEMPOOL_RING=y > CONFIG_RTE_DRIVER_MEMPOOL_STACK=y > > diff --git a/doc/guides/prog_guide/env_abstraction_layer.rst > b/doc/guides/prog_guide/env_abstraction_layer.rst > index 929d76dba..9497b879c 100644 > --- a/doc/guides/prog_guide/env_abstraction_layer.rst > +++ b/doc/guides/prog_guide/env_abstraction_layer.rst > @@ -541,6 +541,11 @@ Known Issues > > 5. It MUST not be used by multi-producer/consumer pthreads, whose > scheduling policies are SCHED_FIFO or SCHED_RR. > > + Alternatively, x86_64 applications can use the non-blocking stack > mempool handler. When considering this handler, note that: > + > + - it is limited to the x86_64 platform, because it uses an instruction (16- > byte compare-and-swap) that is not available on other platforms. ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Arm architecture supports similar instructions. I suggest to simplify this statement to indicate that 'nb_stack feature is available for x86_64 platforms currently' > + - it has worse average-case performance than the non-preemptive > rte_ring, but software caching (e.g. the mempool cache) can mitigate this by > reducing the number of handler operations. > + > + rte_timer > > Running ``rte_timer_manage()`` on a non-EAL pthread is not allowed. > However, resetting/stopping the timer from a non-EAL pthread is allowed. > diff --git a/drivers/mempool/Makefile b/drivers/mempool/Makefile index > 28c2e8360..895cf8a34 100644 > --- a/drivers/mempool/Makefile > +++ b/drivers/mempool/Makefile > @@ -10,6 +10,9 @@ endif > ifeq ($(CONFIG_RTE_EAL_VFIO)$(CONFIG_RTE_LIBRTE_FSLMC_BUS),yy) > DIRS-$(CONFIG_RTE_LIBRTE_DPAA2_MEMPOOL) += dpaa2 endif > +ifeq ($(CONFIG_RTE_ARCH_X86_64),y) > +DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK) += nb_stack endif > DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_RING) += ring > DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_STACK) += stack > DIRS-$(CONFIG_RTE_LIBRTE_OCTEONTX_MEMPOOL) += octeontx diff --git > a/drivers/mempool/meson.build b/drivers/mempool/meson.build index > 4527d9806..220cfaf63 100644 > --- a/drivers/mempool/meson.build > +++ b/drivers/mempool/meson.build > @@ -1,7 +1,8 @@ > # SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2017 Intel > Corporation > > -drivers = ['bucket', 'dpaa', 'dpaa2', 'octeontx', 'ring', 'stack'] > +drivers = ['bucket', 'dpaa', 'dpaa2', 'nb_stack', 'octeontx', 'ring', > +'stack'] > + > std_deps = ['mempool'] > config_flag_fmt = 'RTE_LIBRTE_@0@_MEMPOOL' > driver_name_fmt = 'rte_mempool_@0@' > diff --git a/drivers/mempool/nb_stack/Makefile > b/drivers/mempool/nb_stack/Makefile > new file mode 100644 > index 000000000..318b18283 > --- /dev/null > +++ b/drivers/mempool/nb_stack/Makefile > @@ -0,0 +1,23 @@ > +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2019 Intel > +Corporation > + > +include $(RTE_SDK)/mk/rte.vars.mk > + > +# > +# library name > +# > +LIB = librte_mempool_nb_stack.a > + > +CFLAGS += -O3 > +CFLAGS += $(WERROR_FLAGS) > + > +# Headers > +LDLIBS += -lrte_eal -lrte_mempool > + > +EXPORT_MAP := rte_mempool_nb_stack_version.map > + > +LIBABIVER := 1 > + > +SRCS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK) += > rte_mempool_nb_stack.c > + > +include $(RTE_SDK)/mk/rte.lib.mk > diff --git a/drivers/mempool/nb_stack/meson.build > b/drivers/mempool/nb_stack/meson.build > new file mode 100644 > index 000000000..7dec72242 > --- /dev/null > +++ b/drivers/mempool/nb_stack/meson.build > @@ -0,0 +1,6 @@ > +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2019 Intel > +Corporation > + > +build = dpdk_conf.has('RTE_ARCH_X86_64') > + > +sources = files('rte_mempool_nb_stack.c') > diff --git a/drivers/mempool/nb_stack/nb_lifo.h > b/drivers/mempool/nb_stack/nb_lifo.h > new file mode 100644 > index 000000000..ad4a3401f > --- /dev/null > +++ b/drivers/mempool/nb_stack/nb_lifo.h > @@ -0,0 +1,147 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * Copyright(c) 2019 Intel Corporation > + */ > + > +#ifndef _NB_LIFO_H_ > +#define _NB_LIFO_H_ > + > +struct nb_lifo_elem { > + void *data; > + struct nb_lifo_elem *next; > +}; > + > +struct nb_lifo_head { > + struct nb_lifo_elem *top; /**< Stack top */ > + uint64_t cnt; /**< Modification counter */ }; Minor comment, mentioning ABA problem in the comments for 'cnt' will be helpful. > + > +struct nb_lifo { > + volatile struct nb_lifo_head head __rte_aligned(16); > + rte_atomic64_t len; > +} __rte_cache_aligned; > + > +static __rte_always_inline void > +nb_lifo_init(struct nb_lifo *lifo) > +{ > + memset(lifo, 0, sizeof(*lifo)); > + rte_atomic64_set(&lifo->len, 0); > +} > + > +static __rte_always_inline unsigned int nb_lifo_len(struct nb_lifo > +*lifo) { > + /* nb_lifo_push() and nb_lifo_pop() do not update the list's > contents > + * and lifo->len atomically, which can cause the list to appear > shorter > + * than it actually is if this function is called while other threads > + * are modifying the list. > + * > + * However, given the inherently approximate nature of the > get_count > + * callback -- even if the list and its size were updated atomically, > + * the size could change between when get_count executes and > when the > + * value is returned to the caller -- this is acceptable. > + * > + * The lifo->len updates are placed such that the list may appear to > + * have fewer elements than it does, but will never appear to have > more > + * elements. If the mempool is near-empty to the point that this is a > + * concern, the user should consider increasing the mempool size. > + */ > + return (unsigned int)rte_atomic64_read(&lifo->len); > +} > + > +static __rte_always_inline void > +nb_lifo_push(struct nb_lifo *lifo, > + struct nb_lifo_elem *first, > + struct nb_lifo_elem *last, > + unsigned int num) > +{ > + while (1) { > + struct nb_lifo_head old_head, new_head; > + > + old_head = lifo->head; > + > + /* Swing the top pointer to the first element in the list and > + * make the last element point to the old top. > + */ > + new_head.top = first; > + new_head.cnt = old_head.cnt + 1; > + > + last->next = old_head.top; > + > + if (rte_atomic128_cmpset((volatile uint64_t *)&lifo->head, > + (uint64_t *)&old_head, > + (uint64_t *)&new_head)) > + break; > + } Minor comment, this can be a do-while loop (for ex: similar to the one in __rte_ring_move_prod_head) > + > + rte_atomic64_add(&lifo->len, num); > +} > + > +static __rte_always_inline void > +nb_lifo_push_single(struct nb_lifo *lifo, struct nb_lifo_elem *elem) { > + nb_lifo_push(lifo, elem, elem, 1); > +} > + > +static __rte_always_inline struct nb_lifo_elem * nb_lifo_pop(struct > +nb_lifo *lifo, > + unsigned int num, > + void **obj_table, > + struct nb_lifo_elem **last) > +{ > + struct nb_lifo_head old_head; > + > + /* Reserve num elements, if available */ > + while (1) { > + uint64_t len = rte_atomic64_read(&lifo->len); > + > + /* Does the list contain enough elements? */ > + if (len < num) > + return NULL; > + > + if (rte_atomic64_cmpset((volatile uint64_t *)&lifo->len, > + len, len - num)) > + break; > + } > + > + /* Pop num elements */ > + while (1) { > + struct nb_lifo_head new_head; > + struct nb_lifo_elem *tmp; > + unsigned int i; > + > + old_head = lifo->head; > + > + tmp = old_head.top; > + > + /* Traverse the list to find the new head. A next pointer will > + * either point to another element or NULL; if a thread > + * encounters a pointer that has already been popped, the > CAS > + * will fail. > + */ > + for (i = 0; i < num && tmp != NULL; i++) { > + if (obj_table) This 'if' check can be outside the for loop. May be use RTE_ASSERT in the beginning of the function? > + obj_table[i] = tmp->data; > + if (last) > + *last = tmp; > + tmp = tmp->next; > + } > + > + /* If NULL was encountered, the list was modified while > + * traversing it. Retry. > + */ > + if (i != num) > + continue; > + > + new_head.top = tmp; > + new_head.cnt = old_head.cnt + 1; > + > + if (rte_atomic128_cmpset((volatile uint64_t *)&lifo->head, > + (uint64_t *)&old_head, > + (uint64_t *)&new_head)) > + break; > + } > + > + return old_head.top; > +} > + > +#endif /* _NB_LIFO_H_ */ > diff --git a/drivers/mempool/nb_stack/rte_mempool_nb_stack.c > b/drivers/mempool/nb_stack/rte_mempool_nb_stack.c > new file mode 100644 > index 000000000..1818a2cfa > --- /dev/null > +++ b/drivers/mempool/nb_stack/rte_mempool_nb_stack.c > @@ -0,0 +1,125 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * Copyright(c) 2019 Intel Corporation > + */ > + > +#include <stdio.h> > +#include <rte_mempool.h> > +#include <rte_malloc.h> > + > +#include "nb_lifo.h" > + > +struct rte_mempool_nb_stack { > + uint64_t size; > + struct nb_lifo used_lifo; /**< LIFO containing mempool pointers */ > + struct nb_lifo free_lifo; /**< LIFO containing unused LIFO elements > */ > +}; > + > +static int > +nb_stack_alloc(struct rte_mempool *mp) > +{ > + struct rte_mempool_nb_stack *s; > + struct nb_lifo_elem *elems; > + unsigned int n = mp->size; > + unsigned int size, i; > + > + size = sizeof(*s) + n * sizeof(struct nb_lifo_elem); IMO, the allocation of the stack elements can be moved under nb_lifo_init API, it would make the nb stack code modular. > + > + /* Allocate our local memory structure */ > + s = rte_zmalloc_socket("mempool-nb_stack", > + size, > + RTE_CACHE_LINE_SIZE, > + mp->socket_id); > + if (s == NULL) { > + RTE_LOG(ERR, MEMPOOL, "Cannot allocate nb_stack!\n"); > + return -ENOMEM; > + } > + > + s->size = n; > + > + nb_lifo_init(&s->used_lifo); > + nb_lifo_init(&s->free_lifo); > + > + elems = (struct nb_lifo_elem *)&s[1]; > + for (i = 0; i < n; i++) > + nb_lifo_push_single(&s->free_lifo, &elems[i]); This also can be added to nb_lifo_init API. > + > + mp->pool_data = s; > + > + return 0; > +} > + > +static int > +nb_stack_enqueue(struct rte_mempool *mp, void * const *obj_table, > + unsigned int n) > +{ > + struct rte_mempool_nb_stack *s = mp->pool_data; > + struct nb_lifo_elem *first, *last, *tmp; > + unsigned int i; > + > + if (unlikely(n == 0)) > + return 0; > + > + /* Pop n free elements */ > + first = nb_lifo_pop(&s->free_lifo, n, NULL, NULL); > + if (unlikely(first == NULL)) > + return -ENOBUFS; > + > + /* Prepare the list elements */ > + tmp = first; > + for (i = 0; i < n; i++) { > + tmp->data = obj_table[i]; > + last = tmp; > + tmp = tmp->next; > + } > + > + /* Enqueue them to the used list */ > + nb_lifo_push(&s->used_lifo, first, last, n); > + > + return 0; > +} > + > +static int > +nb_stack_dequeue(struct rte_mempool *mp, void **obj_table, > + unsigned int n) > +{ > + struct rte_mempool_nb_stack *s = mp->pool_data; > + struct nb_lifo_elem *first, *last; > + > + if (unlikely(n == 0)) > + return 0; > + > + /* Pop n used elements */ > + first = nb_lifo_pop(&s->used_lifo, n, obj_table, &last); > + if (unlikely(first == NULL)) > + return -ENOENT; > + > + /* Enqueue the list elements to the free list */ > + nb_lifo_push(&s->free_lifo, first, last, n); > + > + return 0; > +} > + > +static unsigned > +nb_stack_get_count(const struct rte_mempool *mp) { > + struct rte_mempool_nb_stack *s = mp->pool_data; > + > + return nb_lifo_len(&s->used_lifo); > +} > + > +static void > +nb_stack_free(struct rte_mempool *mp) > +{ > + rte_free(mp->pool_data); > +} > + > +static struct rte_mempool_ops ops_nb_stack = { > + .name = "nb_stack", > + .alloc = nb_stack_alloc, > + .free = nb_stack_free, > + .enqueue = nb_stack_enqueue, > + .dequeue = nb_stack_dequeue, > + .get_count = nb_stack_get_count > +}; > + > +MEMPOOL_REGISTER_OPS(ops_nb_stack); > diff --git a/drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map > b/drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map > new file mode 100644 > index 000000000..fc8c95e91 > --- /dev/null > +++ b/drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map > @@ -0,0 +1,4 @@ > +DPDK_19.05 { > + > + local: *; > +}; > diff --git a/mk/rte.app.mk b/mk/rte.app.mk index 02e8b6f05..d4b4aaaf6 > 100644 > --- a/mk/rte.app.mk > +++ b/mk/rte.app.mk > @@ -131,8 +131,11 @@ endif > ifeq ($(CONFIG_RTE_BUILD_SHARED_LIB),n) > # plugins (link only if static libraries) > > -_LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_BUCKET) += - > lrte_mempool_bucket > -_LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_STACK) += - > lrte_mempool_stack > +_LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_BUCKET) += - > lrte_mempool_bucket > +ifeq ($(CONFIG_RTE_ARCH_X86_64),y) > +_LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK) += - > lrte_mempool_nb_stack > +endif > +_LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_STACK) += - > lrte_mempool_stack > ifeq ($(CONFIG_RTE_LIBRTE_DPAA_BUS),y) > _LDLIBS-$(CONFIG_RTE_LIBRTE_DPAA_MEMPOOL) += -lrte_mempool_dpaa > endif > -- > 2.13.6