Hi Honnappa,

On 01/10/2019 07:29, Honnappa Nagarahalli wrote:
Add resource reclamation APIs to make it simple for applications
and libraries to integrate rte_rcu library.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>
Reviewed-by: Ola Liljedhal <ola.liljed...@arm.com>
Reviewed-by: Ruifeng Wang <ruifeng.w...@arm.com>
---
  app/test/test_rcu_qsbr.c           | 291 ++++++++++++++++++++++++++++-
  lib/librte_rcu/meson.build         |   2 +
  lib/librte_rcu/rte_rcu_qsbr.c      | 185 ++++++++++++++++++
  lib/librte_rcu/rte_rcu_qsbr.h      | 169 +++++++++++++++++
  lib/librte_rcu/rte_rcu_qsbr_pvt.h  |  46 +++++
  lib/librte_rcu/rte_rcu_version.map |   4 +
  lib/meson.build                    |   6 +-
  7 files changed, 700 insertions(+), 3 deletions(-)
  create mode 100644 lib/librte_rcu/rte_rcu_qsbr_pvt.h

diff --git a/app/test/test_rcu_qsbr.c b/app/test/test_rcu_qsbr.c
index d1b9e46a2..3a6815243 100644
--- a/app/test/test_rcu_qsbr.c
+++ b/app/test/test_rcu_qsbr.c
@@ -1,8 +1,9 @@
  /* SPDX-License-Identifier: BSD-3-Clause
- * Copyright (c) 2018 Arm Limited
+ * Copyright (c) 2019 Arm Limited
   */
#include <stdio.h>
+#include <string.h>
  #include <rte_pause.h>
  #include <rte_rcu_qsbr.h>
  #include <rte_hash.h>
@@ -33,6 +34,7 @@ static uint32_t *keys;
  #define COUNTER_VALUE 4096
  static uint32_t *hash_data[RTE_MAX_LCORE][TOTAL_ENTRY];
  static uint8_t writer_done;
+static uint8_t cb_failed;
static struct rte_rcu_qsbr *t[RTE_MAX_LCORE];
  struct rte_hash *h[RTE_MAX_LCORE];
@@ -582,6 +584,269 @@ test_rcu_qsbr_thread_offline(void)
        return 0;
  }
