ifeq ($(CONFIG_RTE_APP_TEST),y)
 SRCS-$(CONFIG_RTE_LIBRTE_ACL) += test_acl.c
diff --git a/app/test/commands.c b/app/test/commands.c
index a153026..5978cec 100644
--- a/app/test/commands.c
+++ b/app/test/commands.c
@@ -178,6 +178,8 @@ static void cmd_autotest_parsed(void *parsed_result,
                ret = test_common();
        if (!strcmp(res->autotest, "ivshmem_autotest"))
                ret = test_ivshmem();
+       if (!strcmp(res->autotest, "distributor_autotest"))
+               ret = test_distributor();
 #ifdef RTE_LIBRTE_PMD_RING
        if (!strcmp(res->autotest, "ring_pmd_autotest"))
                ret = test_pmd_ring();
@@ -234,7 +236,7 @@ cmdline_parse_token_string_t cmd_autotest_autotest =
 #ifdef RTE_LIBRTE_KVARGS
                        "kvargs_autotest#"
 #endif
-                       "common_autotest");
+                       "common_autotest#distributor_autotest");
 cmdline_parse_inst_t cmd_autotest = {
        .f = cmd_autotest_parsed,  /* function to call */
diff --git a/app/test/test.h b/app/test/test.h
index bc001b9..4621fc9 100644
--- a/app/test/test.h
+++ b/app/test/test.h
@@ -92,6 +92,7 @@ int test_power(void);
 int test_common(void);
 int test_pmd_ring(void);
 int test_ivshmem(void);
+int test_distributor(void);
 int test_kvargs(void);

 int test_pci_run;
diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
new file mode 100644
index 0000000..0456478
--- /dev/null
+++ b/app/test/test_distributor.c
@@ -0,0 +1,261 @@
+/*
+ * <COPYRIGHT_TAG>
+ */
+
+#include "test.h"
+
+#ifdef RTE_LIBRTE_DISTRIBUTOR
+#include <unistd.h>
+#include <string.h>
+#include <rte_cycles.h>
+#include <rte_distributor.h>
+#include "wmmintrin.h"
+
+#define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
+
+static volatile int quit = 0;
+static volatile unsigned worker_idx;
+
+struct worker_stats{
+       volatile unsigned handled_packets;
+} __rte_cache_aligned;
+struct worker_stats worker_stats[RTE_MAX_LCORE];
+
+#define do_big_pause() do { \
+               __m128i a = {0}; \
+               volatile __m128i res; \
+               res = _mm_clmulepi64_si128(a, res, 0); \
+               res = _mm_clmulepi64_si128(a, res, 0); \
+               res = _mm_clmulepi64_si128(a, res, 0); \
+               res = _mm_clmulepi64_si128(a, res, 0); \
+               RTE_SET_USED(res); \
+               /*rte_pause();*/ \
+} while(0)
+
+#define do_pause() do { \
+       rte_pause(); \
+} while(0)
+
+static void
+flip_bit(volatile uint64_t *arg)
+{
+       uint64_t old_val = 0;
+       while (old_val != 2) {
+               while(!*arg)
+                       do_big_pause();
+               old_val = *arg;
+               *arg = 0;
+       }
+}
+
+static void
+time_cache_line_switch(void)
+{
+       /* allocate a full cache line for data, we use only first byte of it */
+       uint64_t data[CACHE_LINE_SIZE / sizeof(uint64_t)];
+
+       unsigned i, slaveid = rte_get_next_lcore(rte_lcore_id(), 0, 0);
+       volatile uint64_t *pdata = &data[0];
+       *pdata = 1;
+       rte_eal_remote_launch((lcore_function_t *)flip_bit, &data[0], slaveid);
+       while (*pdata)
+               rte_pause();
+
+       const uint64_t start_time = rte_rdtsc();
+       for (i = 0; i < (1<< ITER_POWER); i++) {
+               while (*pdata)
+                       do_big_pause();
+               *pdata = 1;
+       }
+       const uint64_t end_time = rte_rdtsc();
+
+       while (*pdata)
+               do_pause();
+       *pdata = 2;
+       rte_eal_wait_lcore(slaveid);
+       printf("Time for %u iterations = %"PRIu64" ticks\n", (1<<ITER_POWER),
+                       end_time-start_time);
+       printf("Ticks per iteration = %"PRIu64"\n",
+                       (end_time-start_time) >> ITER_POWER);
+}
+
+static inline unsigned
+total_packet_count(void)
+{
+       unsigned i, count = 0;
+       for (i = 0; i < worker_idx; i++)
+               count += worker_stats[i].handled_packets;
+       return count;
+}
+
+static inline void
+clear_packet_count(void)
+{
+       memset(&worker_stats, 0, sizeof(worker_stats));
+}
+
+static int
+handle_work(void *arg)
+{
+       struct rte_mbuf *pkt = NULL;
+       struct rte_distributor *d = arg;
+       unsigned count = 0;
+       unsigned id = __sync_fetch_and_add(&worker_idx, 1);
+
+       printf("Worker %u starting\n", worker_idx);
+       pkt = rte_distributor_get_pkt(d, id);
+       while (!quit) {
+               worker_stats[id].handled_packets++;
+               pkt = rte_distributor_get_next_pkt(d, id, pkt);
+       }
+       rte_distributor_return_pkt(d, id, pkt);
+       printf("Worker %u handled %u pkts\n", id, count);
+       return 0;
+}
+
+static inline int
+perf_test(struct rte_distributor *d, struct rte_mempool *p)
+{
+#define BURST 32
+       unsigned i;
+       uint64_t start, end;
+       struct rte_mbuf *bufs[BURST];
+
+       clear_packet_count();
+       if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+               printf("Error getting mbufs from pool\n");
+               return -1;
+       }
+       /* ensure we have different hash value for each pkt */
+       for (i = 0; i < 32; i++)
+               bufs[i]->pkt.hash.rss = i;
+
+       start = rte_rdtsc();
+       for (i = 0; i < (1<<ITER_POWER); i++)
+               rte_distributor_process(d, bufs, BURST);
+       end = rte_rdtsc();
+
+       do {
+               usleep(100);
+               rte_distributor_process(d, NULL, 0);
+       } while (total_packet_count() < (BURST << ITER_POWER));
+
+       printf("Time per burst:  %"PRIu64"\n", (end - start) >> ITER_POWER);
+       printf("Time per packet: %"PRIu64"\n", ((end - start) >> 
ITER_POWER)/BURST);
+       rte_mempool_put_bulk(p, (void *)bufs, BURST);
+
+       for (i = 0; i < rte_lcore_count() - 1; i++)
+               printf("Worker %u handled %u packets\n", i,
+                               worker_stats[i].handled_packets);
+       printf("Total packets: %u (%x)\n", total_packet_count(),
+                       total_packet_count());
+       printf("Perf test done\n");
+
+       return 0;
+}
+
+static int
+sanity_test(struct rte_distributor *d, struct rte_mempool *p)
+{
+       struct rte_mbuf *bufs[32];
+       unsigned i;
+
+       clear_packet_count();
+       if (rte_mempool_get_bulk(p, (void *)bufs, 32) != 0) {
+               printf("Error getting mbufs from pool\n");
+               return -1;
+       }
+
+       /* now set all hash values in all buffers to zero, so all pkts go to the
+        * one worker thread */
+       for (i = 0; i < 32; i++)
+               bufs[i]->pkt.hash.rss = 0;
+
+       rte_distributor_process(d, bufs, 32);
+       do {
+               usleep(100);
+               rte_distributor_process(d, NULL, 0);
+       } while(total_packet_count() < 32);
+
+       for (i = 0; i < rte_lcore_count() - 1; i++)
+               printf("Worker %u handled %u packets\n", i,
+                               worker_stats[i].handled_packets);
+       printf("Sanity test with all zero hashes done.\n");
+       if (worker_stats[0].handled_packets != 32)
+               return -1;
+
+       /* give a different hash value to each packet, so load gets distributed 
*/
+       clear_packet_count();
+       for (i = 0; i < 32; i++)
+               bufs[i]->pkt.hash.rss = i;
+
+       rte_distributor_process(d, bufs, 32);
+       do {
+               usleep(100);
+               rte_distributor_process(d, NULL, 0);
+       } while(total_packet_count() < 32);
+
+       for (i = 0; i < rte_lcore_count() - 1; i++)
+               printf("Worker %u handled %u packets\n", i,
+                               worker_stats[i].handled_packets);
+       printf("Sanity test with non-zero hashes done\n");
+
+       rte_mempool_put_bulk(p, (void *)bufs, 32);
+       return 0;
+}
+
+#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
+
+int
+test_distributor(void)
+{
+       static struct rte_distributor *d = NULL;
+       static struct rte_mempool *p = NULL;
+
+       if (rte_lcore_count() < 2) {
+               printf("ERROR: not enough cores to test distributor\n");
+               return -1;
+       }
+
+       /* first time how long it takes to round-trip a cache line */
+       time_cache_line_switch();
+
+       if (d == NULL) {
+               d = rte_distributor_create("Test_distributor", rte_socket_id(),
+                               rte_lcore_count() - 1, RTE_DISTRIBUTOR_NOFLAGS);
+               if (d == NULL) {
+                       printf("Error creating distributor\n");
+                       return -1;
+               }
+               rte_eal_mp_remote_launch(handle_work, d, SKIP_MASTER);
+       }
+
+       if (p == NULL) {
+               p = rte_mempool_create("MBUF_POOL", 511, MBUF_SIZE, 64,
+                               sizeof(struct rte_pktmbuf_pool_private),
+                               rte_pktmbuf_pool_init, NULL,
+                               rte_pktmbuf_init, NULL,
+                               rte_socket_id(), 0);
+               if (p == NULL) {
+                       printf("Error creating mempool\n");
+                       return -1;
+               }
+       }
+
+       sanity_test(d, p);
+       perf_test(d,p);
+       return 0;
+}
+
+#else
+
+#include <stdio.h>
+
+int
+test_distributor(void)
+{
+       printf("Distributor is not enabled in configuration\n");
+       return 0;
+}
+
+#endif
diff --git a/config/defconfig_i686-default-linuxapp-gcc 
b/config/defconfig_i686-default-linuxapp-gcc
index 14bd3d1..5b4261e 100644
--- a/config/defconfig_i686-default-linuxapp-gcc
+++ b/config/defconfig_i686-default-linuxapp-gcc
@@ -335,3 +335,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
 #
 CONFIG_RTE_NIC_BYPASS=n

