> > > +
> > > + /* Enqueue the token and resource. Generating the token
> > > +  * and enqueuing (token + resource) on the queue is not an
> > > +  * atomic operation. This might result in tokens enqueued
> > > +  * out of order on the queue. So, some tokens might wait
> > > +  * longer than they are required to be reclaimed.
> > > +  */
> > > + char data[dq->esize];
> > > + memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);
> > > + memcpy(data + __RTE_QSBR_TOKEN_SIZE, e,
> > > +         dq->esize - __RTE_QSBR_TOKEN_SIZE);
> > > + /* Check the status as enqueue might fail since the other thread
> > > +  * might have used up the freed space.
> > > +  * Enqueue uses the configured flags when the DQ was created.
> > > +  */
> > > + if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) {
> > > +         rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> > > +                 "%s(): Enqueue failed\n", __func__);
> > > +         /* Note that the token generated above is not used.
> > > +          * Other than wasting tokens, it should not cause any
> > > +          * other issues.
> > > +          */
> > > +         rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> > > +                 "%s(): Skipped enqueuing token = %"PRIu64"\n",
> > > +                 __func__, token);
> > > +
> > > +         rte_errno = ENOSPC;
> > > +         return 1;
> > > + }
> >
> >
> > Just as a thought: in theory if we'll use MP_HTS(/SP) ring we can avoid
> > wasting RCU tokens:
> >
> > if (rte_ring_enqueue_elem_bulk_start(dq->r, 1, NULL) != 0) {
> >     token = rte_rcu_qsbr_start(dq->v);
> >     memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);
> >     rte_ring_enqueue_elem_finish(dq->r, data, dq->esize, 1); }
> >
> > Though it might slowdown things if we'll have a lot of parallel dq_enqueue.
> > So not sure is it worth it or not.
> Adding peek APIs for RTS would be better. That should take care of the 
> parallel dw_enqueue. Not sure if I gave you the comment. My ring
> patch supported these APIs.
AFAIK, peek API is not possible for RTS mode.
Probably you are talking about Scatter-Gather API introduced in your RFC
(_reserve_; update ring entries manually; _commit_)?
Anyway, if there is no much value in my idea above, then feel free to drop it.

> 
> >
> > > +
> > > + rte_log(RTE_LOG_INFO, rte_rcu_log_type,
> > > +         "%s(): Enqueued token = %"PRIu64"\n", __func__, token);
> > > +
> > > + return 0;
> > > +}
> > > +
> > > +/* Reclaim resources from the defer queue. */ int
> > > +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
> > > +                         unsigned int *freed, unsigned int *pending) {
> > > + uint32_t cnt;
> > > + uint64_t token;
> > > +
> > > + if (dq == NULL || n == 0) {
> > > +         rte_log(RTE_LOG_ERR, rte_rcu_log_type,
> > > +                 "%s(): Invalid input parameter\n", __func__);
> > > +         rte_errno = EINVAL;
> > > +
> > > +         return 1;
> > > + }
> > > +
> > > + cnt = 0;
> > > +
> > > + char e[dq->esize];
> > > + /* Check reader threads quiescent state and reclaim resources */
> > > + while ((cnt < n) &&
> > > +         (rte_ring_dequeue_bulk_elem_start(dq->r, e,
> > > +                                 dq->esize, 1, NULL) != 0)) {
> >
> > Another thought - any point to use burst_elem_start() here to retrieve more
> > then 1 elem in one go? Something like:
> I think it makes sense.
> 
> > char e[32][dq->size];
> > while ((cnt < n) {
> >     k = RTE_MAX(32, cnt - n);
> >     k = rte_ring_dequeue_burst_elem_start(dq->r, e, dq->esize, k, NULL);
> >     if (k = 0)
> >             break;
> >     for (i = 0; i != k; i++) {
> >             memcpy(&token, e[i], sizeof(uint64_t));
> >             if (rte_rcu_qsbr_check(dq->v, token, false) != 1)
> >                     break;
> >     }
> >     k = i;
> >     rte_ring_dequeue_elem_finish(dq->r, k);
> >     for (i = 0; i != k; i++)
> >             dq->free_fn(dq->p, e[i] + __RTE_QSBR_TOKEN_SIZE);
> I think it also makes sense to change the free_fn to take 'n' number of 
> tokens.
> 
> >     n += k;
> >     if (k == 0)
> >             break;
> >
> > ?
> > Also if at enqueue we guarantee strict ordrer (via
> > enqueue_start/enqueue_finish), then here we probably can do _check_ from
> > the last retrieved token here?
> > In theory that might help to minimize number of checks.
> > I.E. do:
> > for (i = k; i-- !=0; )  {
> >     memcpy(&token, e[i], sizeof(uint64_t));
> >     if (rte_rcu_qsbr_check(dq->v, token, false) != 1)
> There is a higher chance that later tokens are not acked. This introduces 
> more polling of the counters.
> The rte_rcu_qsbr_check has an optimization. While acking the current token, 
> it will also caches the greatest token acked. It uses the cached
> token for the subsequent calls. I think this provides a better optimization.

Ok. 

Reply via email to