Hi Honnappa,

 
> Add resource reclamation APIs to make it simple for applications
> and libraries to integrate rte_rcu library.
> 
> Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>
> Reviewed-by: Ola Liljedhal <ola.liljed...@arm.com>
> Reviewed-by: Ruifeng Wang <ruifeng.w...@arm.com>
> ---
>  app/test/test_rcu_qsbr.c           | 291 ++++++++++++++++++++++++++++-
>  lib/librte_rcu/meson.build         |   2 +
>  lib/librte_rcu/rte_rcu_qsbr.c      | 185 ++++++++++++++++++
>  lib/librte_rcu/rte_rcu_qsbr.h      | 169 +++++++++++++++++
>  lib/librte_rcu/rte_rcu_qsbr_pvt.h  |  46 +++++
>  lib/librte_rcu/rte_rcu_version.map |   4 +
>  lib/meson.build                    |   6 +-
>  7 files changed, 700 insertions(+), 3 deletions(-)
>  create mode 100644 lib/librte_rcu/rte_rcu_qsbr_pvt.h
> 
> diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c
> index ce7f93dd3..76814f50b 100644
> --- a/lib/librte_rcu/rte_rcu_qsbr.c
> +++ b/lib/librte_rcu/rte_rcu_qsbr.c
> @@ -21,6 +21,7 @@
>  #include <rte_errno.h>
> 
>  #include "rte_rcu_qsbr.h"
> +#include "rte_rcu_qsbr_pvt.h"
> 
>  /* Get the memory size of QSBR variable */
>  size_t
> @@ -267,6 +268,190 @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
>       return 0;
>  }
> 
> +/* Create a queue used to store the data structure elements that can
> + * be freed later. This queue is referred to as 'defer queue'.
> + */
> +struct rte_rcu_qsbr_dq *
> +rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
> +{
> +     struct rte_rcu_qsbr_dq *dq;
> +     uint32_t qs_fifo_size;
> +
> +     if (params == NULL || params->f == NULL ||
> +             params->v == NULL || params->name == NULL ||
> +             params->size == 0 || params->esize == 0 ||
> +             (params->esize % 8 != 0)) {
> +             rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> +                     "%s(): Invalid input parameter\n", __func__);
> +             rte_errno = EINVAL;
> +
> +             return NULL;
> +     }
> +
> +     dq = rte_zmalloc(NULL,
> +             (sizeof(struct rte_rcu_qsbr_dq) + params->esize),
> +             RTE_CACHE_LINE_SIZE);
> +     if (dq == NULL) {
> +             rte_errno = ENOMEM;
> +
> +             return NULL;
> +     }
> +
> +     /* round up qs_fifo_size to next power of two that is not less than
> +      * max_size.
> +      */
> +     qs_fifo_size = rte_align32pow2((((params->esize/8) + 1)
> +                                     * params->size) + 1);
> +     dq->r = rte_ring_create(params->name, qs_fifo_size,
> +                                     SOCKET_ID_ANY, 0);

If it is going to be not MT safe, then why not to create the ring with
(RING_F_SP_ENQ | RING_F_SC_DEQ) flags set?
Though I think it could be changed to allow MT safe multiple
enqeue/single dequeue, see below.

> +     if (dq->r == NULL) {
> +             rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> +                     "%s(): defer queue create failed\n", __func__);
> +             rte_free(dq);
> +             return NULL;
> +     }
> +
> +     dq->v = params->v;
> +     dq->size = params->size;
> +     dq->esize = params->esize;
> +     dq->f = params->f;
> +     dq->p = params->p;
> +
> +     return dq;
> +}
> +
> +/* Enqueue one resource to the defer queue to free after the grace
> + * period is over.
> + */
> +int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
> +{
> +     uint64_t token;
> +     uint64_t *tmp;
> +     uint32_t i;
> +     uint32_t cur_size, free_size;
> +
> +     if (dq == NULL || e == NULL) {
> +             rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> +                     "%s(): Invalid input parameter\n", __func__);
> +             rte_errno = EINVAL;
> +
> +             return 1;

Why just not to return -EINVAL straightway?
I think there is no much point to set rte_errno in that function at all,
just return value should do.

> +     }
> +
> +     /* Start the grace period */
> +     token = rte_rcu_qsbr_start(dq->v);
> +
> +     /* Reclaim resources if the queue is 1/8th full. This helps
> +      * the queue from growing too large and allows time for reader
> +      * threads to report their quiescent state.
> +      */
> +     cur_size = rte_ring_count(dq->r) / (dq->esize/8 + 1);