+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/config/defconfig_i686-default-linuxapp-icc 
b/config/defconfig_i686-default-linuxapp-icc
index ec3386e..d1d4aeb 100644
--- a/config/defconfig_i686-default-linuxapp-icc
+++ b/config/defconfig_i686-default-linuxapp-icc
@@ -334,3 +334,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
 #
 CONFIG_RTE_NIC_BYPASS=n

+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/config/defconfig_x86_64-default-bsdapp-gcc 
b/config/defconfig_x86_64-default-bsdapp-gcc
index d960e1d..329920e 100644
--- a/config/defconfig_x86_64-default-bsdapp-gcc
+++ b/config/defconfig_x86_64-default-bsdapp-gcc
@@ -300,3 +300,9 @@ CONFIG_RTE_APP_TEST=y
 CONFIG_RTE_TEST_PMD=y
 CONFIG_RTE_TEST_PMD_RECORD_CORE_CYCLES=n
 CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
+
+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/config/defconfig_x86_64-default-linuxapp-gcc 
b/config/defconfig_x86_64-default-linuxapp-gcc
index f11ffbf..772a6b3 100644
--- a/config/defconfig_x86_64-default-linuxapp-gcc
+++ b/config/defconfig_x86_64-default-linuxapp-gcc
@@ -337,3 +337,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
 #
 CONFIG_RTE_NIC_BYPASS=n