+static void
+rte_rcu_qsbr_test_free_resource(void *p, void *e)
+{
+       if (p != NULL && e != NULL) {
+               printf("%s: Test failed\n", __func__);
+               cb_failed = 1;
+       }
+}
+
+/*
+ * rte_rcu_qsbr_dq_create: create a queue used to store the data structure
+ * elements that can be freed later. This queue is referred to as 'defer 
queue'.
+ */
+static int
+test_rcu_qsbr_dq_create(void)
+{
+       char rcu_dq_name[RTE_RING_NAMESIZE];
+       struct rte_rcu_qsbr_dq_parameters params;
+       struct rte_rcu_qsbr_dq *dq;
+
+       printf("\nTest rte_rcu_qsbr_dq_create()\n");
+
+       /* Pass invalid parameters */
+       dq = rte_rcu_qsbr_dq_create(NULL);
+       TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
+
+       memset(&params, 0, sizeof(struct rte_rcu_qsbr_dq_parameters));
+       dq = rte_rcu_qsbr_dq_create(&params);
+       TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
+
+       snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU");
+       params.name = rcu_dq_name;
+       dq = rte_rcu_qsbr_dq_create(&params);
+       TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
+
+       params.f = rte_rcu_qsbr_test_free_resource;
+       dq = rte_rcu_qsbr_dq_create(&params);
+       TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
+
+       rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
+       params.v = t[0];
+       dq = rte_rcu_qsbr_dq_create(&params);
+       TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
+
+       params.size = 1;
+       dq = rte_rcu_qsbr_dq_create(&params);
+       TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
+
+       params.esize = 3;
+       dq = rte_rcu_qsbr_dq_create(&params);
+       TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
+
+       /* Pass all valid parameters */
+       params.esize = 16;
+       dq = rte_rcu_qsbr_dq_create(&params);
+       TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params");
+       rte_rcu_qsbr_dq_delete(dq);
+
+       return 0;
+}
+
+/*
+ * rte_rcu_qsbr_dq_enqueue: enqueue one resource to the defer queue,
+ * to be freed later after atleast one grace period is over.
+ */
+static int
+test_rcu_qsbr_dq_enqueue(void)
+{
+       int ret;
+       uint64_t r;
+       char rcu_dq_name[RTE_RING_NAMESIZE];
+       struct rte_rcu_qsbr_dq_parameters params;
+       struct rte_rcu_qsbr_dq *dq;
+
+       printf("\nTest rte_rcu_qsbr_dq_enqueue()\n");
+
+       /* Create a queue with simple parameters */
+       memset(&params, 0, sizeof(struct rte_rcu_qsbr_dq_parameters));
+       snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU");
+       params.name = rcu_dq_name;
+       params.f = rte_rcu_qsbr_test_free_resource;
+       rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
+       params.v = t[0];
+       params.size = 1;
+       params.esize = 16;
+       dq = rte_rcu_qsbr_dq_create(&params);
+       TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params");
+
+       /* Pass invalid parameters */
+       ret = rte_rcu_qsbr_dq_enqueue(NULL, NULL);
+       TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue invalid params");
+
+       ret = rte_rcu_qsbr_dq_enqueue(dq, NULL);
+       TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue invalid params");
+
+       ret = rte_rcu_qsbr_dq_enqueue(NULL, &r);
+       TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue invalid params");
+
+       ret = rte_rcu_qsbr_dq_delete(dq);
+       TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 1), "dq delete valid params");
+
+       return 0;
+}
+
+/*
+ * rte_rcu_qsbr_dq_reclaim: Reclaim resources from the defer queue.
+ */
+static int
+test_rcu_qsbr_dq_reclaim(void)
+{
+       int ret;
+
+       printf("\nTest rte_rcu_qsbr_dq_reclaim()\n");
+
+       /* Pass invalid parameters */
+       ret = rte_rcu_qsbr_dq_reclaim(NULL);
+       TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 1), "dq reclaim invalid params");
+
+       return 0;
+}
+
+/*
+ * rte_rcu_qsbr_dq_delete: Delete a defer queue.
+ */
+static int
+test_rcu_qsbr_dq_delete(void)
+{
+       int ret;
+       char rcu_dq_name[RTE_RING_NAMESIZE];
+       struct rte_rcu_qsbr_dq_parameters params;
+       struct rte_rcu_qsbr_dq *dq;
+
+       printf("\nTest rte_rcu_qsbr_dq_delete()\n");
+
+       /* Pass invalid parameters */
+       ret = rte_rcu_qsbr_dq_delete(NULL);
+       TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 1), "dq delete invalid params");
+
+       memset(&params, 0, sizeof(struct rte_rcu_qsbr_dq_parameters));
+       snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU");
+       params.name = rcu_dq_name;
+       params.f = rte_rcu_qsbr_test_free_resource;
+       rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
+       params.v = t[0];
+       params.size = 1;
+       params.esize = 16;
+       dq = rte_rcu_qsbr_dq_create(&params);
+       TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params");
+       ret = rte_rcu_qsbr_dq_delete(dq);
+       TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0), "dq delete valid params");
+
+       return 0;
+}
+
+/*
+ * rte_rcu_qsbr_dq_enqueue: enqueue one resource to the defer queue,
+ * to be freed later after atleast one grace period is over.
+ */
+static int
+test_rcu_qsbr_dq_functional(int32_t size, int32_t esize)
+{
+       int i, j, ret;
+       char rcu_dq_name[RTE_RING_NAMESIZE];
+       struct rte_rcu_qsbr_dq_parameters params;
+       struct rte_rcu_qsbr_dq *dq;
+       uint64_t *e;
+       uint64_t sc = 200;
+       int max_entries;
+
+       printf("\nTest rte_rcu_qsbr_dq_xxx functional tests()\n");
+       printf("Size = %d, esize = %d\n", size, esize);
+
+       e = (uint64_t *)rte_zmalloc(NULL, esize, RTE_CACHE_LINE_SIZE);
+       if (e == NULL)
+               return 0;
+       cb_failed = 0;
+
+       /* Initialize the RCU variable. No threads are registered */
+       rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
+
+       /* Create a queue with simple parameters */
+       memset(&params, 0, sizeof(struct rte_rcu_qsbr_dq_parameters));
+       snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU");
+       params.name = rcu_dq_name;
+       params.f = rte_rcu_qsbr_test_free_resource;
+       params.v = t[0];
+       params.size = size;
+       params.esize = esize;
+       dq = rte_rcu_qsbr_dq_create(&params);
+       TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params");
+
+       /* Given the size and esize, calculate the maximum number of entries
+        * that can be stored on the defer queue (look at the logic used
+        * in capacity calculation of rte_ring).
+        */
+       max_entries = rte_align32pow2(((esize/8 + 1) * size) + 1);
+       max_entries = (max_entries - 1)/(esize/8 + 1);
+
+       /* Enqueue few counters starting with the value 'sc' */
+       /* The queue size will be rounded up to 2. The enqueue API also
+        * reclaims if the queue size is above certain limit. Since, there
+        * are no threads registered, reclamation succedes. Hence, it should
+        * be possible to enqueue more than the provided queue size.
+        */
+       for (i = 0; i < 10; i++) {
+               ret = rte_rcu_qsbr_dq_enqueue(dq, e);
+               TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0),
+                       "dq enqueue functional");
+               for (j = 0; j < esize/8; j++)
+                       e[j] = sc++;
+       }
+
+       /* Register a thread on the RCU QSBR variable. Reclamation will not
+        * succeed. It should not be possible to enqueue more than the size
+        * number of resources.
+        */
+       rte_rcu_qsbr_thread_register(t[0], 1);
+       rte_rcu_qsbr_thread_online(t[0], 1);
+
+       for (i = 0; i < max_entries; i++) {
+               ret = rte_rcu_qsbr_dq_enqueue(dq, e);
+               TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0),
+                       "dq enqueue functional");
+               for (j = 0; j < esize/8; j++)
+                       e[j] = sc++;
+       }
+
+       /* Enqueue fails as queue is full */
+       ret = rte_rcu_qsbr_dq_enqueue(dq, e);
+       TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue functional");
+
+       /* Delete should fail as there are elements in defer queue which
+        * cannot be reclaimed.
+        */
+       ret = rte_rcu_qsbr_dq_delete(dq);
+       TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq delete valid params");
+
+       /* Report quiescent state, enqueue should succeed */
+       rte_rcu_qsbr_quiescent(t[0], 1);
+       for (i = 0; i < max_entries; i++) {
+               ret = rte_rcu_qsbr_dq_enqueue(dq, e);
+               TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0),
+                       "dq enqueue functional");
+               for (j = 0; j < esize/8; j++)
+                       e[j] = sc++;
+       }
+
+       /* Queue is full */
+       ret = rte_rcu_qsbr_dq_enqueue(dq, e);
+       TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue functional");
+
+       /* Report quiescent state, delete should succeed */
+       rte_rcu_qsbr_quiescent(t[0], 1);
+       ret = rte_rcu_qsbr_dq_delete(dq);
+       TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0), "dq delete valid params");
+
+       /* Validate that call back function did not return any error */
+       TEST_RCU_QSBR_RETURN_IF_ERROR((cb_failed == 1), "CB failed");
+
+       rte_free(e);
+       return 0;
+}
+
  /*
   * rte_rcu_qsbr_dump: Dump status of a single QS variable to a file
   */