Probably would be a bit easier if you just store in dq->esize (elt size + token 
size) / 8.

> +     if (cur_size > (dq->size >> RTE_RCU_QSBR_AUTO_RECLAIM_LIMIT)) {

Why to make this threshold value hard-coded?
Why either not to put it into create parameter, or just return a special return 
value,
to indicate that threshold is reached?
Or even return number of filled/free entroes on success, so caller can decide
to reclaim or not based on that information on his own?

> +             rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> +                     "%s(): Triggering reclamation\n", __func__);
> +             rte_rcu_qsbr_dq_reclaim(dq);
> +     }
> +
> +     /* Check if there is space for atleast for 1 resource */
> +     free_size = rte_ring_free_count(dq->r) / (dq->esize/8 + 1);
> +     if (!free_size) {
> +             rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> +                     "%s(): Defer queue is full\n", __func__);
> +             rte_errno = ENOSPC;
> +             return 1;
> +     }
> +
> +     /* Enqueue the resource */
> +     rte_ring_sp_enqueue(dq->r, (void *)(uintptr_t)token);
> +
> +     /* The resource to enqueue needs to be a multiple of 64b
> +      * due to the limitation of the rte_ring implementation.
> +      */
> +     for (i = 0, tmp = (uint64_t *)e; i < dq->esize/8; i++, tmp++)
> +             rte_ring_sp_enqueue(dq->r, (void *)(uintptr_t)*tmp);


That whole construction above looks a bit clumsy and error prone...
I suppose just:

const uint32_t nb_elt =  dq->elt_size/8 + 1;
uint32_t free, n;
...
n = rte_ring_enqueue_bulk(dq->r, e, nb_elt, &free);
if (n == 0)
  return -ENOSPC;
return free;

That way I think you can have MT-safe version of that function.

> +
> +     return 0;
> +}
> +
> +/* Reclaim resources from the defer queue. */
> +int
> +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq)
> +{
> +     uint32_t max_cnt;
> +     uint32_t cnt;
> +     void *token;
> +     uint64_t *tmp;
> +     uint32_t i;
> +
> +     if (dq == NULL) {
> +             rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> +                     "%s(): Invalid input parameter\n", __func__);
> +             rte_errno = EINVAL;
> +
> +             return 1;

Same story as above - I think rte_errno is excessive in this function.
Just return value should be enough.


> +     }
> +
> +     /* Anything to reclaim? */
> +     if (rte_ring_count(dq->r) == 0)
> +             return 0;

Not sure you need that, see below.

> +
> +     /* Reclaim at the max 1/16th the total number of entries. */
> +     max_cnt = dq->size >> RTE_RCU_QSBR_MAX_RECLAIM_LIMIT;
> +     max_cnt = (max_cnt == 0) ? dq->size : max_cnt;

Again why not to make max_cnt a configurable at create() parameter?
Or even a parameter for that function?

