On 2018-11-28 17:55, Mattias Rönnblom wrote:
Attached is a small DSW throughput test program, that I thought might help you to find the issue.
Looks like DPDK's mailman didn't like my attachment. -- /* * dswtp - A simple DSW eventdev scheduler throughput demo program. * * SPDX-License-Identifier: BSD-3-Clause * * Copyright(c) 2018 Ericsson AB * Mattias Rönnblom <mattias.ronnb...@ericsson.com> */ #include <inttypes.h> #include <stdbool.h> #include <stdio.h> #include <rte_atomic.h> #include <rte_cycles.h> #include <rte_eal.h> #include <rte_eventdev.h> #include <rte_lcore.h> #include <rte_malloc.h> #include <rte_pause.h> #include <rte_random.h> #define EVENT_DEV_ID (0) #define NUM_IN_FLIGHT_EVENTS (4096) #define EVENTDEV_MAX_EVENTS (NUM_IN_FLIGHT_EVENTS * 2) #define EVENTDEV_PORT_NEW_THRESHOLD (NUM_IN_FLIGHT_EVENTS) #define NUM_FLOWS (1024) #define ITER_PER_SYNC (32) #define DEQUEUE_BURST_SIZE (32) #define ENQUEUE_BURST_SIZE (32) struct worker_ctx { uint8_t event_dev_id; uint8_t event_port_id; uint32_t events_to_produce; uint16_t num_stages; uint32_t stage_work; int64_t num_events; rte_atomic64_t *events_finished; } __rte_cache_aligned; static void usage(const char *name) { printf("%s <num-stages> <stage-proc-cycles> <num-events[M]>\n", name); } static int64_t sync_event_count(rte_atomic64_t *total_events_finished, uint32_t *finished_since_sync) { if (*finished_since_sync > 0) { int64_t total; total = rte_atomic64_add_return(total_events_finished, *finished_since_sync); *finished_since_sync = 0; return total; } else return rte_atomic64_read(total_events_finished); } static void cycle_consume(uint64_t work) { uint64_t deadline; if (likely(work == 0)) return; deadline = rte_get_timer_cycles() + work; while (rte_get_timer_cycles() < deadline) rte_pause(); } static int worker_start(void *arg) { struct worker_ctx *ctx = arg; uint8_t dev_id = ctx->event_dev_id; uint8_t port_id = ctx->event_port_id; uint32_t num_produced = 0; uint32_t finished_since_sync = 0; uint16_t iter_since_sync = 0; for (;;) { uint16_t dequeued; uint16_t i; uint16_t enqueued = 0; if (unlikely(num_produced < ctx->events_to_produce)) { struct rte_event ev = { .op = RTE_EVENT_OP_NEW, .queue_id = 0, .sched_type = RTE_SCHED_TYPE_ATOMIC, .flow_id = rte_rand() % NUM_FLOWS }; if (rte_event_enqueue_new_burst(dev_id, port_id, &ev, 1) == 1) num_produced++; } struct rte_event evs[DEQUEUE_BURST_SIZE]; dequeued = rte_event_dequeue_burst(dev_id, port_id, evs, DEQUEUE_BURST_SIZE, 0); for (i = 0; i < dequeued; i++) { struct rte_event *ev = &evs[i]; uint16_t this_stage = ev->queue_id; uint16_t next_stage_num = this_stage + 1; cycle_consume(ctx->stage_work); ev->op = RTE_EVENT_OP_FORWARD; if (next_stage_num == ctx->num_stages) { finished_since_sync++; ev->queue_id = 0; } else ev->queue_id = next_stage_num; } do { uint16_t left = dequeued - enqueued; uint16_t burst_size = RTE_MIN(left, ENQUEUE_BURST_SIZE); enqueued += rte_event_enqueue_burst(dev_id, port_id, evs+enqueued, burst_size); } while (unlikely(enqueued != dequeued)); iter_since_sync++; if (unlikely(iter_since_sync == ITER_PER_SYNC)) { int64_t total = sync_event_count(ctx->events_finished, &finished_since_sync); if (total >= ctx->num_events) break; iter_since_sync = 0; } } return 0; } static void setup_event_dev(uint16_t num_stages, struct worker_ctx *worker_ctxs, unsigned num_workers) { unsigned i; struct rte_event_dev_info dev_info; for (i=0; i < num_workers; i++) worker_ctxs[i].event_dev_id = EVENT_DEV_ID; rte_event_dev_info_get(EVENT_DEV_ID, &dev_info); struct rte_event_dev_config config = { .nb_event_queues = num_stages, .nb_event_ports = num_workers, .nb_events_limit = EVENTDEV_MAX_EVENTS, .nb_event_queue_flows = dev_info.max_event_queue_flows, .nb_event_port_dequeue_depth = DEQUEUE_BURST_SIZE, .nb_event_port_enqueue_depth = ENQUEUE_BURST_SIZE }; int rc = rte_event_dev_configure(EVENT_DEV_ID, &config); if (rc) rte_panic("Failed to configure the event dev\n"); struct rte_event_queue_conf queue_config = { .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, }; for (i=0; i<num_stages; i++) { uint8_t queue_id = i; queue_config.schedule_type = RTE_SCHED_TYPE_ATOMIC; queue_config.nb_atomic_flows = NUM_FLOWS; queue_config.nb_atomic_order_sequences = NUM_FLOWS; if (rte_event_queue_setup(EVENT_DEV_ID, queue_id, &queue_config)) rte_panic("Unable to setup queue %d\n", queue_id); } struct rte_event_port_conf port_config = { .new_event_threshold = EVENTDEV_PORT_NEW_THRESHOLD, .dequeue_depth = DEQUEUE_BURST_SIZE, .enqueue_depth = ENQUEUE_BURST_SIZE }; for (i=0; i<num_workers; i++) { uint8_t event_port_id = i; worker_ctxs[i].event_port_id = event_port_id; if (rte_event_port_setup(EVENT_DEV_ID, event_port_id, &port_config) < 0) rte_panic("Failed to create worker port #%d\n", event_port_id); } for (i=0; i<num_workers; i++) { uint8_t event_port_id = i; if (rte_event_port_link(EVENT_DEV_ID, event_port_id, NULL, NULL, 0) != (int)num_stages) rte_panic("Failed to map worker ports\n"); } if (rte_event_dev_start(EVENT_DEV_ID)) rte_panic("Unable to start eventdev\n"); } static double tsc_to_s(uint64_t tsc) { return (double)tsc/(double)rte_get_timer_hz(); } int main(int argc, char *argv[]) { int rc; unsigned i; unsigned num_workers; uint16_t num_stages; uint32_t stage_work; int64_t num_events; struct worker_ctx *worker_ctxs; rte_atomic64_t *events_finished; unsigned lcore_id; uint64_t start; uint64_t latency; uint64_t ideal_latency; rc = rte_eal_init(argc, argv); if (rc < 0) rte_panic("Invalid EAL arguments\n"); argc -= rc; argv += rc; if (argc != 4) { usage(argv[0]); exit(EXIT_FAILURE); } num_stages = atoi(argv[1]); stage_work = atoi(argv[2]); num_events = atof(argv[3]) * 1e6; num_workers = rte_lcore_count(); worker_ctxs = rte_malloc("worker-ctx", sizeof(struct worker_ctx) * num_workers, RTE_CACHE_LINE_SIZE); events_finished = rte_malloc("finished-events", sizeof(rte_atomic64_t), RTE_CACHE_LINE_SIZE); if (worker_ctxs == NULL || events_finished == NULL) rte_panic("Unable to allocate memory\n"); rte_atomic64_init(events_finished); for (i=0; i<num_workers; i++) { struct worker_ctx *w = &worker_ctxs[i]; *w = (struct worker_ctx) { .event_dev_id = EVENT_DEV_ID, .event_port_id = i, .events_to_produce = NUM_IN_FLIGHT_EVENTS/num_workers, .num_stages = num_stages, .stage_work = stage_work, .num_events = num_events, .events_finished = events_finished }; } setup_event_dev(num_stages, worker_ctxs, num_workers); start = rte_get_timer_cycles(); rte_compiler_barrier(); i = 0; RTE_LCORE_FOREACH_SLAVE(lcore_id) { if (rte_eal_remote_launch(worker_start, &(worker_ctxs[i]), lcore_id)) rte_panic("Failed to launch worker"); i++; } worker_start(&worker_ctxs[num_workers-1]); rte_eal_mp_wait_lcore(); rte_compiler_barrier(); latency = rte_get_timer_cycles() - start; ideal_latency = (stage_work * num_stages * num_events) / num_workers; printf("Workers: %d\n", num_workers); printf("Stages: %d\n", num_stages); printf("Per-stage application processing: %d TSC cycles\n", stage_work); printf("Events: %"PRId64" M\n", num_events/1000000); if (stage_work > 0) printf("Ideal latency: %.2f s\n", tsc_to_s(ideal_latency)); printf("Actual latency: %.2f s\n", tsc_to_s(latency)); if (stage_work > 0) printf("Ideal scheduling rate: %.2f M events/s\n", (num_events*num_stages)/tsc_to_s(ideal_latency)/1e6); printf("Actual scheduling rate: %.2f M events/s\n", (num_events*num_stages)/tsc_to_s(latency)/1e6); if (stage_work > 0) { uint64_t per_stage_oh = (latency - ideal_latency) / (num_events * num_stages); printf("Scheduling overhead: %"PRId64" TSC cycles/stage\n", per_stage_oh); } rte_event_dev_stop(EVENT_DEV_ID); rte_exit(0, NULL); }