@@ -1025,6 +1290,18 @@ test_rcu_qsbr_main(void)
        if (test_rcu_qsbr_thread_offline() < 0)
                goto test_fail;
+ if (test_rcu_qsbr_dq_create() < 0)
+               goto test_fail;
+
+       if (test_rcu_qsbr_dq_reclaim() < 0)
+               goto test_fail;
+
+       if (test_rcu_qsbr_dq_delete() < 0)
+               goto test_fail;
+
+       if (test_rcu_qsbr_dq_enqueue() < 0)
+               goto test_fail;
+
        printf("\nFunctional tests\n");
if (test_rcu_qsbr_sw_sv_3qs() < 0)
@@ -1033,6 +1310,18 @@ test_rcu_qsbr_main(void)
        if (test_rcu_qsbr_mw_mv_mqs() < 0)
                goto test_fail;
+ if (test_rcu_qsbr_dq_functional(1, 8) < 0)
+               goto test_fail;
+
+       if (test_rcu_qsbr_dq_functional(2, 8) < 0)
+               goto test_fail;
+
+       if (test_rcu_qsbr_dq_functional(303, 16) < 0)
+               goto test_fail;
+
+       if (test_rcu_qsbr_dq_functional(7, 128) < 0)
+               goto test_fail;
+
        free_rcu();
