From: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>

Add a multi-thread unsafe double ended queue data structure. This
library provides a simple and efficient alternative to multi-thread
safe ring when multi-thread safety is not required.

Signed-off-by: Aditya Ambadipudi <aditya.ambadip...@arm.com>
Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>
---
v3:
  * Removed stdio.h from two files where it is not needed.

v2:
  * Addressed the spell check warning issue with the word "Deque"
  * Tried to rename all objects that are named deque to avoid collision with
    std::deque
  * Added the deque library to msvc section in meson.build
  * Renamed api functions to explicitly state if the function inserts at head 
org
    tail.
 .mailmap                   |   1 +
 devtools/build-dict.sh     |   1 +
 lib/deque/meson.build      |  11 +
 lib/deque/rte_deque.c      | 193 +++++++++++++
 lib/deque/rte_deque.h      | 533 ++++++++++++++++++++++++++++++++++++
 lib/deque/rte_deque_core.h |  81 ++++++
 lib/deque/rte_deque_pvt.h  | 538 +++++++++++++++++++++++++++++++++++++
 lib/deque/rte_deque_zc.h   | 430 +++++++++++++++++++++++++++++
 lib/deque/version.map      |  14 +
 lib/meson.build            |   2 +
 10 files changed, 1804 insertions(+)
 create mode 100644 lib/deque/meson.build
 create mode 100644 lib/deque/rte_deque.c
 create mode 100644 lib/deque/rte_deque.h
 create mode 100644 lib/deque/rte_deque_core.h
 create mode 100644 lib/deque/rte_deque_pvt.h
 create mode 100644 lib/deque/rte_deque_zc.h
 create mode 100644 lib/deque/version.map

diff --git a/.mailmap b/.mailmap
index 3843868716..8e705ab6ab 100644
--- a/.mailmap
+++ b/.mailmap
@@ -17,6 +17,7 @@ Adam Bynes <adamby...@outlook.com>
 Adam Dybkowski <adamx.dybkow...@intel.com>
 Adam Ludkiewicz <adam.ludkiew...@intel.com>
 Adham Masarwah <ad...@nvidia.com> <ad...@mellanox.com>
+Aditya Ambadipudi <aditya.ambadip...@arm.com>
 Adrian Moreno <amore...@redhat.com>
 Adrian Podlawski <adrian.podlaw...@intel.com>
 Adrien Mazarguil <adrien.mazarg...@6wind.com>
diff --git a/devtools/build-dict.sh b/devtools/build-dict.sh
index a8cac49029..595d8f9277 100755
--- a/devtools/build-dict.sh
+++ b/devtools/build-dict.sh
@@ -17,6 +17,7 @@ sed '/^..->/d' |
 sed '/^uint->/d' |
 sed "/^doesn'->/d" |
 sed '/^wasn->/d' |
+sed '/^deque.*->/d' |
 
 # print to stdout
 cat
