On Mon, 15 Mar 2021 11:10:18 +0800 Yunsheng Lin wrote:
> @@ -606,6 +623,11 @@ static const u8 prio2band[TC_PRIO_MAX + 1] = {
>   */
>  struct pfifo_fast_priv {
>       struct skb_array q[PFIFO_FAST_BANDS];
> +
> +     /* protect against data race between enqueue/dequeue and
> +      * qdisc->empty setting
> +      */
> +     spinlock_t lock;
>  };
>  
>  static inline struct skb_array *band2list(struct pfifo_fast_priv *priv,
> @@ -623,7 +645,10 @@ static int pfifo_fast_enqueue(struct sk_buff *skb, 
> struct Qdisc *qdisc,
>       unsigned int pkt_len = qdisc_pkt_len(skb);
>       int err;
>  
> -     err = skb_array_produce(q, skb);
> +     spin_lock(&priv->lock);
> +     err = __ptr_ring_produce(&q->ring, skb);
> +     WRITE_ONCE(qdisc->empty, false);
> +     spin_unlock(&priv->lock);
>  
>       if (unlikely(err)) {
>               if (qdisc_is_percpu_stats(qdisc))
> @@ -642,6 +667,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc 
> *qdisc)
>       struct sk_buff *skb = NULL;
>       int band;
>  
> +     spin_lock(&priv->lock);
>       for (band = 0; band < PFIFO_FAST_BANDS && !skb; band++) {
>               struct skb_array *q = band2list(priv, band);
>  
> @@ -655,6 +681,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc 
> *qdisc)
>       } else {
>               WRITE_ONCE(qdisc->empty, true);
>       }
> +     spin_unlock(&priv->lock);
>  
>       return skb;
>  }

I thought pfifo was supposed to be "lockless" and this change
re-introduces a lock between producer and consumer, no?

Reply via email to