printf("\n");
diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build
index 62920ba02..e280b29c1 100644
--- a/lib/librte_rcu/meson.build
+++ b/lib/librte_rcu/meson.build
@@ -10,3 +10,5 @@ headers = files('rte_rcu_qsbr.h')
  if cc.get_id() == 'clang' and dpdk_conf.get('RTE_ARCH_64') == false
        ext_deps += cc.find_library('atomic')
  endif
+
+deps += ['ring']
diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c
index ce7f93dd3..76814f50b 100644
--- a/lib/librte_rcu/rte_rcu_qsbr.c
+++ b/lib/librte_rcu/rte_rcu_qsbr.c
@@ -21,6 +21,7 @@
  #include <rte_errno.h>
#include "rte_rcu_qsbr.h"
+#include "rte_rcu_qsbr_pvt.h"
/* Get the memory size of QSBR variable */
  size_t
@@ -267,6 +268,190 @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
        return 0;
  }
+/* Create a queue used to store the data structure elements that can
+ * be freed later. This queue is referred to as 'defer queue'.
+ */
+struct rte_rcu_qsbr_dq *
+rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
+{
+       struct rte_rcu_qsbr_dq *dq;
+       uint32_t qs_fifo_size;
+
+       if (params == NULL || params->f == NULL ||
+               params->v == NULL || params->name == NULL ||
+               params->size == 0 || params->esize == 0 ||
+               (params->esize % 8 != 0)) {
+               rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+                       "%s(): Invalid input parameter\n", __func__);
+               rte_errno = EINVAL;
+
+               return NULL;
+       }
+
+       dq = rte_zmalloc(NULL,
+               (sizeof(struct rte_rcu_qsbr_dq) + params->esize),
+               RTE_CACHE_LINE_SIZE);
+       if (dq == NULL) {
+               rte_errno = ENOMEM;
+
+               return NULL;
+       }
+
+       /* round up qs_fifo_size to next power of two that is not less than
+        * max_size.
+        */
+       qs_fifo_size = rte_align32pow2((((params->esize/8) + 1)
+                                       * params->size) + 1);
+       dq->r = rte_ring_create(params->name, qs_fifo_size,
+                                       SOCKET_ID_ANY, 0);
+       if (dq->r == NULL) {
+               rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+                       "%s(): defer queue create failed\n", __func__);
+               rte_free(dq);
+               return NULL;
+       }
+
+       dq->v = params->v;
+       dq->size = params->size;
+       dq->esize = params->esize;
+       dq->f = params->f;
+       dq->p = params->p;
+
+       return dq;
+}
+
+/* Enqueue one resource to the defer queue to free after the grace
+ * period is over.
+ */
+int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
+{
+       uint64_t token;
+       uint64_t *tmp;
+       uint32_t i;
+       uint32_t cur_size, free_size;
+
+       if (dq == NULL || e == NULL) {
+               rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+                       "%s(): Invalid input parameter\n", __func__);
+               rte_errno = EINVAL;
+
+               return 1;
+       }
+
+       /* Start the grace period */
+       token = rte_rcu_qsbr_start(dq->v);
+
+       /* Reclaim resources if the queue is 1/8th full. This helps
+        * the queue from growing too large and allows time for reader
+        * threads to report their quiescent state.
+        */
+       cur_size = rte_ring_count(dq->r) / (dq->esize/8 + 1);
+       if (cur_size > (dq->size >> RTE_RCU_QSBR_AUTO_RECLAIM_LIMIT)) {
+               rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+                       "%s(): Triggering reclamation\n", __func__);
+               rte_rcu_qsbr_dq_reclaim(dq);
+       }

There are two problems I see:

1. rte_rcu_qsbr_dq_reclaim() reclaims only 1/16 of the defer queue while it triggers on 1/8. This means that there will always be 1/16 of non reclaimed entries in the queue.

2. Number of entries to reclaim depend on dq->size. So, rte_rcu_qsbr_dq_reclaim() could take a lot of cycles. For LPM library this means that rte_lpm_delete() sometimes takes a long time.

