Hi Jerin,

I'm assisting Harry on the sample app, and have just pushed up a V2 patch based on your feedback. I've addressed most of your suggestions, comments below. There may still a couple of outstanding questions that need further discussion.

Regards,
Dave

On 10/5/2017 3:12 PM, Jerin Jacob wrote:
> -----Original Message-----
>> Date: Fri, 21 Apr 2017 10:51:37 +0100
>> From: Harry van Haaren <harry.van.haa...@intel.com>
>> To: dev@dpdk.org
>> CC: jerin.ja...@caviumnetworks.com, Harry van Haaren
>> <harry.van.haa...@intel.com>, Gage Eads <gage.e...@intel.com>, Bruce
>>  Richardson <bruce.richard...@intel.com>
>> Subject: [PATCH 1/3] examples/eventdev_pipeline: added sample app
>> X-Mailer: git-send-email 2.7.4
>>
>> This commit adds a sample app for the eventdev library.
>> The app has been tested with DPDK 17.05-rc2, hence this
>> release (or later) is recommended.
>>
>> The sample app showcases a pipeline processing use-case,
>> with event scheduling and processing defined per stage.
>> The application recieves traffic as normal, with each
>> packet traversing the pipeline. Once the packet has
>> been processed by each of the pipeline stages, it is
>> transmitted again.
>>
>> The app provides a framework to utilize cores for a single
>> role or multiple roles. Examples of roles are the RX core,
>> TX core, Scheduling core (in the case of the event/sw PMD),
>> and worker cores.
>>
>> Various flags are available to configure numbers of stages,
>> cycles of work at each stage, type of scheduling, number of
>> worker cores, queue depths etc. For a full explaination,
>> please refer to the documentation.
>>
>> Signed-off-by: Gage Eads <gage.e...@intel.com>
>> Signed-off-by: Bruce Richardson <bruce.richard...@intel.com>
>> Signed-off-by: Harry van Haaren <harry.van.haa...@intel.com>
>
> Thanks for the example application to share the SW view.
> I could make it run on HW after some tweaking(not optimized though)
>
> [...]
>> +#define MAX_NUM_STAGES 8
>> +#define BATCH_SIZE 16
>> +#define MAX_NUM_CORE 64
>
> How about RTE_MAX_LCORE?

Core usage in the sample app is held in a uint64_t. Adding arrays would be possible, but I feel that the extra effort would not give that much benefit. I've left as is for the moment, unless you see any strong requirement to go beyond 64 cores?

>
>> +
>> +static unsigned int active_cores;
>> +static unsigned int num_workers;
>> +static unsigned long num_packets = (1L << 25); /* do ~32M packets */
>> +static unsigned int num_fids = 512;
>> +static unsigned int num_priorities = 1;
>
> looks like its not used.

Yes, Removed.

>
>> +static unsigned int num_stages = 1;
>> +static unsigned int worker_cq_depth = 16;
>> +static int queue_type = RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY;
>> +static int16_t next_qid[MAX_NUM_STAGES+1] = {-1};
>> +static int16_t qid[MAX_NUM_STAGES] = {-1};
>
> Moving all fastpath related variables under a structure with cache
> aligned will help.

I tried a few different combinations of this, and saw no gains, some losses. So will leave as is for the moment, if that's OK.

>
>> +static int worker_cycles;
>> +static int enable_queue_priorities;
>> +
>> +struct prod_data {
>> +    uint8_t dev_id;
>> +    uint8_t port_id;
>> +    int32_t qid;
>> +    unsigned num_nic_ports;
>> +};

Yes, saw a percent or two gain when this plus following two data structs cache aligned.

>
> cache aligned ?
>
>> +
>> +struct cons_data {
>> +    uint8_t dev_id;
>> +    uint8_t port_id;
>> +};
>> +
>
> cache aligned ?

Yes, see comment above

>
>> +static struct prod_data prod_data;
>> +static struct cons_data cons_data;
>> +
>> +struct worker_data {
>> +    uint8_t dev_id;
>> +    uint8_t port_id;
>> +};
>
> cache aligned ?

Yes, see comment above