> +     cnt = 0;
> +
> +     /* Check reader threads quiescent state and reclaim resources */
> +     while ((cnt < max_cnt) && (rte_ring_peek(dq->r, &token) == 0) &&
> +             (rte_rcu_qsbr_check(dq->v, (uint64_t)((uintptr_t)token), false)
> +                     == 1)) {


> +             (void)rte_ring_sc_dequeue(dq->r, &token);
> +             /* The resource to dequeue needs to be a multiple of 64b
> +              * due to the limitation of the rte_ring implementation.
> +              */
> +             for (i = 0, tmp = (uint64_t *)dq->e; i < dq->esize/8;
> +                     i++, tmp++)
> +                     (void)rte_ring_sc_dequeue(dq->r,
> +                                     (void *)(uintptr_t)tmp);

Again, no need for such constructs with multiple dequeuer I believe.
Just:

const uint32_t nb_elt =  dq->elt_size/8 + 1;
uint32_t n;
uintptr_t elt[nb_elt];
...
n = rte_ring_dequeue_bulk(dq->r, elt, nb_elt, NULL);
if (n != 0) {dq->f(dq->p, elt);}

Seems enough.
Again in that case you can have enqueue/reclaim running in
different threads simultaneously, plus you don't need dq->e at all. 

> +             dq->f(dq->p, dq->e);
> +
> +             cnt++;
> +     }
> +
> +     rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> +             "%s(): Reclaimed %u resources\n", __func__, cnt);
> +
> +     if (cnt == 0) {
> +             /* No resources were reclaimed */
> +             rte_errno = EAGAIN;
> +             return 1;
> +     }
> +
> +     return 0;

I'd suggest to return cnt on success.

> +}
> +
> +/* Delete a defer queue. */
> +int
> +rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
> +{
> +     if (dq == NULL) {
> +             rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> +                     "%s(): Invalid input parameter\n", __func__);
> +             rte_errno = EINVAL;
> +
> +             return 1;
> +     }
> +
> +     /* Reclaim all the resources */
> +     if (rte_rcu_qsbr_dq_reclaim(dq) != 0)
> +             /* Error number is already set by the reclaim API */
> +             return 1;

How do you know that you have reclaimed everything?

