Hi Honnappa, > Add resource reclamation APIs to make it simple for applications > and libraries to integrate rte_rcu library. > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com> > Reviewed-by: Ola Liljedhal <ola.liljed...@arm.com> > Reviewed-by: Ruifeng Wang <ruifeng.w...@arm.com> > --- > app/test/test_rcu_qsbr.c | 291 ++++++++++++++++++++++++++++- > lib/librte_rcu/meson.build | 2 + > lib/librte_rcu/rte_rcu_qsbr.c | 185 ++++++++++++++++++ > lib/librte_rcu/rte_rcu_qsbr.h | 169 +++++++++++++++++ > lib/librte_rcu/rte_rcu_qsbr_pvt.h | 46 +++++ > lib/librte_rcu/rte_rcu_version.map | 4 + > lib/meson.build | 6 +- > 7 files changed, 700 insertions(+), 3 deletions(-) > create mode 100644 lib/librte_rcu/rte_rcu_qsbr_pvt.h > > diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c > index ce7f93dd3..76814f50b 100644 > --- a/lib/librte_rcu/rte_rcu_qsbr.c > +++ b/lib/librte_rcu/rte_rcu_qsbr.c > @@ -21,6 +21,7 @@ > #include <rte_errno.h> > > #include "rte_rcu_qsbr.h" > +#include "rte_rcu_qsbr_pvt.h" > > /* Get the memory size of QSBR variable */ > size_t > @@ -267,6 +268,190 @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v) > return 0; > } > > +/* Create a queue used to store the data structure elements that can > + * be freed later. This queue is referred to as 'defer queue'. > + */ > +struct rte_rcu_qsbr_dq * > +rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params) > +{ > + struct rte_rcu_qsbr_dq *dq; > + uint32_t qs_fifo_size; > + > + if (params == NULL || params->f == NULL || > + params->v == NULL || params->name == NULL || > + params->size == 0 || params->esize == 0 || > + (params->esize % 8 != 0)) { > + rte_log(RTE_LOG_ERR, rte_rcu_log_type, > + "%s(): Invalid input parameter\n", __func__); > + rte_errno = EINVAL; > + > + return NULL; > + } > + > + dq = rte_zmalloc(NULL, > + (sizeof(struct rte_rcu_qsbr_dq) + params->esize), > + RTE_CACHE_LINE_SIZE); > + if (dq == NULL) { > + rte_errno = ENOMEM; > + > + return NULL; > + } > + > + /* round up qs_fifo_size to next power of two that is not less than > + * max_size. > + */ > + qs_fifo_size = rte_align32pow2((((params->esize/8) + 1) > + * params->size) + 1); > + dq->r = rte_ring_create(params->name, qs_fifo_size, > + SOCKET_ID_ANY, 0);
If it is going to be not MT safe, then why not to create the ring with (RING_F_SP_ENQ | RING_F_SC_DEQ) flags set? Though I think it could be changed to allow MT safe multiple enqeue/single dequeue, see below. > + if (dq->r == NULL) { > + rte_log(RTE_LOG_ERR, rte_rcu_log_type, > + "%s(): defer queue create failed\n", __func__); > + rte_free(dq); > + return NULL; > + } > + > + dq->v = params->v; > + dq->size = params->size; > + dq->esize = params->esize; > + dq->f = params->f; > + dq->p = params->p; > + > + return dq; > +} > + > +/* Enqueue one resource to the defer queue to free after the grace > + * period is over. > + */ > +int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e) > +{ > + uint64_t token; > + uint64_t *tmp; > + uint32_t i; > + uint32_t cur_size, free_size; > + > + if (dq == NULL || e == NULL) { > + rte_log(RTE_LOG_ERR, rte_rcu_log_type, > + "%s(): Invalid input parameter\n", __func__); > + rte_errno = EINVAL; > + > + return 1; Why just not to return -EINVAL straightway? I think there is no much point to set rte_errno in that function at all, just return value should do. > + } > + > + /* Start the grace period */ > + token = rte_rcu_qsbr_start(dq->v); > + > + /* Reclaim resources if the queue is 1/8th full. This helps > + * the queue from growing too large and allows time for reader > + * threads to report their quiescent state. > + */ > + cur_size = rte_ring_count(dq->r) / (dq->esize/8 + 1); Probably would be a bit easier if you just store in dq->esize (elt size + token size) / 8. > + if (cur_size > (dq->size >> RTE_RCU_QSBR_AUTO_RECLAIM_LIMIT)) { Why to make this threshold value hard-coded? Why either not to put it into create parameter, or just return a special return value, to indicate that threshold is reached? Or even return number of filled/free entroes on success, so caller can decide to reclaim or not based on that information on his own? > + rte_log(RTE_LOG_INFO, rte_rcu_log_type, > + "%s(): Triggering reclamation\n", __func__); > + rte_rcu_qsbr_dq_reclaim(dq); > + } > + > + /* Check if there is space for atleast for 1 resource */ > + free_size = rte_ring_free_count(dq->r) / (dq->esize/8 + 1); > + if (!free_size) { > + rte_log(RTE_LOG_ERR, rte_rcu_log_type, > + "%s(): Defer queue is full\n", __func__); > + rte_errno = ENOSPC; > + return 1; > + } > + > + /* Enqueue the resource */ > + rte_ring_sp_enqueue(dq->r, (void *)(uintptr_t)token); > + > + /* The resource to enqueue needs to be a multiple of 64b > + * due to the limitation of the rte_ring implementation. > + */ > + for (i = 0, tmp = (uint64_t *)e; i < dq->esize/8; i++, tmp++) > + rte_ring_sp_enqueue(dq->r, (void *)(uintptr_t)*tmp); That whole construction above looks a bit clumsy and error prone... I suppose just: const uint32_t nb_elt = dq->elt_size/8 + 1; uint32_t free, n; ... n = rte_ring_enqueue_bulk(dq->r, e, nb_elt, &free); if (n == 0) return -ENOSPC; return free; That way I think you can have MT-safe version of that function. > + > + return 0; > +} > + > +/* Reclaim resources from the defer queue. */ > +int > +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq) > +{ > + uint32_t max_cnt; > + uint32_t cnt; > + void *token; > + uint64_t *tmp; > + uint32_t i; > + > + if (dq == NULL) { > + rte_log(RTE_LOG_ERR, rte_rcu_log_type, > + "%s(): Invalid input parameter\n", __func__); > + rte_errno = EINVAL; > + > + return 1; Same story as above - I think rte_errno is excessive in this function. Just return value should be enough. > + } > + > + /* Anything to reclaim? */ > + if (rte_ring_count(dq->r) == 0) > + return 0; Not sure you need that, see below. > + > + /* Reclaim at the max 1/16th the total number of entries. */ > + max_cnt = dq->size >> RTE_RCU_QSBR_MAX_RECLAIM_LIMIT; > + max_cnt = (max_cnt == 0) ? dq->size : max_cnt; Again why not to make max_cnt a configurable at create() parameter? Or even a parameter for that function? > + cnt = 0; > + > + /* Check reader threads quiescent state and reclaim resources */ > + while ((cnt < max_cnt) && (rte_ring_peek(dq->r, &token) == 0) && > + (rte_rcu_qsbr_check(dq->v, (uint64_t)((uintptr_t)token), false) > + == 1)) { > + (void)rte_ring_sc_dequeue(dq->r, &token); > + /* The resource to dequeue needs to be a multiple of 64b > + * due to the limitation of the rte_ring implementation. > + */ > + for (i = 0, tmp = (uint64_t *)dq->e; i < dq->esize/8; > + i++, tmp++) > + (void)rte_ring_sc_dequeue(dq->r, > + (void *)(uintptr_t)tmp); Again, no need for such constructs with multiple dequeuer I believe. Just: const uint32_t nb_elt = dq->elt_size/8 + 1; uint32_t n; uintptr_t elt[nb_elt]; ... n = rte_ring_dequeue_bulk(dq->r, elt, nb_elt, NULL); if (n != 0) {dq->f(dq->p, elt);} Seems enough. Again in that case you can have enqueue/reclaim running in different threads simultaneously, plus you don't need dq->e at all. > + dq->f(dq->p, dq->e); > + > + cnt++; > + } > + > + rte_log(RTE_LOG_INFO, rte_rcu_log_type, > + "%s(): Reclaimed %u resources\n", __func__, cnt); > + > + if (cnt == 0) { > + /* No resources were reclaimed */ > + rte_errno = EAGAIN; > + return 1; > + } > + > + return 0; I'd suggest to return cnt on success. > +} > + > +/* Delete a defer queue. */ > +int > +rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq) > +{ > + if (dq == NULL) { > + rte_log(RTE_LOG_ERR, rte_rcu_log_type, > + "%s(): Invalid input parameter\n", __func__); > + rte_errno = EINVAL; > + > + return 1; > + } > + > + /* Reclaim all the resources */ > + if (rte_rcu_qsbr_dq_reclaim(dq) != 0) > + /* Error number is already set by the reclaim API */ > + return 1; How do you know that you have reclaimed everything? > + > + rte_ring_free(dq->r); > + rte_free(dq); > + > + return 0; > +} > + > int rte_rcu_log_type; > > RTE_INIT(rte_rcu_register) > diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h > index c80f15c00..185d4b50a 100644 > --- a/lib/librte_rcu/rte_rcu_qsbr.h > +++ b/lib/librte_rcu/rte_rcu_qsbr.h > @@ -34,6 +34,7 @@ extern "C" { > #include <rte_lcore.h> > #include <rte_debug.h> > #include <rte_atomic.h> > +#include <rte_ring.h> > > extern int rte_rcu_log_type; > > @@ -109,6 +110,67 @@ struct rte_rcu_qsbr { > */ > } __rte_cache_aligned; > > +/** > + * Call back function called to free the resources. > + * > + * @param p > + * Pointer provided while creating the defer queue > + * @param e > + * Pointer to the resource data stored on the defer queue > + * > + * @return > + * None > + */ > +typedef void (*rte_rcu_qsbr_free_resource)(void *p, void *e); Stylish thing - usually in DPDK we have typedf newtype_t ... Though I am not sure you need a new typedef at all - just a function pointer inside the struct seems enough. > + > +#define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE > + > +/** > + * Trigger automatic reclamation after 1/8th the defer queue is full. > + */ > +#define RTE_RCU_QSBR_AUTO_RECLAIM_LIMIT 3 > + > +/** > + * Reclaim at the max 1/16th the total number of resources. > + */ > +#define RTE_RCU_QSBR_MAX_RECLAIM_LIMIT 4 As I said above, I don't think these thresholds need to be hardcoded. In any case, there seems not much point to put them in the public header file. > + > +/** > + * Parameters used when creating the defer queue. > + */ > +struct rte_rcu_qsbr_dq_parameters { > + const char *name; > + /**< Name of the queue. */ > + uint32_t size; > + /**< Number of entries in queue. Typically, this will be > + * the same as the maximum number of entries supported in the > + * lock free data structure. > + * Data structures with unbounded number of entries is not > + * supported currently. > + */ > + uint32_t esize; > + /**< Size (in bytes) of each element in the defer queue. > + * This has to be multiple of 8B as the rte_ring APIs > + * support 8B element sizes only. > + */ > + rte_rcu_qsbr_free_resource f; > + /**< Function to call to free the resource. */ > + void *p; Style nit again - I like short names myself, but that seems a bit extreme... :) Might be at least: void (*reclaim)(void *, void *); void * reclaim_data; ? > + /**< Pointer passed to the free function. Typically, this is the > + * pointer to the data structure to which the resource to free > + * belongs. This can be NULL. > + */ > + struct rte_rcu_qsbr *v; Does it need to be inside that struct? Might be better: rte_rcu_qsbr_dq_create(struct rte_rcu_qsbr *v, const struct rte_rcu_qsbr_dq_parameters *params); Another alternative: make both reclaim() and enqueue() to take v as a parameter. > + /**< RCU QSBR variable to use for this defer queue */ > +}; > + > +/* RTE defer queue structure. > + * This structure holds the defer queue. The defer queue is used to > + * hold the deleted entries from the data structure that are not > + * yet freed. > + */ > +struct rte_rcu_qsbr_dq; > + > /** > * @warning > * @b EXPERIMENTAL: this API may change without prior notice > @@ -648,6 +710,113 @@ __rte_experimental > int > rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v); > > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Create a queue used to store the data structure elements that can > + * be freed later. This queue is referred to as 'defer queue'. > + * > + * @param params > + * Parameters to create a defer queue. > + * @return > + * On success - Valid pointer to defer queue > + * On error - NULL > + * Possible rte_errno codes are: > + * - EINVAL - NULL parameters are passed > + * - ENOMEM - Not enough memory > + */ > +__rte_experimental > +struct rte_rcu_qsbr_dq * > +rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params); > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Enqueue one resource to the defer queue and start the grace period. > + * The resource will be freed later after at least one grace period > + * is over. > + * > + * If the defer queue is full, it will attempt to reclaim resources. > + * It will also reclaim resources at regular intervals to avoid > + * the defer queue from growing too big. > + * > + * This API is not multi-thread safe. It is expected that the caller > + * provides multi-thread safety by locking a mutex or some other means. > + * > + * A lock free multi-thread writer algorithm could achieve multi-thread > + * safety by creating and using one defer queue per thread. > + * > + * @param dq > + * Defer queue to allocate an entry from. > + * @param e > + * Pointer to resource data to copy to the defer queue. The size of > + * the data to copy is equal to the element size provided when the > + * defer queue was created. > + * @return > + * On success - 0 > + * On error - 1 with rte_errno set to > + * - EINVAL - NULL parameters are passed > + * - ENOSPC - Defer queue is full. This condition can not happen > + * if the defer queue size is equal (or larger) than the > + * number of elements in the data structure. > + */ > +__rte_experimental > +int > +rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e); > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Reclaim resources from the defer queue. > + * > + * This API is not multi-thread safe. It is expected that the caller > + * provides multi-thread safety by locking a mutex or some other means. > + * > + * A lock free multi-thread writer algorithm could achieve multi-thread > + * safety by creating and using one defer queue per thread. > + * > + * @param dq > + * Defer queue to reclaim an entry from. > + * @return > + * On successful reclamation of at least 1 resource - 0 > + * On error - 1 with rte_errno set to > + * - EINVAL - NULL parameters are passed > + * - EAGAIN - None of the resources have completed at least 1 grace period, > + * try again. > + */ > +__rte_experimental > +int > +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq); > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Delete a defer queue. > + * > + * It tries to reclaim all the resources on the defer queue. > + * If any of the resources have not completed the grace period > + * the reclamation stops and returns immediately. The rest of > + * the resources are not reclaimed and the defer queue is not > + * freed. > + * > + * @param dq > + * Defer queue to delete. > + * @return > + * On success - 0 > + * On error - 1 > + * Possible rte_errno codes are: > + * - EINVAL - NULL parameters are passed > + * - EAGAIN - Some of the resources have not completed at least 1 grace > + * period, try again. > + */ > +__rte_experimental > +int > +rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq); > + > #ifdef __cplusplus > } > #endif > diff --git a/lib/librte_rcu/rte_rcu_qsbr_pvt.h > b/lib/librte_rcu/rte_rcu_qsbr_pvt.h > new file mode 100644 > index 000000000..2122bc36a > --- /dev/null > +++ b/lib/librte_rcu/rte_rcu_qsbr_pvt.h Again style suggestion: as it is not public header - don't use rte_ prefix for naming. >From my perspective - easier to relalize for reader what is public header, >what is not. > @@ -0,0 +1,46 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * Copyright (c) 2019 Arm Limited > + */ > + > +#ifndef _RTE_RCU_QSBR_PVT_H_ > +#define _RTE_RCU_QSBR_PVT_H_ > + > +/** > + * This file is private to the RCU library. It should not be included > + * by the user of this library. > + */ > + > +#ifdef __cplusplus > +extern "C" { > +#endif > + > +#include "rte_rcu_qsbr.h" > + > +/* RTE defer queue structure. > + * This structure holds the defer queue. The defer queue is used to > + * hold the deleted entries from the data structure that are not > + * yet freed. > + */ > +struct rte_rcu_qsbr_dq { > + struct rte_rcu_qsbr *v; /**< RCU QSBR variable used by this queue.*/ > + struct rte_ring *r; /**< RCU QSBR defer queue. */ > + uint32_t size; > + /**< Number of elements in the defer queue */ > + uint32_t esize; > + /**< Size (in bytes) of data stored on the defer queue */ > + rte_rcu_qsbr_free_resource f; > + /**< Function to call to free the resource. */ > + void *p; > + /**< Pointer passed to the free function. Typically, this is the > + * pointer to the data structure to which the resource to free > + * belongs. > + */ > + char e[0]; > + /**< Temporary storage to copy the defer queue element. */ Do you really need 'e' at all? Can't it be just temporary stack variable? > +}; > + > +#ifdef __cplusplus > +} > +#endif > + > +#endif /* _RTE_RCU_QSBR_PVT_H_ */ > diff --git a/lib/librte_rcu/rte_rcu_version.map > b/lib/librte_rcu/rte_rcu_version.map > index f8b9ef2ab..dfac88a37 100644 > --- a/lib/librte_rcu/rte_rcu_version.map > +++ b/lib/librte_rcu/rte_rcu_version.map > @@ -8,6 +8,10 @@ EXPERIMENTAL { > rte_rcu_qsbr_synchronize; > rte_rcu_qsbr_thread_register; > rte_rcu_qsbr_thread_unregister; > + rte_rcu_qsbr_dq_create; > + rte_rcu_qsbr_dq_enqueue; > + rte_rcu_qsbr_dq_reclaim; > + rte_rcu_qsbr_dq_delete; > > local: *; > }; > diff --git a/lib/meson.build b/lib/meson.build > index e5ff83893..0e1be8407 100644 > --- a/lib/meson.build > +++ b/lib/meson.build > @@ -11,7 +11,9 @@ > libraries = [ > 'kvargs', # eal depends on kvargs > 'eal', # everything depends on eal > - 'ring', 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core > + 'ring', > + 'rcu', # rcu depends on ring > + 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core > 'cmdline', > 'metrics', # bitrate/latency stats depends on this > 'hash', # efd depends on this > @@ -22,7 +24,7 @@ libraries = [ > 'gro', 'gso', 'ip_frag', 'jobstats', > 'kni', 'latencystats', 'lpm', 'member', > 'power', 'pdump', 'rawdev', > - 'rcu', 'reorder', 'sched', 'security', 'stack', 'vhost', > + 'reorder', 'sched', 'security', 'stack', 'vhost', > # ipsec lib depends on net, crypto and security > 'ipsec', > # add pkt framework libs which use other libs from above > -- > 2.17.1