Add resource reclamation APIs to make it simple for applications
and libraries to integrate rte_rcu library.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>
Reviewed-by: Ola Liljedhal <ola.liljed...@arm.com>
Reviewed-by: Ruifeng Wang <ruifeng.w...@arm.com>
---
 lib/librte_rcu/Makefile            |   2 +-
 lib/librte_rcu/meson.build         |   2 +
 lib/librte_rcu/rcu_qsbr_pvt.h      |  57 +++++++
 lib/librte_rcu/rte_rcu_qsbr.c      | 243 ++++++++++++++++++++++++++++-
 lib/librte_rcu/rte_rcu_qsbr.h      | 188 ++++++++++++++++++++++
 lib/librte_rcu/rte_rcu_version.map |   4 +
 lib/meson.build                    |   6 +-
 7 files changed, 498 insertions(+), 4 deletions(-)
 create mode 100644 lib/librte_rcu/rcu_qsbr_pvt.h

diff --git a/lib/librte_rcu/Makefile b/lib/librte_rcu/Makefile
index c4bb28d77..95f8a57e2 100644
--- a/lib/librte_rcu/Makefile
+++ b/lib/librte_rcu/Makefile
@@ -8,7 +8,7 @@ LIB = librte_rcu.a
 
 CFLAGS += -DALLOW_EXPERIMENTAL_API
 CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
-LDLIBS += -lrte_eal
+LDLIBS += -lrte_eal -lrte_ring
 
 EXPORT_MAP := rte_rcu_version.map
 
diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build
index 62920ba02..e280b29c1 100644
--- a/lib/librte_rcu/meson.build
+++ b/lib/librte_rcu/meson.build
@@ -10,3 +10,5 @@ headers = files('rte_rcu_qsbr.h')
 if cc.get_id() == 'clang' and dpdk_conf.get('RTE_ARCH_64') == false
        ext_deps += cc.find_library('atomic')
 endif
+
+deps += ['ring']
diff --git a/lib/librte_rcu/rcu_qsbr_pvt.h b/lib/librte_rcu/rcu_qsbr_pvt.h
new file mode 100644
index 000000000..413f28587
--- /dev/null
+++ b/lib/librte_rcu/rcu_qsbr_pvt.h
@@ -0,0 +1,57 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright (c) 2019 Arm Limited
+ */
+
+#ifndef _RTE_RCU_QSBR_PVT_H_
+#define _RTE_RCU_QSBR_PVT_H_
+
+/**
+ * This file is private to the RCU library. It should not be included
+ * by the user of this library.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <rte_ring.h>
+#include <rte_ring_elem.h>
+
+#include "rte_rcu_qsbr.h"
+
+/* RTE defer queue structure.
+ * This structure holds the defer queue. The defer queue is used to
+ * hold the deleted entries from the data structure that are not
+ * yet freed.
+ */
+struct rte_rcu_qsbr_dq {
+       struct rte_rcu_qsbr *v; /**< RCU QSBR variable used by this queue.*/
+       struct rte_ring *r;     /**< RCU QSBR defer queue. */
+       uint32_t size;
+       /**< Number of elements in the defer queue */
+       uint32_t esize;
+       /**< Size (in bytes) of data, including the token, stored on the
+        *   defer queue.
+        */
+       uint32_t trigger_reclaim_limit;
+       /**< Trigger automatic reclamation after the defer queue
+        *   has atleast these many resources waiting.
+        */
+       uint32_t max_reclaim_size;
+       /**< Reclaim at the max these many resources during auto
+        *   reclamation.
+        */
+       rte_rcu_qsbr_free_resource_t free_fn;
+       /**< Function to call to free the resource. */
+       void *p;
+       /**< Pointer passed to the free function. Typically, this is the
+        *   pointer to the data structure to which the resource to free
+        *   belongs.
+        */
+};
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_RCU_QSBR_PVT_H_ */
diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c
index 2f3fad776..e8c1e386f 100644
--- a/lib/librte_rcu/rte_rcu_qsbr.c
+++ b/lib/librte_rcu/rte_rcu_qsbr.c
@@ -1,6 +1,6 @@
 /* SPDX-License-Identifier: BSD-3-Clause
  *
- * Copyright (c) 2018 Arm Limited
+ * Copyright (c) 2018-2019 Arm Limited
  */
 
 #include <stdio.h>
