On 2018-11-28 17:55, Mattias Rönnblom wrote:
Attached is a small DSW throughput test program, that I thought might help you to find the issue.

Looks like DPDK's mailman didn't like my attachment.

--

/*
 * dswtp - A simple DSW eventdev scheduler throughput demo program.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *
 * Copyright(c) 2018 Ericsson AB
 * Mattias Rönnblom <mattias.ronnb...@ericsson.com>
 */

#include <inttypes.h>
#include <stdbool.h>
#include <stdio.h>

#include <rte_atomic.h>
#include <rte_cycles.h>
#include <rte_eal.h>
#include <rte_eventdev.h>
#include <rte_lcore.h>
#include <rte_malloc.h>
#include <rte_pause.h>
#include <rte_random.h>

#define EVENT_DEV_ID (0)
#define NUM_IN_FLIGHT_EVENTS (4096)
#define EVENTDEV_MAX_EVENTS (NUM_IN_FLIGHT_EVENTS * 2)
#define EVENTDEV_PORT_NEW_THRESHOLD (NUM_IN_FLIGHT_EVENTS)
#define NUM_FLOWS (1024)

#define ITER_PER_SYNC (32)

#define DEQUEUE_BURST_SIZE (32)
#define ENQUEUE_BURST_SIZE (32)

struct worker_ctx
{
        uint8_t event_dev_id;
        uint8_t event_port_id;

        uint32_t events_to_produce;

        uint16_t num_stages;
        uint32_t stage_work;
        int64_t num_events;

        rte_atomic64_t *events_finished;
} __rte_cache_aligned;

static void
usage(const char *name)
{
        printf("%s <num-stages> <stage-proc-cycles> <num-events[M]>\n", name);
}

static int64_t
sync_event_count(rte_atomic64_t *total_events_finished,
                 uint32_t *finished_since_sync)
{
        if (*finished_since_sync > 0) {
                int64_t total;

                total = rte_atomic64_add_return(total_events_finished,
                                                *finished_since_sync);

                *finished_since_sync = 0;

                return total;
        } else
                return rte_atomic64_read(total_events_finished);
}

static void
cycle_consume(uint64_t work)
{
        uint64_t deadline;

        if (likely(work == 0))
            return;

        deadline = rte_get_timer_cycles() + work;
        while (rte_get_timer_cycles() < deadline)
                rte_pause();
}

static int
worker_start(void *arg)
{
        struct worker_ctx *ctx = arg;
        uint8_t dev_id = ctx->event_dev_id;
        uint8_t port_id = ctx->event_port_id;
        uint32_t num_produced = 0;
        uint32_t finished_since_sync = 0;
        uint16_t iter_since_sync = 0;

        for (;;) {
                uint16_t dequeued;
                uint16_t i;
                uint16_t enqueued = 0;

                if (unlikely(num_produced < ctx->events_to_produce)) {
                        struct rte_event ev = {
                                .op = RTE_EVENT_OP_NEW,
                                .queue_id = 0,
                                .sched_type = RTE_SCHED_TYPE_ATOMIC,
                                .flow_id = rte_rand() % NUM_FLOWS
                        };
                        if (rte_event_enqueue_new_burst(dev_id, port_id,
                                                        &ev, 1) == 1)
                                num_produced++;
                }

                struct rte_event evs[DEQUEUE_BURST_SIZE];
                dequeued = rte_event_dequeue_burst(dev_id, port_id, evs,
                                                     DEQUEUE_BURST_SIZE, 0);

                for (i = 0; i < dequeued; i++) {
                        struct rte_event *ev = &evs[i];
                        uint16_t this_stage = ev->queue_id;
                        uint16_t next_stage_num = this_stage + 1;

                        cycle_consume(ctx->stage_work);

                        ev->op = RTE_EVENT_OP_FORWARD;

                        if (next_stage_num == ctx->num_stages) {
                                finished_since_sync++;
                                ev->queue_id = 0;
                        } else
                                ev->queue_id = next_stage_num;
                }

                do {
                        uint16_t left = dequeued - enqueued;
                        uint16_t burst_size =
                                RTE_MIN(left, ENQUEUE_BURST_SIZE);
                        enqueued +=
                                rte_event_enqueue_burst(dev_id, port_id,
                                                        evs+enqueued,
                                                        burst_size);
                } while (unlikely(enqueued != dequeued));

                iter_since_sync++;
                if (unlikely(iter_since_sync == ITER_PER_SYNC)) {
                        int64_t total =
                                sync_event_count(ctx->events_finished,
                                                 &finished_since_sync);
                        if (total >= ctx->num_events)
                                break;
                        iter_since_sync = 0;
                }
        }

        return 0;
}