So, my suggestions here would be

- trigger rte_rcu_qsbr_dq_reclaim() with every enqueue

- reclaim small amount of entries (could be configurable of creation time)

- provide API to trigger reclaim from the application manually.

+
+       /* Check if there is space for atleast for 1 resource */
+       free_size = rte_ring_free_count(dq->r) / (dq->esize/8 + 1);
+       if (!free_size) {
+               rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+                       "%s(): Defer queue is full\n", __func__);
+               rte_errno = ENOSPC;
+               return 1;
+       }
+
+       /* Enqueue the resource */
+       rte_ring_sp_enqueue(dq->r, (void *)(uintptr_t)token);
+
+       /* The resource to enqueue needs to be a multiple of 64b
+        * due to the limitation of the rte_ring implementation.
+        */
+       for (i = 0, tmp = (uint64_t *)e; i < dq->esize/8; i++, tmp++)
+               rte_ring_sp_enqueue(dq->r, (void *)(uintptr_t)*tmp);
+
+       return 0;
+}
+
+/* Reclaim resources from the defer queue. */
+int
+rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq)
+{
+       uint32_t max_cnt;
+       uint32_t cnt;
+       void *token;
+       uint64_t *tmp;
+       uint32_t i;
+
+       if (dq == NULL) {
+               rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+                       "%s(): Invalid input parameter\n", __func__);
+               rte_errno = EINVAL;
+
+               return 1;
+       }
+
+       /* Anything to reclaim? */
+       if (rte_ring_count(dq->r) == 0)
+               return 0;
+
+       /* Reclaim at the max 1/16th the total number of entries. */
+       max_cnt = dq->size >> RTE_RCU_QSBR_MAX_RECLAIM_LIMIT;
+       max_cnt = (max_cnt == 0) ? dq->size : max_cnt;
+       cnt = 0;
+
+       /* Check reader threads quiescent state and reclaim resources */
+       while ((cnt < max_cnt) && (rte_ring_peek(dq->r, &token) == 0) &&
+               (rte_rcu_qsbr_check(dq->v, (uint64_t)((uintptr_t)token), false)
+                       == 1)) {
+               (void)rte_ring_sc_dequeue(dq->r, &token);
+               /* The resource to dequeue needs to be a multiple of 64b
+                * due to the limitation of the rte_ring implementation.
+                */
+               for (i = 0, tmp = (uint64_t *)dq->e; i < dq->esize/8;
+                       i++, tmp++)
+                       (void)rte_ring_sc_dequeue(dq->r,
+                                       (void *)(uintptr_t)tmp);
+               dq->f(dq->p, dq->e);
+
+               cnt++;
+       }
+
+       rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+               "%s(): Reclaimed %u resources\n", __func__, cnt);
+
+       if (cnt == 0) {
+               /* No resources were reclaimed */
+               rte_errno = EAGAIN;
+               return 1;
+       }
+
+       return 0;
+}
+
+/* Delete a defer queue. */
+int
+rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
+{
+       if (dq == NULL) {
+               rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+                       "%s(): Invalid input parameter\n", __func__);
+               rte_errno = EINVAL;
+
+               return 1;
+       }
+
+       /* Reclaim all the resources */
+       if (rte_rcu_qsbr_dq_reclaim(dq) != 0)
+               /* Error number is already set by the reclaim API */
+               return 1;
+
+       rte_ring_free(dq->r);
+       rte_free(dq);
+
+       return 0;
+}
+
  int rte_rcu_log_type;