> +
> +     rte_ring_free(dq->r);
> +     rte_free(dq);
> +
> +     return 0;
> +}
> +
>  int rte_rcu_log_type;
> 
>  RTE_INIT(rte_rcu_register)
> diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h
> index c80f15c00..185d4b50a 100644
> --- a/lib/librte_rcu/rte_rcu_qsbr.h
> +++ b/lib/librte_rcu/rte_rcu_qsbr.h
> @@ -34,6 +34,7 @@ extern "C" {
>  #include <rte_lcore.h>
>  #include <rte_debug.h>
>  #include <rte_atomic.h>
> +#include <rte_ring.h>
> 
>  extern int rte_rcu_log_type;
> 
> @@ -109,6 +110,67 @@ struct rte_rcu_qsbr {
>        */
>  } __rte_cache_aligned;
> 
> +/**
> + * Call back function called to free the resources.
> + *
> + * @param p
> + *   Pointer provided while creating the defer queue
> + * @param e
> + *   Pointer to the resource data stored on the defer queue
> + *
> + * @return
> + *   None
> + */
> +typedef void (*rte_rcu_qsbr_free_resource)(void *p, void *e);

Stylish thing - usually in DPDK we have typedf newtype_t ...
Though I am not sure you need a new typedef at all - just 
a function pointer inside the struct seems enough.

> +
> +#define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
> +
> +/**
> + *  Trigger automatic reclamation after 1/8th the defer queue is full.
> + */
> +#define RTE_RCU_QSBR_AUTO_RECLAIM_LIMIT 3
> +
> +/**
> + *  Reclaim at the max 1/16th the total number of resources.
> + */
> +#define RTE_RCU_QSBR_MAX_RECLAIM_LIMIT 4


As I said above, I don't think these thresholds need to be hardcoded.
In any case, there seems not much point to put them in the public header file.

> +
> +/**
> + * Parameters used when creating the defer queue.
> + */
> +struct rte_rcu_qsbr_dq_parameters {
> +     const char *name;
> +     /**< Name of the queue. */
> +     uint32_t size;
> +     /**< Number of entries in queue. Typically, this will be
> +      *   the same as the maximum number of entries supported in the
> +      *   lock free data structure.
> +      *   Data structures with unbounded number of entries is not
> +      *   supported currently.
> +      */
> +     uint32_t esize;
> +     /**< Size (in bytes) of each element in the defer queue.
> +      *   This has to be multiple of 8B as the rte_ring APIs
> +      *   support 8B element sizes only.
> +      */
> +     rte_rcu_qsbr_free_resource f;
> +     /**< Function to call to free the resource. */
> +     void *p;

Style nit again - I like short names myself, but that seems a bit extreme... :)
Might be at least:
void (*reclaim)(void *, void *);
void * reclaim_data;
?

> +     /**< Pointer passed to the free function. Typically, this is the
> +      *   pointer to the data structure to which the resource to free
> +      *   belongs. This can be NULL.
> +      */
> +     struct rte_rcu_qsbr *v;

Does it need to be inside that struct?
Might be better:
rte_rcu_qsbr_dq_create(struct rte_rcu_qsbr *v, const struct 
rte_rcu_qsbr_dq_parameters *params);

Another alternative: make both reclaim() and enqueue() to take v as a parameter.

> +     /**< RCU QSBR variable to use for this defer queue */
> +};
> +
> +/* RTE defer queue structure.
> + * This structure holds the defer queue. The defer queue is used to
> + * hold the deleted entries from the data structure that are not
> + * yet freed.
> + */
> +struct rte_rcu_qsbr_dq;
> +
>  /**
>   * @warning
>   * @b EXPERIMENTAL: this API may change without prior notice
> @@ -648,6 +710,113 @@ __rte_experimental
>  int
>  rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
> 
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Create a queue used to store the data structure elements that can
> + * be freed later. This queue is referred to as 'defer queue'.
> + *
> + * @param params
> + *   Parameters to create a defer queue.
> + * @return
> + *   On success - Valid pointer to defer queue
> + *   On error - NULL
> + *   Possible rte_errno codes are:
> + *   - EINVAL - NULL parameters are passed
> + *   - ENOMEM - Not enough memory
> + */
> +__rte_experimental
> +struct rte_rcu_qsbr_dq *
> +rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Enqueue one resource to the defer queue and start the grace period.
> + * The resource will be freed later after at least one grace period
> + * is over.
> + *
> + * If the defer queue is full, it will attempt to reclaim resources.
> + * It will also reclaim resources at regular intervals to avoid
> + * the defer queue from growing too big.
> + *
> + * This API is not multi-thread safe. It is expected that the caller
> + * provides multi-thread safety by locking a mutex or some other means.
> + *
> + * A lock free multi-thread writer algorithm could achieve multi-thread
> + * safety by creating and using one defer queue per thread.
> + *
> + * @param dq
> + *   Defer queue to allocate an entry from.
> + * @param e
> + *   Pointer to resource data to copy to the defer queue. The size of
> + *   the data to copy is equal to the element size provided when the
> + *   defer queue was created.
> + * @return
> + *   On success - 0
> + *   On error - 1 with rte_errno set to
> + *   - EINVAL - NULL parameters are passed
> + *   - ENOSPC - Defer queue is full. This condition can not happen
> + *           if the defer queue size is equal (or larger) than the
> + *           number of elements in the data structure.
> + */
> +__rte_experimental
> +int
> +rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Reclaim resources from the defer queue.
> + *
> + * This API is not multi-thread safe. It is expected that the caller
> + * provides multi-thread safety by locking a mutex or some other means.
> + *
> + * A lock free multi-thread writer algorithm could achieve multi-thread
> + * safety by creating and using one defer queue per thread.
> + *
> + * @param dq
> + *   Defer queue to reclaim an entry from.
> + * @return
> + *   On successful reclamation of at least 1 resource - 0
> + *   On error - 1 with rte_errno set to
> + *   - EINVAL - NULL parameters are passed
> + *   - EAGAIN - None of the resources have completed at least 1 grace period,
> + *           try again.
> + */
> +__rte_experimental
> +int
> +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Delete a defer queue.
> + *
> + * It tries to reclaim all the resources on the defer queue.
> + * If any of the resources have not completed the grace period
> + * the reclamation stops and returns immediately. The rest of
> + * the resources are not reclaimed and the defer queue is not
> + * freed.
> + *
> + * @param dq
> + *   Defer queue to delete.
> + * @return
> + *   On success - 0
> + *   On error - 1
> + *   Possible rte_errno codes are:
> + *   - EINVAL - NULL parameters are passed
> + *   - EAGAIN - Some of the resources have not completed at least 1 grace
> + *           period, try again.
> + */
> +__rte_experimental
> +int
> +rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
> +
>  #ifdef __cplusplus
>  }
>  #endif
> diff --git a/lib/librte_rcu/rte_rcu_qsbr_pvt.h 
> b/lib/librte_rcu/rte_rcu_qsbr_pvt.h
> new file mode 100644
> index 000000000..2122bc36a
> --- /dev/null
> +++ b/lib/librte_rcu/rte_rcu_qsbr_pvt.h