diff --git a/lib/deque/meson.build b/lib/deque/meson.build
new file mode 100644
index 0000000000..1ff45fc39f
--- /dev/null
+++ b/lib/deque/meson.build
@@ -0,0 +1,11 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2024 Arm Limited
+
+sources = files('rte_deque.c')
+headers = files('rte_deque.h')
+# most sub-headers are not for direct inclusion
+indirect_headers += files (
+        'rte_deque_core.h',
+        'rte_deque_pvt.h',
+        'rte_deque_zc.h'
+)
diff --git a/lib/deque/rte_deque.c b/lib/deque/rte_deque.c
new file mode 100644
index 0000000000..b83a6c43c4
--- /dev/null
+++ b/lib/deque/rte_deque.c
@@ -0,0 +1,193 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Arm Limited
+ */
+
+#include <stdalign.h>
+#include <string.h>
+#include <stdint.h>
+#include <inttypes.h>
+#include <errno.h>
+#include <sys/queue.h>
+
+#include <rte_common.h>
+#include <rte_log.h>
+#include <rte_memzone.h>
+#include <rte_malloc.h>
+#include <rte_eal_memconfig.h>
+#include <rte_errno.h>
+#include <rte_string_fns.h>
+
+#include "rte_deque.h"
+
+/* mask of all valid flag values to deque_create() */
+#define __RTE_DEQUE_F_MASK (RTE_DEQUE_F_EXACT_SZ)
+ssize_t
+rte_deque_get_memsize_elem(unsigned int esize, unsigned int count)
+{
+       ssize_t sz;
+
+       /* Check if element size is a multiple of 4B */
+       if (esize % 4 != 0) {
+               rte_log(RTE_LOG_ERR, rte_deque_log_type,
+                       "%s(): element size is not a multiple of 4\n",
+                       __func__);
+
+               return -EINVAL;
+       }
+
+       /* count must be a power of 2 */
+       if ((!RTE_IS_POWER_OF_2(count)) || (count > RTE_DEQUE_SZ_MASK)) {
+               rte_log(RTE_LOG_ERR, rte_deque_log_type,
+                       "%s(): Requested number of elements is invalid,"
+                       "must be power of 2, and not exceed %u\n",
+                       __func__, RTE_DEQUE_SZ_MASK);
+
+               return -EINVAL;
+       }
+
+       sz = sizeof(struct rte_deque) + (ssize_t)count * esize;
+       sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+       return sz;
+}
+
+void
+rte_deque_reset(struct rte_deque *d)
+{
+       d->head = 0;
+       d->tail = 0;
+}
+
+int
+rte_deque_init(struct rte_deque *d, const char *name, unsigned int count,
+       unsigned int flags)
+{
+       int ret;
+
+       /* compilation-time checks */
+       RTE_BUILD_BUG_ON((sizeof(struct rte_deque) &
+                         RTE_CACHE_LINE_MASK) != 0);
+
+       /* future proof flags, only allow supported values */
+       if (flags & ~__RTE_DEQUE_F_MASK) {
+               rte_log(RTE_LOG_ERR, rte_deque_log_type,
+                       "%s(): Unsupported flags requested %#x\n",
+                       __func__, flags);
+               return -EINVAL;
+       }
+
+       /* init the deque structure */
+       memset(d, 0, sizeof(*d));
+       ret = strlcpy(d->name, name, sizeof(d->name));
+       if (ret < 0 || ret >= (int)sizeof(d->name))
+               return -ENAMETOOLONG;
+       d->flags = flags;
+
+       if (flags & RTE_DEQUE_F_EXACT_SZ) {
+               d->size = rte_align32pow2(count + 1);
+               d->mask = d->size - 1;
+               d->capacity = count;
+       } else {
+               if ((!RTE_IS_POWER_OF_2(count)) || (count > RTE_DEQUE_SZ_MASK)) 
{
+                       rte_log(RTE_LOG_ERR, rte_deque_log_type,
+                               "%s(): Requested size is invalid, must be power"
+                               " of 2, and not exceed the size limit %u\n",
+                               __func__, RTE_DEQUE_SZ_MASK);
+                       return -EINVAL;
+               }
+               d->size = count;
+               d->mask = count - 1;
+               d->capacity = d->mask;
+       }
+
+       return 0;
+}
+
+/* create the deque for a given element size */
+struct rte_deque *
+rte_deque_create(const char *name, unsigned int esize, unsigned int count,
+               int socket_id, unsigned int flags)
+{
+       char mz_name[RTE_MEMZONE_NAMESIZE];
+       struct rte_deque *d;
+       const struct rte_memzone *mz;
+       ssize_t deque_size;
+       int mz_flags = 0;
+       const unsigned int requested_count = count;
+       int ret;
+
+       /* for an exact size deque, round up from count to a power of two */
+       if (flags & RTE_DEQUE_F_EXACT_SZ)
+               count = rte_align32pow2(count + 1);
+
+       deque_size = rte_deque_get_memsize_elem(esize, count);
+       if (deque_size < 0) {
+               rte_errno = -deque_size;
+               return NULL;
+       }
+
+       ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
+               RTE_DEQUE_MZ_PREFIX, name);
+       if (ret < 0 || ret >= (int)sizeof(mz_name)) {
+               rte_errno = ENAMETOOLONG;
+               return NULL;
+       }
+
+       /* reserve a memory zone for this deque. If we can't get rte_config or
+        * we are secondary process, the memzone_reserve function will set
+        * rte_errno for us appropriately - hence no check in this function
+        */
+       mz = rte_memzone_reserve_aligned(mz_name, deque_size, socket_id,
+                                        mz_flags, alignof(struct rte_deque));
+       if (mz != NULL) {
+               d = mz->addr;
+               /* no need to check return value here, we already checked the
+                * arguments above
+                */
+               rte_deque_init(d, name, requested_count, flags);
+               d->memzone = mz;
+       } else {
+               d = NULL;
+               rte_log(RTE_LOG_ERR, rte_deque_log_type,
+                       "%s(): Cannot reserve memory\n", __func__);
+       }
+       return d;
+}
+
+/* free the deque */
+void
+rte_deque_free(struct rte_deque *d)
+{
+       if (d == NULL)
+               return;
+
+       /*
+        * Deque was not created with rte_deque_create,
+        * therefore, there is no memzone to free.
+        */
+       if (d->memzone == NULL) {
+               rte_log(RTE_LOG_ERR, rte_deque_log_type,
+                       "%s(): Cannot free deque, not created "
+                       "with rte_deque_create()\n", __func__);
+               return;
+       }
+
+       if (rte_memzone_free(d->memzone) != 0)
+               rte_log(RTE_LOG_ERR, rte_deque_log_type,
+                       "%s(): Cannot free memory\n", __func__);
+}
+
+/* dump the status of the deque on the console */
+void
+rte_deque_dump(FILE *f, const struct rte_deque *d)
+{
+       fprintf(f, "deque <%s>@%p\n", d->name, d);
+       fprintf(f, "  flags=%x\n", d->flags);
+       fprintf(f, "  size=%"PRIu32"\n", d->size);
+       fprintf(f, "  capacity=%"PRIu32"\n", d->capacity);
+       fprintf(f, "  head=%"PRIu32"\n", d->head);
+       fprintf(f, "  tail=%"PRIu32"\n", d->tail);
+       fprintf(f, "  used=%u\n", rte_deque_count(d));
+       fprintf(f, "  avail=%u\n", rte_deque_free_count(d));
+}
+
+RTE_LOG_REGISTER_DEFAULT(rte_deque_log_type, ERR);
diff --git a/lib/deque/rte_deque.h b/lib/deque/rte_deque.h
new file mode 100644
index 0000000000..6633eab377
--- /dev/null
+++ b/lib/deque/rte_deque.h
@@ -0,0 +1,533 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Arm Limited
+ */
+
+#ifndef _RTE_DEQUE_H_
+#define _RTE_DEQUE_H_
+
+/**
+ * @file
+ * RTE double ended queue (Deque)
+ *
+ * This fixed-size queue does not provide concurrent access by
+ * multiple threads. If required, the application should use locks
+ * to protect the deque from concurrent access.
+ *
+ * - Double ended queue
+ * - Maximum size is fixed
+ * - Store objects of any size
+ * - Single/bulk/burst dequeue at tail or head
+ * - Single/bulk/burst enqueue at head or tail
+ *
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <rte_deque_core.h>
+#include <rte_deque_pvt.h>
+#include <rte_deque_zc.h>
+
+/**
+ * Calculate the memory size needed for a deque
+ *
+ * This function returns the number of bytes needed for a deque, given
+ * the number of objects and the object size. This value is the sum of
+ * the size of the structure rte_deque and the size of the memory needed
+ * by the objects. The value is aligned to a cache line size.
+ *
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ * @param count
+ *   The number of objects in the deque (must be a power of 2).
+ * @return
+ *   - The memory size needed for the deque on success.
+ *   - -EINVAL if count is not a power of 2.
+ */
+__rte_experimental
+ssize_t rte_deque_get_memsize_elem(unsigned int esize, unsigned int count);
+
+/**
+ * Initialize a deque structure.
+ *
+ * Initialize a deque structure in memory pointed by "d". The size of the
+ * memory area must be large enough to store the deque structure and the
+ * object table. It is advised to use rte_deque_get_memsize() to get the
+ * appropriate size.
+ *
+ * The deque size is set to *count*, which must be a power of two.
+ * The real usable deque size is *count-1* instead of *count* to
+ * differentiate a full deque from an empty deque.
+ *
+ * @param d
+ *   The pointer to the deque structure followed by the objects table.
+ * @param name
+ *   The name of the deque.
+ * @param count
+ *   The number of objects in the deque (must be a power of 2,
+ *   unless RTE_DEQUE_F_EXACT_SZ is set in flags).
+ * @param flags
+ *   - RTE_DEQUE_F_EXACT_SZ: If this flag is set, the deque will hold
+ *     exactly the requested number of objects, and the requested size
+ *     will be rounded up to the next power of two, but the usable space
+ *     will be exactly that requested. Worst case, if a power-of-2 size is
+ *     requested, half the deque space will be wasted.
+ *     Without this flag set, the deque size requested must be a power of 2,
+ *     and the usable space will be that size - 1.
+ * @return
+ *   0 on success, or a negative value on error.
+ */
+__rte_experimental
+int rte_deque_init(struct rte_deque *d, const char *name, unsigned int count,
+               unsigned int flags);
+
+/**
+ * Create a new deque named *name* in memory.
+ *
+ * This function uses ``memzone_reserve()`` to allocate memory. Then it
+ * calls rte_deque_init() to initialize an empty deque.
+ *
+ * The new deque size is set to *count*, which must be a power of two.
+ * The real usable deque size is *count-1* instead of *count* to
+ * differentiate a full deque from an empty deque.
+ *
+ * @param name
+ *   The name of the deque.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ * @param count
+ *   The size of the deque (must be a power of 2,
+ *   unless RTE_DEQUE_F_EXACT_SZ is set in flags).
+ * @param socket_id
+ *   The *socket_id* argument is the socket identifier in case of
+ *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
+ *   constraint for the reserved zone.
+ * @param flags
+ *   - RTE_DEQUE_F_EXACT_SZ: If this flag is set, the deque will hold exactly 
the
+ *     requested number of entries, and the requested size will be rounded up
+ *     to the next power of two, but the usable space will be exactly that
+ *     requested. Worst case, if a power-of-2 size is requested, half the
+ *     deque space will be wasted.
+ *     Without this flag set, the deque size requested must be a power of 2,
+ *     and the usable space will be that size - 1.
+ * @return
+ *   On success, the pointer to the new allocated deque. NULL on error with
+ *    rte_errno set appropriately. Possible errno values include:
+ *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config 
structure
+ *    - EINVAL - count provided is not a power of 2
+ *    - ENOSPC - the maximum number of memzones has already been allocated
+ *    - EEXIST - a memzone with the same name already exists
+ *    - ENOMEM - no appropriate memory area found in which to create memzone
+ */
+__rte_experimental
+struct rte_deque *rte_deque_create(const char *name, unsigned int esize,
+                               unsigned int count, int socket_id,
+                               unsigned int flags);
+
+/**
+ * De-allocate all memory used by the deque.
+ *
+ * @param d
+ *   Deque to free.
+ *   If NULL then, the function does nothing.
+ */
+__rte_experimental
+void rte_deque_free(struct rte_deque *d);
+
+/**
+ * Dump the status of the deque to a file.
+ *
+ * @param f
+ *   A pointer to a file for output
+ * @param d
+ *   A pointer to the deque structure.
+ */
+__rte_experimental
+void rte_deque_dump(FILE *f, const struct rte_deque *d);
+
+/**
+ * Return the number of entries in a deque.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @return
+ *   The number of entries in the deque.
+ */
+static inline unsigned int
+rte_deque_count(const struct rte_deque *d)
+{
+       return (d->head - d->tail) & d->mask;
+}
+
+/**
+ * Return the number of free entries in a deque.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @return
+ *   The number of free entries in the deque.
+ */
+static inline unsigned int
+rte_deque_free_count(const struct rte_deque *d)
+{
+       return d->capacity - rte_deque_count(d);
+}
+
+/**
+ * Enqueue fixed number of objects on a deque at the head.
+ *
+ * This function copies the objects at the head of the deque and
+ * moves the head index.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the deque from the obj_table.
+ * @param free_space
+ *   Returns the amount of space in the deque after the enqueue operation
+ *   has finished.
+ * @return
+ *   The number of objects enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_enqueue_bulk_elem(struct rte_deque *d,
+                       const void *obj_table,
+                       unsigned int esize,
+                       unsigned int n,
+                       unsigned int *free_space)
+{
+       *free_space = rte_deque_free_count(d);
+       if (unlikely(n > *free_space))
+               return 0;
+       *free_space -= n;
+       return __rte_deque_enqueue_at_head(d, obj_table, esize, n);
+}
+
+/**
+ * Enqueue up to a maximum number of objects on a deque at the head.
+ *
+ * This function copies the objects at the head of the deque and
+ * moves the head index.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the deque from the obj_table.
+ * @param free_space
+ *   Returns the amount of space in the deque after the enqueue operation
+ *   has finished.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_enqueue_burst_elem(struct rte_deque *d, const void *obj_table,
+                       unsigned int esize, unsigned int n,
+                       unsigned int *free_space)
+{
+       unsigned int avail_space = rte_deque_free_count(d);
+       unsigned int to_be_enqueued = (n <= avail_space ? n : avail_space);
+       *free_space = avail_space - n;
+       return __rte_deque_enqueue_at_head(d, obj_table, esize, to_be_enqueued);
+}
+
+/**
+ * Enqueue fixed number of objects on a deque at the tail.
+ *
+ * This function copies the objects at the tail of the deque and
+ * moves the tail index (backwards).
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the deque from the obj_table.
+ * @param free_space
+ *   Returns the amount of space in the deque after the enqueue operation
+ *   has finished.
+ * @return
+ *   The number of objects enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_enqueue_bulk_elem(struct rte_deque *d,
+                                const void *obj_table, unsigned int esize,
+                                unsigned int n, unsigned int *free_space)
+{
+       *free_space = rte_deque_free_count(d);
+       if (unlikely(n > *free_space))
+               return 0;
+       *free_space -= n;
+       return __rte_deque_enqueue_at_tail(d, obj_table, esize, n);
+}
+
+/**
+ * Enqueue up to a maximum number of objects on a deque at the tail.
+ *
+ * This function copies the objects at the tail of the deque and
+ * moves the tail index (backwards).
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to add in the deque from the obj_table.
+ * @param free_space
+ *   Returns the amount of space in the deque after the enqueue operation
+ *   has finished.
+ * @return
+ *   - n: Actual number of objects enqueued.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_enqueue_burst_elem(struct rte_deque *d,
+                               const void *obj_table, unsigned int esize,
+                               unsigned int n, unsigned int *free_space)
+{
+       unsigned int avail_space = rte_deque_free_count(d);
+       unsigned int to_be_enqueued = (n <= avail_space ? n : avail_space);
+       *free_space = avail_space - to_be_enqueued;
+       return __rte_deque_enqueue_at_tail(d, obj_table, esize, to_be_enqueued);
+}
+
+/**
+ * Dequeue a fixed number of objects from a deque at tail.
+ *
+ * This function copies the objects from the tail of the deque and
+ * moves the tail index.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects that will be filled.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the deque to the obj_table.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue
+ *   has finished.
+ * @return
+ *   The number of objects dequeued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_dequeue_bulk_elem(struct rte_deque *d, void *obj_table,
+                       unsigned int esize, unsigned int n,
+                       unsigned int *available)
+{
+       *available = rte_deque_count(d);
+       if (unlikely(n > *available))
+               return 0;
+       *available -= n;
+       return __rte_deque_dequeue_at_tail(d, obj_table, esize, n);
+}
+
+/**
+ * Dequeue up to a maximum number of objects from a deque at tail.
+ *
+ * This function copies the objects from the tail of the deque and
+ * moves the tail index.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects that will be filled.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the deque to the obj_table.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue
+ *   has finished.
+ * @return
+ *   - Number of objects dequeued
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_dequeue_burst_elem(struct rte_deque *d, void *obj_table,
+                       unsigned int esize, unsigned int n,
+                       unsigned int *available)
+{
+       unsigned int count = rte_deque_count(d);
+       unsigned int to_be_dequeued = (n <= count ? n : count);
+       *available = count - to_be_dequeued;
+       return __rte_deque_dequeue_at_tail(d, obj_table, esize, to_be_dequeued);
+}
+
+/**
+ * Dequeue a fixed number of objects from a deque from the head.
+ *
+ * This function copies the objects from the head of the deque and
+ * moves the head index (backwards).
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects that will be filled.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the deque to the obj_table.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue
+ *   has finished.
+ * @return
+ *   The number of objects dequeued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_dequeue_bulk_elem(struct rte_deque *d, void *obj_table,
+                       unsigned int esize, unsigned int n,
+                       unsigned int *available)
+{
+       *available = rte_deque_count(d);
+       if (unlikely(n > *available))
+               return 0;
+       *available -= n;
+       return __rte_deque_dequeue_at_head(d, obj_table, esize, n);
+}
+
+/**
+ * Dequeue up to a maximum number of objects from a deque from the head.
+ *
+ * This function copies the objects from the head of the deque and
+ * moves the head index (backwards).
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param obj_table
+ *   A pointer to a table of objects that will be filled.
+ * @param esize
+ *   The size of deque object, in bytes. It must be a multiple of 4.
+ *   This must be the same value used while creating the deque. Otherwise
+ *   the results are undefined.
+ * @param n
+ *   The number of objects to dequeue from the deque to the obj_table.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue
+ *   has finished.
+ * @return
+ *   - Number of objects dequeued
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_dequeue_burst_elem(struct rte_deque *d, void *obj_table,
+                       unsigned int esize, unsigned int n,
+                       unsigned int *available)
+{
+       unsigned int count = rte_deque_count(d);
+       unsigned int to_be_dequeued = (n <= count ? n : count);
+       *available = count - to_be_dequeued;
+       return __rte_deque_dequeue_at_head(d, obj_table, esize, to_be_dequeued);
+}
+
+/**
+ * Flush a deque.
+ *
+ * This function flush all the objects in a deque
+ *
+ * @warning
+ * Make sure the deque is not in use while calling this function.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ */
+__rte_experimental
+void rte_deque_reset(struct rte_deque *d);
+
+/**
+ * Test if a deque is full.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @return
+ *   - 1: The deque is full.
+ *   - 0: The deque is not full.
+ */
+static inline int
+rte_deque_full(const struct rte_deque *d)
+{
+       return rte_deque_free_count(d) == 0;
+}
+
+/**
+ * Test if a deque is empty.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @return
+ *   - 1: The deque is empty.
+ *   - 0: The deque is not empty.
+ */
+static inline int
+rte_deque_empty(const struct rte_deque *d)
+{
+       return d->tail == d->head;
+}
+
+/**
+ * Return the size of the deque.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @return
+ *   The size of the data store used by the deque.
+ *   NOTE: this is not the same as the usable space in the deque. To query that
+ *   use ``rte_deque_get_capacity()``.
+ */
+static inline unsigned int
+rte_deque_get_size(const struct rte_deque *d)
+{
+       return d->size;
+}
+
+/**
+ * Return the number of objects which can be stored in the deque.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @return
+ *   The usable size of the deque.
+ */
+static inline unsigned int
+rte_deque_get_capacity(const struct rte_deque *d)
+{
+       return d->capacity;
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_DEQUE_H_ */
diff --git a/lib/deque/rte_deque_core.h b/lib/deque/rte_deque_core.h
new file mode 100644
index 0000000000..0bb8695c8a
--- /dev/null
+++ b/lib/deque/rte_deque_core.h
@@ -0,0 +1,81 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Arm Limited
+ */
+
+#ifndef _RTE_DEQUE_CORE_H_
+#define _RTE_DEQUE_CORE_H_
+
+/**
+ * @file
+ * This file contains definition of RTE deque structure, init flags and
+ * some related macros. This file should not be included directly,
+ * include rte_deque.h instead.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdint.h>
+#include <string.h>
+#include <errno.h>
+#include <rte_common.h>
+#include <rte_config.h>
+#include <rte_memory.h>
+#include <rte_lcore.h>
+#include <rte_atomic.h>
+#include <rte_branch_prediction.h>
+#include <rte_memzone.h>
+#include <rte_pause.h>
+#include <rte_debug.h>
+
+extern int rte_deque_log_type;
+
+#define RTE_DEQUE_MZ_PREFIX "DEQUE_"
+/** The maximum length of a deque name. */
+#define RTE_DEQUE_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
+                          sizeof(RTE_DEQUE_MZ_PREFIX) + 1)
+
+/**
+ * Double ended queue (deque) structure.
+ *
+ * The producer and the consumer have a head and a tail index. These indices
+ * are not between 0 and size(deque)-1. These indices are between 0 and
+ * 2^32 -1. Their value is masked while accessing the objects in deque.
+ * These indices are unsigned 32bits. Hence the result of the subtraction is
+ * always a modulo of 2^32 and it is between 0 and capacity.
+ */
+struct rte_deque {
+       alignas(RTE_CACHE_LINE_SIZE) char name[RTE_DEQUE_NAMESIZE];
+       /**< Name of the deque */
+       int flags;
+       /**< Flags supplied at creation. */
+       const struct rte_memzone *memzone;
+       /**< Memzone, if any, containing the rte_deque */
+
+       alignas(RTE_CACHE_LINE_SIZE) char pad0; /**< empty cache line */
+
+       uint32_t size;           /**< Size of deque. */
+       uint32_t mask;           /**< Mask (size-1) of deque. */
+       uint32_t capacity;       /**< Usable size of deque */
+       /** Ring head and tail pointers. */
+       volatile uint32_t head;
+       volatile uint32_t tail;
+};
+
+/**
+ * Deque is to hold exactly requested number of entries.
+ * Without this flag set, the deque size requested must be a power of 2, and 
the
+ * usable space will be that size - 1. With the flag, the requested size will
+ * be rounded up to the next power of two, but the usable space will be exactly
+ * that requested. Worst case, if a power-of-2 size is requested, half the
+ * deque space will be wasted.
+ */
+#define RTE_DEQUE_F_EXACT_SZ 0x0004
+#define RTE_DEQUE_SZ_MASK  (0x7fffffffU) /**< Ring size mask */
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_DEQUE_CORE_H_ */
diff --git a/lib/deque/rte_deque_pvt.h b/lib/deque/rte_deque_pvt.h
new file mode 100644
index 0000000000..931bbd4d19
--- /dev/null
+++ b/lib/deque/rte_deque_pvt.h
@@ -0,0 +1,538 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Arm Limited
+ */
+
+#ifndef _RTE_DEQUE_PVT_H_
+#define _RTE_DEQUE_PVT_H_
+
+#define __RTE_DEQUE_COUNT(d) ((d->head - d->tail) & d->mask)
+#define __RTE_DEQUE_FREE_SPACE(d) (d->capacity - __RTE_DEQUE_COUNT(d))
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_head_32(struct rte_deque *d,
+                               const unsigned int size,
+                               uint32_t idx,
+                               const void *obj_table,
+                               unsigned int n)
+{
+       unsigned int i;
+       uint32_t *deque = (uint32_t *)&d[1];
+       const uint32_t *obj = (const uint32_t *)obj_table;
+       if (likely(idx + n <= size)) {
+               for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
+                       deque[idx] = obj[i];
+                       deque[idx + 1] = obj[i + 1];
+                       deque[idx + 2] = obj[i + 2];
+                       deque[idx + 3] = obj[i + 3];
+                       deque[idx + 4] = obj[i + 4];
+                       deque[idx + 5] = obj[i + 5];
+                       deque[idx + 6] = obj[i + 6];
+                       deque[idx + 7] = obj[i + 7];
+               }
+               switch (n & 0x7) {
+               case 7:
+                       deque[idx++] = obj[i++]; /* fallthrough */
+               case 6:
+                       deque[idx++] = obj[i++]; /* fallthrough */
+               case 5:
+                       deque[idx++] = obj[i++]; /* fallthrough */
+               case 4:
+                       deque[idx++] = obj[i++]; /* fallthrough */
+               case 3:
+                       deque[idx++] = obj[i++]; /* fallthrough */
+               case 2:
+                       deque[idx++] = obj[i++]; /* fallthrough */
+               case 1:
+                       deque[idx++] = obj[i++]; /* fallthrough */
+               }
+       } else {
+               for (i = 0; idx < size; i++, idx++)
+                       deque[idx] = obj[i];
+               /* Start at the beginning */
+               for (idx = 0; i < n; i++, idx++)
+                       deque[idx] = obj[i];
+       }
+}
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_head_64(struct rte_deque *d,
+                               const void *obj_table,
+                               unsigned int n)
+{
+       unsigned int i;
+       const uint32_t size = d->size;
+       uint32_t idx = (d->head & d->mask);
+       uint64_t *deque = (uint64_t *)&d[1];
+       const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table;
+       if (likely(idx + n <= size)) {
+               for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
+                       deque[idx] = obj[i];
+                       deque[idx + 1] = obj[i + 1];
+                       deque[idx + 2] = obj[i + 2];
+                       deque[idx + 3] = obj[i + 3];
+               }
+               switch (n & 0x3) {
+               case 3:
+                       deque[idx++] = obj[i++]; /* fallthrough */
+               case 2:
+                       deque[idx++] = obj[i++]; /* fallthrough */
+               case 1:
+                       deque[idx++] = obj[i++]; /* fallthrough */
+               }
+       } else {
+               for (i = 0; idx < size; i++, idx++)
+                       deque[idx] = obj[i];
+               /* Start at the beginning */
+               for (idx = 0; i < n; i++, idx++)
+                       deque[idx] = obj[i];
+       }
+}
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_head_128(struct rte_deque *d,
+                               const void *obj_table,
+                               unsigned int n)
+{
+       unsigned int i;
+       const uint32_t size = d->size;
+       uint32_t idx = (d->head & d->mask);
+       rte_int128_t *deque = (rte_int128_t *)&d[1];
+       const rte_int128_t *obj = (const rte_int128_t *)obj_table;
+       if (likely(idx + n <= size)) {
+               for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
+                       memcpy((void *)(deque + idx),
+                               (const void *)(obj + i), 32);
+               switch (n & 0x1) {
+               case 1:
+                       memcpy((void *)(deque + idx),
+                               (const void *)(obj + i), 16);
+               }
+       } else {
+               for (i = 0; idx < size; i++, idx++)
+                       memcpy((void *)(deque + idx),
+                               (const void *)(obj + i), 16);
+               /* Start at the beginning */
+               for (idx = 0; i < n; i++, idx++)
+                       memcpy((void *)(deque + idx),
+                               (const void *)(obj + i), 16);
+       }
+}
+
+static __rte_always_inline unsigned int
+__rte_deque_enqueue_at_head(struct rte_deque *d,
+                       const void *obj_table,
+                       unsigned int esize,
+                       unsigned int n)
+{
+       /* 8B and 16B copies implemented individually because on some platforms
+        * there are 64 bit and 128 bit registers available for direct copying.
+        */
+       if (esize == 8)
+               __rte_deque_enqueue_elems_head_64(d, obj_table, n);
+       else if (esize == 16)
+               __rte_deque_enqueue_elems_head_128(d, obj_table, n);
+       else {
+               uint32_t idx, scale, nd_idx, nd_num, nd_size;
+
+               /* Normalize to uint32_t */
+               scale = esize / sizeof(uint32_t);
+               nd_num = n * scale;
+               idx = d->head & d->mask;
+               nd_idx = idx * scale;
+               nd_size = d->size * scale;
+               __rte_deque_enqueue_elems_head_32(d, nd_size, nd_idx,
+                                               obj_table, nd_num);
+       }
+       d->head = (d->head + n) & d->mask;
+       return n;
+}
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_tail_32(struct rte_deque *d,
+                               const unsigned int mask,
+                               uint32_t idx,
+                               const void *obj_table,
+                               unsigned int n,
+                               const unsigned int scale,
+                               const unsigned int elem_size)
+{
+       unsigned int i;
+       uint32_t *deque = (uint32_t *)&d[1];
+       const uint32_t *obj = (const uint32_t *)obj_table;
+
+       if (likely(idx >= n)) {
+               for (i = 0; i < n; idx -= scale, i += scale)
+                       memcpy(&deque[idx], &obj[i], elem_size);
+       } else {
+               for (i = 0; (int32_t)idx >= 0; idx -= scale, i += scale)
+                       memcpy(&deque[idx], &obj[i], elem_size);
+
+               /* Start at the ending */
+               idx = mask;
+               for (; i < n; idx -= scale, i += scale)
+                       memcpy(&deque[idx], &obj[i], elem_size);
+       }
+}
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_tail_64(struct rte_deque *d,
+                               const void *obj_table,
+                               unsigned int n)
+{
+       unsigned int i;
+       uint32_t idx = (d->tail & d->mask);
+       uint64_t *deque = (uint64_t *)&d[1];
+       const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table;
+       if (likely((int32_t)(idx - n) >= 0)) {
+               for (i = 0; i < (n & ~0x3); i += 4, idx -= 4) {
+                       deque[idx] = obj[i];
+                       deque[idx - 1] = obj[i + 1];
+                       deque[idx - 2] = obj[i + 2];
+                       deque[idx - 3] = obj[i + 3];
+               }
+               switch (n & 0x3) {
+               case 3:
+                       deque[idx--] = obj[i++]; /* fallthrough */
+               case 2:
+                       deque[idx--] = obj[i++]; /* fallthrough */
+               case 1:
+                       deque[idx--] = obj[i++]; /* fallthrough */
+               }
+       } else {
+               for (i = 0; (int32_t)idx >= 0; i++, idx--)
+                       deque[idx] = obj[i];
+               /* Start at the ending */
+               for (idx = d->mask; i < n; i++, idx--)
+                       deque[idx] = obj[i];
+       }
+}
+
+static __rte_always_inline void
+__rte_deque_enqueue_elems_tail_128(struct rte_deque *d,
+                               const void *obj_table,
+                               unsigned int n)
+{
+       unsigned int i;
+       uint32_t idx = (d->tail & d->mask);
+       rte_int128_t *deque = (rte_int128_t *)&d[1];
+       const rte_int128_t *obj = (const rte_int128_t *)obj_table;
+       if (likely((int32_t)(idx - n) >= 0)) {
+               for (i = 0; i < (n & ~0x1); i += 2, idx -= 2) {
+                       deque[idx] = obj[i];
+                       deque[idx - 1] = obj[i + 1];
+               }
+               switch (n & 0x1) {
+               case 1:
+                       memcpy((void *)(deque + idx),
+                               (const void *)(obj + i), 16);
+               }
+       } else {
+               for (i = 0; (int32_t)idx >= 0; i++, idx--)
+                       memcpy((void *)(deque + idx),
+                               (const void *)(obj + i), 16);
+               /* Start at the ending */
+               for (idx = d->mask; i < n; i++, idx--)
+                       memcpy((void *)(deque + idx),
+                               (const void *)(obj + i), 16);
+       }
+}
+
+static __rte_always_inline unsigned int
+__rte_deque_enqueue_at_tail(struct rte_deque *d,
+                       const void *obj_table,
+                       unsigned int esize,
+                       unsigned int n)
+{
+       /* The tail point must point at an empty cell when enqueuing */
+       d->tail--;
+
+       /* 8B and 16B copies implemented individually because on some platforms
+        * there are 64 bit and 128 bit registers available for direct copying.
+        */
+       if (esize == 8)
+               __rte_deque_enqueue_elems_tail_64(d, obj_table, n);
+       else if (esize == 16)
+               __rte_deque_enqueue_elems_tail_128(d, obj_table, n);
+       else {
+               uint32_t idx, scale, nd_idx, nd_num, nd_mask;
+
+               /* Normalize to uint32_t */
+               scale = esize / sizeof(uint32_t);
+               nd_num = n * scale;
+               idx = d->tail & d->mask;
+               nd_idx = idx * scale;
+               nd_mask = d->mask * scale;
+               __rte_deque_enqueue_elems_tail_32(d, nd_mask, nd_idx, obj_table,
+                                               nd_num, scale, esize);
+       }
+
+       /* The +1 is because the tail needs to point at a
+        * non-empty memory location after the enqueuing operation.
+        */
+       d->tail = (d->tail - n + 1) & d->mask;
+       return n;
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_32(struct rte_deque *d,
+                       const unsigned int size,
+                       uint32_t idx,
+                       void *obj_table,
+                       unsigned int n)
+{
+       unsigned int i;
+       const uint32_t *deque = (const uint32_t *)&d[1];
+       uint32_t *obj = (uint32_t *)obj_table;
+       if (likely(idx + n <= size)) {
+               for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
+                       obj[i] = deque[idx];
+                       obj[i + 1] = deque[idx + 1];
+                       obj[i + 2] = deque[idx + 2];
+                       obj[i + 3] = deque[idx + 3];
+                       obj[i + 4] = deque[idx + 4];
+                       obj[i + 5] = deque[idx + 5];
+                       obj[i + 6] = deque[idx + 6];
+                       obj[i + 7] = deque[idx + 7];
+               }
+               switch (n & 0x7) {
+               case 7:
+                       obj[i++] = deque[idx++]; /* fallthrough */
+               case 6:
+                       obj[i++] = deque[idx++]; /* fallthrough */
+               case 5:
+                       obj[i++] = deque[idx++]; /* fallthrough */
+               case 4:
+                       obj[i++] = deque[idx++]; /* fallthrough */
+               case 3:
+                       obj[i++] = deque[idx++]; /* fallthrough */
+               case 2:
+                       obj[i++] = deque[idx++]; /* fallthrough */
+               case 1:
+                       obj[i++] = deque[idx++]; /* fallthrough */
+               }
+       } else {
+               for (i = 0; idx < size; i++, idx++)
+                       obj[i] = deque[idx];
+               /* Start at the beginning */
+               for (idx = 0; i < n; i++, idx++)
+                       obj[i] = deque[idx];
+       }
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_64(struct rte_deque *d, void *obj_table,
+                       unsigned int n)
+{
+       unsigned int i;
+       const uint32_t size = d->size;
+       uint32_t idx = (d->tail & d->mask);
+       const uint64_t *deque = (const uint64_t *)&d[1];
+       unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
+       if (likely(idx + n <= size)) {
+               for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
+                       obj[i] = deque[idx];
+                       obj[i + 1] = deque[idx + 1];
+                       obj[i + 2] = deque[idx + 2];
+                       obj[i + 3] = deque[idx + 3];
+               }
+               switch (n & 0x3) {
+               case 3:
+                       obj[i++] = deque[idx++]; /* fallthrough */
+               case 2:
+                       obj[i++] = deque[idx++]; /* fallthrough */
+               case 1:
+                       obj[i++] = deque[idx++]; /* fallthrough */
+               }
+       } else {
+               for (i = 0; idx < size; i++, idx++)
+                       obj[i] = deque[idx];
+               /* Start at the beginning */
+               for (idx = 0; i < n; i++, idx++)
+                       obj[i] = deque[idx];
+       }
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_128(struct rte_deque *d,
+                       void *obj_table,
+                       unsigned int n)
+{
+       unsigned int i;
+       const uint32_t size = d->size;
+       uint32_t idx = (d->tail & d->mask);
+       const rte_int128_t *deque = (const rte_int128_t *)&d[1];
+       rte_int128_t *obj = (rte_int128_t *)obj_table;
+       if (likely(idx + n <= size)) {
+               for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
+                       memcpy((void *)(obj + i),
+                               (const void *)(deque + idx), 32);
+               switch (n & 0x1) {
+               case 1:
+                       memcpy((void *)(obj + i),
+                               (const void *)(deque + idx), 16);
+               }
+       } else {
+               for (i = 0; idx < size; i++, idx++)
+                       memcpy((void *)(obj + i),
+                               (const void *)(deque + idx), 16);
+               /* Start at the beginning */
+               for (idx = 0; i < n; i++, idx++)
+                       memcpy((void *)(obj + i),
+                               (const void *)(deque + idx), 16);
+       }
+}
+
+static __rte_always_inline unsigned int
+__rte_deque_dequeue_at_tail(struct rte_deque *d,
+                       void *obj_table,
+                       unsigned int esize,
+                       unsigned int n)
+{
+       /* 8B and 16B copies implemented individually because on some platforms
+        * there are 64 bit and 128 bit registers available for direct copying.
+        */
+       if (esize == 8)
+               __rte_deque_dequeue_elems_64(d, obj_table, n);
+       else if (esize == 16)
+               __rte_deque_dequeue_elems_128(d, obj_table, n);
+       else {
+               uint32_t idx, scale, nd_idx, nd_num, nd_size;
+
+               /* Normalize to uint32_t */
+               scale = esize / sizeof(uint32_t);
+               nd_num = n * scale;
+               idx = d->tail & d->mask;
+               nd_idx = idx * scale;
+               nd_size = d->size * scale;
+               __rte_deque_dequeue_elems_32(d, nd_size, nd_idx,
+                                       obj_table, nd_num);
+       }
+       d->tail = (d->tail + n) & d->mask;
+       return n;
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_head_32(struct rte_deque *d,
+                               const unsigned int mask,
+                               uint32_t idx,
+                               void *obj_table,
+                               unsigned int n,
+                               const unsigned int scale,
+                               const unsigned int elem_size)
+{
+       unsigned int i;
+       const uint32_t *deque = (uint32_t *)&d[1];
+       uint32_t *obj = (uint32_t *)obj_table;
+
+       if (likely(idx >= n)) {
+               for (i = 0; i < n; idx -= scale, i += scale)
+                       memcpy(&obj[i], &deque[idx], elem_size);
+       } else {
+               for (i = 0; (int32_t)idx >= 0; idx -= scale, i += scale)
+                       memcpy(&obj[i], &deque[idx], elem_size);
+               /* Start at the ending */
+               idx = mask;
+               for (; i < n; idx -= scale, i += scale)
+                       memcpy(&obj[i], &deque[idx], elem_size);
+       }
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_head_64(struct rte_deque *d,
+                               void *obj_table,
+                               unsigned int n)
+{
+       unsigned int i;
+       uint32_t idx = (d->head & d->mask);
+       const uint64_t *deque = (uint64_t *)&d[1];
+       unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
+       if (likely((int32_t)(idx - n) >= 0)) {
+               for (i = 0; i < (n & ~0x3); i += 4, idx -= 4) {
+                       obj[i] = deque[idx];
+                       obj[i + 1] = deque[idx - 1];
+                       obj[i + 2] = deque[idx - 2];
+                       obj[i + 3] = deque[idx - 3];
+               }
+               switch (n & 0x3) {
+               case 3:
+                       obj[i++] = deque[idx--];  /* fallthrough */
+               case 2:
+                       obj[i++] = deque[idx--]; /* fallthrough */
+               case 1:
+                       obj[i++] = deque[idx--]; /* fallthrough */
+               }
+       } else {
+               for (i = 0; (int32_t)idx >= 0; i++, idx--)
+                       obj[i] = deque[idx];
+               /* Start at the ending */
+               for (idx = d->mask; i < n; i++, idx--)
+                       obj[i] = deque[idx];
+       }
+}
+
+static __rte_always_inline void
+__rte_deque_dequeue_elems_head_128(struct rte_deque *d,
+                               void *obj_table,
+                               unsigned int n)
+{
+       unsigned int i;
+       uint32_t idx = (d->head & d->mask);
+       const rte_int128_t *deque = (rte_int128_t *)&d[1];
+       rte_int128_t *obj = (rte_int128_t *)obj_table;
+       if (likely((int32_t)(idx - n) >= 0)) {
+               for (i = 0; i < (n & ~0x1); i += 2, idx -= 2) {
+                       obj[i] = deque[idx];
+                       obj[i + 1] = deque[idx - 1];
+               }
+               switch (n & 0x1) {
+               case 1:
+                       memcpy((void *)(obj + i),
+                               (const void *)(deque + idx), 16);
+               }
+       } else {
+               for (i = 0; (int32_t)idx >= 0; i++, idx--)
+                       memcpy((void *)(obj + i),
+                               (const void *)(deque + idx), 16);
+               /* Start at the ending */
+               for (idx = d->mask; i < n; i++, idx--)
+                       memcpy((void *)(obj + i),
+                               (const void *)(deque + idx), 16);
+       }
+}
+
+static __rte_always_inline unsigned int
+__rte_deque_dequeue_at_head(struct rte_deque *d,
+                       void *obj_table,
+                       unsigned int esize,
+                       unsigned int n)
+{
+       /* The head must point at an empty cell when dequeueing */
+       d->head--;
+
+       /* 8B and 16B copies implemented individually because on some platforms
+        * there are 64 bit and 128 bit registers available for direct copying.
+        */
+       if (esize == 8)
+               __rte_deque_dequeue_elems_head_64(d, obj_table, n);
+       else if (esize == 16)
+               __rte_deque_dequeue_elems_head_128(d, obj_table, n);
+       else {
+               uint32_t idx, scale, nd_idx, nd_num, nd_mask;
+
+               /* Normalize to uint32_t */
+               scale = esize / sizeof(uint32_t);
+               nd_num = n * scale;
+               idx = d->head & d->mask;
+               nd_idx = idx * scale;
+               nd_mask = d->mask * scale;
+               __rte_deque_dequeue_elems_head_32(d, nd_mask, nd_idx, obj_table,
+                                               nd_num, scale, esize);
+       }
+
+       /* The +1 is because the head needs to point at a
+        * empty memory location after the dequeueing operation.
+        */
+       d->head = (d->head - n + 1) & d->mask;
+       return n;
+}
+#endif /* _RTE_DEQUEU_PVT_H_ */
diff --git a/lib/deque/rte_deque_zc.h b/lib/deque/rte_deque_zc.h
new file mode 100644
index 0000000000..6d7167e158
--- /dev/null
+++ b/lib/deque/rte_deque_zc.h
@@ -0,0 +1,430 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2024 Arm Limited
+ */
+#ifndef _RTE_DEQUE_ZC_H_
+#define _RTE_DEQUE_ZC_H_
+
+/**
+ * @file
+ * This file should not be included directly, include rte_deque.h instead.
+ *
+ * Deque Zero Copy APIs
+ * These APIs make it possible to split public enqueue/dequeue API
+ * into 3 parts:
+ * - enqueue/dequeue start
+ * - copy data to/from the deque
+ * - enqueue/dequeue finish
+ * These APIs provide the ability to avoid copying of the data to temporary 
area.
+ *
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Deque zero-copy information structure.
+ *
+ * This structure contains the pointers and length of the space
+ * reserved on the Deque storage.
+ */
+struct __rte_cache_aligned rte_deque_zc_data {
+       /* Pointer to the first space in the deque */
+       void *ptr1;
+       /* Pointer to the second space in the deque if there is wrap-around.
+        * It contains valid value only if wrap-around happens.
+        */
+       void *ptr2;
+       /* Number of elements in the first pointer. If this is equal to
+        * the number of elements requested, then ptr2 is NULL.
+        * Otherwise, subtracting n1 from number of elements requested
+        * will give the number of elements available at ptr2.
+        */
+       unsigned int n1;
+};
+
+static __rte_always_inline void
+__rte_deque_get_elem_addr(struct rte_deque *d, uint32_t pos,
+       uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void **dst2,
+       bool low_to_high)
+{
+       uint32_t idx, scale, nr_idx;
+       uint32_t *deque_ptr = (uint32_t *)&d[1];
+
+       /* Normalize to uint32_t */
+       scale = esize / sizeof(uint32_t);
+       idx = pos & d->mask;
+       nr_idx = idx * scale;
+
+       *dst1 = deque_ptr + nr_idx;
+       *n1 = num;
+
+       if (low_to_high) {
+               if (idx + num > d->size) {
+                       *n1 = d->size - idx;
+                       *dst2 = deque_ptr;
+               } else
+                       *dst2 = NULL;
+       } else {
+               if ((int32_t)(idx - num) < 0) {
+                       *n1 = idx + 1;
+                       *dst2 = (void *)&deque_ptr[(-1 & d->mask) * scale];
+               } else
+                       *dst2 = NULL;
+       }
+}
+
+/**
+ * Start to enqueue several objects on the deque.
+ * Note that no actual objects are put in the deque by this function,
+ * it just reserves space for the user on the deque.
+ * User has to copy objects into the deque using the returned pointers.
+ * User should call rte_deque_enqueue_zc_elem_finish to complete the
+ * enqueue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to add in the deque.
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param free_space
+ *   Returns the amount of space in the deque after the reservation operation
+ *   has finished.
+ * @return
+ *   The number of objects that can be enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_enqueue_zc_bulk_elem_start(struct rte_deque *d, unsigned int 
esize,
+       unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space)
+{
+
+       *free_space = __RTE_DEQUE_FREE_SPACE(d);
+       if (unlikely(*free_space < n))
+               return 0;
+       __rte_deque_get_elem_addr(d, d->head, esize, n, &zcd->ptr1,
+                                                       &zcd->n1, &zcd->ptr2, 
true);
+
+       *free_space -= n;
+       return n;
+}
+
+/**
+ * Complete enqueuing several pointers to objects on the deque.
+ * Note that number of objects to enqueue should not exceed previous
+ * enqueue_start return value.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param n
+ *   The number of pointers to objects to add to the deque.
+ */
+__rte_experimental
+static __rte_always_inline void
+rte_deque_head_enqueue_zc_elem_finish(struct rte_deque *d, unsigned int n)
+{
+       d->head = (d->head + n) & d->mask;
+}
+
+/**
+ * Start to enqueue several objects on the deque.
+ * Note that no actual objects are put in the queue by this function,
+ * it just reserves space for the user on the deque.
+ * User has to copy objects into the queue using the returned pointers.
+ * User should call rte_deque_enqueue_zc_elem_finish to complete the
+ * enqueue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to add in the deque.
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param free_space
+ *   Returns the amount of space in the deque after the reservation operation
+ *   has finished.
+ * @return
+ *   The number of objects that can be enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_enqueue_zc_burst_elem_start(struct rte_deque *d, unsigned int 
esize,
+       unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space)
+{
+       *free_space = __RTE_DEQUE_FREE_SPACE(d);
+       n = n > *free_space ? *free_space : n;
+       return rte_deque_head_enqueue_zc_bulk_elem_start(d, esize, n, zcd, 
free_space);
+}
+
+/**
+ * Start to enqueue several objects on the deque.
+ * Note that no actual objects are put in the deque by this function,
+ * it just reserves space for the user on the deque.
+ * User has to copy objects into the deque using the returned pointers.
+ * User should call rte_deque_enqueue_zc_elem_finish to complete the
+ * enqueue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to add in the deque.
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param free_space
+ *   Returns the amount of space in the deque after the reservation operation
+ *   has finished.
+ * @return
+ *   The number of objects that can be enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_enqueue_zc_bulk_elem_start(struct rte_deque *d, unsigned int 
esize,
+       unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space)
+{
+       *free_space = __RTE_DEQUE_FREE_SPACE(d);
+       if (unlikely(*free_space < n))
+               return 0;
+       __rte_deque_get_elem_addr(d, d->tail - 1, esize, n, &zcd->ptr1,
+                                                         &zcd->n1, &zcd->ptr2, 
false);
+
+       *free_space -= n;
+       return n;
+}
+
+/**
+ * Complete enqueuing several pointers to objects on the deque.
+ * Note that number of objects to enqueue should not exceed previous
+ * enqueue_start return value.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param n
+ *   The number of pointers to objects to add to the deque.
+ */
+__rte_experimental
+static __rte_always_inline void
+rte_deque_tail_enqueue_zc_elem_finish(struct rte_deque *d, unsigned int n)
+{
+       d->tail = (d->tail - n) & d->mask;
+}
+
+/**
+ * Start to enqueue several objects on the deque.
+ * Note that no actual objects are put in the queue by this function,
+ * it just reserves space for the user on the deque.
+ * User has to copy objects into the queue using the returned pointers.
+ * User should call rte_deque_enqueue_zc_elem_finish to complete the
+ * enqueue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to add in the deque.@param r
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param free_space
+ *   Returns the amount of space in the deque after the reservation operation
+ *   has finished.
+ * @return
+ *   The number of objects that can be enqueued, either 0 or n
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_enqueue_zc_burst_elem_start(struct rte_deque *d, unsigned int 
esize,
+       unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *free_space)
+{
+       *free_space = __RTE_DEQUE_FREE_SPACE(d);
+       n = n > *free_space ? *free_space : n;
+       return rte_deque_tail_enqueue_zc_bulk_elem_start(d, esize, n, zcd, 
free_space);
+}
+
+/**
+ * Start to dequeue several objects from the deque.
+ * Note that no actual objects are copied from the queue by this function.
+ * User has to copy objects from the queue using the returned pointers.
+ * User should call rte_deque_dequeue_zc_elem_finish to complete the
+ * dequeue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to remove from the deque.
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue has
+ *   finished.
+ * @return
+ *   The number of objects that can be dequeued, either 0 or n.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_dequeue_zc_bulk_elem_start(struct rte_deque *d, unsigned int 
esize,
+       unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available)
+{
+       *available = __RTE_DEQUE_COUNT(d);
+       if (unlikely(*available < n))
+               return 0;
+       __rte_deque_get_elem_addr(d, d->tail, esize, n, &zcd->ptr1,
+                                                       &zcd->n1, &zcd->ptr2, 
true);
+
+       *available -= n;
+       return n;
+}
+
+/**
+ * Complete dequeuing several objects from the deque.
+ * Note that number of objects to dequeued should not exceed previous
+ * dequeue_start return value.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param n
+ *   The number of objects to remove from the deque.
+ */
+__rte_experimental
+static __rte_always_inline void
+rte_deque_tail_dequeue_zc_elem_finish(struct rte_deque *d, unsigned int n)
+{
+       d->tail = (d->tail + n) & d->mask;
+}
+
+/**
+ * Start to dequeue several objects from the deque.
+ * Note that no actual objects are copied from the queue by this function.
+ * User has to copy objects from the queue using the returned pointers.
+ * User should call rte_deque_dequeue_zc_elem_finish to complete the
+ * dequeue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to remove from the deque.
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue has
+ *   finished.
+ * @return
+ *   The number of objects that can be dequeued, either 0 or n.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_tail_dequeue_zc_burst_elem_start(struct rte_deque *d, unsigned int 
esize,
+       unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available)
+{
+       *available = __RTE_DEQUE_COUNT(d);
+       n = n > *available ? *available : n;
+       return rte_deque_tail_dequeue_zc_bulk_elem_start(d, esize, n, zcd, 
available);
+}
+
+/**
+ * Start to dequeue several objects from the deque.
+ * Note that no actual objects are copied from the queue by this function.
+ * User has to copy objects from the queue using the returned pointers.
+ * User should call rte_deque_dequeue_zc_elem_finish to complete the
+ * dequeue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to remove from the deque.
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue has
+ *   finished.
+ * @return
+ *   The number of objects that can be dequeued, either 0 or n.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_dequeue_zc_bulk_elem_start(struct rte_deque *d, unsigned int 
esize,
+       unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available)
+{
+       *available = __RTE_DEQUE_COUNT(d);
+       if (unlikely(*available < n))
+               return 0;
+       __rte_deque_get_elem_addr(d, d->head - 1, esize, n, &zcd->ptr1,
+                                                       &zcd->n1, &zcd->ptr2, 
false);
+
+       *available -= n;
+       return n;
+}
+
+/**
+ * Complete dequeuing several objects from the deque.
+ * Note that number of objects to dequeued should not exceed previous
+ * dequeue_start return value.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param n
+ *   The number of objects to remove from the deque.
+ */
+__rte_experimental
+static __rte_always_inline void
+rte_deque_head_dequeue_zc_elem_finish(struct rte_deque *d, unsigned int n)
+{
+       d->head = (d->head - n) & d->mask;
+}
+
+/**
+ * Start to dequeue several objects from the deque.
+ * Note that no actual objects are copied from the queue by this function.
+ * User has to copy objects from the queue using the returned pointers.
+ * User should call rte_deque_dequeue_zc_elem_finish to complete the
+ * dequeue operation.
+ *
+ * @param d
+ *   A pointer to the deque structure.
+ * @param esize
+ *   The size of deque element, in bytes. It must be a multiple of 4.
+ * @param n
+ *   The number of objects to remove from the deque.
+ * @param zcd
+ *   Structure containing the pointers and length of the space
+ *   reserved on the deque storage.
+ * @param available
+ *   Returns the number of remaining deque entries after the dequeue has
+ *   finished.
+ * @return
+ *   The number of objects that can be dequeued, either 0 or n.
+ */
+__rte_experimental
+static __rte_always_inline unsigned int
+rte_deque_head_dequeue_zc_burst_elem_start(struct rte_deque *d, unsigned int 
esize,
+       unsigned int n, struct rte_deque_zc_data *zcd, unsigned int *available)
+{
+       *available = __RTE_DEQUE_COUNT(d);
+       n = n > *available ? *available : n;
+       return rte_deque_head_dequeue_zc_bulk_elem_start(d, esize, n, zcd, 
available);
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_DEQUE_ZC_H_ */
diff --git a/lib/deque/version.map b/lib/deque/version.map
new file mode 100644
index 0000000000..103fd3b512
--- /dev/null
+++ b/lib/deque/version.map
@@ -0,0 +1,14 @@
+EXPERIMENTAL {
+       global:
+
+       # added in 24.07
+       rte_deque_log_type;
+       rte_deque_create;
+       rte_deque_dump;
+       rte_deque_free;
+       rte_deque_get_memsize_elem;
+       rte_deque_init;
+       rte_deque_reset;
+
+       local: *;
+};
diff --git a/lib/meson.build b/lib/meson.build
index 179a272932..127e4dc68c 100644
--- a/lib/meson.build
+++ b/lib/meson.build
@@ -14,6 +14,7 @@ libraries = [
         'argparse',
         'telemetry', # basic info querying
         'eal', # everything depends on eal
+        'deque',
         'ring',
         'rcu', # rcu depends on ring
         'mempool',
@@ -74,6 +75,7 @@ if is_ms_compiler
             'kvargs',
             'telemetry',
             'eal',
+            'deque',
             'ring',
     ]
 endif
-- 
2.25.1

Reply via email to