RTE_INIT(rte_rcu_register)
diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h
index c80f15c00..185d4b50a 100644
--- a/lib/librte_rcu/rte_rcu_qsbr.h
+++ b/lib/librte_rcu/rte_rcu_qsbr.h
@@ -34,6 +34,7 @@ extern "C" {
  #include <rte_lcore.h>
  #include <rte_debug.h>
  #include <rte_atomic.h>
+#include <rte_ring.h>
extern int rte_rcu_log_type; @@ -109,6 +110,67 @@ struct rte_rcu_qsbr {
         */
  } __rte_cache_aligned;
+/**
+ * Call back function called to free the resources.
+ *
+ * @param p
+ *   Pointer provided while creating the defer queue
+ * @param e
+ *   Pointer to the resource data stored on the defer queue
+ *
+ * @return
+ *   None
+ */
+typedef void (*rte_rcu_qsbr_free_resource)(void *p, void *e);
+
+#define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
+
+/**
+ *  Trigger automatic reclamation after 1/8th the defer queue is full.
+ */
+#define RTE_RCU_QSBR_AUTO_RECLAIM_LIMIT 3
+
+/**
+ *  Reclaim at the max 1/16th the total number of resources.
+ */
+#define RTE_RCU_QSBR_MAX_RECLAIM_LIMIT 4
+
+/**
+ * Parameters used when creating the defer queue.
+ */
+struct rte_rcu_qsbr_dq_parameters {
+       const char *name;
+       /**< Name of the queue. */
+       uint32_t size;
+       /**< Number of entries in queue. Typically, this will be
+        *   the same as the maximum number of entries supported in the
+        *   lock free data structure.
+        *   Data structures with unbounded number of entries is not
+        *   supported currently.
+        */
+       uint32_t esize;
+       /**< Size (in bytes) of each element in the defer queue.
+        *   This has to be multiple of 8B as the rte_ring APIs
+        *   support 8B element sizes only.
+        */
+       rte_rcu_qsbr_free_resource f;
+       /**< Function to call to free the resource. */
+       void *p;
+       /**< Pointer passed to the free function. Typically, this is the
+        *   pointer to the data structure to which the resource to free
+        *   belongs. This can be NULL.
+        */
+       struct rte_rcu_qsbr *v;
+       /**< RCU QSBR variable to use for this defer queue */
+};
+
+/* RTE defer queue structure.
+ * This structure holds the defer queue. The defer queue is used to
+ * hold the deleted entries from the data structure that are not
+ * yet freed.
+ */
+struct rte_rcu_qsbr_dq;
+
  /**
   * @warning
   * @b EXPERIMENTAL: this API may change without prior notice
@@ -648,6 +710,113 @@ __rte_experimental
  int
  rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Create a queue used to store the data structure elements that can
+ * be freed later. This queue is referred to as 'defer queue'.
+ *
+ * @param params
+ *   Parameters to create a defer queue.
+ * @return
+ *   On success - Valid pointer to defer queue
+ *   On error - NULL
+ *   Possible rte_errno codes are:
+ *   - EINVAL - NULL parameters are passed
+ *   - ENOMEM - Not enough memory
+ */
+__rte_experimental
+struct rte_rcu_qsbr_dq *
+rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Enqueue one resource to the defer queue and start the grace period.
+ * The resource will be freed later after at least one grace period
+ * is over.
+ *
+ * If the defer queue is full, it will attempt to reclaim resources.
+ * It will also reclaim resources at regular intervals to avoid
+ * the defer queue from growing too big.
+ *
+ * This API is not multi-thread safe. It is expected that the caller
+ * provides multi-thread safety by locking a mutex or some other means.
+ *
+ * A lock free multi-thread writer algorithm could achieve multi-thread
+ * safety by creating and using one defer queue per thread.
+ *
+ * @param dq
+ *   Defer queue to allocate an entry from.
+ * @param e
+ *   Pointer to resource data to copy to the defer queue. The size of
+ *   the data to copy is equal to the element size provided when the
+ *   defer queue was created.
+ * @return
+ *   On success - 0
+ *   On error - 1 with rte_errno set to
+ *   - EINVAL - NULL parameters are passed
+ *   - ENOSPC - Defer queue is full. This condition can not happen
+ *             if the defer queue size is equal (or larger) than the
+ *             number of elements in the data structure.
+ */
+__rte_experimental
+int
+rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Reclaim resources from the defer queue.
+ *
+ * This API is not multi-thread safe. It is expected that the caller
+ * provides multi-thread safety by locking a mutex or some other means.
+ *
+ * A lock free multi-thread writer algorithm could achieve multi-thread
+ * safety by creating and using one defer queue per thread.
+ *
+ * @param dq
+ *   Defer queue to reclaim an entry from.
+ * @return
+ *   On successful reclamation of at least 1 resource - 0
+ *   On error - 1 with rte_errno set to
+ *   - EINVAL - NULL parameters are passed
+ *   - EAGAIN - None of the resources have completed at least 1 grace period,
+ *             try again.
+ */
+__rte_experimental
+int
+rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Delete a defer queue.
+ *
+ * It tries to reclaim all the resources on the defer queue.
+ * If any of the resources have not completed the grace period
+ * the reclamation stops and returns immediately. The rest of
+ * the resources are not reclaimed and the defer queue is not
+ * freed.
+ *
+ * @param dq
+ *   Defer queue to delete.
+ * @return
+ *   On success - 0
+ *   On error - 1
+ *   Possible rte_errno codes are:
+ *   - EINVAL - NULL parameters are passed
+ *   - EAGAIN - Some of the resources have not completed at least 1 grace
+ *             period, try again.
+ */
+__rte_experimental
+int
+rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
+
  #ifdef __cplusplus
  }
  #endif