Again style suggestion: as it is not public header - don't use rte_ prefix for 
naming.
>From my perspective - easier to relalize for reader what is public header, 
>what is not.

> @@ -0,0 +1,46 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright (c) 2019 Arm Limited
> + */
> +
> +#ifndef _RTE_RCU_QSBR_PVT_H_
> +#define _RTE_RCU_QSBR_PVT_H_
> +
> +/**
> + * This file is private to the RCU library. It should not be included
> + * by the user of this library.
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include "rte_rcu_qsbr.h"
> +
> +/* RTE defer queue structure.
> + * This structure holds the defer queue. The defer queue is used to
> + * hold the deleted entries from the data structure that are not
> + * yet freed.
> + */
> +struct rte_rcu_qsbr_dq {
> +     struct rte_rcu_qsbr *v; /**< RCU QSBR variable used by this queue.*/
> +     struct rte_ring *r;     /**< RCU QSBR defer queue. */
> +     uint32_t size;
> +     /**< Number of elements in the defer queue */
> +     uint32_t esize;
> +     /**< Size (in bytes) of data stored on the defer queue */
> +     rte_rcu_qsbr_free_resource f;
> +     /**< Function to call to free the resource. */
> +     void *p;
> +     /**< Pointer passed to the free function. Typically, this is the
> +      *   pointer to the data structure to which the resource to free
> +      *   belongs.
> +      */
> +     char e[0];
> +     /**< Temporary storage to copy the defer queue element. */

Do you really need 'e' at all?
Can't it be just temporary stack variable?

> +};
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _RTE_RCU_QSBR_PVT_H_ */
> diff --git a/lib/librte_rcu/rte_rcu_version.map 
> b/lib/librte_rcu/rte_rcu_version.map
> index f8b9ef2ab..dfac88a37 100644
> --- a/lib/librte_rcu/rte_rcu_version.map
> +++ b/lib/librte_rcu/rte_rcu_version.map
> @@ -8,6 +8,10 @@ EXPERIMENTAL {
>       rte_rcu_qsbr_synchronize;
>       rte_rcu_qsbr_thread_register;
>       rte_rcu_qsbr_thread_unregister;
> +     rte_rcu_qsbr_dq_create;
> +     rte_rcu_qsbr_dq_enqueue;
> +     rte_rcu_qsbr_dq_reclaim;
> +     rte_rcu_qsbr_dq_delete;
> 
>       local: *;
>  };
> diff --git a/lib/meson.build b/lib/meson.build
> index e5ff83893..0e1be8407 100644
> --- a/lib/meson.build
> +++ b/lib/meson.build
> @@ -11,7 +11,9 @@
>  libraries = [
>       'kvargs', # eal depends on kvargs
>       'eal', # everything depends on eal
> -     'ring', 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
> +     'ring',
> +     'rcu', # rcu depends on ring
> +     'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
>       'cmdline',
>       'metrics', # bitrate/latency stats depends on this
>       'hash',    # efd depends on this
> @@ -22,7 +24,7 @@ libraries = [
>       'gro', 'gso', 'ip_frag', 'jobstats',
>       'kni', 'latencystats', 'lpm', 'member',
>       'power', 'pdump', 'rawdev',
> -     'rcu', 'reorder', 'sched', 'security', 'stack', 'vhost',
> +     'reorder', 'sched', 'security', 'stack', 'vhost',
>       # ipsec lib depends on net, crypto and security
>       'ipsec',
>       # add pkt framework libs which use other libs from above
> --
> 2.17.1

Reply via email to