>
>> +
>> +static unsigned *enqueue_cnt;
>> +static unsigned *dequeue_cnt;
>> +
>> +static volatile int done;
>> +static volatile int prod_stop;
>
> No one updating the prod_stop.

Old var, removed.

>
>> +static int quiet;
>> +static int dump_dev;
>> +static int dump_dev_signal;
>> +
>> +static uint32_t rx_lock;
>> +static uint32_t tx_lock;
>> +static uint32_t sched_lock;
>> +static bool rx_single;
>> +static bool tx_single;
>> +static bool sched_single;
>> +
>> +static unsigned rx_core[MAX_NUM_CORE];
>> +static unsigned tx_core[MAX_NUM_CORE];
>> +static unsigned sched_core[MAX_NUM_CORE];
>> +static unsigned worker_core[MAX_NUM_CORE];
>> +
>> +static bool
>> +core_in_use(unsigned lcore_id) {
>> +    return (rx_core[lcore_id] || sched_core[lcore_id] ||
>> +        tx_core[lcore_id] || worker_core[lcore_id]);
>> +}
>> +
>> +static struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
>> +
>> +static void
>> +rte_eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
>> +            void *userdata)
>
> IMO, It is better to not use rte_eth_* for application functions.

Sure, removed the 'rte_' part of the function name.

>
>> +{
>> +    int port_id = (uintptr_t) userdata;
>> +    unsigned _sent = 0;
>> +
>> +    do {
>> +        /* Note: hard-coded TX queue */
>> +        _sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent],
>> +                      unsent - _sent);
>> +    } while (_sent != unsent);
>> +}
>> +
>> +static int
>> +consumer(void)
>> +{
>> +    const uint64_t freq_khz = rte_get_timer_hz() / 1000;
>> +    struct rte_event packets[BATCH_SIZE];
>> +
>> +    static uint64_t npackets;
>> +    static uint64_t received;
>> +    static uint64_t received_printed;
>> +    static uint64_t time_printed;
>> +    static uint64_t start_time;
>> +    unsigned i, j;
>> +    uint8_t dev_id = cons_data.dev_id;
>> +    uint8_t port_id = cons_data.port_id;
>> +
>> +    if (!npackets)
>> +        npackets = num_packets;
>> +
>> +    do {
>> +        uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
>> +                packets, RTE_DIM(packets), 0);
>
>         const uint16_t n =

sure.

>
>> +
>> +        if (n == 0) {
>> +            for (j = 0; j < rte_eth_dev_count(); j++)
>> +                rte_eth_tx_buffer_flush(j, 0, tx_buf[j]);
>> +            return 0;
>> +        }
>> +        if (start_time == 0)
>> +            time_printed = start_time = rte_get_timer_cycles();
>> +
>> +        received += n;
>> +        for (i = 0; i < n; i++) {
>> +            uint8_t outport = packets[i].mbuf->port;
>> +            rte_eth_tx_buffer(outport, 0, tx_buf[outport],
>> +                    packets[i].mbuf);
>> +        }
>> +
>> +        if (!quiet && received >= received_printed + (1<<22)) {
>> +            const uint64_t now = rte_get_timer_cycles();
>> +            const uint64_t delta_cycles = now - start_time;
>> +            const uint64_t elapsed_ms = delta_cycles / freq_khz;
>> +            const uint64_t interval_ms =
>> +                    (now - time_printed) / freq_khz;
>> +
>> +            uint64_t rx_noprint = received - received_printed;
>> +            printf("# consumer RX=%"PRIu64", time %"PRIu64
>> +                "ms, avg %.3f mpps [current %.3f mpps]\n",
>> +                    received, elapsed_ms,
>> +                    (received) / (elapsed_ms * 1000.0),
>> +                    rx_noprint / (interval_ms * 1000.0));
>> +            received_printed = received;
>> +            time_printed = now;
>> +        }
>> +
>> +        dequeue_cnt[0] += n;
>> +
>> +        if (num_packets > 0 && npackets > 0) {
>> +            npackets -= n;
>> +            if (npackets == 0 || npackets > num_packets)
>> +                done = 1;
>> +        }
>
> Looks like very complicated logic.I think we can simplify it.

I've simplified this.


>
>> +    } while (0);
>
> do while(0); really required here?

Removed.