diff --git a/lib/librte_rcu/rte_rcu_qsbr_pvt.h 
b/lib/librte_rcu/rte_rcu_qsbr_pvt.h
new file mode 100644
index 000000000..2122bc36a
--- /dev/null
+++ b/lib/librte_rcu/rte_rcu_qsbr_pvt.h
@@ -0,0 +1,46 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright (c) 2019 Arm Limited
+ */
+
+#ifndef _RTE_RCU_QSBR_PVT_H_
+#define _RTE_RCU_QSBR_PVT_H_
+
+/**
+ * This file is private to the RCU library. It should not be included
+ * by the user of this library.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "rte_rcu_qsbr.h"
+
+/* RTE defer queue structure.
+ * This structure holds the defer queue. The defer queue is used to
+ * hold the deleted entries from the data structure that are not
+ * yet freed.
+ */
+struct rte_rcu_qsbr_dq {
+       struct rte_rcu_qsbr *v; /**< RCU QSBR variable used by this queue.*/
+       struct rte_ring *r;     /**< RCU QSBR defer queue. */
+       uint32_t size;
+       /**< Number of elements in the defer queue */
+       uint32_t esize;
+       /**< Size (in bytes) of data stored on the defer queue */
+       rte_rcu_qsbr_free_resource f;
+       /**< Function to call to free the resource. */
+       void *p;
+       /**< Pointer passed to the free function. Typically, this is the
+        *   pointer to the data structure to which the resource to free
+        *   belongs.
+        */
+       char e[0];
+       /**< Temporary storage to copy the defer queue element. */
+};
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_RCU_QSBR_PVT_H_ */
diff --git a/lib/librte_rcu/rte_rcu_version.map 
b/lib/librte_rcu/rte_rcu_version.map
index f8b9ef2ab..dfac88a37 100644
--- a/lib/librte_rcu/rte_rcu_version.map
+++ b/lib/librte_rcu/rte_rcu_version.map
@@ -8,6 +8,10 @@ EXPERIMENTAL {
        rte_rcu_qsbr_synchronize;
        rte_rcu_qsbr_thread_register;
        rte_rcu_qsbr_thread_unregister;
+       rte_rcu_qsbr_dq_create;
+       rte_rcu_qsbr_dq_enqueue;
+       rte_rcu_qsbr_dq_reclaim;
+       rte_rcu_qsbr_dq_delete;
local: *;
  };
diff --git a/lib/meson.build b/lib/meson.build
index e5ff83893..0e1be8407 100644
--- a/lib/meson.build
+++ b/lib/meson.build
@@ -11,7 +11,9 @@
  libraries = [
        'kvargs', # eal depends on kvargs
        'eal', # everything depends on eal
-       'ring', 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
+       'ring',
+       'rcu', # rcu depends on ring
+       'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
        'cmdline',
        'metrics', # bitrate/latency stats depends on this
        'hash',    # efd depends on this
@@ -22,7 +24,7 @@ libraries = [
        'gro', 'gso', 'ip_frag', 'jobstats',
        'kni', 'latencystats', 'lpm', 'member',
        'power', 'pdump', 'rawdev',
-       'rcu', 'reorder', 'sched', 'security', 'stack', 'vhost',
+       'reorder', 'sched', 'security', 'stack', 'vhost',
        # ipsec lib depends on net, crypto and security
        'ipsec',
        # add pkt framework libs which use other libs from above

--
Regards,
Vladimir

Reply via email to