On Mon, Oct 16, 2017 at 12:19:39PM +0200, Jesper Dangaard Brouer wrote: > @@ -191,15 +280,45 @@ static int cpu_map_kthread_run(void *data) > * kthread_stop signal until queue is empty. > */ > while (!kthread_should_stop() || !__ptr_ring_empty(rcpu->queue)) { > + unsigned int processed = 0, drops = 0; > struct xdp_pkt *xdp_pkt; > > - schedule(); > - /* Do work */ > - while ((xdp_pkt = ptr_ring_consume(rcpu->queue))) { > - /* For now just "refcnt-free" */ > - page_frag_free(xdp_pkt); > + /* Release CPU reschedule checks */ > + if (__ptr_ring_empty(rcpu->queue)) {
I suspect this is racy: if ring becomes non empty here and you wake the task, next line will put it to sleep. I think you want to reverse the order: __set_current_state(TASK_INTERRUPTIBLE); then check __ptr_ring_empty. I note using the __ version means you can not resize the ring. Hope you do not need to. > + __set_current_state(TASK_INTERRUPTIBLE); > + schedule(); > + } else { > + cond_resched(); > + } > + __set_current_state(TASK_RUNNING); > + > + /* Process packets in rcpu->queue */ > + local_bh_disable(); > + /* > + * The bpf_cpu_map_entry is single consumer, with this > + * kthread CPU pinned. Lockless access to ptr_ring > + * consume side valid as no-resize allowed of queue. > + */ > + while ((xdp_pkt = __ptr_ring_consume(rcpu->queue))) { > + struct sk_buff *skb; > + int ret; > + > + skb = cpu_map_build_skb(rcpu, xdp_pkt); > + if (!skb) { > + page_frag_free(xdp_pkt); > + continue; > + } > + > + /* Inject into network stack */ > + ret = netif_receive_skb_core(skb); > + if (ret == NET_RX_DROP) > + drops++; > + > + /* Limit BH-disable period */ > + if (++processed == 8) > + break; > } > - __set_current_state(TASK_INTERRUPTIBLE); > + local_bh_enable(); /* resched point, may call do_softirq() */ > } > __set_current_state(TASK_RUNNING); > > @@ -490,13 +609,6 @@ static int bq_flush_to_queue(struct bpf_cpu_map_entry > *rcpu, > return 0; > } > > -/* Notice: Will change in later patch */ > -struct xdp_pkt { > - void *data; > - u16 len; > - u16 headroom; > -}; > - > /* Runs under RCU-read-side, plus in softirq under NAPI protection. > * Thus, safe percpu variable access. > */ > @@ -524,17 +636,13 @@ int cpu_map_enqueue(struct bpf_cpu_map_entry *rcpu, > struct xdp_buff *xdp, > struct net_device *dev_rx) > { > struct xdp_pkt *xdp_pkt; > - int headroom; > > - /* For now this is just used as a void pointer to data_hard_start. > - * Followup patch will generalize this. > - */ > - xdp_pkt = xdp->data_hard_start; > + xdp_pkt = convert_to_xdp_pkt(xdp); > + if (!xdp_pkt) > + return -EOVERFLOW; > > - /* Fake writing into xdp_pkt->data to measure overhead */ > - headroom = xdp->data - xdp->data_hard_start; > - if (headroom < sizeof(*xdp_pkt)) > - xdp_pkt->data = xdp->data; > + /* Info needed when constructing SKB on remote CPU */ > + xdp_pkt->dev_rx = dev_rx; > > bq_enqueue(rcpu, xdp_pkt); > return 0; > diff --git a/net/core/dev.c b/net/core/dev.c > index fcddccb6be41..36bb68f3a2c7 100644 > --- a/net/core/dev.c > +++ b/net/core/dev.c > @@ -4491,6 +4491,33 @@ static int __netif_receive_skb_core(struct sk_buff > *skb, bool pfmemalloc) > return ret; > } > > +/** > + * netif_receive_skb_core - special purpose version of netif_receive_skb > + * @skb: buffer to process > + * > + * More direct receive version of netif_receive_skb(). It should > + * only be used by callers that have a need to skip RPS and Generic XDP. > + * Caller must also take care of handling if (page_is_)pfmemalloc. > + * > + * This function may only be called from softirq context and interrupts > + * should be enabled. > + * > + * Return values (usually ignored): > + * NET_RX_SUCCESS: no congestion > + * NET_RX_DROP: packet was dropped > + */ > +int netif_receive_skb_core(struct sk_buff *skb) > +{ > + int ret; > + > + rcu_read_lock(); > + ret = __netif_receive_skb_core(skb, false); > + rcu_read_unlock(); > + > + return ret; > +} > +EXPORT_SYMBOL(netif_receive_skb_core); > + > static int __netif_receive_skb(struct sk_buff *skb) > { > int ret;