>-----Original Message-----
>From: Jayatheerthan, Jay <jay.jayatheert...@intel.com>
>Sent: Thursday, March 25, 2021 4:07 PM
>To: Pavan Nikhilesh Bhagavatula <pbhagavat...@marvell.com>; Jerin
>Jacob Kollanukkaran <jer...@marvell.com>; Carrillo, Erik G
><erik.g.carri...@intel.com>; Gujjar, Abhinandan S
><abhinandan.guj...@intel.com>; McDaniel, Timothy
><timothy.mcdan...@intel.com>; hemant.agra...@nxp.com; Van
>Haaren, Harry <harry.van.haa...@intel.com>; mattias.ronnblom
><mattias.ronnb...@ericsson.com>; Ma, Liang J
><liang.j...@intel.com>
>Cc: dev@dpdk.org
>Subject: [EXT] RE: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rx adapter
>event vector support
>
>External Email
>
>----------------------------------------------------------------------
>> -----Original Message-----
>> From: pbhagavat...@marvell.com <pbhagavat...@marvell.com>
>> Sent: Wednesday, March 24, 2021 10:35 AM
>> To: jer...@marvell.com; Jayatheerthan, Jay
><jay.jayatheert...@intel.com>; Carrillo, Erik G
><erik.g.carri...@intel.com>; Gujjar,
>> Abhinandan S <abhinandan.guj...@intel.com>; McDaniel, Timothy
><timothy.mcdan...@intel.com>; hemant.agra...@nxp.com; Van
>> Haaren, Harry <harry.van.haa...@intel.com>; mattias.ronnblom
><mattias.ronnb...@ericsson.com>; Ma, Liang J
>> <liang.j...@intel.com>
>> Cc: dev@dpdk.org; Pavan Nikhilesh <pbhagavat...@marvell.com>
>> Subject: [dpdk-dev] [PATCH v5 4/8] eventdev: add Rx adapter event
>vector support
>>
>> From: Pavan Nikhilesh <pbhagavat...@marvell.com>
>>
>> Add event vector support for event eth Rx adapter, the
>implementation
>> creates vector flows based on port and queue identifier of the
>received
>> mbufs.
>>
>> Signed-off-by: Pavan Nikhilesh <pbhagavat...@marvell.com>
>> ---
>> lib/librte_eventdev/eventdev_pmd.h | 7 +-
>> .../rte_event_eth_rx_adapter.c | 257 ++++++++++++++++--
>> lib/librte_eventdev/rte_eventdev.c | 6 +-
>> 3 files changed, 250 insertions(+), 20 deletions(-)
>>
>> diff --git a/lib/librte_eventdev/eventdev_pmd.h
>b/lib/librte_eventdev/eventdev_pmd.h
>> index 9297f1433..0f724ac85 100644
>> --- a/lib/librte_eventdev/eventdev_pmd.h
>> +++ b/lib/librte_eventdev/eventdev_pmd.h
>> @@ -69,9 +69,10 @@ extern "C" {
>> } \
>> } while (0)
>>
>> -#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP \
>> -
> ((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) |
>\
>> -
> (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ))
>> +#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP
>\
>> + ((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) |
>\
>> + (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) |
>\
>> + (RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR))
>>
>> #define RTE_EVENT_CRYPTO_ADAPTER_SW_CAP \
>>
> RTE_EVENT_CRYPTO_ADAPTER_CAP_SESSION_PRIVATE_DATA
>> diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>> index ac8ba5bf0..c71990078 100644
>> --- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>> +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
>> @@ -26,6 +26,10 @@
>> #define BATCH_SIZE 32
>> #define BLOCK_CNT_THRESHOLD 10
>> #define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE)
>> +#define MAX_VECTOR_SIZE 1024
>> +#define MIN_VECTOR_SIZE 4
>> +#define MAX_VECTOR_NS 1E9
>> +#define MIN_VECTOR_NS 1E5
>>
>> #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
>> #define ETH_RX_ADAPTER_MEM_NAME_LEN 32
>> @@ -59,6 +63,20 @@ struct eth_rx_poll_entry {
>> uint16_t eth_rx_qid;
>> };
>>
>> +struct eth_rx_vector_data {
>> + TAILQ_ENTRY(eth_rx_vector_data) next;
>> + uint16_t port;
>> + uint16_t queue;
>> + uint16_t max_vector_count;
>> + uint64_t event;
>> + uint64_t ts;
>> + uint64_t vector_timeout_ticks;
>> + struct rte_mempool *vector_pool;
>> + struct rte_event_vector *vector_ev;
>> +} __rte_cache_aligned;
>> +
>> +TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
>> +
>> /* Instance per adapter */
>> struct rte_eth_event_enqueue_buffer {
>> /* Count of events in this buffer */
>> @@ -92,6 +110,14 @@ struct rte_event_eth_rx_adapter {
>> uint32_t wrr_pos;
>> /* Event burst buffer */
>> struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
>> + /* Vector enable flag */
>> + uint8_t ena_vector;
>> + /* Timestamp of previous vector expiry list traversal */
>> + uint64_t prev_expiry_ts;
>> + /* Minimum ticks to wait before traversing expiry list */
>> + uint64_t vector_tmo_ticks;
>> + /* vector list */
>> + struct eth_rx_vector_data_list vector_list;
>> /* Per adapter stats */
>> struct rte_event_eth_rx_adapter_stats stats;
>> /* Block count, counts up to BLOCK_CNT_THRESHOLD */
>> @@ -198,9 +224,11 @@ struct eth_device_info {
>> struct eth_rx_queue_info {
>> int queue_enabled; /* True if added */
>> int intr_enabled;
>> + uint8_t ena_vector;
>> uint16_t wt; /* Polling weight */
>> uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else
>0 */
>> uint64_t event;
>> + struct eth_rx_vector_data vector_data;
>> };
>>
>> static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
>> @@ -722,6 +750,9 @@ rxa_flush_event_buffer(struct
>rte_event_eth_rx_adapter *rx_adapter)
>> &rx_adapter->event_enqueue_buffer;
>> struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter-
>>stats;
>>
>> + if (!buf->count)
>> + return 0;
>> +
>> uint16_t n = rte_event_enqueue_new_burst(rx_adapter-
>>eventdev_id,
>> rx_adapter->event_port_id,
>> buf->events,
>> @@ -742,6 +773,72 @@ rxa_flush_event_buffer(struct
>rte_event_eth_rx_adapter *rx_adapter)
>> return n;
>> }
>>
>> +static inline uint16_t
>> +rxa_create_event_vector(struct rte_event_eth_rx_adapter
>*rx_adapter,
>> + struct eth_rx_queue_info *queue_info,
>> + struct rte_eth_event_enqueue_buffer *buf,
>> + struct rte_mbuf **mbufs, uint16_t num)
>> +{
>> + struct rte_event *ev = &buf->events[buf->count];
>> + struct eth_rx_vector_data *vec;
>> + uint16_t filled, space, sz;
>> +
>> + filled = 0;
>> + vec = &queue_info->vector_data;
>> + while (num) {
>> + if (vec->vector_ev == NULL) {
>> + if (rte_mempool_get(vec->vector_pool,
>> + (void **)&vec->vector_ev) <
>0) {
>> + rte_pktmbuf_free_bulk(mbufs, num);
>> + return 0;
>> + }
>> + vec->vector_ev->nb_elem = 0;
>> + vec->vector_ev->port = vec->port;
>> + vec->vector_ev->queue = vec->queue;
>> + vec->vector_ev->attr_valid = true;
>> + TAILQ_INSERT_TAIL(&rx_adapter->vector_list,
>vec, next);
>> + } else if (vec->vector_ev->nb_elem == vec-
>>max_vector_count) {
>
>Is there a case where nb_elem > max_vector_count as we accumulate
>sz to it ?
I don't think so, that would overflow the vector event.
>
>> + /* Event ready. */
>> + ev->event = vec->event;
>> + ev->vec = vec->vector_ev;
>> + ev++;
>> + filled++;
>> + vec->vector_ev = NULL;
>> + TAILQ_REMOVE(&rx_adapter->vector_list, vec,
>next);
>> + if (rte_mempool_get(vec->vector_pool,
>> + (void **)&vec->vector_ev) <
>0) {
>> + rte_pktmbuf_free_bulk(mbufs, num);
>> + return 0;
>> + }
>> + vec->vector_ev->nb_elem = 0;
>> + vec->vector_ev->port = vec->port;
>> + vec->vector_ev->queue = vec->queue;
>> + vec->vector_ev->attr_valid = true;
>> + TAILQ_INSERT_TAIL(&rx_adapter->vector_list,
>vec, next);
>> + }
>> +
>> + space = vec->max_vector_count - vec->vector_ev-
>>nb_elem;
>> + sz = num > space ? space : num;
>> + memcpy(vec->vector_ev->mbufs + vec->vector_ev-
>>nb_elem, mbufs,
>> + sizeof(void *) * sz);
>> + vec->vector_ev->nb_elem += sz;
>> + num -= sz;
>> + mbufs += sz;
>> + vec->ts = rte_rdtsc();
>> + }
>> +
>> + if (vec->vector_ev->nb_elem == vec->max_vector_count) {
>
>Same here.
>
>> + ev->event = vec->event;
>> + ev->vec = vec->vector_ev;
>> + ev++;
>> + filled++;
>> + vec->vector_ev = NULL;
>> + TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
>> + }
>> +
>> + return filled;
>> +}
>
>I am seeing more than one repeating code chunks in this function.
>Perhaps, you can give it a try to not repeat. We can drop if its
>performance affecting.
I will try to move them to inline functions and test.
>
>> +
>> static inline void
>> rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
>> uint16_t eth_dev_id,
>> @@ -770,25 +867,30 @@ rxa_buffer_mbufs(struct
>rte_event_eth_rx_adapter *rx_adapter,
>> rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
>> do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
>
>The RSS related code is executed for vector case as well. Can this be
>moved inside ena_vector if condition ?
RSS is used to generate the event flowid, in vector case the flow id
Will be a combination of port and queue id.
The idea is that flows having the same RSS LSB will end up in the same
queue.
>
>>
>> - for (i = 0; i < num; i++) {
>> - m = mbufs[i];
>> -
>> - rss = do_rss ?
>> - rxa_do_softrss(m, rx_adapter->rss_key_be) :
>> - m->hash.rss;
>> - ev->event = event;
>> - ev->flow_id = (rss & ~flow_id_mask) |
>> - (ev->flow_id & flow_id_mask);
>> - ev->mbuf = m;
>> - ev++;
>> + if (!eth_rx_queue_info->ena_vector) {
>> + for (i = 0; i < num; i++) {
>> + m = mbufs[i];
>> +
>> + rss = do_rss ? rxa_do_softrss(m, rx_adapter-
>>rss_key_be)
>> + : m->hash.rss;
>> + ev->event = event;
>> + ev->flow_id = (rss & ~flow_id_mask) |
>> + (ev->flow_id & flow_id_mask);
>> + ev->mbuf = m;
>> + ev++;
>> + }
>> + } else {
>> + num = rxa_create_event_vector(rx_adapter,
>eth_rx_queue_info,
>> + buf, mbufs, num);
>> }
>>
>> - if (dev_info->cb_fn) {
>> + if (num && dev_info->cb_fn) {
>>
>> dropped = 0;
>> nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
>> - ETH_EVENT_BUFFER_SIZE, buf-
>>count, ev,
>> - num, dev_info->cb_arg,
>&dropped);
>> + ETH_EVENT_BUFFER_SIZE, buf-
>>count,
>> + &buf->events[buf->count],
>num,
>> + dev_info->cb_arg, &dropped);
>
>Before this patch, we pass ev which is &buf->events[buf->count] + num
>as fifth param when calling cb_fn. Now, we are passing &buf-
>>events[buf->count] for non-vector case. Do you see this as an issue?
>
The callback function takes in the array newly formed events i.e. we need
to pass the start of array and the count.
the previous code had a bug where it passes the end of the event list.
>Also, for vector case would it make sense to do pass &buf->events[buf-
>>count] + num ?
>
>> if (unlikely(nb_cb > num))
>> RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d)
>events",
>> nb_cb, num);
>> @@ -1124,6 +1226,30 @@ rxa_poll(struct rte_event_eth_rx_adapter
>*rx_adapter)
>> return nb_rx;
>> }
>>
>> +static void
>> +rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
>> +{
>> + struct rte_event_eth_rx_adapter *rx_adapter = arg;
>> + struct rte_eth_event_enqueue_buffer *buf =
>> + &rx_adapter->event_enqueue_buffer;
>> + struct rte_event *ev;
>> +
>> + if (buf->count)
>> + rxa_flush_event_buffer(rx_adapter);
>> +
>> + if (vec->vector_ev->nb_elem == 0)
>> + return;
>> + ev = &buf->events[buf->count];
>> +
>> + /* Event ready. */
>> + ev->event = vec->event;
>> + ev->vec = vec->vector_ev;
>> + buf->count++;
>> +
>> + vec->vector_ev = NULL;
>> + vec->ts = 0;
>> +}
>> +
>> static int
>> rxa_service_func(void *args)
>> {
>> @@ -1137,6 +1263,24 @@ rxa_service_func(void *args)
>> return 0;
>> }
>>
>> + if (rx_adapter->ena_vector) {
>> + if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
>> + rx_adapter->vector_tmo_ticks) {
>> + struct eth_rx_vector_data *vec;
>> +
>> + TAILQ_FOREACH(vec, &rx_adapter->vector_list,
>next) {
>> + uint64_t elapsed_time = rte_rdtsc() -
>vec->ts;
>> +
>> + if (elapsed_time >= vec-
>>vector_timeout_ticks) {
>> + rxa_vector_expire(vec,
>rx_adapter);
>> + TAILQ_REMOVE(&rx_adapter-
>>vector_list,
>> + vec, next);
>> + }
>> + }
>> + rx_adapter->prev_expiry_ts = rte_rdtsc();
>> + }
>> + }
>> +
>> stats = &rx_adapter->stats;
>> stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
>> stats->rx_packets += rxa_poll(rx_adapter);
>> @@ -1640,6 +1784,28 @@ rxa_update_queue(struct
>rte_event_eth_rx_adapter *rx_adapter,
>> }
>> }
>>
>> +static void
>> +rxa_set_vector_data(struct eth_rx_queue_info *queue_info,
>uint16_t vector_count,
>> + uint64_t vector_ns, struct rte_mempool *mp, int32_t
>qid,
>> + uint16_t port_id)
>> +{
>> +#define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
>> + struct eth_rx_vector_data *vector_data;
>> + uint32_t flow_id;
>> +
>> + vector_data = &queue_info->vector_data;
>> + vector_data->max_vector_count = vector_count;
>> + vector_data->port = port_id;
>> + vector_data->queue = qid;
>> + vector_data->vector_pool = mp;
>> + vector_data->vector_timeout_ticks =
>> + NSEC2TICK(vector_ns, rte_get_timer_hz());
>> + vector_data->ts = 0;
>> + flow_id = queue_info->event & 0xFFFFF;
>> + flow_id = flow_id == 0 ? (qid & 0xFF) | (port_id & 0xFFFF) :
>flow_id;
>
>Maybe I am missing something here. Looking at the code it looks like
>qid and port_id may overlap. For e.g., if qid = 0x10 and port_id = 0x11,
>flow_id would end up being 0x11. Is this the expectation? Also, it may
>be useful to document flow_id format.
The flow_id is 20 bit, I guess we could do 12bit queue_id and 8bit port
as a flow.
>Comparing this format with existing RSS hash based method, are we
>saying that all mbufs received in a rx burst are part of same flow when
>vectorization is used?
Yes, the hard way to do this is to use a hash table and treating each
mbuf having an unique flow.
>
>> + vector_data->event = (queue_info->event & ~0xFFFFF) |
>flow_id;
>> +}
>> +
>> static void
>> rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
>> struct eth_device_info *dev_info,
>> @@ -1741,6 +1907,44 @@ rxa_add_queue(struct
>rte_event_eth_rx_adapter *rx_adapter,
>> }
>> }
>>
>> +static void
>> +rxa_sw_event_vector_configure(
>> + struct rte_event_eth_rx_adapter *rx_adapter, uint16_t
>eth_dev_id,
>> + int rx_queue_id,
>> + const struct rte_event_eth_rx_adapter_event_vector_config
>*config)
>> +{
>> + struct eth_device_info *dev_info = &rx_adapter-
>>eth_devices[eth_dev_id];
>> + struct eth_rx_queue_info *queue_info;
>> + struct rte_event *qi_ev;
>> +
>> + if (rx_queue_id == -1) {
>> + uint16_t nb_rx_queues;
>> + uint16_t i;
>> +
>> + nb_rx_queues = dev_info->dev->data->nb_rx_queues;
>> + for (i = 0; i < nb_rx_queues; i++)
>> + rxa_sw_event_vector_configure(rx_adapter,
>eth_dev_id, i,
>> + config);
>> + return;
>> + }
>> +
>> + queue_info = &dev_info->rx_queue[rx_queue_id];
>> + qi_ev = (struct rte_event *)&queue_info->event;
>> + queue_info->ena_vector = 1;
>> + qi_ev->event_type =
>RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
>> + rxa_set_vector_data(queue_info, config->vector_sz,
>> + config->vector_timeout_ns, config-
>>vector_mp,
>> + rx_queue_id, dev_info->dev->data->port_id);
>> + rx_adapter->ena_vector = 1;
>> + rx_adapter->vector_tmo_ticks =
>> + rx_adapter->vector_tmo_ticks ?
>> + RTE_MIN(config->vector_timeout_ns << 1,
>> + rx_adapter->vector_tmo_ticks) :
>> + config->vector_timeout_ns << 1;
>> + rx_adapter->prev_expiry_ts = 0;
>> + TAILQ_INIT(&rx_adapter->vector_list);
>> +}
>> +
>> static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
>> uint16_t eth_dev_id,
>> int rx_queue_id,
>> @@ -2081,6 +2285,15 @@
>rte_event_eth_rx_adapter_queue_add(uint8_t id,
>> return -EINVAL;
>> }
>>
>> + if ((cap &
>RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0 &&
>> + (queue_conf->rx_queue_flags &
>> + RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR)) {
>> + RTE_EDEV_LOG_ERR("Event vectorization is not
>supported,"
>> + " eth port: %" PRIu16 " adapter id: %"
>PRIu8,
>> + eth_dev_id, id);
>> + return -EINVAL;
>> + }
>> +
>> if ((cap &
>RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
>> (rx_queue_id != -1)) {
>> RTE_EDEV_LOG_ERR("Rx queues can only be connected
>to single "
>> @@ -2143,6 +2356,17 @@
>rte_event_eth_rx_adapter_queue_add(uint8_t id,
>> return 0;
>> }
>>
>> +static int
>> +rxa_sw_vector_limits(struct
>rte_event_eth_rx_adapter_vector_limits *limits)
>> +{
>> + limits->max_sz = MAX_VECTOR_SIZE;
>> + limits->min_sz = MIN_VECTOR_SIZE;
>> + limits->max_timeout_ns = MAX_VECTOR_NS;
>> + limits->min_timeout_ns = MIN_VECTOR_NS;
>> +
>> + return 0;
>> +}
>> +
>> int
>> rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t
>eth_dev_id,
>> int32_t rx_queue_id)
>> @@ -2333,7 +2557,8 @@
>rte_event_eth_rx_adapter_queue_event_vector_config(
>> ret = dev->dev_ops-
>>eth_rx_adapter_event_vector_config(
>> dev, &rte_eth_devices[eth_dev_id],
>rx_queue_id, config);
>> } else {
>> - ret = -ENOTSUP;
>> + rxa_sw_event_vector_configure(rx_adapter,
>eth_dev_id,
>> + rx_queue_id, config);
>> }
>>
>> return ret;
>> @@ -2371,7 +2596,7 @@
>rte_event_eth_rx_adapter_vector_limits_get(
>> ret = dev->dev_ops-
>>eth_rx_adapter_vector_limits_get(
>> dev, &rte_eth_devices[eth_port_id], limits);
>> } else {
>> - ret = -ENOTSUP;
>> + ret = rxa_sw_vector_limits(limits);
>> }
>>
>> return ret;
>> diff --git a/lib/librte_eventdev/rte_eventdev.c
>b/lib/librte_eventdev/rte_eventdev.c
>> index f95edc075..254a31b1f 100644
>> --- a/lib/librte_eventdev/rte_eventdev.c
>> +++ b/lib/librte_eventdev/rte_eventdev.c
>> @@ -122,7 +122,11 @@ rte_event_eth_rx_adapter_caps_get(uint8_t
>dev_id, uint16_t eth_port_id,
>>
>> if (caps == NULL)
>> return -EINVAL;
>> - *caps = 0;
>> +
>> + if (dev->dev_ops->eth_rx_adapter_caps_get == NULL)
>> + *caps = RTE_EVENT_ETH_RX_ADAPTER_SW_CAP;
>> + else
>> + *caps = 0;
>
>Any reason why we had to set default caps value? I am thinking if sw
>event device is used, it would set it anyways.
>
There are multiple sw event devices which don't implement caps_get
function, this changes solves that.
>>
>> return dev->dev_ops->eth_rx_adapter_caps_get ?
>> (*dev->dev_ops-
>>eth_rx_adapter_caps_get)(dev,
>> --
>> 2.17.1