>
>> +
>> +    return 0;
>> +}
>> +
>> +static int
>> +producer(void)
>> +{
>> +    static uint8_t eth_port;
>> +    struct rte_mbuf *mbufs[BATCH_SIZE];
>> +    struct rte_event ev[BATCH_SIZE];
>> +    uint32_t i, num_ports = prod_data.num_nic_ports;
>> +    int32_t qid = prod_data.qid;
>> +    uint8_t dev_id = prod_data.dev_id;
>> +    uint8_t port_id = prod_data.port_id;
>> +    uint32_t prio_idx = 0;
>> +
>> + const uint16_t nb_rx = rte_eth_rx_burst(eth_port, 0, mbufs, BATCH_SIZE);
>> +    if (++eth_port == num_ports)
>> +        eth_port = 0;
>> +    if (nb_rx == 0) {
>> +        rte_pause();
>> +        return 0;
>> +    }
>> +
>> +    for (i = 0; i < nb_rx; i++) {
>> +        ev[i].flow_id = mbufs[i]->hash.rss;
>
> prefetching the buff[i+1] may help here?

I tried, didn't make much difference.

>
>> +        ev[i].op = RTE_EVENT_OP_NEW;
>> +        ev[i].sched_type = queue_type;
>
> The value of RTE_EVENT_QUEUE_CFG_ORDERED_ONLY != RTE_SCHED_TYPE_ORDERED. So, we
> cannot assign .sched_type as queue_type.
>
> I think, one option could be to avoid translation in application is to
> - Remove RTE_EVENT_QUEUE_CFG_ALL_TYPES, RTE_EVENT_QUEUE_CFG_*_ONLY
> - Introduce a new RTE_EVENT_DEV_CAP_ to denote RTE_EVENT_QUEUE_CFG_ALL_TYPES cap
> ability
> - add sched_type in struct rte_event_queue_conf. If capability flag is
>   not set then implementation takes sched_type value for the queue.
>
> Any thoughts?


Not sure here, would it be ok for the moment, and we can work on a patch in the future?

>
>
>> +        ev[i].queue_id = qid;
>> +        ev[i].event_type = RTE_EVENT_TYPE_CPU;
>
> IMO, RTE_EVENT_TYPE_ETHERNET is the better option here as it is
> producing the Ethernet packets/events.

Changed to RTE_EVENT_TYPE_ETHDEV.


>
>> +        ev[i].sub_event_type = 0;
>> +        ev[i].priority = RTE_EVENT_DEV_PRIORITY_NORMAL;
>> +        ev[i].mbuf = mbufs[i];
>> +        RTE_SET_USED(prio_idx);
>> +    }
>> +
>> + const int nb_tx = rte_event_enqueue_burst(dev_id, port_id, ev, nb_rx);
>
> For producer pattern i.e a burst of RTE_EVENT_OP_NEW, OcteonTX can do burst
> operation unlike FORWARD case(which is one event at a time).Earlier, I
> thought I can abstract the producer pattern in PMD, but it looks like we
> are going with application driven producer model based on latest RFC.So I think,
> we can add one flag to rte_event_enqueue_burst to denote all the events
> are of type RTE_EVENT_OP_NEW as hint.SW driver can ignore this.
>
> I can send a patch for the same.
>
> Any thoughts?

I think this comment is closed now, as your patch is upstreamed, afaik?

>
>
>> +    if (nb_tx != nb_rx) {
>> +        for (i = nb_tx; i < nb_rx; i++)
>> +            rte_pktmbuf_free(mbufs[i]);
>> +    }
>> +    enqueue_cnt[0] += nb_tx;
>> +
>> +    if (unlikely(prod_stop))
>
> I think, No one updating the prod_stop

Removed.

>
>> +        done = 1;
>> +
>> +    return 0;
>> +}
>> +
>> +static inline void
>> +schedule_devices(uint8_t dev_id, unsigned lcore_id)
>> +{
>> +    if (rx_core[lcore_id] && (rx_single ||
>> +        rte_atomic32_cmpset(&rx_lock, 0, 1))) {
>
> This pattern(rte_atomic32_cmpset) makes application can inject only
> "one core" worth of packets. Not enough for low-end cores. May be we need
> multiple producer options. I think, new RFC is addressing it.

OK, will leave this to the RFC.

>
>> +        producer();
>> +        rte_atomic32_clear((rte_atomic32_t *)&rx_lock);
>> +    }
>> +
>> +    if (sched_core[lcore_id] && (sched_single ||
>> +        rte_atomic32_cmpset(&sched_lock, 0, 1))) {
>> +        rte_event_schedule(dev_id);
>> +        if (dump_dev_signal) {
>> +            rte_event_dev_dump(0, stdout);
>> +            dump_dev_signal = 0;
>> +        }
>> +        rte_atomic32_clear((rte_atomic32_t *)&sched_lock);
>> +    }
>
> Lot of unwanted code if RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED set.
>
> I think, We can make common code with compile time aware and make
> runtime workers based on the flag..
> i.e
> rte_eal_remote_launch(worker_x, &worker_data[worker_idx], lcore_id);
> rte_eal_remote_launch(worker_y, &worker_data[worker_idx], lcore_id);
>
> May we can improve after initial version.

Yes, we can clean up after initial version.


>
>> +
>> +    if (tx_core[lcore_id] && (tx_single ||
>> +        rte_atomic32_cmpset(&tx_lock, 0, 1))) {
>> +        consumer();
>
> Should consumer() need to come in this pattern? I am thinking like
> if events is from last stage then call consumer() in worker()
>
> I think, above scheme works better when the _same_ worker code need to run the
> case where
> 1) ethdev HW is capable to enqueuing the packets to same txq from
>   multiple thread
> 2) ethdev is not capable to do so.
>
> So, The above cases can be addressed in configuration time where we link
> the queues to port
> case 1) Link all workers to last queue
> case 2) Link only worker to last queue
>
> and keeping the common worker code.
>
> HW implementation has functional and performance issue if "two" ports are
> assigned to one lcore for dequeue. The above scheme fixes that problem too.


