Please ignore, my patch seems to have got corrupted with some other WIP content 
that will be upstreamed in a future patch set.

/Bruce

> -----Original Message-----
> From: dev [mailto:dev-bounces at dpdk.org] On Behalf Of Bruce Richardson
> Sent: Monday, May 12, 2014 10:23 AM
> To: dev at dpdk.org
> Subject: [dpdk-dev] [PATCH 2/2] 10G PMD: enable vector PMD compile for 64b
> linux
> 
>  ifeq ($(CONFIG_RTE_APP_TEST),y)
>  SRCS-$(CONFIG_RTE_LIBRTE_ACL) += test_acl.c diff --git
> a/app/test/commands.c b/app/test/commands.c index a153026..5978cec
> 100644
> --- a/app/test/commands.c
> +++ b/app/test/commands.c
> @@ -178,6 +178,8 @@ static void cmd_autotest_parsed(void *parsed_result,
>               ret = test_common();
>       if (!strcmp(res->autotest, "ivshmem_autotest"))
>               ret = test_ivshmem();
> +     if (!strcmp(res->autotest, "distributor_autotest"))
> +             ret = test_distributor();
>  #ifdef RTE_LIBRTE_PMD_RING
>       if (!strcmp(res->autotest, "ring_pmd_autotest"))
>               ret = test_pmd_ring();
> @@ -234,7 +236,7 @@ cmdline_parse_token_string_t cmd_autotest_autotest
> =  #ifdef RTE_LIBRTE_KVARGS
>                       "kvargs_autotest#"
>  #endif
> -                     "common_autotest");
> +                     "common_autotest#distributor_autotest");
> 
>  cmdline_parse_inst_t cmd_autotest = {
>       .f = cmd_autotest_parsed,  /* function to call */ diff --git
> a/app/test/test.h b/app/test/test.h index bc001b9..4621fc9 100644
> --- a/app/test/test.h
> +++ b/app/test/test.h
> @@ -92,6 +92,7 @@ int test_power(void);
>  int test_common(void);
>  int test_pmd_ring(void);
>  int test_ivshmem(void);
> +int test_distributor(void);
>  int test_kvargs(void);
> 
>  int test_pci_run;
> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c new 
> file
> mode 100644 index 0000000..0456478
> --- /dev/null
> +++ b/app/test/test_distributor.c
> @@ -0,0 +1,261 @@
> +/*
> + * <COPYRIGHT_TAG>
> + */
> +
> +#include "test.h"
> +
> +#ifdef RTE_LIBRTE_DISTRIBUTOR
> +#include <unistd.h>
> +#include <string.h>
> +#include <rte_cycles.h>
> +#include <rte_distributor.h>
> +#include "wmmintrin.h"
> +
> +#define ITER_POWER 20 /* log 2 of how many iterations we do when
> +timing. */
> +
> +static volatile int quit = 0;
> +static volatile unsigned worker_idx;
> +
> +struct worker_stats{
> +     volatile unsigned handled_packets;
> +} __rte_cache_aligned;
> +struct worker_stats worker_stats[RTE_MAX_LCORE];
> +
> +#define do_big_pause() do { \
> +             __m128i a = {0}; \
> +             volatile __m128i res; \
> +             res = _mm_clmulepi64_si128(a, res, 0); \
> +             res = _mm_clmulepi64_si128(a, res, 0); \
> +             res = _mm_clmulepi64_si128(a, res, 0); \
> +             res = _mm_clmulepi64_si128(a, res, 0); \
> +             RTE_SET_USED(res); \
> +             /*rte_pause();*/ \
> +} while(0)
> +
> +#define do_pause() do { \
> +     rte_pause(); \
> +} while(0)
> +
> +static void
> +flip_bit(volatile uint64_t *arg)
> +{
> +     uint64_t old_val = 0;
> +     while (old_val != 2) {
> +             while(!*arg)
> +                     do_big_pause();
> +             old_val = *arg;
> +             *arg = 0;
> +     }
> +}
> +
> +static void
> +time_cache_line_switch(void)
> +{
> +     /* allocate a full cache line for data, we use only first byte of it */
> +     uint64_t data[CACHE_LINE_SIZE / sizeof(uint64_t)];
> +
> +     unsigned i, slaveid = rte_get_next_lcore(rte_lcore_id(), 0, 0);
> +     volatile uint64_t *pdata = &data[0];
> +     *pdata = 1;
> +     rte_eal_remote_launch((lcore_function_t *)flip_bit, &data[0], slaveid);
> +     while (*pdata)
> +             rte_pause();
> +
> +     const uint64_t start_time = rte_rdtsc();
> +     for (i = 0; i < (1<< ITER_POWER); i++) {
> +             while (*pdata)
> +                     do_big_pause();
> +             *pdata = 1;
> +     }
> +     const uint64_t end_time = rte_rdtsc();
> +
> +     while (*pdata)
> +             do_pause();
> +     *pdata = 2;
> +     rte_eal_wait_lcore(slaveid);
> +     printf("Time for %u iterations = %"PRIu64" ticks\n", (1<<ITER_POWER),
> +                     end_time-start_time);
> +     printf("Ticks per iteration = %"PRIu64"\n",
> +                     (end_time-start_time) >> ITER_POWER); }
> +
> +static inline unsigned
> +total_packet_count(void)
> +{
> +     unsigned i, count = 0;
> +     for (i = 0; i < worker_idx; i++)
> +             count += worker_stats[i].handled_packets;
> +     return count;
> +}
> +
> +static inline void
> +clear_packet_count(void)
> +{
> +     memset(&worker_stats, 0, sizeof(worker_stats)); }
> +
> +static int
> +handle_work(void *arg)
> +{
> +     struct rte_mbuf *pkt = NULL;
> +     struct rte_distributor *d = arg;
> +     unsigned count = 0;
> +     unsigned id = __sync_fetch_and_add(&worker_idx, 1);
> +
> +     printf("Worker %u starting\n", worker_idx);
> +     pkt = rte_distributor_get_pkt(d, id);
> +     while (!quit) {
> +             worker_stats[id].handled_packets++;
> +             pkt = rte_distributor_get_next_pkt(d, id, pkt);
> +     }
> +     rte_distributor_return_pkt(d, id, pkt);
> +     printf("Worker %u handled %u pkts\n", id, count);
> +     return 0;
> +}
> +
> +static inline int
> +perf_test(struct rte_distributor *d, struct rte_mempool *p) { #define
> +BURST 32
> +     unsigned i;
> +     uint64_t start, end;
> +     struct rte_mbuf *bufs[BURST];
> +
> +     clear_packet_count();
> +     if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
> +             printf("Error getting mbufs from pool\n");
> +             return -1;
> +     }
> +     /* ensure we have different hash value for each pkt */
> +     for (i = 0; i < 32; i++)
> +             bufs[i]->pkt.hash.rss = i;
> +
> +     start = rte_rdtsc();
> +     for (i = 0; i < (1<<ITER_POWER); i++)
> +             rte_distributor_process(d, bufs, BURST);
> +     end = rte_rdtsc();
> +
> +     do {
> +             usleep(100);
> +             rte_distributor_process(d, NULL, 0);
> +     } while (total_packet_count() < (BURST << ITER_POWER));
> +
> +     printf("Time per burst:  %"PRIu64"\n", (end - start) >> ITER_POWER);
> +     printf("Time per packet: %"PRIu64"\n", ((end - start) >>
> ITER_POWER)/BURST);
> +     rte_mempool_put_bulk(p, (void *)bufs, BURST);
> +
> +     for (i = 0; i < rte_lcore_count() - 1; i++)
> +             printf("Worker %u handled %u packets\n", i,
> +                             worker_stats[i].handled_packets);
> +     printf("Total packets: %u (%x)\n", total_packet_count(),
> +                     total_packet_count());
> +     printf("Perf test done\n");
> +
> +     return 0;
> +}
> +
> +static int
> +sanity_test(struct rte_distributor *d, struct rte_mempool *p) {
> +     struct rte_mbuf *bufs[32];
> +     unsigned i;
> +
> +     clear_packet_count();
> +     if (rte_mempool_get_bulk(p, (void *)bufs, 32) != 0) {
> +             printf("Error getting mbufs from pool\n");
> +             return -1;
> +     }
> +
> +     /* now set all hash values in all buffers to zero, so all pkts go to the
> +      * one worker thread */
> +     for (i = 0; i < 32; i++)
> +             bufs[i]->pkt.hash.rss = 0;
> +
> +     rte_distributor_process(d, bufs, 32);
> +     do {
> +             usleep(100);
> +             rte_distributor_process(d, NULL, 0);
> +     } while(total_packet_count() < 32);
> +
> +     for (i = 0; i < rte_lcore_count() - 1; i++)
> +             printf("Worker %u handled %u packets\n", i,
> +                             worker_stats[i].handled_packets);
> +     printf("Sanity test with all zero hashes done.\n");
> +     if (worker_stats[0].handled_packets != 32)
> +             return -1;
> +
> +     /* give a different hash value to each packet, so load gets distributed 
> */
> +     clear_packet_count();
> +     for (i = 0; i < 32; i++)
> +             bufs[i]->pkt.hash.rss = i;
> +
> +     rte_distributor_process(d, bufs, 32);
> +     do {
> +             usleep(100);
> +             rte_distributor_process(d, NULL, 0);
> +     } while(total_packet_count() < 32);
> +
> +     for (i = 0; i < rte_lcore_count() - 1; i++)
> +             printf("Worker %u handled %u packets\n", i,
> +                             worker_stats[i].handled_packets);
> +     printf("Sanity test with non-zero hashes done\n");
> +
> +     rte_mempool_put_bulk(p, (void *)bufs, 32);
> +     return 0;
> +}
> +
> +#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) +
> +RTE_PKTMBUF_HEADROOM)
> +
> +int
> +test_distributor(void)
> +{
> +     static struct rte_distributor *d = NULL;
> +     static struct rte_mempool *p = NULL;
> +
> +     if (rte_lcore_count() < 2) {
> +             printf("ERROR: not enough cores to test distributor\n");
> +             return -1;
> +     }
> +
> +     /* first time how long it takes to round-trip a cache line */
> +     time_cache_line_switch();
> +
> +     if (d == NULL) {
> +             d = rte_distributor_create("Test_distributor", rte_socket_id(),
> +                             rte_lcore_count() - 1,
> RTE_DISTRIBUTOR_NOFLAGS);
> +             if (d == NULL) {
> +                     printf("Error creating distributor\n");
> +                     return -1;
> +             }
> +             rte_eal_mp_remote_launch(handle_work, d, SKIP_MASTER);
> +     }
> +
> +     if (p == NULL) {
> +             p = rte_mempool_create("MBUF_POOL", 511, MBUF_SIZE, 64,
> +                             sizeof(struct rte_pktmbuf_pool_private),
> +                             rte_pktmbuf_pool_init, NULL,
> +                             rte_pktmbuf_init, NULL,
> +                             rte_socket_id(), 0);
> +             if (p == NULL) {
> +                     printf("Error creating mempool\n");
> +                     return -1;
> +             }
> +     }
> +
> +     sanity_test(d, p);
> +     perf_test(d,p);
> +     return 0;
> +}
> +
> +#else
> +
> +#include <stdio.h>
> +
> +int
> +test_distributor(void)
> +{
> +     printf("Distributor is not enabled in configuration\n");
> +     return 0;
> +}
> +
> +#endif
> diff --git a/config/defconfig_i686-default-linuxapp-gcc 
> b/config/defconfig_i686-
> default-linuxapp-gcc
> index 14bd3d1..5b4261e 100644
> --- a/config/defconfig_i686-default-linuxapp-gcc
> +++ b/config/defconfig_i686-default-linuxapp-gcc
> @@ -335,3 +335,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
>  #
>  CONFIG_RTE_NIC_BYPASS=n
> 
> +#
> +# Compile the distributor library
> +#
> +CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
> +
> diff --git a/config/defconfig_i686-default-linuxapp-icc 
> b/config/defconfig_i686-
> default-linuxapp-icc
> index ec3386e..d1d4aeb 100644
> --- a/config/defconfig_i686-default-linuxapp-icc
> +++ b/config/defconfig_i686-default-linuxapp-icc
> @@ -334,3 +334,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
>  #
>  CONFIG_RTE_NIC_BYPASS=n
> 
> +#
> +# Compile the distributor library
> +#
> +CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
> +
> diff --git a/config/defconfig_x86_64-default-bsdapp-gcc
> b/config/defconfig_x86_64-default-bsdapp-gcc
> index d960e1d..329920e 100644
> --- a/config/defconfig_x86_64-default-bsdapp-gcc
> +++ b/config/defconfig_x86_64-default-bsdapp-gcc
> @@ -300,3 +300,9 @@ CONFIG_RTE_APP_TEST=y  CONFIG_RTE_TEST_PMD=y
> CONFIG_RTE_TEST_PMD_RECORD_CORE_CYCLES=n
>  CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
> +
> +#
> +# Compile the distributor library
> +#
> +CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
> +
> diff --git a/config/defconfig_x86_64-default-linuxapp-gcc
> b/config/defconfig_x86_64-default-linuxapp-gcc
> index f11ffbf..772a6b3 100644
> --- a/config/defconfig_x86_64-default-linuxapp-gcc
> +++ b/config/defconfig_x86_64-default-linuxapp-gcc
> @@ -337,3 +337,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
>  #
>  CONFIG_RTE_NIC_BYPASS=n
> 
> +#
> +# Compile the distributor library
> +#
> +CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
> +
> diff --git a/config/defconfig_x86_64-default-linuxapp-icc
> b/config/defconfig_x86_64-default-linuxapp-icc
> index 4eaca4c..04affc8 100644
> --- a/config/defconfig_x86_64-default-linuxapp-icc
> +++ b/config/defconfig_x86_64-default-linuxapp-icc
> @@ -333,3 +333,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
>  #
>  CONFIG_RTE_NIC_BYPASS=n
> 
> +#
> +# Compile the distributor library
> +#
> +CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
> +
> diff --git a/config/defconfig_x86_64-ivshmem-linuxapp-gcc
> b/config/defconfig_x86_64-ivshmem-linuxapp-gcc
> index 2f55a69..0002dca 100644
> --- a/config/defconfig_x86_64-ivshmem-linuxapp-gcc
> +++ b/config/defconfig_x86_64-ivshmem-linuxapp-gcc
> @@ -46,4 +46,4 @@ CONFIG_RTE_LIBRTE_IVSHMEM_MAX_ENTRIES=128
>  CONFIG_RTE_LIBRTE_IVSHMEM_MAX_METADATA_FILES=32
> 
>  # Set EAL to single file segments
> -CONFIG_RTE_EAL_SINGLE_FILE_SEGMENTS=y
> \ No newline at end of file
> +CONFIG_RTE_EAL_SINGLE_FILE_SEGMENTS=y
> diff --git a/examples/distributor_app/Makefile
> b/examples/distributor_app/Makefile
> new file mode 100644
> index 0000000..9a1ebd3
> --- /dev/null
> +++ b/examples/distributor_app/Makefile
> @@ -0,0 +1,28 @@
> +# <COPYRIGHT_TAG>
> +
> +ifeq ($(RTE_SDK),)
> +$(error "Please define RTE_SDK environment variable") endif
> +
> +# Default target, can be overriden by command line or environment
> +RTE_TARGET ?= x86_64-default-linuxapp-gcc
> +
> +include $(RTE_SDK)/mk/rte.vars.mk
> +
> +# binary name
> +APP = basicfwd
> +
> +# all source are stored in SRCS-y
> +SRCS-y := basicfwd.c
> +
> +CFLAGS += $(WERROR_FLAGS)
> +
> +# workaround for a gcc bug with noreturn attribute #
> +http://gcc.gnu.org/bugzilla/show_bug.cgi?id=12603
> +ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y)
> +CFLAGS_main.o += -Wno-return-type
> +endif
> +
> +EXTRA_CFLAGS += -O3 -g -Wfatal-errors
> +
> +include $(RTE_SDK)/mk/rte.extapp.mk
> diff --git a/examples/distributor_app/basicfwd.c
> b/examples/distributor_app/basicfwd.c
> new file mode 100644
> index 0000000..7662d91
> --- /dev/null
> +++ b/examples/distributor_app/basicfwd.c
> @@ -0,0 +1,248 @@
> +/*-
> + * <COPYRIGHT_TAG>
> + */
> +
> +#include <stdint.h>
> +#include <inttypes.h>
> +#include <unistd.h>
> +
> +#include <rte_eal.h>
> +#include <rte_ethdev.h>
> +#include <rte_cycles.h>
> +#include <rte_distributor.h>
> +#include "basicfwd.h"
> +
> +#define RX_RING_SIZE 128
> +#define RX_FREE_THRESH 32
> +#define RX_PTHRESH 8
> +#define RX_HTHRESH 8
> +#define RX_WTHRESH 0
> +
> +#define TX_RING_SIZE 512
> +#define TX_FREE_THRESH 32
> +#define TX_PTHRESH 32
> +#define TX_HTHRESH 0
> +#define TX_WTHRESH 0
> +#define TX_RSBIT_THRESH 32
> +#define TX_Q_FLAGS (ETH_TXQ_FLAGS_NOMULTSEGS |
> ETH_TXQ_FLAGS_NOVLANOFFL |\
> +     ETH_TXQ_FLAGS_NOXSUMSCTP | ETH_TXQ_FLAGS_NOXSUMUDP | \
> +     ETH_TXQ_FLAGS_NOXSUMTCP)
> +
> +#define NUM_MBUFS 8191
> +#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) +
> +RTE_PKTMBUF_HEADROOM) #define MBUF_CACHE_SIZE 250 #define
> BURST_SIZE 32
> +
> +static struct rte_eth_conf port_conf_default = {
> +     .rxmode = {
> +             .mq_mode = ETH_MQ_RX_RSS,
> +             .max_rx_pkt_len = ETHER_MAX_LEN,
> +             .split_hdr_size = 0,
> +             .header_split   = 0, /**< Header Split disabled */
> +             .hw_ip_checksum = 0, /**< IP checksum offload enabled */
> +             .hw_vlan_filter = 0, /**< VLAN filtering disabled */
> +             .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
> +             .hw_strip_crc   = 0, /**< CRC stripped by hardware */
> +     },
> +     .txmode = {
> +             .mq_mode = ETH_MQ_TX_NONE,
> +     },
> +     .lpbk_mode = 0,
> +     .rx_adv_conf = {
> +                     .rss_conf = {
> +                             .rss_hf = ETH_RSS_IPV4 | ETH_RSS_IPV6 |
> ETH_RSS_IPV4_TCP |
> +                                     ETH_RSS_IPV4_UDP |
> ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
> +                     }
> +     },
> +};
> +
> +static const struct rte_eth_rxconf rx_conf_default = {
> +     .rx_thresh = {
> +             .pthresh = RX_PTHRESH,
> +             .hthresh = RX_HTHRESH,
> +             .wthresh = RX_WTHRESH,
> +     },
> +     .rx_free_thresh = RX_FREE_THRESH,
> +     .rx_drop_en = 0,
> +};
> +
> +static struct rte_eth_txconf tx_conf_default = {
> +     .tx_thresh = {
> +             .pthresh = TX_PTHRESH,
> +             .hthresh = TX_HTHRESH,
> +             .wthresh = TX_WTHRESH,
> +     },
> +     .tx_free_thresh = TX_FREE_THRESH,
> +     .tx_rs_thresh = TX_RSBIT_THRESH,
> +     .txq_flags = TX_Q_FLAGS
> +
> +};
> +
> +#define NOFLAGS 0
> +
> +
> +/*
> + * Initialises a given port using global settings and with the rx
> +buffers
> + * coming from the mbuf_pool passed as parameter  */ static inline int
> +port_init(uint8_t port, struct rte_mempool *mbuf_pool) {
> +     struct rte_eth_conf port_conf = port_conf_default;
> +     const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1;
> +     int retval;
> +     uint16_t q;
> +
> +     if (port >= rte_eth_dev_count())
> +             return -1;
> +
> +     retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf);
> +     if (retval != 0)
> +             return retval;
> +
> +     for (q = 0; q < rxRings; q ++) {
> +             retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE,
> +                                             rte_eth_dev_socket_id(port),
> &rx_conf_default,
> +                                             mbuf_pool);
> +             if (retval < 0)
> +                     return retval;
> +     }
> +
> +     for (q = 0; q < txRings; q ++) {
> +             retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE,
> +                                             rte_eth_dev_socket_id(port),
> &tx_conf_default);
> +             if (retval < 0)
> +                     return retval;
> +     }
> +
> +     retval  = rte_eth_dev_start(port);
> +     if (retval < 0)
> +             return retval;
> +
> +     struct rte_eth_link link;
> +     rte_eth_link_get_nowait(port, &link);
> +     if (!link.link_status) {
> +             sleep(1);
> +             rte_eth_link_get_nowait(port, &link);
> +     }
> +
> +     if (!link.link_status) {
> +             printf("Link down on port %"PRIu8"\n", port);
> +             return 0;
> +     }
> +
> +     struct ether_addr addr;
> +     rte_eth_macaddr_get(port, &addr);
> +     printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
> +                     " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
> +                     (unsigned)port,
> +                     addr.addr_bytes[0], addr.addr_bytes[1],
> addr.addr_bytes[2],
> +                     addr.addr_bytes[3], addr.addr_bytes[4],
> addr.addr_bytes[5]);
> +
> +     rte_eth_promiscuous_enable(port);
> +
> +     return 0;
> +}
> +
> +/*
> + * Main thread that does the work, reading from INPUT_PORT
> + * and writing to OUTPUT_PORT
> + */
> +static  __attribute__((noreturn)) void
> +lcore_main(struct rte_distributor *d)
> +{
> +     const uint8_t nb_ports = rte_eth_dev_count();
> +     uint8_t port;
> +     for (port = 0; port < nb_ports; port++)
> +             if (rte_eth_dev_socket_id(port) > 0 &&
> +                             rte_eth_dev_socket_id(port) !=
> (int)rte_socket_id())
> +                     printf("WARNING, port %u is on remote NUMA node to
> polling thread."
> +                                     "\n\tPerformance will not be
> optimal.\n", port);
> +
> +     printf("\nCore %u distributing packets. [Ctrl+C to quit]\n",
> rte_lcore_id());
> +     for(;;) {
> +             for (port = 0; port < nb_ports; port++) {
> +                     struct rte_mbuf *bufs[BURST_SIZE];
> +                     const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
> BURST_SIZE);
> +                     if (unlikely(nb_rx == 0))
> +                             continue;
> +
> +                     rte_distributor_process(d, bufs, nb_rx);
> +             }
> +     }
> +}
> +
> +static __attribute__((noreturn)) void
> +tx_main(struct rte_distributor *d)
> +{
> +     static volatile unsigned num_workers = 0;
> +     struct rte_mbuf *to_send[BURST_SIZE];
> +     unsigned nb_to_send = 0;
> +     unsigned port_id = 0;
> +     unsigned id = __sync_fetch_and_add(&num_workers, 1);
> +
> +     struct rte_mbuf *pkt = rte_distributor_get_pkt(d, id);
> +     for (;;) {
> +             to_send[nb_to_send++] = pkt;
> +             if (nb_to_send == BURST_SIZE) {
> +                     rte_eth_tx_burst(port_id++, id, to_send, BURST_SIZE);
> +                     nb_to_send = 0;
> +                     if (port_id == rte_eth_dev_count())
> +                             port_id = 0;
> +             }
> +             pkt = rte_distributor_get_next_pkt(d, id, pkt);
> +     }
> +}
> +
> +/* Main function, does initialisation and calls the per-lcore functions
> +*/ int MAIN(int argc, char *argv[]) {
> +     struct rte_mempool *mbuf_pool;
> +     struct rte_distributor *d;
> +     unsigned nb_ports;
> +     uint8_t portid;
> +
> +     /* init EAL */
> +     int ret = rte_eal_init(argc, argv);
> +     if (ret < 0)
> +             rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
> +     argc -= ret;
> +     argv += ret;
> +
> +     if (rte_ixgbe_pmd_init() != 0 ||
> +                     rte_eal_pci_probe() != 0)
> +             rte_exit(EXIT_FAILURE, "Error with NIC driver 
> initialization\n");
> +
> +     nb_ports = rte_eth_dev_count();
> +     if (nb_ports < 2 || (nb_ports & 1))
> +             rte_exit(EXIT_FAILURE, "Error: number of ports must be
> even\n");
> +
> +     mbuf_pool = rte_mempool_create("MBUF_POOL", NUM_MBUFS *
> nb_ports,
> +                                    MBUF_SIZE, MBUF_CACHE_SIZE,
> +                                    sizeof(struct rte_pktmbuf_pool_private),
> +                                    rte_pktmbuf_pool_init, NULL,
> +                                    rte_pktmbuf_init, NULL,
> +                                    rte_socket_id(), 0);
> +     if (mbuf_pool == NULL)
> +             rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");
> +
> +     /* initialize all ports */
> +     for (portid = 0; portid < nb_ports; portid++)
> +             if (port_init(portid, mbuf_pool) != 0)
> +                     rte_exit(EXIT_FAILURE, "Cannot initialize port
> %"PRIu8"\n", portid);
> +
> +     if (rte_lcore_count() == 1)
> +             rte_exit(EXIT_FAILURE, "Error, need worker cores\n");
> +
> +     d = rte_distributor_create("PKT_DIST", rte_socket_id(),
> +                     rte_lcore_count() - 1, NOFLAGS);
> +     if (d == NULL)
> +             rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
> +
> +     rte_eal_mp_remote_launch((lcore_function_t *)tx_main, d,
> SKIP_MASTER);
> +     /* call lcore_main on master core only */
> +     lcore_main(d);
> +     return 0;
> +}
> +
> diff --git a/examples/distributor_app/basicfwd.h
> b/examples/distributor_app/basicfwd.h
> new file mode 100644
> index 0000000..4ff20bc
> --- /dev/null
> +++ b/examples/distributor_app/basicfwd.h
> @@ -0,0 +1,17 @@
> +/*-
> + * <COPYRIGHT_TAG>
> + */
> +
> +#ifndef _MAIN_H_
> +#define _MAIN_H_
> +
> +
> +#ifdef RTE_EXEC_ENV_BAREMETAL
> +#define MAIN _main
> +#else
> +#define MAIN main
> +#endif
> +
> +int MAIN( int argc, char *argv[] );
> +
> +#endif /* ifndef _MAIN_H_ */
> diff --git a/lib/Makefile b/lib/Makefile index b92b392..5a0b10f 100644
> --- a/lib/Makefile
> +++ b/lib/Makefile
> @@ -55,6 +55,7 @@ DIRS-$(CONFIG_RTE_LIBRTE_METER) += librte_meter
>  DIRS-$(CONFIG_RTE_LIBRTE_SCHED) += librte_sched
>  DIRS-$(CONFIG_RTE_LIBRTE_ACL) += librte_acl
>  DIRS-$(CONFIG_RTE_LIBRTE_KVARGS) += librte_kvargs
> +DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += librte_distributor
> 
>  ifeq ($(CONFIG_RTE_EXEC_ENV_LINUXAPP),y)
>  DIRS-$(CONFIG_RTE_LIBRTE_KNI) += librte_kni diff --git
> a/lib/librte_distributor/Makefile b/lib/librte_distributor/Makefile new file 
> mode
> 100644 index 0000000..794dd46
> --- /dev/null
> +++ b/lib/librte_distributor/Makefile
> @@ -0,0 +1,21 @@
> +# <COPYRIGHT_TAG>
> +
> +include $(RTE_SDK)/mk/rte.vars.mk
> +
> +# library name
> +LIB = librte_distributor.a
> +
> +CFLAGS += -O3
> +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
> +
> +# all source are stored in SRCS-y
> +SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) := rte_distributor.c
> +
> +# install this header file
> +SYMLINK-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR)-include := rte_distributor.h
> +
> +# this lib needs eal
> +DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_eal
> +DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_mbuf
> +
> +include $(RTE_SDK)/mk/rte.lib.mk
> diff --git a/lib/librte_distributor/rte_distributor.c
> b/lib/librte_distributor/rte_distributor.c
> new file mode 100644
> index 0000000..f7aea9a
> --- /dev/null
> +++ b/lib/librte_distributor/rte_distributor.c
> @@ -0,0 +1,201 @@
> +/*
> + * <COPYRIGHT_TAG>
> + */
> +
> +#include <stdio.h>
> +#include <sys/queue.h>
> +#include <x86intrin.h>
> +#include <rte_mbuf.h>
> +#include <rte_memzone.h>
> +#include <rte_errno.h>
> +#include <rte_string_fns.h>
> +#include <rte_tailq.h>
> +#include <rte_eal_memconfig.h>
> +#include "rte_distributor.h"
> +
> +#define NO_FLAGS 0
> +#define DISTRIBUTOR_PREFIX "DT_"
> +
> +#define RTE_DISTRIBUTOR_FLAG_BITS 4
> +#define RTE_DISTRIBUTOR_NO_BUF 0
> +/* we will use the top four bits of pointer for flags */ #define
> +RTE_DISTRIBUTOR_GET_BUF (1) #define RTE_DISTRIBUTOR_RETURN_BUF (2)
> +
> +#define RTE_DISTRIBUTOR_BACKLOG_SIZE 8
> +#define RTE_DISTRIBUTOR_BACKLOG_MASK 7
> +
> +#define RTE_DISTRIBUTOR_FLAGS_MASK (0x0F)
> +
> +union rte_distributor_buffer {
> +     volatile int64_t bufptr64;
> +     char pad[CACHE_LINE_SIZE*3];
> +} __rte_cache_aligned;
> +
> +struct rte_distributor_backlog {
> +     int64_t pkts[RTE_DISTRIBUTOR_BACKLOG_SIZE];
> +     unsigned start;
> +     unsigned count;
> +};
> +
> +struct rte_distributor {
> +     TAILQ_ENTRY(rte_distributor) next;    /**< Next in list. */
> +
> +     char name[RTE_DISTRIBUTOR_NAMESIZE];   /**< Name of the ring. */
> +     unsigned flags;                       /**< Flags supplied at creation. 
> */
> +     unsigned num_workers;                 /**< Number of workers polling */
> +
> +     uint32_t in_flight_ids[RTE_MAX_LCORE];
> +     struct rte_distributor_backlog backlog[RTE_MAX_LCORE];
> +
> +     union rte_distributor_buffer buffers[RTE_MAX_LCORE]; };
> +
> +TAILQ_HEAD(rte_distributor_list, rte_distributor);
> +
> +static int
> +add_to_backlog(struct rte_distributor_backlog *bl, int64_t item) {
> +     if (bl->count == RTE_DISTRIBUTOR_BACKLOG_SIZE)
> +             return -1;
> +
> +     bl->pkts[(bl->start + bl->count++) &
> (RTE_DISTRIBUTOR_BACKLOG_MASK)] = item;
> +     return 0;
> +}
> +
> +static int64_t
> +backlog_pop(struct rte_distributor_backlog *bl) {
> +     bl->count--;
> +     return bl->pkts[bl->start++ & RTE_DISTRIBUTOR_BACKLOG_MASK]; }
> +
> +int
> +rte_distributor_process(struct rte_distributor *d,
> +             struct rte_mbuf **mbufs, const unsigned num_mbufs) {
> +     unsigned next_idx = 0;
> +     unsigned worker_offset = 0;
> +
> +     while (next_idx < num_mbufs) {
> +             int64_t data = d->buffers[worker_offset].bufptr64;
> +             if (data & RTE_DISTRIBUTOR_GET_BUF) {
> +
> +                     if (d->backlog[worker_offset].count)
> +                             d->buffers[worker_offset].bufptr64 =
> +                                             backlog_pop(&d-
> >backlog[worker_offset]);
> +
> +                     else {
> +                             struct rte_mbuf *next_mb =
> mbufs[next_idx++];
> +                             int64_t next_value = ((uintptr_t)next_mb <<
> +RTE_DISTRIBUTOR_FLAG_BITS);
> +
> +                             /* note signed variable - arithmetic shift */
> +                             int64_t oldbuf = data >>
> RTE_DISTRIBUTOR_FLAG_BITS;
> +                             if (oldbuf)
> +                                     d->in_flight_ids[worker_offset] = 0;
> +
> +                             uint32_t match = 0, newid = (next_mb-
> >pkt.hash.rss | 1);
> +                             unsigned i;
> +                             for (i = 0; i < d->num_workers; i++)
> +                                     match |= (!(d->in_flight_ids[i] ^ newid)
> << i);
> +                             if (!match) {
> +                                     d->buffers[worker_offset].bufptr64 =
> next_value;
> +                                     d->in_flight_ids[worker_offset] =
> newid;
> +                             } else {
> +                                     unsigned worker =
> __builtin_ctz(match);
> +                                     if (add_to_backlog(&d-
> >backlog[worker], next_value) < 0)
> +                                             next_idx--;
> +                             }
> +                     }
> +             }
> +             if (++worker_offset == d->num_workers)
> +                     worker_offset = 0;
> +     }
> +     /* to finish, check all workers for backlog and schedule work for them
> +      * if they are ready */
> +     for (worker_offset = 0; worker_offset < d->num_workers;
> worker_offset++)
> +             if (d->backlog[worker_offset].count &&
> +                             (d->buffers[worker_offset].bufptr64 &
> RTE_DISTRIBUTOR_GET_BUF))
> +                     d->buffers[worker_offset].bufptr64 =
> +
>       backlog_pop(&d->backlog[worker_offset]);
> +
> +     return num_mbufs;
> +}
> +
> +struct rte_mbuf *
> +rte_distributor_get_pkt(struct rte_distributor *d,
> +             unsigned worker_id)
> +{
> +     union rte_distributor_buffer *buf = &d->buffers[worker_id];
> +     buf->bufptr64 |= RTE_DISTRIBUTOR_GET_BUF;
> +     while (buf->bufptr64 & RTE_DISTRIBUTOR_GET_BUF)
> +             rte_pause();
> +     /* since bufptr64 is a signed value, this should be an arithmetic shift 
> */
> +     int64_t ret = buf->bufptr64 >> RTE_DISTRIBUTOR_FLAG_BITS;
> +     return (struct rte_mbuf *)((uintptr_t)ret); }
> +
> +struct rte_mbuf *
> +rte_distributor_get_next_pkt(struct rte_distributor *d,
> +             unsigned worker_id, struct rte_mbuf *oldpkt) {
> +     union rte_distributor_buffer *buf = &d->buffers[worker_id];
> +     int64_t req = ((uintptr_t)oldpkt << RTE_DISTRIBUTOR_FLAG_BITS) | \
> +                     RTE_DISTRIBUTOR_GET_BUF;
> +     buf->bufptr64 = req;
> +     while (buf->bufptr64 & RTE_DISTRIBUTOR_GET_BUF)
> +             rte_pause();
> +     /* since bufptr64 is a signed value, this should be an arithmetic shift 
> */
> +     int64_t ret = buf->bufptr64 >> RTE_DISTRIBUTOR_FLAG_BITS;
> +     return (struct rte_mbuf *)((uintptr_t)ret); }
> +
> +int
> +rte_distributor_return_pkt(struct rte_distributor *d,
> +             unsigned worker_id, struct rte_mbuf *oldpkt) {
> +     union rte_distributor_buffer *buf = &d->buffers[worker_id];
> +     uint64_t req = ((uintptr_t)oldpkt << RTE_DISTRIBUTOR_FLAG_BITS) | \
> +                     RTE_DISTRIBUTOR_RETURN_BUF;
> +     buf->bufptr64 = req;
> +     while (buf->bufptr64 & RTE_DISTRIBUTOR_RETURN_BUF)
> +             rte_pause();
> +     return 0;
> +}
> +
> +struct rte_distributor *
> +rte_distributor_create(const char *name, unsigned socket_id,
> +             unsigned num_workers, unsigned flags) {
> +     struct rte_distributor *d;
> +     struct rte_distributor_list *distributor_list;
> +     char mz_name[RTE_MEMZONE_NAMESIZE];
> +     const struct rte_memzone *mz;
> +
> +     /* compilation-time checks */
> +     RTE_BUILD_BUG_ON((sizeof(*d) & CACHE_LINE_MASK) != 0);
> +     RTE_BUILD_BUG_ON((RTE_MAX_LCORE & 7) != 0);
> +
> +     rte_snprintf(mz_name, sizeof(mz_name), DISTRIBUTOR_PREFIX"%s",
> name);
> +     mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id,
> NO_FLAGS);
> +     if (mz == NULL) {
> +             rte_errno = ENOMEM;
> +             return NULL;
> +     }
> +
> +     /* check that we have an initialised tail queue */
> +     if ((distributor_list =
> RTE_TAILQ_LOOKUP_BY_IDX(RTE_TAILQ_DISTRIBUTOR,
> +                     rte_distributor_list)) == NULL) {
> +             rte_errno = E_RTE_NO_TAILQ;
> +             return NULL;
> +     }
> +
> +     d = mz->addr;
> +     rte_snprintf(d->name, sizeof(d->name), "%s", name);
> +     d->flags = flags;
> +     d->num_workers = num_workers;
> +     TAILQ_INSERT_TAIL(distributor_list, d, next);
> +
> +     return d;
> +}
> +
> diff --git a/lib/librte_distributor/rte_distributor.h
> b/lib/librte_distributor/rte_distributor.h
> new file mode 100644
> index 0000000..b8021a0
> --- /dev/null
> +++ b/lib/librte_distributor/rte_distributor.h
> @@ -0,0 +1,45 @@
> +/*
> + * <COPYIRHGT_TAG>
> + */
> +
> +#ifndef _RTE_DISTRIBUTE_H_
> +#define _RTE_DISTRIBUTE_H_
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <rte_mbuf.h>
> +
> +#define RTE_DISTRIBUTOR_NAMESIZE 32
> +
> +#define RTE_DISTRIBUTOR_NOFLAGS 0
> +
> +struct rte_distributor;
> +
> +struct rte_distributor *
> +rte_distributor_create(const char *name, unsigned socket_id,
> +             unsigned num_workers, unsigned flags);
> +
> +int
> +rte_distributor_process(struct rte_distributor *d,
> +             struct rte_mbuf **mbufs, const unsigned num_mbufs);
> +
> +struct rte_mbuf *
> +rte_distributor_get_pkt(struct rte_distributor *d, unsigned worker_id);
> +
> +struct rte_mbuf *
> +rte_distributor_get_next_pkt(struct rte_distributor *d,
> +             unsigned worker_id, struct rte_mbuf *oldpkt);
> +
> +int
> +rte_distributor_return_pkt(struct rte_distributor *d, unsigned worker_id,
> +             struct rte_mbuf *mbuf);
> +
> +/******************************************/
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif
> diff --git a/lib/librte_eal/common/include/rte_tailq_elem.h
> b/lib/librte_eal/common/include/rte_tailq_elem.h
> index 2de4010..fdd2faf 100644
> --- a/lib/librte_eal/common/include/rte_tailq_elem.h
> +++ b/lib/librte_eal/common/include/rte_tailq_elem.h
> @@ -82,6 +82,8 @@ rte_tailq_elem(RTE_TAILQ_PM, "RTE_PM")
> 
>  rte_tailq_elem(RTE_TAILQ_ACL, "RTE_ACL")
> 
> +rte_tailq_elem(RTE_TAILQ_DISTRIBUTOR, "RTE_DISTRIBUTOR")
> +
>  rte_tailq_end(RTE_TAILQ_NUM)
> 
>  #undef rte_tailq_elem
> diff --git a/mk/rte.app.mk b/mk/rte.app.mk index 9c70ce0..64c0f6e 100644
> --- a/mk/rte.app.mk
> +++ b/mk/rte.app.mk
> @@ -70,6 +70,10 @@ LDLIBS += -lrte_ivshmem  endif  endif
> 
> +ifeq ($(CONFIG_RTE_LIBRTE_DISTRIBUTOR),y)
> +LDLIBS += -lrte_distributor
> +endif
> +
>  ifeq ($(CONFIG_RTE_LIBRTE_E1000_PMD),y)
>  LDLIBS += -lrte_pmd_e1000
>  endif

Reply via email to