> -----Original Message-----
> From: Honnappa Nagarahalli [mailto:honnappa.nagaraha...@arm.com]
> Sent: Thursday, January 17, 2019 11:05 PM
> To: Eads, Gage <gage.e...@intel.com>; dev@dpdk.org
> Cc: olivier.m...@6wind.com; arybche...@solarflare.com; Richardson, Bruce
> <bruce.richard...@intel.com>; Ananyev, Konstantin
> <konstantin.anan...@intel.com>; Gavin Hu (Arm Technology China)
> <gavin...@arm.com>; nd <n...@arm.com>; Honnappa Nagarahalli
> <honnappa.nagaraha...@arm.com>; nd <n...@arm.com>
> Subject: RE: [dpdk-dev] [PATCH v4 2/2] mempool/nb_stack: add non-blocking
> stack mempool
> 
> Hi Gage,
>      Thank you for your contribution on non-blocking data structures. I think 
> they
> are important to extend DPDK into additional use cases.
> 

Glad to hear it. Be sure to check out my non-blocking ring patchset as well, if 
you haven't already: http://mails.dpdk.org/archives/dev/2019-January/123774.html

> I am wondering if it makes sense to decouple the NB stack data structure from
> mempool driver (similar to rte_ring)? I see that stack based mempool
> implements the stack data structure in the driver. But, NB stack might not be
> such a trivial data structure. It might be useful for the applications or 
> other use
> cases as well.
> 

I agree -- and you're not the first to suggest this :).

I'm going to defer that work to a later patchset; creating a new lib/ directory 
requires tech board approval (IIRC), which would unnecessarily slow down this 
mempool handler from getting merged.

> I also suggest that we use C11 __atomic_xxx APIs for memory operations. The
> rte_atomic64_xxx APIs use __sync_xxx APIs which do not provide the capability
> to express memory orderings.
> 

Ok, I will add those (dependent on RTE_USE_C11_MEM_MODEL).