Can we have a bit more discussion on this item? Is this needed for this sample app, or can we perhaps work a patch for this later? Harry?


>
>> +        rte_atomic32_clear((rte_atomic32_t *)&tx_lock);
>> +    }
>> +}
>> +
>> +static int
>> +worker(void *arg)
>> +{
>> +    struct rte_event events[BATCH_SIZE];
>> +
>> +    struct worker_data *data = (struct worker_data *)arg;
>> +    uint8_t dev_id = data->dev_id;
>> +    uint8_t port_id = data->port_id;
>> +    size_t sent = 0, received = 0;
>> +    unsigned lcore_id = rte_lcore_id();
>> +
>> +    while (!done) {
>> +        uint16_t i;
>> +
>> +        schedule_devices(dev_id, lcore_id);
>> +
>> +        if (!worker_core[lcore_id]) {
>> +            rte_pause();
>> +            continue;
>> +        }
>> +
>> +        uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
>> +                events, RTE_DIM(events), 0);
>> +
>> +        if (nb_rx == 0) {
>> +            rte_pause();
>> +            continue;
>> +        }
>> +        received += nb_rx;
>> +
>> +        for (i = 0; i < nb_rx; i++) {
>> +            struct ether_hdr *eth;
>> +            struct ether_addr addr;
>> +            struct rte_mbuf *m = events[i].mbuf;
>> +
>> +            /* The first worker stage does classification */
>> +            if (events[i].queue_id == qid[0])
>> +                events[i].flow_id = m->hash.rss % num_fids;
>
> Not sure why we need do(shrinking the flows) this in worker() in queue based pipeline.
> If an PMD has any specific requirement on num_fids,I think, we
> can move this configuration stage or PMD can choose optimum fid internally to
> avoid modulus operation tax in fastpath in all PMD.
>
> Does struct rte_event_queue_conf.nb_atomic_flows help here?

In my tests the modulus makes very little difference in the throughput. And I think it's good to have a way of varying the number of flows for testing different scenarios, even if it's not the most performant.

>
>> +
>> +            events[i].queue_id = next_qid[events[i].queue_id];
>> +            events[i].op = RTE_EVENT_OP_FORWARD;
>
> missing events[i].sched_type.HW PMD does not work with this.
> I think, we can use similar scheme like next_qid for next_sched_type.

Done. added events[i].sched_type = queue_type.

>
>> +
>> +            /* change mac addresses on packet (to use mbuf data) */
>> +            eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
>> +            ether_addr_copy(&eth->d_addr, &addr);
>> +            ether_addr_copy(&eth->s_addr, &eth->d_addr);
>> +            ether_addr_copy(&addr, &eth->s_addr);
>
> IMO, We can make packet processing code code as "static inline function" so
> different worker types can reuse.

Done. moved out to a work() function.

>
>> +
>> +            /* do a number of cycles of work per packet */
>> +            volatile uint64_t start_tsc = rte_rdtsc();
>> +            while (rte_rdtsc() < start_tsc + worker_cycles)
>> +                rte_pause();
>
> Ditto.

Done. moved out to a work() function.

>
> I think, All worker specific variables like "worker_cycles" can moved into
> one structure and use.
>
>> +        }
>> +        uint16_t nb_tx = rte_event_enqueue_burst(dev_id, port_id,
>> +                events, nb_rx);
>> +        while (nb_tx < nb_rx && !done)
>> +            nb_tx += rte_event_enqueue_burst(dev_id, port_id,
>> +                            events + nb_tx,
>> +                            nb_rx - nb_tx);
>> +        sent += nb_tx;
>> +    }
>> +
>> +    if (!quiet)
>> +        printf("  worker %u thread done. RX=%zu TX=%zu\n",
>> +                rte_lcore_id(), received, sent);
>> +
>> +    return 0;
>> +}
>> +
>> +/*
>> + * Parse the coremask given as argument (hexadecimal string) and fill
>> + * the global configuration (core role and core count) with the parsed
>> + * value.
>> + */
>> +static int xdigit2val(unsigned char c)
>
> multiple instance of "xdigit2val" in DPDK repo. May be we can push this
> as common code.

