Hi Honnappa,
On 01/10/2019 07:29, Honnappa Nagarahalli wrote:
Add resource reclamation APIs to make it simple for applications
and libraries to integrate rte_rcu library.
Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>
Reviewed-by: Ola Liljedhal <ola.liljed...@arm.com>
Reviewed-by: Ruifeng Wang <ruifeng.w...@arm.com>
---
app/test/test_rcu_qsbr.c | 291 ++++++++++++++++++++++++++++-
lib/librte_rcu/meson.build | 2 +
lib/librte_rcu/rte_rcu_qsbr.c | 185 ++++++++++++++++++
lib/librte_rcu/rte_rcu_qsbr.h | 169 +++++++++++++++++
lib/librte_rcu/rte_rcu_qsbr_pvt.h | 46 +++++
lib/librte_rcu/rte_rcu_version.map | 4 +
lib/meson.build | 6 +-
7 files changed, 700 insertions(+), 3 deletions(-)
create mode 100644 lib/librte_rcu/rte_rcu_qsbr_pvt.h
There are compilation errors when building DPDK as a shared library.
I think you need something like:
--- a/lib/librte_rcu/Makefile
+++ b/lib/librte_rcu/Makefile
@@ -8,7 +8,7 @@ LIB = librte_rcu.a
CFLAGS += -DALLOW_EXPERIMENTAL_API
CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
-LDLIBS += -lrte_eal
+LDLIBS += -lrte_eal -lrte_ring
diff --git a/app/test/test_rcu_qsbr.c b/app/test/test_rcu_qsbr.c
index d1b9e46a2..3a6815243 100644
--- a/app/test/test_rcu_qsbr.c
+++ b/app/test/test_rcu_qsbr.c
I think it's better to split unittests patches and the library patches
@@ -1,8 +1,9 @@
/* SPDX-License-Identifier: BSD-3-Clause
- * Copyright (c) 2018 Arm Limited
+ * Copyright (c) 2019 Arm Limited
*/
#include <stdio.h>
+#include <string.h>
#include <rte_pause.h>
#include <rte_rcu_qsbr.h>
#include <rte_hash.h>
@@ -33,6 +34,7 @@ static uint32_t *keys;
#define COUNTER_VALUE 4096
static uint32_t *hash_data[RTE_MAX_LCORE][TOTAL_ENTRY];
static uint8_t writer_done;
+static uint8_t cb_failed;
static struct rte_rcu_qsbr *t[RTE_MAX_LCORE];
struct rte_hash *h[RTE_MAX_LCORE];
@@ -582,6 +584,269 @@ test_rcu_qsbr_thread_offline(void)
return 0;
}
+static void
+rte_rcu_qsbr_test_free_resource(void *p, void *e)
This function is not a part of DPDK API so it's better to name it like
test_rcu_qsbr_free_resource().
+{
+ if (p != NULL && e != NULL) {
+ printf("%s: Test failed\n", __func__);
+ cb_failed = 1;
+ }
+}
+
+/*
+ * rte_rcu_qsbr_dq_create: create a queue used to store the data structure
+ * elements that can be freed later. This queue is referred to as 'defer
queue'.
+ */
+static int
+test_rcu_qsbr_dq_create(void)
+{
+ char rcu_dq_name[RTE_RING_NAMESIZE];
+ struct rte_rcu_qsbr_dq_parameters params;
+ struct rte_rcu_qsbr_dq *dq;
+
+ printf("\nTest rte_rcu_qsbr_dq_create()\n");
+
+ /* Pass invalid parameters */
+ dq = rte_rcu_qsbr_dq_create(NULL);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
+
+ memset(¶ms, 0, sizeof(struct rte_rcu_qsbr_dq_parameters));
+ dq = rte_rcu_qsbr_dq_create(¶ms);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
+
+ snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU");
+ params.name = rcu_dq_name;
+ dq = rte_rcu_qsbr_dq_create(¶ms);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
+
+ params.f = rte_rcu_qsbr_test_free_resource;
+ dq = rte_rcu_qsbr_dq_create(¶ms);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
+
+ rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
+ params.v = t[0];
+ dq = rte_rcu_qsbr_dq_create(¶ms);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
+
+ params.size = 1;
+ dq = rte_rcu_qsbr_dq_create(¶ms);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
+
+ params.esize = 3;
+ dq = rte_rcu_qsbr_dq_create(¶ms);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
+
+ /* Pass all valid parameters */
+ params.esize = 16;
+ dq = rte_rcu_qsbr_dq_create(¶ms);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params");
+ rte_rcu_qsbr_dq_delete(dq);
+
+ return 0;
+}
+
+/*
+ * rte_rcu_qsbr_dq_enqueue: enqueue one resource to the defer queue,
+ * to be freed later after atleast one grace period is over.
+ */
+static int
+test_rcu_qsbr_dq_enqueue(void)
+{
+ int ret;
+ uint64_t r;
+ char rcu_dq_name[RTE_RING_NAMESIZE];
+ struct rte_rcu_qsbr_dq_parameters params;
+ struct rte_rcu_qsbr_dq *dq;
+
+ printf("\nTest rte_rcu_qsbr_dq_enqueue()\n");
+
+ /* Create a queue with simple parameters */
+ memset(¶ms, 0, sizeof(struct rte_rcu_qsbr_dq_parameters));
+ snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU");
+ params.name = rcu_dq_name;
+ params.f = rte_rcu_qsbr_test_free_resource;
+ rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
+ params.v = t[0];
+ params.size = 1;
+ params.esize = 16;
+ dq = rte_rcu_qsbr_dq_create(¶ms);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params");
+
+ /* Pass invalid parameters */
+ ret = rte_rcu_qsbr_dq_enqueue(NULL, NULL);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue invalid params");
+
+ ret = rte_rcu_qsbr_dq_enqueue(dq, NULL);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue invalid params");
+
+ ret = rte_rcu_qsbr_dq_enqueue(NULL, &r);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue invalid params");
+
+ ret = rte_rcu_qsbr_dq_delete(dq);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 1), "dq delete valid params");
+
+ return 0;
+}
+
+/*
+ * rte_rcu_qsbr_dq_reclaim: Reclaim resources from the defer queue.
+ */
+static int
+test_rcu_qsbr_dq_reclaim(void)
+{
+ int ret;
+
+ printf("\nTest rte_rcu_qsbr_dq_reclaim()\n");
+
+ /* Pass invalid parameters */
+ ret = rte_rcu_qsbr_dq_reclaim(NULL);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 1), "dq reclaim invalid params");
+
+ return 0;
+}
+
+/*
+ * rte_rcu_qsbr_dq_delete: Delete a defer queue.
+ */
+static int
+test_rcu_qsbr_dq_delete(void)
+{
+ int ret;
+ char rcu_dq_name[RTE_RING_NAMESIZE];
+ struct rte_rcu_qsbr_dq_parameters params;
+ struct rte_rcu_qsbr_dq *dq;
+
+ printf("\nTest rte_rcu_qsbr_dq_delete()\n");
+
+ /* Pass invalid parameters */
+ ret = rte_rcu_qsbr_dq_delete(NULL);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 1), "dq delete invalid params");
+
+ memset(¶ms, 0, sizeof(struct rte_rcu_qsbr_dq_parameters));
+ snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU");
+ params.name = rcu_dq_name;
+ params.f = rte_rcu_qsbr_test_free_resource;
+ rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
+ params.v = t[0];
+ params.size = 1;
+ params.esize = 16;
+ dq = rte_rcu_qsbr_dq_create(¶ms);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params");
+ ret = rte_rcu_qsbr_dq_delete(dq);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0), "dq delete valid params");
+
+ return 0;
+}
+
+/*
+ * rte_rcu_qsbr_dq_enqueue: enqueue one resource to the defer queue,
+ * to be freed later after atleast one grace period is over.
+ */
+static int
+test_rcu_qsbr_dq_functional(int32_t size, int32_t esize)
+{
+ int i, j, ret;
+ char rcu_dq_name[RTE_RING_NAMESIZE];
+ struct rte_rcu_qsbr_dq_parameters params;
+ struct rte_rcu_qsbr_dq *dq;
+ uint64_t *e;
+ uint64_t sc = 200;
+ int max_entries;
+
+ printf("\nTest rte_rcu_qsbr_dq_xxx functional tests()\n");
+ printf("Size = %d, esize = %d\n", size, esize);
+
+ e = (uint64_t *)rte_zmalloc(NULL, esize, RTE_CACHE_LINE_SIZE);
+ if (e == NULL)
+ return 0;
+ cb_failed = 0;
+
+ /* Initialize the RCU variable. No threads are registered */
+ rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
+
+ /* Create a queue with simple parameters */
+ memset(¶ms, 0, sizeof(struct rte_rcu_qsbr_dq_parameters));
+ snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU");
+ params.name = rcu_dq_name;
+ params.f = rte_rcu_qsbr_test_free_resource;
+ params.v = t[0];
+ params.size = size;
+ params.esize = esize;
+ dq = rte_rcu_qsbr_dq_create(¶ms);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params");
+
+ /* Given the size and esize, calculate the maximum number of entries
+ * that can be stored on the defer queue (look at the logic used
+ * in capacity calculation of rte_ring).
+ */
+ max_entries = rte_align32pow2(((esize/8 + 1) * size) + 1);
+ max_entries = (max_entries - 1)/(esize/8 + 1);
+
+ /* Enqueue few counters starting with the value 'sc' */
+ /* The queue size will be rounded up to 2. The enqueue API also
+ * reclaims if the queue size is above certain limit. Since, there
+ * are no threads registered, reclamation succedes. Hence, it should
+ * be possible to enqueue more than the provided queue size.
+ */
+ for (i = 0; i < 10; i++) {
+ ret = rte_rcu_qsbr_dq_enqueue(dq, e);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0),
+ "dq enqueue functional");
+ for (j = 0; j < esize/8; j++)
+ e[j] = sc++;
+ }
+
+ /* Register a thread on the RCU QSBR variable. Reclamation will not
+ * succeed. It should not be possible to enqueue more than the size
+ * number of resources.
+ */
+ rte_rcu_qsbr_thread_register(t[0], 1);
+ rte_rcu_qsbr_thread_online(t[0], 1);
+
+ for (i = 0; i < max_entries; i++) {
+ ret = rte_rcu_qsbr_dq_enqueue(dq, e);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0),
+ "dq enqueue functional");
+ for (j = 0; j < esize/8; j++)
+ e[j] = sc++;
+ }
+
+ /* Enqueue fails as queue is full */
+ ret = rte_rcu_qsbr_dq_enqueue(dq, e);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue functional");
+
+ /* Delete should fail as there are elements in defer queue which
+ * cannot be reclaimed.
+ */
+ ret = rte_rcu_qsbr_dq_delete(dq);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq delete valid params");
+
+ /* Report quiescent state, enqueue should succeed */
+ rte_rcu_qsbr_quiescent(t[0], 1);
+ for (i = 0; i < max_entries; i++) {
+ ret = rte_rcu_qsbr_dq_enqueue(dq, e);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0),
+ "dq enqueue functional");
+ for (j = 0; j < esize/8; j++)
+ e[j] = sc++;
+ }
+
+ /* Queue is full */
+ ret = rte_rcu_qsbr_dq_enqueue(dq, e);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue functional");
+
+ /* Report quiescent state, delete should succeed */
+ rte_rcu_qsbr_quiescent(t[0], 1);
+ ret = rte_rcu_qsbr_dq_delete(dq);
+ TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0), "dq delete valid params");
+
+ /* Validate that call back function did not return any error */
+ TEST_RCU_QSBR_RETURN_IF_ERROR((cb_failed == 1), "CB failed");
+
+ rte_free(e);
+ return 0;
+}
+
/*
* rte_rcu_qsbr_dump: Dump status of a single QS variable to a file
*/
@@ -1025,6 +1290,18 @@ test_rcu_qsbr_main(void)
if (test_rcu_qsbr_thread_offline() < 0)
goto test_fail;
+ if (test_rcu_qsbr_dq_create() < 0)
+ goto test_fail;
+
+ if (test_rcu_qsbr_dq_reclaim() < 0)
+ goto test_fail;
+
+ if (test_rcu_qsbr_dq_delete() < 0)
+ goto test_fail;
+
+ if (test_rcu_qsbr_dq_enqueue() < 0)
+ goto test_fail;
+
printf("\nFunctional tests\n");
if (test_rcu_qsbr_sw_sv_3qs() < 0)
@@ -1033,6 +1310,18 @@ test_rcu_qsbr_main(void)
if (test_rcu_qsbr_mw_mv_mqs() < 0)
goto test_fail;
+ if (test_rcu_qsbr_dq_functional(1, 8) < 0)
+ goto test_fail;
+
+ if (test_rcu_qsbr_dq_functional(2, 8) < 0)
+ goto test_fail;
+
+ if (test_rcu_qsbr_dq_functional(303, 16) < 0)
+ goto test_fail;
+
+ if (test_rcu_qsbr_dq_functional(7, 128) < 0)
+ goto test_fail;
+
free_rcu();
printf("\n");
diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build
index 62920ba02..e280b29c1 100644
--- a/lib/librte_rcu/meson.build
+++ b/lib/librte_rcu/meson.build
@@ -10,3 +10,5 @@ headers = files('rte_rcu_qsbr.h')
if cc.get_id() == 'clang' and dpdk_conf.get('RTE_ARCH_64') == false
ext_deps += cc.find_library('atomic')
endif
+
+deps += ['ring']
diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c
index ce7f93dd3..76814f50b 100644
--- a/lib/librte_rcu/rte_rcu_qsbr.c
+++ b/lib/librte_rcu/rte_rcu_qsbr.c
@@ -21,6 +21,7 @@
#include <rte_errno.h>
#include "rte_rcu_qsbr.h"
+#include "rte_rcu_qsbr_pvt.h"
/* Get the memory size of QSBR variable */
size_t
@@ -267,6 +268,190 @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
return 0;
}
+/* Create a queue used to store the data structure elements that can
+ * be freed later. This queue is referred to as 'defer queue'.
+ */
+struct rte_rcu_qsbr_dq *
+rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
+{
+ struct rte_rcu_qsbr_dq *dq;
+ uint32_t qs_fifo_size;
+
+ if (params == NULL || params->f == NULL ||
+ params->v == NULL || params->name == NULL ||
+ params->size == 0 || params->esize == 0 ||
+ (params->esize % 8 != 0)) {
+ rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+ "%s(): Invalid input parameter\n", __func__);
+ rte_errno = EINVAL;
+
+ return NULL;
+ }
+
+ dq = rte_zmalloc(NULL,
+ (sizeof(struct rte_rcu_qsbr_dq) + params->esize),
+ RTE_CACHE_LINE_SIZE);
+ if (dq == NULL) {
+ rte_errno = ENOMEM;
+
+ return NULL;
+ }
+
+ /* round up qs_fifo_size to next power of two that is not less than
+ * max_size.
+ */
+ qs_fifo_size = rte_align32pow2((((params->esize/8) + 1)
+ * params->size) + 1);
+ dq->r = rte_ring_create(params->name, qs_fifo_size,
+ SOCKET_ID_ANY, 0);
+ if (dq->r == NULL) {
+ rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+ "%s(): defer queue create failed\n", __func__);
+ rte_free(dq);
+ return NULL;
+ }
+
+ dq->v = params->v;
+ dq->size = params->size;
+ dq->esize = params->esize;
+ dq->f = params->f;
+ dq->p = params->p;
+
+ return dq;
+}
+
+/* Enqueue one resource to the defer queue to free after the grace
+ * period is over.
+ */
+int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
+{
+ uint64_t token;
+ uint64_t *tmp;
+ uint32_t i;
+ uint32_t cur_size, free_size;
+
+ if (dq == NULL || e == NULL) {
+ rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+ "%s(): Invalid input parameter\n", __func__);
+ rte_errno = EINVAL;
+
+ return 1;
+ }
+
+ /* Start the grace period */
+ token = rte_rcu_qsbr_start(dq->v);
+
+ /* Reclaim resources if the queue is 1/8th full. This helps
+ * the queue from growing too large and allows time for reader
+ * threads to report their quiescent state.
+ */
+ cur_size = rte_ring_count(dq->r) / (dq->esize/8 + 1);
+ if (cur_size > (dq->size >> RTE_RCU_QSBR_AUTO_RECLAIM_LIMIT)) {
+ rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+ "%s(): Triggering reclamation\n", __func__);
+ rte_rcu_qsbr_dq_reclaim(dq);
+ }
+
+ /* Check if there is space for atleast for 1 resource */
+ free_size = rte_ring_free_count(dq->r) / (dq->esize/8 + 1);
+ if (!free_size) {
+ rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+ "%s(): Defer queue is full\n", __func__);
+ rte_errno = ENOSPC;
+ return 1;
+ }
+
+ /* Enqueue the resource */
+ rte_ring_sp_enqueue(dq->r, (void *)(uintptr_t)token);
+
+ /* The resource to enqueue needs to be a multiple of 64b
+ * due to the limitation of the rte_ring implementation.
+ */
+ for (i = 0, tmp = (uint64_t *)e; i < dq->esize/8; i++, tmp++)
+ rte_ring_sp_enqueue(dq->r, (void *)(uintptr_t)*tmp);
+
+ return 0;
+}
+
+/* Reclaim resources from the defer queue. */
+int
+rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq)
+{
+ uint32_t max_cnt;
+ uint32_t cnt;
+ void *token;
+ uint64_t *tmp;
+ uint32_t i;
+
+ if (dq == NULL) {
+ rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+ "%s(): Invalid input parameter\n", __func__);
+ rte_errno = EINVAL;
+
+ return 1;
+ }
+
+ /* Anything to reclaim? */
+ if (rte_ring_count(dq->r) == 0)
+ return 0;
+
+ /* Reclaim at the max 1/16th the total number of entries. */
+ max_cnt = dq->size >> RTE_RCU_QSBR_MAX_RECLAIM_LIMIT;
+ max_cnt = (max_cnt == 0) ? dq->size : max_cnt;
+ cnt = 0;
+
+ /* Check reader threads quiescent state and reclaim resources */
+ while ((cnt < max_cnt) && (rte_ring_peek(dq->r, &token) == 0) &&
+ (rte_rcu_qsbr_check(dq->v, (uint64_t)((uintptr_t)token), false)
+ == 1)) {
+ (void)rte_ring_sc_dequeue(dq->r, &token);
+ /* The resource to dequeue needs to be a multiple of 64b
+ * due to the limitation of the rte_ring implementation.
+ */
+ for (i = 0, tmp = (uint64_t *)dq->e; i < dq->esize/8;
+ i++, tmp++)
+ (void)rte_ring_sc_dequeue(dq->r,
+ (void *)(uintptr_t)tmp);
+ dq->f(dq->p, dq->e);
+
+ cnt++;
+ }
+
+ rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+ "%s(): Reclaimed %u resources\n", __func__, cnt);
+
+ if (cnt == 0) {
+ /* No resources were reclaimed */
+ rte_errno = EAGAIN;
+ return 1;
+ }
+
+ return 0;
+}
+
+/* Delete a defer queue. */
+int
+rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
+{
+ if (dq == NULL) {
+ rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+ "%s(): Invalid input parameter\n", __func__);
+ rte_errno = EINVAL;
+
+ return 1;
+ }
+
+ /* Reclaim all the resources */
+ if (rte_rcu_qsbr_dq_reclaim(dq) != 0)
+ /* Error number is already set by the reclaim API */
+ return 1;
Here could be a potential problem. rte_rcu_qsbr_dq_reclai() reclaims
only max_cnt entries that is 1/16 of possible enqueued entries, so the
rest won't be reclaimed.
+
+ rte_ring_free(dq->r);
+ rte_free(dq);
+
+ return 0;
+}
+
int rte_rcu_log_type;
RTE_INIT(rte_rcu_register)
diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h
index c80f15c00..185d4b50a 100644
--- a/lib/librte_rcu/rte_rcu_qsbr.h
+++ b/lib/librte_rcu/rte_rcu_qsbr.h
@@ -34,6 +34,7 @@ extern "C" {
#include <rte_lcore.h>
#include <rte_debug.h>
#include <rte_atomic.h>
+#include <rte_ring.h>
I think it's better to move this include into rte_rcu_qsbr.c
extern int rte_rcu_log_type;
@@ -109,6 +110,67 @@ struct rte_rcu_qsbr {
*/
} __rte_cache_aligned;
+/**
+ * Call back function called to free the resources.
+ *
+ * @param p
+ * Pointer provided while creating the defer queue
+ * @param e
+ * Pointer to the resource data stored on the defer queue
+ *
+ * @return
+ * None
+ */
+typedef void (*rte_rcu_qsbr_free_resource)(void *p, void *e);
+
+#define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
I don't see the usage of this macro anywhere in the rcu library (I see
you are using it in LPM).
char rcu_dq_name[RTE_RING_NAMESIZE];
is using instead in the tests.
+ See my comments for [PATCH v3 1/3] lib/lpm: integrate RCU QSBR
+
+/**
+ * Trigger automatic reclamation after 1/8th the defer queue is full.
+ */
+#define RTE_RCU_QSBR_AUTO_RECLAIM_LIMIT 3
+
+/**
+ * Reclaim at the max 1/16th the total number of resources.
+ */
+#define RTE_RCU_QSBR_MAX_RECLAIM_LIMIT 4
Those two defines could be moved into .c file.
+
+/**
+ * Parameters used when creating the defer queue.
+ */
+struct rte_rcu_qsbr_dq_parameters {
+ const char *name;
+ /**< Name of the queue. */
+ uint32_t size;
+ /**< Number of entries in queue. Typically, this will be
+ * the same as the maximum number of entries supported in the
+ * lock free data structure.
+ * Data structures with unbounded number of entries is not
+ * supported currently.
+ */
+ uint32_t esize;
+ /**< Size (in bytes) of each element in the defer queue.
+ * This has to be multiple of 8B as the rte_ring APIs
+ * support 8B element sizes only.
+ */
+ rte_rcu_qsbr_free_resource f;
+ /**< Function to call to free the resource. */
+ void *p;
+ /**< Pointer passed to the free function. Typically, this is the
+ * pointer to the data structure to which the resource to free
+ * belongs. This can be NULL.
+ */
+ struct rte_rcu_qsbr *v;
+ /**< RCU QSBR variable to use for this defer queue */
+};
+
+/* RTE defer queue structure.
+ * This structure holds the defer queue. The defer queue is used to
+ * hold the deleted entries from the data structure that are not
+ * yet freed.
+ */
+struct rte_rcu_qsbr_dq;
+
/**
* @warning
* @b EXPERIMENTAL: this API may change without prior notice
@@ -648,6 +710,113 @@ __rte_experimental
int
rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Create a queue used to store the data structure elements that can
+ * be freed later. This queue is referred to as 'defer queue'.
+ *
+ * @param params
+ * Parameters to create a defer queue.
+ * @return
+ * On success - Valid pointer to defer queue
+ * On error - NULL
+ * Possible rte_errno codes are:
+ * - EINVAL - NULL parameters are passed
+ * - ENOMEM - Not enough memory
+ */
+__rte_experimental
+struct rte_rcu_qsbr_dq *
+rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Enqueue one resource to the defer queue and start the grace period.
+ * The resource will be freed later after at least one grace period
+ * is over.
+ *
+ * If the defer queue is full, it will attempt to reclaim resources.
+ * It will also reclaim resources at regular intervals to avoid
+ * the defer queue from growing too big.
+ *
+ * This API is not multi-thread safe. It is expected that the caller
+ * provides multi-thread safety by locking a mutex or some other means.
+ *
+ * A lock free multi-thread writer algorithm could achieve multi-thread
+ * safety by creating and using one defer queue per thread.
+ *
+ * @param dq
+ * Defer queue to allocate an entry from.
+ * @param e
+ * Pointer to resource data to copy to the defer queue. The size of
+ * the data to copy is equal to the element size provided when the
+ * defer queue was created.
+ * @return
+ * On success - 0
+ * On error - 1 with rte_errno set to
+ * - EINVAL - NULL parameters are passed
+ * - ENOSPC - Defer queue is full. This condition can not happen
+ * if the defer queue size is equal (or larger) than the
+ * number of elements in the data structure.
+ */
+__rte_experimental
+int
+rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Reclaim resources from the defer queue.
+ *
+ * This API is not multi-thread safe. It is expected that the caller
+ * provides multi-thread safety by locking a mutex or some other means.
+ *
+ * A lock free multi-thread writer algorithm could achieve multi-thread
+ * safety by creating and using one defer queue per thread.
+ *
+ * @param dq
+ * Defer queue to reclaim an entry from.
+ * @return
+ * On successful reclamation of at least 1 resource - 0
+ * On error - 1 with rte_errno set to
+ * - EINVAL - NULL parameters are passed
+ * - EAGAIN - None of the resources have completed at least 1 grace period,
+ * try again.
+ */
+__rte_experimental
+int
+rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Delete a defer queue.
+ *
+ * It tries to reclaim all the resources on the defer queue.
+ * If any of the resources have not completed the grace period
+ * the reclamation stops and returns immediately. The rest of
+ * the resources are not reclaimed and the defer queue is not
+ * freed.
+ *
+ * @param dq
+ * Defer queue to delete.
+ * @return
+ * On success - 0
+ * On error - 1
+ * Possible rte_errno codes are:
+ * - EINVAL - NULL parameters are passed
+ * - EAGAIN - Some of the resources have not completed at least 1 grace
+ * period, try again.
+ */
+__rte_experimental
+int
+rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
+
#ifdef __cplusplus
}
#endif
diff --git a/lib/librte_rcu/rte_rcu_qsbr_pvt.h
b/lib/librte_rcu/rte_rcu_qsbr_pvt.h
new file mode 100644
index 000000000..2122bc36a
--- /dev/null
+++ b/lib/librte_rcu/rte_rcu_qsbr_pvt.h
@@ -0,0 +1,46 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright (c) 2019 Arm Limited
+ */
+
+#ifndef _RTE_RCU_QSBR_PVT_H_
+#define _RTE_RCU_QSBR_PVT_H_
+
+/**
+ * This file is private to the RCU library. It should not be included
+ * by the user of this library.
+ */
Why this struct definition is separated into private .h? Maybe just
define it in rte_rcu_qsbr.c instead?
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "rte_rcu_qsbr.h"
+
+/* RTE defer queue structure.
+ * This structure holds the defer queue. The defer queue is used to
+ * hold the deleted entries from the data structure that are not
+ * yet freed.
+ */
+struct rte_rcu_qsbr_dq {
+ struct rte_rcu_qsbr *v; /**< RCU QSBR variable used by this queue.*/
+ struct rte_ring *r; /**< RCU QSBR defer queue. */
+ uint32_t size;
+ /**< Number of elements in the defer queue */
+ uint32_t esize;
+ /**< Size (in bytes) of data stored on the defer queue */
+ rte_rcu_qsbr_free_resource f;
+ /**< Function to call to free the resource. */
+ void *p;
+ /**< Pointer passed to the free function. Typically, this is the
+ * pointer to the data structure to which the resource to free
+ * belongs.
+ */
+ char e[0];
+ /**< Temporary storage to copy the defer queue element. */
+};
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_RCU_QSBR_PVT_H_ */
diff --git a/lib/librte_rcu/rte_rcu_version.map
b/lib/librte_rcu/rte_rcu_version.map
index f8b9ef2ab..dfac88a37 100644
--- a/lib/librte_rcu/rte_rcu_version.map
+++ b/lib/librte_rcu/rte_rcu_version.map
@@ -8,6 +8,10 @@ EXPERIMENTAL {
rte_rcu_qsbr_synchronize;
rte_rcu_qsbr_thread_register;
rte_rcu_qsbr_thread_unregister;
+ rte_rcu_qsbr_dq_create;
+ rte_rcu_qsbr_dq_enqueue;
+ rte_rcu_qsbr_dq_reclaim;
+ rte_rcu_qsbr_dq_delete;
local: *;
};
diff --git a/lib/meson.build b/lib/meson.build
index e5ff83893..0e1be8407 100644
--- a/lib/meson.build
+++ b/lib/meson.build
@@ -11,7 +11,9 @@
libraries = [
'kvargs', # eal depends on kvargs
'eal', # everything depends on eal
- 'ring', 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
+ 'ring',
+ 'rcu', # rcu depends on ring
+ 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
'cmdline',
'metrics', # bitrate/latency stats depends on this
'hash', # efd depends on this
@@ -22,7 +24,7 @@ libraries = [
'gro', 'gso', 'ip_frag', 'jobstats',
'kni', 'latencystats', 'lpm', 'member',
'power', 'pdump', 'rawdev',
- 'rcu', 'reorder', 'sched', 'security', 'stack', 'vhost',
+ 'reorder', 'sched', 'security', 'stack', 'vhost',
# ipsec lib depends on net, crypto and security
'ipsec',
# add pkt framework libs which use other libs from above
--
Regards,
Vladimir