> Please find few comments inline.
> 
> >
> > This commit adds support for non-blocking (linked list based) stack
> > mempool handler. The stack uses a 128-bit compare-and-swap
> > instruction, and thus is limited to x86_64. The 128-bit CAS atomically
> > updates the stack top pointer and a modification counter, which
> > protects against the ABA problem.
> >
> > In mempool_perf_autotest the lock-based stack outperforms the non-
> > blocking handler*, however:
> > - For applications with preemptible pthreads, a lock-based stack's
> >   worst-case performance (i.e. one thread being preempted while
> >   holding the spinlock) is much worse than the non-blocking stack's.
> > - Using per-thread mempool caches will largely mitigate the performance
> >   difference.
> >
> > *Test setup: x86_64 build with default config, dual-socket Xeon
> > E5-2699 v4, running on isolcpus cores with a tickless scheduler. The
> > lock-based stack's rate_persec was 1x-3.5x the non-blocking stack's.
> >
> > Signed-off-by: Gage Eads <gage.e...@intel.com>
> > Acked-by: Andrew Rybchenko <arybche...@solarflare.com>
> > ---
> >  MAINTAINERS                                        |   4 +
> >  config/common_base                                 |   1 +
> >  doc/guides/prog_guide/env_abstraction_layer.rst    |   5 +
> >  drivers/mempool/Makefile                           |   3 +
> >  drivers/mempool/meson.build                        |   3 +-
> >  drivers/mempool/nb_stack/Makefile                  |  23 ++++
> >  drivers/mempool/nb_stack/meson.build               |   6 +
> >  drivers/mempool/nb_stack/nb_lifo.h                 | 147
> > +++++++++++++++++++++
> >  drivers/mempool/nb_stack/rte_mempool_nb_stack.c    | 125
> > ++++++++++++++++++
> >  .../nb_stack/rte_mempool_nb_stack_version.map      |   4 +
> >  mk/rte.app.mk                                      |   7 +-
> >  11 files changed, 325 insertions(+), 3 deletions(-)  create mode
> > 100644 drivers/mempool/nb_stack/Makefile  create mode 100644
> > drivers/mempool/nb_stack/meson.build
> >  create mode 100644 drivers/mempool/nb_stack/nb_lifo.h
> >  create mode 100644 drivers/mempool/nb_stack/rte_mempool_nb_stack.c
> >  create mode 100644
> > drivers/mempool/nb_stack/rte_mempool_nb_stack_version.map
> >
> > diff --git a/MAINTAINERS b/MAINTAINERS index 470f36b9c..5519d3323
> > 100644
> > --- a/MAINTAINERS
> > +++ b/MAINTAINERS
> > @@ -416,6 +416,10 @@ M: Artem V. Andreev <artem.andr...@oktetlabs.ru>
> >  M: Andrew Rybchenko <arybche...@solarflare.com>
> >  F: drivers/mempool/bucket/
> >
> > +Non-blocking stack memory pool
> > +M: Gage Eads <gage.e...@intel.com>
> > +F: drivers/mempool/nb_stack/
> > +
> >
> >  Bus Drivers
> >  -----------
> > diff --git a/config/common_base b/config/common_base index
> > 964a6956e..8a51f36b1 100644
> > --- a/config/common_base
> > +++ b/config/common_base
> > @@ -726,6 +726,7 @@ CONFIG_RTE_LIBRTE_MEMPOOL_DEBUG=n  #
> > CONFIG_RTE_DRIVER_MEMPOOL_BUCKET=y
> >  CONFIG_RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB=64
> > +CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK=y
> >  CONFIG_RTE_DRIVER_MEMPOOL_RING=y
> >  CONFIG_RTE_DRIVER_MEMPOOL_STACK=y
> >
> > diff --git a/doc/guides/prog_guide/env_abstraction_layer.rst
> > b/doc/guides/prog_guide/env_abstraction_layer.rst
> > index 929d76dba..9497b879c 100644
> > --- a/doc/guides/prog_guide/env_abstraction_layer.rst
> > +++ b/doc/guides/prog_guide/env_abstraction_layer.rst
> > @@ -541,6 +541,11 @@ Known Issues
> >
> >    5. It MUST not be used by multi-producer/consumer pthreads, whose
> > scheduling policies are SCHED_FIFO or SCHED_RR.
> >
> > +  Alternatively, x86_64 applications can use the non-blocking stack
> > mempool handler. When considering this handler, note that:
> > +
> > +  - it is limited to the x86_64 platform, because it uses an
> > + instruction (16-
> > byte compare-and-swap) that is not available on other platforms.
>                                                  
> ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Arm
> architecture supports similar instructions. I suggest to simplify this 
> statement to
> indicate that 'nb_stack feature is available for x86_64 platforms currently'
> 

Will do.