Sure, that's something we can look at in a separate patch, now that it's being used more and more.

>
>> +{
>> +    int val;
>> +
>> +    if (isdigit(c))
>> +        val = c - '0';
>> +    else if (isupper(c))
>> +        val = c - 'A' + 10;
>> +    else
>> +        val = c - 'a' + 10;
>> +    return val;
>> +}
>> +
>> +
>> +static void
>> +usage(void)
>> +{
>> +    const char *usage_str =
>> +        "  Usage: eventdev_demo [options]\n"
>> +        "  Options:\n"
>> + " -n, --packets=N Send N packets (default ~32M), 0 implies no limit\n" >> + " -f, --atomic-flows=N Use N random flows from 1 to N (default 16)\n"
>
> I think, this parameter now, effects the application fast path code.I think,
> it should eventdev configuration para-mater.

See above comment on num_fids

>
>> + " -s, --num_stages=N Use N atomic stages (default 1)\n" >> + " -r, --rx-mask=core mask Run NIC rx on CPUs in core mask\n" >> + " -w, --worker-mask=core mask Run worker on CPUs in core mask\n" >> + " -t, --tx-mask=core mask Run NIC tx on CPUs in core mask\n" >> + " -e --sched-mask=core mask Run scheduler on CPUs in core mask\n"
>> +        "  -c  --cq-depth=N             Worker CQ depth (default 16)\n"
>> +        "  -W  --work-cycles=N          Worker cycles (default 0)\n"
>> + " -P --queue-priority Enable scheduler queue prioritization\n"
>> +        "  -o, --ordered                Use ordered scheduling\n"
>> +        "  -p, --parallel               Use parallel scheduling\n"
>
> IMO, all stage being "parallel" or "ordered" or "atomic" is one mode of
> operation. It is valid have to any combination. We need to express that in
> command like
> example:
> 3 stage with
> O->A->P

How about we add an option that specifies the mode of operation for each stage in a string? Maybe have a '-m' option (modes) e.g. '-m appo' for 4 stages with atomic, parallel, paralled, ordered. Or maybe reuse your test-eventdev parameter style?

>
>> +        "  -q, --quiet                  Minimize printed output\n"
>> + " -D, --dump Print detailed statistics before exit"
>> +        "\n";
>> +    fprintf(stderr, "%s", usage_str);
>> +    exit(1);
>> +}
>> +
>
> [...]
>
>> +            rx_single = (popcnt == 1);
>> +            break;
>> +        case 't':
>> +            tx_lcore_mask = parse_coremask(optarg);
>> +            popcnt = __builtin_popcountll(tx_lcore_mask);
>> +            tx_single = (popcnt == 1);
>> +            break;
>> +        case 'e':
>> +            sched_lcore_mask = parse_coremask(optarg);
>> +            popcnt = __builtin_popcountll(sched_lcore_mask);
>> +            sched_single = (popcnt == 1);
>> +            break;
>> +        default:
>> +            usage();
>> +        }
>> +    }
>> +
>> +    if (worker_lcore_mask == 0 || rx_lcore_mask == 0 ||
>> +        sched_lcore_mask == 0 || tx_lcore_mask == 0) {
>
> Need to honor RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED i.e sched_lcore_mask
> is zero can be valid case.

I'll seperate this out to a check for RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED.
Need to do it later in eventdev_setup(), after eventdev instance is created.

>
>> +        printf("Core part of pipeline was not assigned any cores. "
>> +            "This will stall the pipeline, please check core masks "
>> +            "(use -h for details on setting core masks):\n"
>> +            "\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64
>> +            "\n\tworkers: %"PRIu64"\n",
>> +            rx_lcore_mask, tx_lcore_mask, sched_lcore_mask,
>> +            worker_lcore_mask);
>> +        rte_exit(-1, "Fix core masks\n");
>> +    }
>> +    if (num_stages == 0 || num_stages > MAX_NUM_STAGES)
>> +        usage();
>> +
>> +    for (i = 0; i < MAX_NUM_CORE; i++) {
>> +        rx_core[i] = !!(rx_lcore_mask & (1UL << i));
>> +        tx_core[i] = !!(tx_lcore_mask & (1UL << i));
>> +        sched_core[i] = !!(sched_lcore_mask & (1UL << i));
>> +        worker_core[i] = !!(worker_lcore_mask & (1UL << i));
>> +
>> +        if (worker_core[i])
>> +            num_workers++;
>> +        if (core_in_use(i))
>> +            active_cores++;
>> +    }
>> +}
>> +
>> +
>> +struct port_link {
>> +    uint8_t queue_id;
>> +    uint8_t priority;
>> +};
>> +
>> +static int
>> +setup_eventdev(struct prod_data *prod_data,
>> +        struct cons_data *cons_data,
>> +        struct worker_data *worker_data)
>> +{
>> +    const uint8_t dev_id = 0;
>> +    /* +1 stages is for a SINGLE_LINK TX stage */
>> +    const uint8_t nb_queues = num_stages + 1;
>> +    /* + 2 is one port for producer and one for consumer */
>> +    const uint8_t nb_ports = num_workers + 2;
>
> selection of number of ports is a function of rte_event_has_producer().
> I think, it will be addressed with RFC.
>
>> +    const struct rte_event_dev_config config = {
>> +            .nb_event_queues = nb_queues,
>> +            .nb_event_ports = nb_ports,
>> +            .nb_events_limit  = 4096,
>> +            .nb_event_queue_flows = 1024,
>> +            .nb_event_port_dequeue_depth = 128,
>> +            .nb_event_port_enqueue_depth = 128,
>
> OCTEONTX PMD driver has .nb_event_port_dequeue_depth = 1 and
> .nb_event_port_enqueue_depth = 1 and struct rte_event_dev_info.min_dequeue_timeout_ns
> = 853 value.
> I think, we need to check the rte_event_dev_info_get() first to get the sane
> values and take RTE_MIN or RTE_MAX based on the use case.
>
> or
>
> I can ignore this value in OCTEONTX PMD. But I am not sure NXP case,
> Any thoughts from NXP folks.
>
>

Added in code to do the relevant checks. It should now limit the enqueue/dequeue depths to the configured depth for the port if it's less.



>> +    };
>> +    const struct rte_event_port_conf wkr_p_conf = {
>> +            .dequeue_depth = worker_cq_depth,
>
> Same as above

See previous comment.

>
>> +            .enqueue_depth = 64,
>
> Same as above

See previous comment.

>
>> +            .new_event_threshold = 4096,
>> +    };
>> +    struct rte_event_queue_conf wkr_q_conf = {
>> +            .event_queue_cfg = queue_type,
>> +            .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
>> +            .nb_atomic_flows = 1024,
>> +            .nb_atomic_order_sequences = 1024,
>> +    };
>> +    const struct rte_event_port_conf tx_p_conf = {
>> +            .dequeue_depth = 128,
>
> Same as above

See previous comment.

>> +            .enqueue_depth = 128,
>
> Same as above

See previous comment.

>> +            .new_event_threshold = 4096,
>> +    };
>> +    const struct rte_event_queue_conf tx_q_conf = {
>> +            .priority = RTE_EVENT_DEV_PRIORITY_HIGHEST,
>> +            .event_queue_cfg =
>> +                    RTE_EVENT_QUEUE_CFG_ATOMIC_ONLY |
>> +                    RTE_EVENT_QUEUE_CFG_SINGLE_LINK,
>> +            .nb_atomic_flows = 1024,
>> +            .nb_atomic_order_sequences = 1024,
>> +    };
>> +
>> +    struct port_link worker_queues[MAX_NUM_STAGES];
>> +    struct port_link tx_queue;
>> +    unsigned i;
>> +
>> +    int ret, ndev = rte_event_dev_count();
>> +    if (ndev < 1) {
>> +        printf("%d: No Eventdev Devices Found\n", __LINE__);
>> +        return -1;
>> +    }
>> +
>> +    struct rte_event_dev_info dev_info;
>> +    ret = rte_event_dev_info_get(dev_id, &dev_info);
>> +    printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
>> +
>> +    ret = rte_event_dev_configure(dev_id, &config);
>> +    if (ret < 0)
>> +        printf("%d: Error configuring device\n", __LINE__)
>
> Don't process further with failed configure.
>

Done.

>> +
>> +    /* Q creation - one load balanced per pipeline stage*/
>> +
>> +    /* set up one port per worker, linking to all stage queues */
>> +    for (i = 0; i < num_workers; i++) {
>> +        struct worker_data *w = &worker_data[i];
>> +        w->dev_id = dev_id;
>> +        if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) {
>> +            printf("Error setting up port %d\n", i);
>> +            return -1;
>> +        }
>> +
>> +        uint32_t s;
>> +        for (s = 0; s < num_stages; s++) {
>> +            if (rte_event_port_link(dev_id, i,
>> +                        &worker_queues[s].queue_id,
>> +                        &worker_queues[s].priority,
>> +                        1) != 1) {
>> +                printf("%d: error creating link for port %d\n",
>> +                        __LINE__, i);
>> +                return -1;
>> +            }
>> +        }
>> +        w->port_id = i;
>> +    }
>> +    /* port for consumer, linked to TX queue */
>> +    if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {
>
> If ethdev supports MT txq queue support then this port can be linked to
> worker too. something to consider for future.
>

Sure. No change for now.

>> +        printf("Error setting up port %d\n", i);
>> +        return -1;
>> +    }
>> +    if (rte_event_port_link(dev_id, i, &tx_queue.queue_id,
>> +                &tx_queue.priority, 1) != 1) {
>> +        printf("%d: error creating link for port %d\n",
>> +                __LINE__, i);
>> +        return -1;
>> +    }
>> +    /* port for producer, no links */
>> +    const struct rte_event_port_conf rx_p_conf = {
>> +            .dequeue_depth = 8,
>> +            .enqueue_depth = 8,
>
> same as above issue.You could get default config first and configure.
>

Done. Checking config.nb_event_port_dequeue_depth and reducing if less.

>> +            .new_event_threshold = 1200,
>> +    };
>> +    if (rte_event_port_setup(dev_id, i + 1, &rx_p_conf) < 0) {
>> +        printf("Error setting up port %d\n", i);
>> +        return -1;
>> +    }
>> +
>> +    *prod_data = (struct prod_data){.dev_id = dev_id,
>> +                    .port_id = i + 1,
>> +                    .qid = qid[0] };
>> +    *cons_data = (struct cons_data){.dev_id = dev_id,
>> +                    .port_id = i };
>> +
>> +    enqueue_cnt = rte_calloc(0,
>> +            RTE_CACHE_LINE_SIZE/(sizeof(enqueue_cnt[0])),
>> +            sizeof(enqueue_cnt[0]), 0);
>> +    dequeue_cnt = rte_calloc(0,
>> +            RTE_CACHE_LINE_SIZE/(sizeof(dequeue_cnt[0])),
>> +            sizeof(dequeue_cnt[0]), 0);
>
> Why array? looks like enqueue_cnt[1] and dequeue_cnt[1] not used anywhere.
>