static void
setup_event_dev(uint16_t num_stages, struct worker_ctx *worker_ctxs,
                unsigned num_workers)
{
        unsigned i;
        struct rte_event_dev_info dev_info;

        for (i=0; i < num_workers; i++)
                worker_ctxs[i].event_dev_id = EVENT_DEV_ID;

        rte_event_dev_info_get(EVENT_DEV_ID, &dev_info);

        struct rte_event_dev_config config = {
                .nb_event_queues = num_stages,
                .nb_event_ports = num_workers,
                .nb_events_limit = EVENTDEV_MAX_EVENTS,
                .nb_event_queue_flows = dev_info.max_event_queue_flows,
                .nb_event_port_dequeue_depth = DEQUEUE_BURST_SIZE,
                .nb_event_port_enqueue_depth = ENQUEUE_BURST_SIZE
        };

        int rc = rte_event_dev_configure(EVENT_DEV_ID, &config);
        if (rc)
                rte_panic("Failed to configure the event dev\n");

        struct rte_event_queue_conf queue_config = {
                .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
        };

        for (i=0; i<num_stages; i++) {
                uint8_t queue_id = i;
                queue_config.schedule_type = RTE_SCHED_TYPE_ATOMIC;
                queue_config.nb_atomic_flows = NUM_FLOWS;
                queue_config.nb_atomic_order_sequences = NUM_FLOWS;

                if (rte_event_queue_setup(EVENT_DEV_ID, queue_id,
                                          &queue_config))
                        rte_panic("Unable to setup queue %d\n", queue_id);
        }

        struct rte_event_port_conf port_config = {
                .new_event_threshold = EVENTDEV_PORT_NEW_THRESHOLD,
                .dequeue_depth = DEQUEUE_BURST_SIZE,
                .enqueue_depth = ENQUEUE_BURST_SIZE
        };

        for (i=0; i<num_workers; i++) {
                uint8_t event_port_id = i;
                worker_ctxs[i].event_port_id = event_port_id;
                if (rte_event_port_setup(EVENT_DEV_ID, event_port_id,
                                         &port_config) < 0)
                        rte_panic("Failed to create worker port #%d\n",
                                  event_port_id);
        }

        for (i=0; i<num_workers; i++) {
                uint8_t event_port_id = i;
                if (rte_event_port_link(EVENT_DEV_ID, event_port_id,
                                        NULL, NULL, 0)
                    != (int)num_stages)
                        rte_panic("Failed to map worker ports\n");
        }

        if (rte_event_dev_start(EVENT_DEV_ID))
                rte_panic("Unable to start eventdev\n");
}

static double
tsc_to_s(uint64_t tsc)
{
        return (double)tsc/(double)rte_get_timer_hz();
}

int main(int argc, char *argv[])
{
        int rc;
        unsigned i;
        unsigned num_workers;
        uint16_t num_stages;
        uint32_t stage_work;
        int64_t num_events;
        struct worker_ctx *worker_ctxs;
        rte_atomic64_t *events_finished;
        unsigned lcore_id;
        uint64_t start;
        uint64_t latency;
        uint64_t ideal_latency;

        rc = rte_eal_init(argc, argv);
        if (rc < 0)
                rte_panic("Invalid EAL arguments\n");

        argc -= rc;
        argv += rc;

        if (argc != 4) {
                usage(argv[0]);
                exit(EXIT_FAILURE);
        }

        num_stages = atoi(argv[1]);
        stage_work = atoi(argv[2]);
        num_events = atof(argv[3]) * 1e6;

        num_workers = rte_lcore_count();

        worker_ctxs = rte_malloc("worker-ctx",
                                 sizeof(struct worker_ctx) * num_workers,
                                 RTE_CACHE_LINE_SIZE);
        events_finished = rte_malloc("finished-events", sizeof(rte_atomic64_t),
                                   RTE_CACHE_LINE_SIZE);

        if (worker_ctxs == NULL || events_finished == NULL)
                rte_panic("Unable to allocate memory\n");

        rte_atomic64_init(events_finished);

        for (i=0; i<num_workers; i++) {
                struct worker_ctx *w = &worker_ctxs[i];
                *w = (struct worker_ctx) {
                        .event_dev_id = EVENT_DEV_ID,
                        .event_port_id = i,
                        .events_to_produce = NUM_IN_FLIGHT_EVENTS/num_workers,
                        .num_stages = num_stages,
                        .stage_work = stage_work,
                        .num_events = num_events,
                        .events_finished = events_finished
                };
        }

        setup_event_dev(num_stages, worker_ctxs, num_workers);

        start = rte_get_timer_cycles();
        rte_compiler_barrier();

        i = 0;
        RTE_LCORE_FOREACH_SLAVE(lcore_id) {
                if (rte_eal_remote_launch(worker_start, &(worker_ctxs[i]),
                                          lcore_id))
                        rte_panic("Failed to launch worker");
                i++;
        }

        worker_start(&worker_ctxs[num_workers-1]);

        rte_eal_mp_wait_lcore();

        rte_compiler_barrier();
        latency = rte_get_timer_cycles() - start;
        ideal_latency = (stage_work * num_stages * num_events) / num_workers;

        printf("Workers: %d\n", num_workers);
        printf("Stages: %d\n", num_stages);
        printf("Per-stage application processing: %d TSC cycles\n",
               stage_work);
        printf("Events: %"PRId64" M\n", num_events/1000000);
        if (stage_work > 0)
                printf("Ideal latency: %.2f s\n", tsc_to_s(ideal_latency));
        printf("Actual latency: %.2f s\n", tsc_to_s(latency));

        if (stage_work > 0)
                printf("Ideal scheduling rate: %.2f M events/s\n",
                       (num_events*num_stages)/tsc_to_s(ideal_latency)/1e6);
        printf("Actual scheduling rate: %.2f M events/s\n",
               (num_events*num_stages)/tsc_to_s(latency)/1e6);

        if (stage_work > 0) {
                uint64_t per_stage_oh =
                        (latency - ideal_latency) / (num_events * num_stages);
                printf("Scheduling overhead: %"PRId64" TSC cycles/stage\n",
                       per_stage_oh);
        }

        rte_event_dev_stop(EVENT_DEV_ID);

        rte_exit(0, NULL);
}

Reply via email to