> > +  - it has worse average-case performance than the non-preemptive
> > rte_ring, but software caching (e.g. the mempool cache) can mitigate
> > this by reducing the number of handler operations.
> > +
> >  + rte_timer
> >
> >    Running  ``rte_timer_manage()`` on a non-EAL pthread is not allowed.
> > However, resetting/stopping the timer from a non-EAL pthread is allowed.
> > diff --git a/drivers/mempool/Makefile b/drivers/mempool/Makefile index
> > 28c2e8360..895cf8a34 100644
> > --- a/drivers/mempool/Makefile
> > +++ b/drivers/mempool/Makefile
> > @@ -10,6 +10,9 @@ endif
> >  ifeq ($(CONFIG_RTE_EAL_VFIO)$(CONFIG_RTE_LIBRTE_FSLMC_BUS),yy)
> >  DIRS-$(CONFIG_RTE_LIBRTE_DPAA2_MEMPOOL) += dpaa2  endif
> > +ifeq ($(CONFIG_RTE_ARCH_X86_64),y)
> > +DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK) += nb_stack endif
> >  DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_RING) += ring
> >  DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_STACK) += stack
> >  DIRS-$(CONFIG_RTE_LIBRTE_OCTEONTX_MEMPOOL) += octeontx diff --git
> > a/drivers/mempool/meson.build b/drivers/mempool/meson.build index
> > 4527d9806..220cfaf63 100644
> > --- a/drivers/mempool/meson.build
> > +++ b/drivers/mempool/meson.build
> > @@ -1,7 +1,8 @@
> >  # SPDX-License-Identifier: BSD-3-Clause  # Copyright(c) 2017 Intel
> > Corporation
> >
> > -drivers = ['bucket', 'dpaa', 'dpaa2', 'octeontx', 'ring', 'stack']
> > +drivers = ['bucket', 'dpaa', 'dpaa2', 'nb_stack', 'octeontx', 'ring',
> > +'stack']
> > +
> >  std_deps = ['mempool']
> >  config_flag_fmt = 'RTE_LIBRTE_@0@_MEMPOOL'
> >  driver_name_fmt = 'rte_mempool_@0@'
> > diff --git a/drivers/mempool/nb_stack/Makefile
> > b/drivers/mempool/nb_stack/Makefile
> > new file mode 100644
> > index 000000000..318b18283
> > --- /dev/null
> > +++ b/drivers/mempool/nb_stack/Makefile
> > @@ -0,0 +1,23 @@
> > +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2019 Intel
> > +Corporation
> > +
> > +include $(RTE_SDK)/mk/rte.vars.mk
> > +
> > +#
> > +# library name
> > +#
> > +LIB = librte_mempool_nb_stack.a
> > +
> > +CFLAGS += -O3
> > +CFLAGS += $(WERROR_FLAGS)
> > +
> > +# Headers
> > +LDLIBS += -lrte_eal -lrte_mempool
> > +
> > +EXPORT_MAP := rte_mempool_nb_stack_version.map
> > +
> > +LIBABIVER := 1
> > +
> > +SRCS-$(CONFIG_RTE_DRIVER_MEMPOOL_NB_STACK) +=
> > rte_mempool_nb_stack.c
> > +
> > +include $(RTE_SDK)/mk/rte.lib.mk
> > diff --git a/drivers/mempool/nb_stack/meson.build
> > b/drivers/mempool/nb_stack/meson.build
> > new file mode 100644
> > index 000000000..7dec72242
> > --- /dev/null
> > +++ b/drivers/mempool/nb_stack/meson.build
> > @@ -0,0 +1,6 @@
> > +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2019 Intel
> > +Corporation
> > +
> > +build = dpdk_conf.has('RTE_ARCH_X86_64')
> > +
> > +sources = files('rte_mempool_nb_stack.c')
> > diff --git a/drivers/mempool/nb_stack/nb_lifo.h
> > b/drivers/mempool/nb_stack/nb_lifo.h
> > new file mode 100644
> > index 000000000..ad4a3401f
> > --- /dev/null
> > +++ b/drivers/mempool/nb_stack/nb_lifo.h
> > @@ -0,0 +1,147 @@
> > +/* SPDX-License-Identifier: BSD-3-Clause
> > + * Copyright(c) 2019 Intel Corporation  */
> > +
> > +#ifndef _NB_LIFO_H_
> > +#define _NB_LIFO_H_
> > +
> > +struct nb_lifo_elem {
> > +   void *data;
> > +   struct nb_lifo_elem *next;
> > +};
> > +
> > +struct nb_lifo_head {
> > +   struct nb_lifo_elem *top; /**< Stack top */
> > +   uint64_t cnt; /**< Modification counter */ };
> Minor comment, mentioning ABA problem in the comments for 'cnt' will be
> helpful.
> 

Sure.