Looks like there was an intention to extend this more. And all the app does is increment without using. I've removed these two vars to clean up.

>> +
>> +    if (rte_event_dev_start(dev_id) < 0) {
>> +        printf("Error starting eventdev\n");
>> +        return -1;
>> +    }
>> +
>> +    return dev_id;
>> +}
>> +
>> +static void
>> +signal_handler(int signum)
>> +{
>> +    if (done || prod_stop)
>
> I think, No one updating the prod_stop
>

Removed.


>> +        rte_exit(1, "Exiting on signal %d\n", signum);
>> +    if (signum == SIGINT || signum == SIGTERM) {
>> +        printf("\n\nSignal %d received, preparing to exit...\n",
>> +                signum);
>> +        done = 1;
>> +    }
>> +    if (signum == SIGTSTP)
>> +        rte_event_dev_dump(0, stdout);
>> +}
>> +
>> +int
>> +main(int argc, char **argv)
>
> [...]
>
>> +       RTE_LCORE_FOREACH_SLAVE(lcore_id) {
>> +               if (lcore_id >= MAX_NUM_CORE)
>> +                       break;
>> +
>> +               if (!rx_core[lcore_id] && !worker_core[lcore_id] &&
>> +                   !tx_core[lcore_id] && !sched_core[lcore_id])
>> +                       continue;
>> +
>> +               if (rx_core[lcore_id])
>> +                       printf(
>> + "[%s()] lcore %d executing NIC Rx, and using eventdev port %u\n",
>> +                                __func__, lcore_id, prod_data.port_id);
>
> These prints wont show if rx,tx, scheduler running on master core(as we
> are browsing through RTE_LCORE_FOREACH_SLAVE)

OK, changed to RTE_LCORE_FOREACH, which also includes the master core.

>
>> +
>> +    if (!quiet) {
>> +        printf("\nPort Workload distribution:\n");
>> +        uint32_t i;
>> +        uint64_t tot_pkts = 0;
>> +        uint64_t pkts_per_wkr[RTE_MAX_LCORE] = {0};
>> +        for (i = 0; i < num_workers; i++) {
>> +            char statname[64];
>> +            snprintf(statname, sizeof(statname), "port_%u_rx",
>> +                    worker_data[i].port_id);
>
> Please check "port_%u_rx" xstat availability with PMD first.

Added a check after rte_event_dev_xstats_by_name_get() to see if it's not -ENOTSUP.

>> +            pkts_per_wkr[i] = rte_event_dev_xstats_by_name_get(
>> +                    dev_id, statname, NULL);
>> +            tot_pkts += pkts_per_wkr[i];
>> +        }
>> +        for (i = 0; i < num_workers; i++) {
>> +            float pc = pkts_per_wkr[i]  * 100 /
>> +                ((float)tot_pkts);
>> +            printf("worker %i :\t%.1f %% (%"PRIu64" pkts)\n",
>> +                    i, pc, pkts_per_wkr[i]);
>> +        }
>> +
>> +    }
>> +
>> +    return 0;
>> +}
>
> As final note, considering the different options in fastpath, I was
> thinking like introducing app/test-eventdev like app/testpmd and have
> set of function pointers# for different modes like "macswap", "txonly"
> in testpmd to exercise different options and framework for adding new use
> cases.I will work on that to check the feasibility.
>
> ##
> struct fwd_engine {
>         const char       *fwd_mode_name; /**< Forwarding mode name. */
> port_fwd_begin_t port_fwd_begin; /**< NULL if nothing special to do. */ > port_fwd_end_t port_fwd_end; /**< NULL if nothing special to do. */
>         packet_fwd_t     packet_fwd;     /**< Mandatory. */
> };
>
>> --
>> 2.7.4
>>

Reply via email to