@@ -18,8 +18,10 @@
 #include <rte_per_lcore.h>
 #include <rte_lcore.h>
 #include <rte_errno.h>
+#include <rte_ring_elem.h>
 
 #include "rte_rcu_qsbr.h"
+#include "rcu_qsbr_pvt.h"
 
 /* Get the memory size of QSBR variable */
 size_t
@@ -270,6 +272,245 @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
        return 0;
 }
 
+/* Create a queue used to store the data structure elements that can
+ * be freed later. This queue is referred to as 'defer queue'.
+ */
+struct rte_rcu_qsbr_dq *
+rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
+{
+       struct rte_rcu_qsbr_dq *dq;
+       uint32_t qs_fifo_size;
+       unsigned int flags;
+
+       if (params == NULL || params->free_fn == NULL ||
+               params->v == NULL || params->name == NULL ||
+               params->size == 0 || params->esize == 0 ||
+               (params->esize % 4 != 0)) {
+               rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+                       "%s(): Invalid input parameter\n", __func__);
+               rte_errno = EINVAL;
+
+               return NULL;
+       }
+       /* If auto reclamation is configured, reclaim limit
+        * should be a valid value.
+        */
+       if ((params->trigger_reclaim_limit <= params->size) &&
+           (params->max_reclaim_size == 0)) {
+               rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+                       "%s(): Invalid input parameter, size = %u, 
trigger_reclaim_limit = %u, max_reclaim_size = %u\n",
+                       __func__, params->size, params->trigger_reclaim_limit,
+                       params->max_reclaim_size);
+               rte_errno = EINVAL;
+
+               return NULL;
+       }
+
+       dq = rte_zmalloc(NULL, sizeof(struct rte_rcu_qsbr_dq),
+                        RTE_CACHE_LINE_SIZE);
+       if (dq == NULL) {
+               rte_errno = ENOMEM;
+
+               return NULL;
+       }
+
+       /* Decide the flags for the ring.
+        * If MT safety is requested, use RTS for ring enqueue as most
+        * use cases involve dq-enqueue happening on the control plane.
+        * Ring dequeue is always HTS due to the possibility of revert.
+        */
+       flags = RING_F_MP_RTS_ENQ;
+       if (params->flags & RTE_RCU_QSBR_DQ_MT_UNSAFE)
+               flags = RING_F_SP_ENQ;
+       flags |= RING_F_MC_HTS_DEQ;
+       /* round up qs_fifo_size to next power of two that is not less than
+        * max_size.
+        */
+       qs_fifo_size = rte_align32pow2(params->size + 1);
+       /* Add token size to ring element size */
+       dq->r = rte_ring_create_elem(params->name,
+                       __RTE_QSBR_TOKEN_SIZE + params->esize,
+                       qs_fifo_size, SOCKET_ID_ANY, flags);
+       if (dq->r == NULL) {
+               rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+                       "%s(): defer queue create failed\n", __func__);
+               rte_free(dq);
+               return NULL;
+       }
+
+       dq->v = params->v;
+       dq->size = params->size;
+       dq->esize = __RTE_QSBR_TOKEN_SIZE + params->esize;
+       dq->trigger_reclaim_limit = params->trigger_reclaim_limit;
+       dq->max_reclaim_size = params->max_reclaim_size;
+       dq->free_fn = params->free_fn;
+       dq->p = params->p;
+
+       return dq;
+}
+
+/* Enqueue one resource to the defer queue to free after the grace
+ * period is over.
+ */
+int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
+{
+       uint64_t token;
+       uint32_t cur_size, free_size;
+
+       if (dq == NULL || e == NULL) {
+               rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+                       "%s(): Invalid input parameter\n", __func__);
+               rte_errno = EINVAL;
+
+               return 1;
+       }
+
+       /* Start the grace period */
+       token = rte_rcu_qsbr_start(dq->v);
+
+       /* Reclaim resources if the queue is 1/8th full. This helps
+        * the queue from growing too large and allows time for reader
+        * threads to report their quiescent state.
+        */
+       cur_size = rte_ring_count(dq->r);
+       if (cur_size > dq->trigger_reclaim_limit) {
+               rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+                       "%s(): Triggering reclamation\n", __func__);
+               rte_rcu_qsbr_dq_reclaim(dq, dq->max_reclaim_size, NULL, NULL);
+       }
+
+       /* Check if there is space for atleast 1 resource */
+       free_size = rte_ring_free_count(dq->r);
+       if (!free_size) {
+               rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+                       "%s(): Defer queue is full\n", __func__);
+               /* Note that the token generated above is not used.
+                * Other than wasting tokens, it should not cause any
+                * other issues.
+                */
+               rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+                       "%s(): Skipped enqueuing token = %"PRIu64"\n",
+                       __func__, token);
+
+               rte_errno = ENOSPC;
+               return 1;
+       }
+
+       /* Enqueue the token and resource. Generating the token
+        * and enqueuing (token + resource) on the queue is not an
+        * atomic operation. This might result in tokens enqueued
+        * out of order on the queue. So, some tokens might wait
+        * longer than they are required to be reclaimed.
+        */
+       char data[dq->esize];
+       memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);
+       memcpy(data + __RTE_QSBR_TOKEN_SIZE, e,
+               dq->esize - __RTE_QSBR_TOKEN_SIZE);
+       /* Check the status as enqueue might fail since the other thread
+        * might have used up the freed space.
+        * Enqueue uses the configured flags when the DQ was created.
+        */
+       if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) {
+               rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+                       "%s(): Enqueue failed\n", __func__);
+               /* Note that the token generated above is not used.
+                * Other than wasting tokens, it should not cause any
+                * other issues.
+                */
+               rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+                       "%s(): Skipped enqueuing token = %"PRIu64"\n",
+                       __func__, token);
+
+               rte_errno = ENOSPC;
+               return 1;
+       }
+
+       rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+               "%s(): Enqueued token = %"PRIu64"\n", __func__, token);
+
+       return 0;
+}
+
+/* Reclaim resources from the defer queue. */
+int
+rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
+                               unsigned int *freed, unsigned int *pending)
+{
+       uint32_t cnt;
+       uint64_t token;
+
+       if (dq == NULL || n == 0) {
+               rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+                       "%s(): Invalid input parameter\n", __func__);
+               rte_errno = EINVAL;
+
+               return 1;
+       }
+
+       cnt = 0;
+
+       char e[dq->esize];
+       /* Check reader threads quiescent state and reclaim resources */
+       while ((cnt < n) &&
+               (rte_ring_dequeue_bulk_elem_start(dq->r, e,
+                                       dq->esize, 1, NULL) != 0)) {
+               memcpy(&token, e, sizeof(uint64_t));
+
+               /* Reclaim the resource */
+               if (rte_rcu_qsbr_check(dq->v, token, false) != 1) {
+                       rte_ring_dequeue_finish(dq->r, 0);
+                       break;
+               }
+               rte_ring_dequeue_finish(dq->r, 1);
+
+               rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+                       "%s(): Reclaimed token = %"PRIu64"\n",
+                       __func__, *(uint64_t *)e);
+
+               dq->free_fn(dq->p, e + __RTE_QSBR_TOKEN_SIZE);
+
+               cnt++;
+       }
+
+       rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+               "%s(): Reclaimed %u resources\n", __func__, cnt);
+
+       if (freed != NULL)
+               *freed = cnt;
+       if (pending != NULL)
+               *pending = rte_ring_count(dq->r);
+
+       return 0;
+}
+
+/* Delete a defer queue. */
+int
+rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
+{
+       unsigned int pending;
+
+       if (dq == NULL) {
+               rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+                       "%s(): Invalid input parameter\n", __func__);
+               rte_errno = EINVAL;
+
+               return 1;
+       }
+
+       /* Reclaim all the resources */
+       rte_rcu_qsbr_dq_reclaim(dq, ~0, NULL, &pending);
+       if (pending != 0) {
+               rte_errno = EAGAIN;
+
+               return 1;
+       }
+
+       rte_ring_free(dq->r);
+       rte_free(dq);
+
+       return 0;
+}
+
 int rte_rcu_log_type;
 
 RTE_INIT(rte_rcu_register)
diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h
index 0b5585925..213f9b029 100644
--- a/lib/librte_rcu/rte_rcu_qsbr.h
+++ b/lib/librte_rcu/rte_rcu_qsbr.h
@@ -34,6 +34,7 @@ extern "C" {
 #include <rte_lcore.h>
 #include <rte_debug.h>
 #include <rte_atomic.h>
+#include <rte_ring.h>
 
 extern int rte_rcu_log_type;
 
@@ -84,6 +85,7 @@ struct rte_rcu_qsbr_cnt {
 #define __RTE_QSBR_CNT_THR_OFFLINE 0
 #define __RTE_QSBR_CNT_INIT 1
 #define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
+#define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
 
 /* RTE Quiescent State variable structure.
  * This structure has two elements that vary in size based on the
@@ -114,6 +116,84 @@ struct rte_rcu_qsbr {
         */
 } __rte_cache_aligned;
 
+/**
+ * Call back function called to free the resources.
+ *
+ * @param p
+ *   Pointer provided while creating the defer queue
+ * @param e
+ *   Pointer to the resource data stored on the defer queue
+ *
+ * @return
+ *   None
+ */
+typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e);
+
+#define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
+
+/**
+ * Various flags supported.
+ */
+/**< Enqueue and reclaim operations are multi-thread safe by default.
+ *   The call back functions registered to free the resources are
+ *   assumed to be multi-thread safe.
+ *   Set this flag is multi-thread safety is not required.
+ */
+#define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
+
+/**
+ * Parameters used when creating the defer queue.
+ */
+struct rte_rcu_qsbr_dq_parameters {
+       const char *name;
+       /**< Name of the queue. */
+       uint32_t flags;
+       /**< Flags to control API behaviors */
+       uint32_t size;
+       /**< Number of entries in queue. Typically, this will be
+        *   the same as the maximum number of entries supported in the
+        *   lock free data structure.
+        *   Data structures with unbounded number of entries is not
+        *   supported currently.
+        */
+       uint32_t esize;
+       /**< Size (in bytes) of each element in the defer queue.
+        *   This has to be multiple of 4B.
+        */
+       uint32_t trigger_reclaim_limit;
+       /**< Trigger automatic reclamation after the defer queue
+        *   has atleast these many resources waiting. This auto
+        *   reclamation is triggered in rte_rcu_qsbr_dq_enqueue API
+        *   call.
+        *   If this is greater than 'size', auto reclamation is
+        *   not triggered.
+        *   If this is set to 0, auto reclamation is triggered
+        *   in every call to rte_rcu_qsbr_dq_enqueue API.
+        */
+       uint32_t max_reclaim_size;
+       /**< When automatic reclamation is enabled, reclaim at the max
+        *   these many resources. This should contain a valid value, if
+        *   auto reclamation is on. Setting this to 'size' or greater will
+        *   reclaim all possible resources currently on the defer queue.
+        */
+       rte_rcu_qsbr_free_resource_t free_fn;
+       /**< Function to call to free the resource. */
+       void *p;
+       /**< Pointer passed to the free function. Typically, this is the
+        *   pointer to the data structure to which the resource to free
+        *   belongs. This can be NULL.
+        */
+       struct rte_rcu_qsbr *v;
+       /**< RCU QSBR variable to use for this defer queue */
+};
+
+/* RTE defer queue structure.
+ * This structure holds the defer queue. The defer queue is used to
+ * hold the deleted entries from the data structure that are not
+ * yet freed.
+ */
+struct rte_rcu_qsbr_dq;
+
 /**
  * @warning
  * @b EXPERIMENTAL: this API may change without prior notice
@@ -692,6 +772,114 @@ __rte_experimental
 int
 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
 
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Create a queue used to store the data structure elements that can
+ * be freed later. This queue is referred to as 'defer queue'.
+ *
+ * @param params
+ *   Parameters to create a defer queue.
+ * @return
+ *   On success - Valid pointer to defer queue
+ *   On error - NULL
+ *   Possible rte_errno codes are:
+ *   - EINVAL - NULL parameters are passed
+ *   - ENOMEM - Not enough memory
+ */
+__rte_experimental
+struct rte_rcu_qsbr_dq *
+rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Enqueue one resource to the defer queue and start the grace period.
+ * The resource will be freed later after at least one grace period
+ * is over.
+ *
+ * If the defer queue is full, it will attempt to reclaim resources.
+ * It will also reclaim resources at regular intervals to avoid
+ * the defer queue from growing too big.
+ *
+ * Multi-thread safety is provided as the defer queue configuration.
+ * When multi-thread safety is requested, it is possible that the
+ * resources are not stored in their order of deletion. This results
+ * in resources being held in the defer queue longer than they should.
+ *
+ * @param dq
+ *   Defer queue to allocate an entry from.
+ * @param e
+ *   Pointer to resource data to copy to the defer queue. The size of
+ *   the data to copy is equal to the element size provided when the
+ *   defer queue was created.
+ * @return
+ *   On success - 0
+ *   On error - 1 with rte_errno set to
+ *   - EINVAL - NULL parameters are passed
+ *   - ENOSPC - Defer queue is full. This condition can not happen
+ *             if the defer queue size is equal (or larger) than the
+ *             number of elements in the data structure.
+ */
+__rte_experimental
+int
+rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Free quesed resources from the defer queue.
+ *
+ * This API is multi-thread safe.
+ *
+ * @param dq
+ *   Defer queue to free an entry from.
+ * @param n
+ *   Maximum number of resources to free.
+ * @param freed
+ *   Number of resources that were freed.
+ * @param pending
+ *   Number of resources pending on the defer queue. This number might not
+ *   be acurate if multi-thread safety is configured.
+ * @return
+ *   On successful reclamation of at least 1 resource - 0
+ *   On error - 1 with rte_errno set to
+ *   - EINVAL - NULL parameters are passed
+ */
+__rte_experimental
+int
+rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
+                               unsigned int *freed, unsigned int *pending);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Delete a defer queue.
+ *
+ * It tries to reclaim all the resources on the defer queue.
+ * If any of the resources have not completed the grace period
+ * the reclamation stops and returns immediately. The rest of
+ * the resources are not reclaimed and the defer queue is not
+ * freed.
+ *
+ * @param dq
+ *   Defer queue to delete.
+ * @return
+ *   On success - 0
+ *   On error - 1
+ *   Possible rte_errno codes are:
+ *   - EINVAL - NULL parameters are passed
+ *   - EAGAIN - Some of the resources have not completed at least 1 grace
+ *             period, try again.
+ */
+__rte_experimental
+int
+rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/lib/librte_rcu/rte_rcu_version.map 
b/lib/librte_rcu/rte_rcu_version.map
index f8b9ef2ab..dfac88a37 100644
--- a/lib/librte_rcu/rte_rcu_version.map
+++ b/lib/librte_rcu/rte_rcu_version.map
@@ -8,6 +8,10 @@ EXPERIMENTAL {
        rte_rcu_qsbr_synchronize;
        rte_rcu_qsbr_thread_register;
        rte_rcu_qsbr_thread_unregister;
+       rte_rcu_qsbr_dq_create;
+       rte_rcu_qsbr_dq_enqueue;
+       rte_rcu_qsbr_dq_reclaim;
+       rte_rcu_qsbr_dq_delete;
 
        local: *;
 };
diff --git a/lib/meson.build b/lib/meson.build
index 9c3cc55d5..15e91a303 100644
--- a/lib/meson.build
+++ b/lib/meson.build
@@ -11,7 +11,9 @@
 libraries = [
        'kvargs', # eal depends on kvargs
        'eal', # everything depends on eal
-       'ring', 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
+       'ring',
+       'rcu', # rcu depends on ring
+       'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
        'cmdline',
        'metrics', # bitrate/latency stats depends on this
        'hash',    # efd depends on this
@@ -22,7 +24,7 @@ libraries = [
        'gro', 'gso', 'ip_frag', 'jobstats',
        'kni', 'latencystats', 'lpm', 'member',
        'power', 'pdump', 'rawdev',
-       'rcu', 'rib', 'reorder', 'sched', 'security', 'stack', 'vhost',
+       'rib', 'reorder', 'sched', 'security', 'stack', 'vhost',
        # ipsec lib depends on net, crypto and security
        'ipsec',
        #fib lib depends on rib
-- 
2.17.1

Reply via email to