> > +
> > +struct nb_lifo {
> > +   volatile struct nb_lifo_head head __rte_aligned(16);
> > +   rte_atomic64_t len;
> > +} __rte_cache_aligned;
> > +
> > +static __rte_always_inline void
> > +nb_lifo_init(struct nb_lifo *lifo)
> > +{
> > +   memset(lifo, 0, sizeof(*lifo));
> > +   rte_atomic64_set(&lifo->len, 0);
> > +}
> > +
> > +static __rte_always_inline unsigned int nb_lifo_len(struct nb_lifo
> > +*lifo) {
> > +   /* nb_lifo_push() and nb_lifo_pop() do not update the list's
> > contents
> > +    * and lifo->len atomically, which can cause the list to appear
> > shorter
> > +    * than it actually is if this function is called while other threads
> > +    * are modifying the list.
> > +    *
> > +    * However, given the inherently approximate nature of the
> > get_count
> > +    * callback -- even if the list and its size were updated atomically,
> > +    * the size could change between when get_count executes and
> > when the
> > +    * value is returned to the caller -- this is acceptable.
> > +    *
> > +    * The lifo->len updates are placed such that the list may appear to
> > +    * have fewer elements than it does, but will never appear to have
> > more
> > +    * elements. If the mempool is near-empty to the point that this is a
> > +    * concern, the user should consider increasing the mempool size.
> > +    */
> > +   return (unsigned int)rte_atomic64_read(&lifo->len);
> > +}
> > +
> > +static __rte_always_inline void
> > +nb_lifo_push(struct nb_lifo *lifo,
> > +        struct nb_lifo_elem *first,
> > +        struct nb_lifo_elem *last,
> > +        unsigned int num)
> > +{
> > +   while (1) {
> > +           struct nb_lifo_head old_head, new_head;
> > +
> > +           old_head = lifo->head;
> > +
> > +           /* Swing the top pointer to the first element in the list and
> > +            * make the last element point to the old top.
> > +            */
> > +           new_head.top = first;
> > +           new_head.cnt = old_head.cnt + 1;
> > +
> > +           last->next = old_head.top;
> > +
> > +           if (rte_atomic128_cmpset((volatile uint64_t *)&lifo->head,
> > +                                    (uint64_t *)&old_head,
> > +                                    (uint64_t *)&new_head))
> > +                   break;
> > +   }
> Minor comment, this can be a do-while loop (for ex: similar to the one in
> __rte_ring_move_prod_head)
> 

Sure.

> > +
> > +   rte_atomic64_add(&lifo->len, num);
> > +}
> > +
> > +static __rte_always_inline void
> > +nb_lifo_push_single(struct nb_lifo *lifo, struct nb_lifo_elem *elem) {
> > +   nb_lifo_push(lifo, elem, elem, 1);
> > +}
> > +
> > +static __rte_always_inline struct nb_lifo_elem * nb_lifo_pop(struct
> > +nb_lifo *lifo,
> > +       unsigned int num,
> > +       void **obj_table,
> > +       struct nb_lifo_elem **last)
> > +{
> > +   struct nb_lifo_head old_head;
> > +
> > +   /* Reserve num elements, if available */
> > +   while (1) {
> > +           uint64_t len = rte_atomic64_read(&lifo->len);
> > +
> > +           /* Does the list contain enough elements? */
> > +           if (len < num)
> > +                   return NULL;
> > +
> > +           if (rte_atomic64_cmpset((volatile uint64_t *)&lifo->len,
> > +                                   len, len - num))
> > +                   break;
> > +   }
> > +
> > +   /* Pop num elements */
> > +   while (1) {
> > +           struct nb_lifo_head new_head;
> > +           struct nb_lifo_elem *tmp;
> > +           unsigned int i;
> > +
> > +           old_head = lifo->head;
> > +
> > +           tmp = old_head.top;
> > +
> > +           /* Traverse the list to find the new head. A next pointer will
> > +            * either point to another element or NULL; if a thread
> > +            * encounters a pointer that has already been popped, the
> > CAS
> > +            * will fail.
> > +            */
> > +           for (i = 0; i < num && tmp != NULL; i++) {
> > +                   if (obj_table)
> This 'if' check can be outside the for loop. May be use RTE_ASSERT in the
> beginning of the function?
> 

A NULL obj_table pointer isn't an error -- nb_stack_enqueue() calls this 
function with NULL because it doesn't need the popped elements added to a 
table. When the compiler inlines this function into nb_stack_enqueue(), it can 
use constant propagation to optimize away the if-statement.

I don't think that's possible for the other caller, nb_stack_dequeue, though, 
unless we add a NULL pointer check to the beginning of that function. Then it 
would be guaranteed that obj_table is non-NULL, and the compiler can optimize 
away the if-statement. I'll add that.

> > +                           obj_table[i] = tmp->data;
> > +                   if (last)
> > +                           *last = tmp;
> > +                   tmp = tmp->next;
> > +           }
> > +
> > +           /* If NULL was encountered, the list was modified while
> > +            * traversing it. Retry.
> > +            */
> > +           if (i != num)
> > +                   continue;
> > +
> > +           new_head.top = tmp;
> > +           new_head.cnt = old_head.cnt + 1;
> > +
> > +           if (rte_atomic128_cmpset((volatile uint64_t *)&lifo->head,
> > +                                    (uint64_t *)&old_head,
> > +                                    (uint64_t *)&new_head))
> > +                   break;
> > +   }
> > +
> > +   return old_head.top;
> > +}
> > +
> > +#endif /* _NB_LIFO_H_ */
> > diff --git a/drivers/mempool/nb_stack/rte_mempool_nb_stack.c
> > b/drivers/mempool/nb_stack/rte_mempool_nb_stack.c
> > new file mode 100644
> > index 000000000..1818a2cfa
> > --- /dev/null
> > +++ b/drivers/mempool/nb_stack/rte_mempool_nb_stack.c
> > @@ -0,0 +1,125 @@
> > +/* SPDX-License-Identifier: BSD-3-Clause
> > + * Copyright(c) 2019 Intel Corporation  */
> > +
> > +#include <stdio.h>
> > +#include <rte_mempool.h>
> > +#include <rte_malloc.h>
> > +
> > +#include "nb_lifo.h"
> > +
> > +struct rte_mempool_nb_stack {
> > +   uint64_t size;
> > +   struct nb_lifo used_lifo; /**< LIFO containing mempool pointers  */
> > +   struct nb_lifo free_lifo; /**< LIFO containing unused LIFO elements
> > */
> > +};
> > +
> > +static int
> > +nb_stack_alloc(struct rte_mempool *mp) {
> > +   struct rte_mempool_nb_stack *s;
> > +   struct nb_lifo_elem *elems;
> > +   unsigned int n = mp->size;
> > +   unsigned int size, i;
> > +
> > +   size = sizeof(*s) + n * sizeof(struct nb_lifo_elem);
> IMO, the allocation of the stack elements can be moved under nb_lifo_init API,
> it would make the nb stack code modular.
> 

(see below)

> > +
> > +   /* Allocate our local memory structure */
> > +   s = rte_zmalloc_socket("mempool-nb_stack",
> > +                          size,
> > +                          RTE_CACHE_LINE_SIZE,
> > +                          mp->socket_id);
> > +   if (s == NULL) {
> > +           RTE_LOG(ERR, MEMPOOL, "Cannot allocate nb_stack!\n");
> > +           return -ENOMEM;
> > +   }
> > +
> > +   s->size = n;
> > +
> > +   nb_lifo_init(&s->used_lifo);
> > +   nb_lifo_init(&s->free_lifo);
> > +
> > +   elems = (struct nb_lifo_elem *)&s[1];
> > +   for (i = 0; i < n; i++)
> > +           nb_lifo_push_single(&s->free_lifo, &elems[i]);
> This also can be added to nb_lifo_init API.
> 

Sure, good suggestions. I'll address this.

Appreciate the feedback!

Thanks,
Gage

Reply via email to