On Mon, Aug 30, 2021 at 6:41 PM Ganapati Kundapura <ganapati.kundap...@intel.com> wrote: > > v2: > * Fixed typo in commit message > * changed subject line > > v1:
Changelog should be moved under "---".. See below. > RX adapter uses memove() to move unprocessed events to the beginning of > the packet enqueue buffer. The use memmove() was found to consume good > amount of CPU cycles (about 20%). > > This patch removes the use of memove() while implementing a circular > buffer to avoid copying of data. With this change RX adapter is able > to fill the buffer of 16384 events. > Acked-by: Jerin Jacob <jer...@marvell.com> Fixed the changelog and change the subject as "eventdev: make Rx-adapter enqueue buffer as circular buffer" Applied to dpdk-next-net-eventdev/for-main. Thanks > Signed-off-by: Ganapati Kundapura <ganapati.kundap...@intel.com> > --- See above > lib/eventdev/rte_event_eth_rx_adapter.c | 84 > ++++++++++++++++++++++++++------- > 1 file changed, 68 insertions(+), 16 deletions(-) > > diff --git a/lib/eventdev/rte_event_eth_rx_adapter.c > b/lib/eventdev/rte_event_eth_rx_adapter.c > index 13dfb28..7c94c73 100644 > --- a/lib/eventdev/rte_event_eth_rx_adapter.c > +++ b/lib/eventdev/rte_event_eth_rx_adapter.c > @@ -25,7 +25,7 @@ > > #define BATCH_SIZE 32 > #define BLOCK_CNT_THRESHOLD 10 > -#define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE) > +#define ETH_EVENT_BUFFER_SIZE (6*BATCH_SIZE) > #define MAX_VECTOR_SIZE 1024 > #define MIN_VECTOR_SIZE 4 > #define MAX_VECTOR_NS 1E9 > @@ -83,6 +83,13 @@ struct rte_eth_event_enqueue_buffer { > uint16_t count; > /* Array of events in this buffer */ > struct rte_event events[ETH_EVENT_BUFFER_SIZE]; > + /* Event enqueue happens from head */ > + uint16_t head; > + /* New packets from rte_eth_rx_burst is enqued from tail */ > + uint16_t tail; > + /* last element in the buffer before rollover */ > + uint16_t last; > + uint16_t last_mask; > }; > > struct rte_event_eth_rx_adapter { > @@ -749,19 +756,35 @@ rxa_flush_event_buffer(struct rte_event_eth_rx_adapter > *rx_adapter) > struct rte_eth_event_enqueue_buffer *buf = > &rx_adapter->event_enqueue_buffer; > struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats; > + uint16_t count = buf->last ? buf->last - buf->head : buf->count; > > - if (!buf->count) > + if (!count) > return 0; > > uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id, > rx_adapter->event_port_id, > - buf->events, > - buf->count); > - if (n != buf->count) { > - memmove(buf->events, > - &buf->events[n], > - (buf->count - n) * sizeof(struct rte_event)); > + &buf->events[buf->head], > + count); > + if (n != count) > stats->rx_enq_retry++; > + > + buf->head += n; > + > + if (buf->last && n == count) { > + uint16_t n1; > + > + n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id, > + rx_adapter->event_port_id, > + &buf->events[0], > + buf->tail); > + > + if (n1 != buf->tail) > + stats->rx_enq_retry++; > + > + buf->last = 0; > + buf->head = n1; > + buf->last_mask = 0; > + n += n1; > } > > n ? rxa_enq_block_end_ts(rx_adapter, stats) : > @@ -858,7 +881,7 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter > *rx_adapter, > &dev_info->rx_queue[rx_queue_id]; > struct rte_eth_event_enqueue_buffer *buf = > &rx_adapter->event_enqueue_buffer; > - struct rte_event *ev = &buf->events[buf->count]; > + uint16_t new_tail = buf->tail; > uint64_t event = eth_rx_queue_info->event; > uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask; > struct rte_mbuf *m = mbufs[0]; > @@ -873,7 +896,10 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter > *rx_adapter, > rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1); > do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask; > for (i = 0; i < num; i++) { > + struct rte_event *ev; > + > m = mbufs[i]; > + ev = &buf->events[new_tail]; > > rss = do_rss ? rxa_do_softrss(m, > rx_adapter->rss_key_be) > : m->hash.rss; > @@ -881,7 +907,7 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter > *rx_adapter, > ev->flow_id = (rss & ~flow_id_mask) | > (ev->flow_id & flow_id_mask); > ev->mbuf = m; > - ev++; > + new_tail++; > } > } else { > num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info, > @@ -892,9 +918,14 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter > *rx_adapter, > > dropped = 0; > nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id, > - ETH_EVENT_BUFFER_SIZE, buf->count, > - &buf->events[buf->count], num, > - dev_info->cb_arg, &dropped); > + buf->last | > + (RTE_DIM(buf->events) & > ~buf->last_mask), > + buf->count >= BATCH_SIZE ? > + buf->count - BATCH_SIZE : 0, > + &buf->events[buf->tail], > + num, > + dev_info->cb_arg, > + &dropped); > if (unlikely(nb_cb > num)) > RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events", > nb_cb, num); > @@ -905,6 +936,27 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter > *rx_adapter, > } > > buf->count += num; > + buf->tail += num; > +} > + > +static inline bool > +rxa_pkt_buf_available(struct rte_eth_event_enqueue_buffer *buf) > +{ > + uint32_t nb_req = buf->tail + BATCH_SIZE; > + > + if (!buf->last) { > + if (nb_req <= RTE_DIM(buf->events)) > + return true; > + > + if (buf->head >= BATCH_SIZE) { > + buf->last_mask = ~0; > + buf->last = buf->tail; > + buf->tail = 0; > + return true; > + } > + } > + > + return nb_req <= buf->head; > } > > /* Enqueue packets from <port, q> to event buffer */ > @@ -929,7 +981,7 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter, > /* Don't do a batch dequeue from the rx queue if there isn't > * enough space in the enqueue buffer. > */ > - while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) { > + while (rxa_pkt_buf_available(buf)) { > if (buf->count >= BATCH_SIZE) > rxa_flush_event_buffer(rx_adapter); > > @@ -1090,7 +1142,7 @@ rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter > *rx_adapter) > if (buf->count >= BATCH_SIZE) > rxa_flush_event_buffer(rx_adapter); > > - while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) { > + while (rxa_pkt_buf_available(buf)) { > struct eth_device_info *dev_info; > uint16_t port; > uint16_t queue; > @@ -1211,7 +1263,7 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter) > */ > if (buf->count >= BATCH_SIZE) > rxa_flush_event_buffer(rx_adapter); > - if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) { > + if (!rxa_pkt_buf_available(buf)) { > rx_adapter->wrr_pos = wrr_pos; > return nb_rx; > } > -- > 2.6.4 >