+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/config/defconfig_x86_64-default-linuxapp-icc 
b/config/defconfig_x86_64-default-linuxapp-icc
index 4eaca4c..04affc8 100644
--- a/config/defconfig_x86_64-default-linuxapp-icc
+++ b/config/defconfig_x86_64-default-linuxapp-icc
@@ -333,3 +333,8 @@ CONFIG_RTE_TEST_PMD_RECORD_BURST_STATS=n
 #
 CONFIG_RTE_NIC_BYPASS=n

+#
+# Compile the distributor library
+#
+CONFIG_RTE_LIBRTE_DISTRIBUTOR=y
+
diff --git a/config/defconfig_x86_64-ivshmem-linuxapp-gcc 
b/config/defconfig_x86_64-ivshmem-linuxapp-gcc
index 2f55a69..0002dca 100644
--- a/config/defconfig_x86_64-ivshmem-linuxapp-gcc
+++ b/config/defconfig_x86_64-ivshmem-linuxapp-gcc
@@ -46,4 +46,4 @@ CONFIG_RTE_LIBRTE_IVSHMEM_MAX_ENTRIES=128
 CONFIG_RTE_LIBRTE_IVSHMEM_MAX_METADATA_FILES=32

 # Set EAL to single file segments
-CONFIG_RTE_EAL_SINGLE_FILE_SEGMENTS=y
\ No newline at end of file
+CONFIG_RTE_EAL_SINGLE_FILE_SEGMENTS=y
diff --git a/examples/distributor_app/Makefile 
b/examples/distributor_app/Makefile
new file mode 100644
index 0000000..9a1ebd3
--- /dev/null
+++ b/examples/distributor_app/Makefile
@@ -0,0 +1,28 @@
+# <COPYRIGHT_TAG>
+
+ifeq ($(RTE_SDK),)
+$(error "Please define RTE_SDK environment variable")
+endif
+
+# Default target, can be overriden by command line or environment
+RTE_TARGET ?= x86_64-default-linuxapp-gcc
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# binary name
+APP = basicfwd
+
+# all source are stored in SRCS-y
+SRCS-y := basicfwd.c
+
+CFLAGS += $(WERROR_FLAGS)
+
+# workaround for a gcc bug with noreturn attribute
+# http://gcc.gnu.org/bugzilla/show_bug.cgi?id=12603
+ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y)
+CFLAGS_main.o += -Wno-return-type
+endif
+
+EXTRA_CFLAGS += -O3 -g -Wfatal-errors
+
+include $(RTE_SDK)/mk/rte.extapp.mk
diff --git a/examples/distributor_app/basicfwd.c 
b/examples/distributor_app/basicfwd.c
new file mode 100644
index 0000000..7662d91
--- /dev/null
+++ b/examples/distributor_app/basicfwd.c
@@ -0,0 +1,248 @@
+/*-
+ * <COPYRIGHT_TAG>
+ */
+
+#include <stdint.h>
+#include <inttypes.h>
+#include <unistd.h>
+
+#include <rte_eal.h>
+#include <rte_ethdev.h>
+#include <rte_cycles.h>
+#include <rte_distributor.h>
+#include "basicfwd.h"
+
+#define RX_RING_SIZE 128
+#define RX_FREE_THRESH 32
+#define RX_PTHRESH 8
+#define RX_HTHRESH 8
+#define RX_WTHRESH 0
+
+#define TX_RING_SIZE 512
+#define TX_FREE_THRESH 32
+#define TX_PTHRESH 32
+#define TX_HTHRESH 0
+#define TX_WTHRESH 0
+#define TX_RSBIT_THRESH 32
+#define TX_Q_FLAGS (ETH_TXQ_FLAGS_NOMULTSEGS | ETH_TXQ_FLAGS_NOVLANOFFL |\
+       ETH_TXQ_FLAGS_NOXSUMSCTP | ETH_TXQ_FLAGS_NOXSUMUDP | \
+       ETH_TXQ_FLAGS_NOXSUMTCP)
+
+#define NUM_MBUFS 8191
+#define MBUF_SIZE (2048 + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
+#define MBUF_CACHE_SIZE 250
+#define BURST_SIZE 32
+
+static struct rte_eth_conf port_conf_default = {
+       .rxmode = {
+               .mq_mode = ETH_MQ_RX_RSS,
+               .max_rx_pkt_len = ETHER_MAX_LEN,
+               .split_hdr_size = 0,
+               .header_split   = 0, /**< Header Split disabled */
+               .hw_ip_checksum = 0, /**< IP checksum offload enabled */
+               .hw_vlan_filter = 0, /**< VLAN filtering disabled */
+               .jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
+               .hw_strip_crc   = 0, /**< CRC stripped by hardware */
+       },
+       .txmode = {
+               .mq_mode = ETH_MQ_TX_NONE,
+       },
+       .lpbk_mode = 0,
+       .rx_adv_conf = {
+                       .rss_conf = {
+                               .rss_hf = ETH_RSS_IPV4 | ETH_RSS_IPV6 | 
ETH_RSS_IPV4_TCP |
+                                       ETH_RSS_IPV4_UDP | ETH_RSS_IPV6_TCP | 
ETH_RSS_IPV6_UDP,
+                       }
+       },
+};
+
+static const struct rte_eth_rxconf rx_conf_default = {
+       .rx_thresh = {
+               .pthresh = RX_PTHRESH,
+               .hthresh = RX_HTHRESH,
+               .wthresh = RX_WTHRESH,
+       },
+       .rx_free_thresh = RX_FREE_THRESH,
+       .rx_drop_en = 0,
+};
+
+static struct rte_eth_txconf tx_conf_default = {
+       .tx_thresh = {
+               .pthresh = TX_PTHRESH,
+               .hthresh = TX_HTHRESH,
+               .wthresh = TX_WTHRESH,
+       },
+       .tx_free_thresh = TX_FREE_THRESH,
+       .tx_rs_thresh = TX_RSBIT_THRESH,
+       .txq_flags = TX_Q_FLAGS
+
+};
+
+#define NOFLAGS 0
+
+
+/*
+ * Initialises a given port using global settings and with the rx buffers
+ * coming from the mbuf_pool passed as parameter
+ */
+static inline int
+port_init(uint8_t port, struct rte_mempool *mbuf_pool)
+{
+       struct rte_eth_conf port_conf = port_conf_default;
+       const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1;
+       int retval;
+       uint16_t q;
+
+       if (port >= rte_eth_dev_count())
+               return -1;
+
+       retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf);
+       if (retval != 0)
+               return retval;
+
+       for (q = 0; q < rxRings; q ++) {
+               retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE,
+                                               rte_eth_dev_socket_id(port), 
&rx_conf_default,
+                                               mbuf_pool);
+               if (retval < 0)
+                       return retval;
+       }
+
+       for (q = 0; q < txRings; q ++) {
+               retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE,
+                                               rte_eth_dev_socket_id(port), 
&tx_conf_default);
+               if (retval < 0)
+                       return retval;
+       }
+
+       retval  = rte_eth_dev_start(port);
+       if (retval < 0)
+               return retval;
+
+       struct rte_eth_link link;
+       rte_eth_link_get_nowait(port, &link);
+       if (!link.link_status) {
+               sleep(1);
+               rte_eth_link_get_nowait(port, &link);
+       }
+
+       if (!link.link_status) {
+               printf("Link down on port %"PRIu8"\n", port);
+               return 0;
+       }
+
+       struct ether_addr addr;
+       rte_eth_macaddr_get(port, &addr);
+       printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
+                       " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
+                       (unsigned)port,
+                       addr.addr_bytes[0], addr.addr_bytes[1], 
addr.addr_bytes[2],
+                       addr.addr_bytes[3], addr.addr_bytes[4], 
addr.addr_bytes[5]);
+
+       rte_eth_promiscuous_enable(port);
+
+       return 0;
+}
+
+/*
+ * Main thread that does the work, reading from INPUT_PORT
+ * and writing to OUTPUT_PORT
+ */
+static  __attribute__((noreturn)) void
+lcore_main(struct rte_distributor *d)
+{
+       const uint8_t nb_ports = rte_eth_dev_count();
+       uint8_t port;
+       for (port = 0; port < nb_ports; port++)
+               if (rte_eth_dev_socket_id(port) > 0 && 
+                               rte_eth_dev_socket_id(port) != 
(int)rte_socket_id())
+                       printf("WARNING, port %u is on remote NUMA node to 
polling thread."
+                                       "\n\tPerformance will not be 
optimal.\n", port);
+
+       printf("\nCore %u distributing packets. [Ctrl+C to quit]\n", 
rte_lcore_id());
+       for(;;) {
+               for (port = 0; port < nb_ports; port++) {
+                       struct rte_mbuf *bufs[BURST_SIZE];
+                       const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, 
BURST_SIZE);
+                       if (unlikely(nb_rx == 0))
+                               continue;
+
+                       rte_distributor_process(d, bufs, nb_rx);
+               }
+       }
+}
+
+static __attribute__((noreturn)) void
+tx_main(struct rte_distributor *d)
+{
+       static volatile unsigned num_workers = 0;
+       struct rte_mbuf *to_send[BURST_SIZE];
+       unsigned nb_to_send = 0;
+       unsigned port_id = 0;
+       unsigned id = __sync_fetch_and_add(&num_workers, 1);
+
+       struct rte_mbuf *pkt = rte_distributor_get_pkt(d, id);
+       for (;;) {
+               to_send[nb_to_send++] = pkt;
+               if (nb_to_send == BURST_SIZE) {
+                       rte_eth_tx_burst(port_id++, id, to_send, BURST_SIZE);
+                       nb_to_send = 0;
+                       if (port_id == rte_eth_dev_count())
+                               port_id = 0;
+               }
+               pkt = rte_distributor_get_next_pkt(d, id, pkt);
+       }
+}
+
+/* Main function, does initialisation and calls the per-lcore functions */
+int
+MAIN(int argc, char *argv[])
+{
+       struct rte_mempool *mbuf_pool;
+       struct rte_distributor *d;
+       unsigned nb_ports;
+       uint8_t portid;
+
+       /* init EAL */
+       int ret = rte_eal_init(argc, argv);
+       if (ret < 0)
+               rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
+       argc -= ret;
+       argv += ret;
+
+       if (rte_ixgbe_pmd_init() != 0 ||
+                       rte_eal_pci_probe() != 0)
+               rte_exit(EXIT_FAILURE, "Error with NIC driver 
initialization\n");
+       
+       nb_ports = rte_eth_dev_count();
+       if (nb_ports < 2 || (nb_ports & 1))
+               rte_exit(EXIT_FAILURE, "Error: number of ports must be even\n");
+
+       mbuf_pool = rte_mempool_create("MBUF_POOL", NUM_MBUFS * nb_ports,
+                                      MBUF_SIZE, MBUF_CACHE_SIZE,
+                                      sizeof(struct rte_pktmbuf_pool_private),
+                                      rte_pktmbuf_pool_init, NULL,
+                                      rte_pktmbuf_init, NULL,
+                                      rte_socket_id(), 0);
+       if (mbuf_pool == NULL)
+               rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");
+
+       /* initialize all ports */
+       for (portid = 0; portid < nb_ports; portid++)
+               if (port_init(portid, mbuf_pool) != 0) 
+                       rte_exit(EXIT_FAILURE, "Cannot initialize port 
%"PRIu8"\n", portid);
+
+       if (rte_lcore_count() == 1)
+               rte_exit(EXIT_FAILURE, "Error, need worker cores\n");
+
+       d = rte_distributor_create("PKT_DIST", rte_socket_id(),
+                       rte_lcore_count() - 1, NOFLAGS);
+       if (d == NULL)
+               rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
+
+       rte_eal_mp_remote_launch((lcore_function_t *)tx_main, d, SKIP_MASTER);
+       /* call lcore_main on master core only */
+       lcore_main(d);
+       return 0;
+}
+
diff --git a/examples/distributor_app/basicfwd.h 
b/examples/distributor_app/basicfwd.h
new file mode 100644
index 0000000..4ff20bc
--- /dev/null
+++ b/examples/distributor_app/basicfwd.h
@@ -0,0 +1,17 @@
+/*-
+ * <COPYRIGHT_TAG>
+ */
+
+#ifndef _MAIN_H_
+#define _MAIN_H_
+
+
+#ifdef RTE_EXEC_ENV_BAREMETAL
+#define MAIN _main
+#else
+#define MAIN main
+#endif
+
+int MAIN( int argc, char *argv[] );
+
+#endif /* ifndef _MAIN_H_ */
diff --git a/lib/Makefile b/lib/Makefile
index b92b392..5a0b10f 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -55,6 +55,7 @@ DIRS-$(CONFIG_RTE_LIBRTE_METER) += librte_meter
 DIRS-$(CONFIG_RTE_LIBRTE_SCHED) += librte_sched
 DIRS-$(CONFIG_RTE_LIBRTE_ACL) += librte_acl
 DIRS-$(CONFIG_RTE_LIBRTE_KVARGS) += librte_kvargs
+DIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += librte_distributor

 ifeq ($(CONFIG_RTE_EXEC_ENV_LINUXAPP),y)
 DIRS-$(CONFIG_RTE_LIBRTE_KNI) += librte_kni
diff --git a/lib/librte_distributor/Makefile b/lib/librte_distributor/Makefile
new file mode 100644
index 0000000..794dd46
--- /dev/null
+++ b/lib/librte_distributor/Makefile
@@ -0,0 +1,21 @@
+# <COPYRIGHT_TAG>
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# library name
+LIB = librte_distributor.a
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
+
+# all source are stored in SRCS-y
+SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) := rte_distributor.c
+
+# install this header file
+SYMLINK-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR)-include := rte_distributor.h
+
+# this lib needs eal
+DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_eal
+DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_mbuf
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_distributor/rte_distributor.c 
b/lib/librte_distributor/rte_distributor.c
new file mode 100644
index 0000000..f7aea9a
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor.c
@@ -0,0 +1,201 @@
+/*
+ * <COPYRIGHT_TAG>
+ */
+
+#include <stdio.h>
+#include <sys/queue.h>
+#include <x86intrin.h>
+#include <rte_mbuf.h>
+#include <rte_memzone.h>
+#include <rte_errno.h>
+#include <rte_string_fns.h>
+#include <rte_tailq.h>
+#include <rte_eal_memconfig.h>
+#include "rte_distributor.h"
+
+#define NO_FLAGS 0
+#define DISTRIBUTOR_PREFIX "DT_"
+
+#define RTE_DISTRIBUTOR_FLAG_BITS 4
+#define RTE_DISTRIBUTOR_NO_BUF 0
+/* we will use the top four bits of pointer for flags */
+#define RTE_DISTRIBUTOR_GET_BUF (1)
+#define RTE_DISTRIBUTOR_RETURN_BUF (2)
+
+#define RTE_DISTRIBUTOR_BACKLOG_SIZE 8
+#define RTE_DISTRIBUTOR_BACKLOG_MASK 7
+
+#define RTE_DISTRIBUTOR_FLAGS_MASK (0x0F)
+
+union rte_distributor_buffer {
+       volatile int64_t bufptr64;
+       char pad[CACHE_LINE_SIZE*3];
+} __rte_cache_aligned;
+
+struct rte_distributor_backlog {
+       int64_t pkts[RTE_DISTRIBUTOR_BACKLOG_SIZE];
+       unsigned start;
+       unsigned count;
+};
+
+struct rte_distributor {
+       TAILQ_ENTRY(rte_distributor) next;    /**< Next in list. */
+
+       char name[RTE_DISTRIBUTOR_NAMESIZE];   /**< Name of the ring. */
+       unsigned flags;                       /**< Flags supplied at creation. 
*/
+       unsigned num_workers;                 /**< Number of workers polling */
+
+       uint32_t in_flight_ids[RTE_MAX_LCORE];
+       struct rte_distributor_backlog backlog[RTE_MAX_LCORE];
+
+       union rte_distributor_buffer buffers[RTE_MAX_LCORE];
+};
+
+TAILQ_HEAD(rte_distributor_list, rte_distributor);
+
+static int
+add_to_backlog(struct rte_distributor_backlog *bl, int64_t item)
+{
+       if (bl->count == RTE_DISTRIBUTOR_BACKLOG_SIZE)
+               return -1;
+
+       bl->pkts[(bl->start + bl->count++) &  (RTE_DISTRIBUTOR_BACKLOG_MASK)] = 
item;
+       return 0;
+}
+
+static int64_t
+backlog_pop(struct rte_distributor_backlog *bl)
+{
+       bl->count--;
+       return bl->pkts[bl->start++ & RTE_DISTRIBUTOR_BACKLOG_MASK];
+}
+
+int
+rte_distributor_process(struct rte_distributor *d,
+               struct rte_mbuf **mbufs, const unsigned num_mbufs)
+{
+       unsigned next_idx = 0;
+       unsigned worker_offset = 0;
+
+       while (next_idx < num_mbufs) {
+               int64_t data = d->buffers[worker_offset].bufptr64;
+               if (data & RTE_DISTRIBUTOR_GET_BUF) {
+
+                       if (d->backlog[worker_offset].count)
+                               d->buffers[worker_offset].bufptr64 =
+                                               
backlog_pop(&d->backlog[worker_offset]);
+
+                       else {
+                               struct rte_mbuf *next_mb = mbufs[next_idx++];
+                               int64_t next_value = ((uintptr_t)next_mb << 
RTE_DISTRIBUTOR_FLAG_BITS);
+
+                               /* note signed variable - arithmetic shift */
+                               int64_t oldbuf = data >> 
RTE_DISTRIBUTOR_FLAG_BITS;
+                               if (oldbuf)
+                                       d->in_flight_ids[worker_offset] = 0;
+
+                               uint32_t match = 0, newid = 
(next_mb->pkt.hash.rss | 1);
+                               unsigned i;
+                               for (i = 0; i < d->num_workers; i++)
+                                       match |= (!(d->in_flight_ids[i] ^ 
newid) << i);
+                               if (!match) {
+                                       d->buffers[worker_offset].bufptr64 = 
next_value;
+                                       d->in_flight_ids[worker_offset] = newid;
+                               } else {
+                                       unsigned worker = __builtin_ctz(match);
+                                       if (add_to_backlog(&d->backlog[worker], 
next_value) < 0)
+                                               next_idx--;
+                               }
+                       }
+               }
+               if (++worker_offset == d->num_workers)
+                       worker_offset = 0;
+       }
+       /* to finish, check all workers for backlog and schedule work for them
+        * if they are ready */
+       for (worker_offset = 0; worker_offset < d->num_workers; worker_offset++)
+               if (d->backlog[worker_offset].count &&
+                               (d->buffers[worker_offset].bufptr64 & 
RTE_DISTRIBUTOR_GET_BUF))
+                       d->buffers[worker_offset].bufptr64 =
+                                                                       
backlog_pop(&d->backlog[worker_offset]);
+
+       return num_mbufs;
+}
+
+struct rte_mbuf *
+rte_distributor_get_pkt(struct rte_distributor *d,
+               unsigned worker_id)
+{
+       union rte_distributor_buffer *buf = &d->buffers[worker_id];
+       buf->bufptr64 |= RTE_DISTRIBUTOR_GET_BUF;
+       while (buf->bufptr64 & RTE_DISTRIBUTOR_GET_BUF)
+               rte_pause();
+       /* since bufptr64 is a signed value, this should be an arithmetic shift 
*/
+       int64_t ret = buf->bufptr64 >> RTE_DISTRIBUTOR_FLAG_BITS;
+       return (struct rte_mbuf *)((uintptr_t)ret);
+}
+
+struct rte_mbuf *
+rte_distributor_get_next_pkt(struct rte_distributor *d,
+               unsigned worker_id, struct rte_mbuf *oldpkt)
+{
+       union rte_distributor_buffer *buf = &d->buffers[worker_id];
+       int64_t req = ((uintptr_t)oldpkt << RTE_DISTRIBUTOR_FLAG_BITS) | \
+                       RTE_DISTRIBUTOR_GET_BUF;
+       buf->bufptr64 = req;
+       while (buf->bufptr64 & RTE_DISTRIBUTOR_GET_BUF)
+               rte_pause();
+       /* since bufptr64 is a signed value, this should be an arithmetic shift 
*/
+       int64_t ret = buf->bufptr64 >> RTE_DISTRIBUTOR_FLAG_BITS;
+       return (struct rte_mbuf *)((uintptr_t)ret);
+}
+
+int
+rte_distributor_return_pkt(struct rte_distributor *d,
+               unsigned worker_id, struct rte_mbuf *oldpkt)
+{
+       union rte_distributor_buffer *buf = &d->buffers[worker_id];
+       uint64_t req = ((uintptr_t)oldpkt << RTE_DISTRIBUTOR_FLAG_BITS) | \
+                       RTE_DISTRIBUTOR_RETURN_BUF;
+       buf->bufptr64 = req;
+       while (buf->bufptr64 & RTE_DISTRIBUTOR_RETURN_BUF)
+               rte_pause();
+       return 0;
+}
+
+struct rte_distributor *
+rte_distributor_create(const char *name, unsigned socket_id,
+               unsigned num_workers, unsigned flags)
+{
+       struct rte_distributor *d;
+       struct rte_distributor_list *distributor_list;
+       char mz_name[RTE_MEMZONE_NAMESIZE];
+       const struct rte_memzone *mz;
+
+       /* compilation-time checks */
+       RTE_BUILD_BUG_ON((sizeof(*d) & CACHE_LINE_MASK) != 0);
+       RTE_BUILD_BUG_ON((RTE_MAX_LCORE & 7) != 0);
+
+       rte_snprintf(mz_name, sizeof(mz_name), DISTRIBUTOR_PREFIX"%s", name);
+       mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
+       if (mz == NULL) {
+               rte_errno = ENOMEM;
+               return NULL;
+       }
+
+       /* check that we have an initialised tail queue */
+       if ((distributor_list = RTE_TAILQ_LOOKUP_BY_IDX(RTE_TAILQ_DISTRIBUTOR,
+                       rte_distributor_list)) == NULL) {
+               rte_errno = E_RTE_NO_TAILQ;
+               return NULL;
+       }
+
+       d = mz->addr;
+       rte_snprintf(d->name, sizeof(d->name), "%s", name);
+       d->flags = flags;
+       d->num_workers = num_workers;
+       TAILQ_INSERT_TAIL(distributor_list, d, next);
+
+       return d;
+}
+
diff --git a/lib/librte_distributor/rte_distributor.h 
b/lib/librte_distributor/rte_distributor.h
new file mode 100644
index 0000000..b8021a0
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor.h
@@ -0,0 +1,45 @@
+/*
+ * <COPYIRHGT_TAG>
+ */
+
+#ifndef _RTE_DISTRIBUTE_H_
+#define _RTE_DISTRIBUTE_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <rte_mbuf.h>
+
+#define RTE_DISTRIBUTOR_NAMESIZE 32
+
+#define RTE_DISTRIBUTOR_NOFLAGS 0
+
+struct rte_distributor;
+
+struct rte_distributor *
+rte_distributor_create(const char *name, unsigned socket_id,
+               unsigned num_workers, unsigned flags);
+
+int
+rte_distributor_process(struct rte_distributor *d,
+               struct rte_mbuf **mbufs, const unsigned num_mbufs);
+
+struct rte_mbuf *
+rte_distributor_get_pkt(struct rte_distributor *d, unsigned worker_id);
+
+struct rte_mbuf *
+rte_distributor_get_next_pkt(struct rte_distributor *d,
+               unsigned worker_id, struct rte_mbuf *oldpkt);
+
+int
+rte_distributor_return_pkt(struct rte_distributor *d, unsigned worker_id,
+               struct rte_mbuf *mbuf);
+
+/******************************************/
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/librte_eal/common/include/rte_tailq_elem.h 
b/lib/librte_eal/common/include/rte_tailq_elem.h
index 2de4010..fdd2faf 100644
--- a/lib/librte_eal/common/include/rte_tailq_elem.h
+++ b/lib/librte_eal/common/include/rte_tailq_elem.h
@@ -82,6 +82,8 @@ rte_tailq_elem(RTE_TAILQ_PM, "RTE_PM")

 rte_tailq_elem(RTE_TAILQ_ACL, "RTE_ACL")

+rte_tailq_elem(RTE_TAILQ_DISTRIBUTOR, "RTE_DISTRIBUTOR")
+
 rte_tailq_end(RTE_TAILQ_NUM)

 #undef rte_tailq_elem
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index 9c70ce0..64c0f6e 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -70,6 +70,10 @@ LDLIBS += -lrte_ivshmem
 endif
 endif

+ifeq ($(CONFIG_RTE_LIBRTE_DISTRIBUTOR),y)
+LDLIBS += -lrte_distributor
+endif
+
 ifeq ($(CONFIG_RTE_LIBRTE_E1000_PMD),y)
 LDLIBS += -lrte_pmd_e1000
 endif

Reply via email to