User application is advocated to set the newly introduced union field
mbuf->hash.usr as flow id, which is uint32_t.
With introduction of in_flight_bitmask, the whole 32 bits of tag can
be used.

Further more, this patch fixed the integer overflow when finding the
matched tags.

Note that currently librte_distributor supports up to 64 worker
threads. If more workers are needed, the size of in_flight_bitmask and
the algorithm of finding matched tag must be revised.

Signed-off-by: Qinglai Xiao <jigsaw at gmail.com>
---
 app/test/test_distributor.c              |   18 ++++++------
 app/test/test_distributor_perf.c         |    4 +-
 lib/librte_distributor/rte_distributor.c |   45 +++++++++++++++++++++--------
 lib/librte_distributor/rte_distributor.h |    3 ++
 lib/librte_mbuf/rte_mbuf.h               |    1 +
 5 files changed, 47 insertions(+), 24 deletions(-)

diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
index ce06436..9e8c06d 100644
--- a/app/test/test_distributor.c
+++ b/app/test/test_distributor.c
@@ -120,7 +120,7 @@ sanity_test(struct rte_distributor *d, struct rte_mempool 
*p)
        /* now set all hash values in all buffers to zero, so all pkts go to the
         * one worker thread */
        for (i = 0; i < BURST; i++)
-               bufs[i]->hash.rss = 0;
+               bufs[i]->hash.usr = 0;

        rte_distributor_process(d, bufs, BURST);
        rte_distributor_flush(d);
@@ -142,7 +142,7 @@ sanity_test(struct rte_distributor *d, struct rte_mempool 
*p)
        if (rte_lcore_count() >= 3) {
                clear_packet_count();
                for (i = 0; i < BURST; i++)
-                       bufs[i]->hash.rss = (i & 1) << 8;
+                       bufs[i]->hash.usr = (i & 1) << 8;

                rte_distributor_process(d, bufs, BURST);
                rte_distributor_flush(d);
@@ -167,7 +167,7 @@ sanity_test(struct rte_distributor *d, struct rte_mempool 
*p)
         * so load gets distributed */
        clear_packet_count();
        for (i = 0; i < BURST; i++)
-               bufs[i]->hash.rss = i;
+               bufs[i]->hash.usr = i;

        rte_distributor_process(d, bufs, BURST);
        rte_distributor_flush(d);
@@ -199,7 +199,7 @@ sanity_test(struct rte_distributor *d, struct rte_mempool 
*p)
                return -1;
        }
        for (i = 0; i < BIG_BATCH; i++)
-               many_bufs[i]->hash.rss = i << 2;
+               many_bufs[i]->hash.usr = i << 2;

        for (i = 0; i < BIG_BATCH/BURST; i++) {
                rte_distributor_process(d, &many_bufs[i*BURST], BURST);
@@ -280,7 +280,7 @@ sanity_test_with_mbuf_alloc(struct rte_distributor *d, 
struct rte_mempool *p)
                while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0)
                        rte_distributor_process(d, NULL, 0);
                for (j = 0; j < BURST; j++) {
-                       bufs[j]->hash.rss = (i+j) << 1;
+                       bufs[j]->hash.usr = (i+j) << 1;
                        rte_mbuf_refcnt_set(bufs[j], 1);
                }

@@ -359,7 +359,7 @@ sanity_test_with_worker_shutdown(struct rte_distributor *d,
        /* now set all hash values in all buffers to zero, so all pkts go to the
         * one worker thread */
        for (i = 0; i < BURST; i++)
-               bufs[i]->hash.rss = 0;
+               bufs[i]->hash.usr = 0;

        rte_distributor_process(d, bufs, BURST);
        /* at this point, we will have processed some packets and have a full
@@ -372,7 +372,7 @@ sanity_test_with_worker_shutdown(struct rte_distributor *d,
                return -1;
        }
        for (i = 0; i < BURST; i++)
-               bufs[i]->hash.rss = 0;
+               bufs[i]->hash.usr = 0;

        /* get worker zero to quit */
        zero_quit = 1;
@@ -416,7 +416,7 @@ test_flush_with_worker_shutdown(struct rte_distributor *d,
        /* now set all hash values in all buffers to zero, so all pkts go to the
         * one worker thread */
        for (i = 0; i < BURST; i++)
-               bufs[i]->hash.rss = 0;
+               bufs[i]->hash.usr = 0;

        rte_distributor_process(d, bufs, BURST);
        /* at this point, we will have processed some packets and have a full
@@ -488,7 +488,7 @@ quit_workers(struct rte_distributor *d, struct rte_mempool 
*p)
        zero_quit = 0;
        quit = 1;
        for (i = 0; i < num_workers; i++)
-               bufs[i]->hash.rss = i << 1;
+               bufs[i]->hash.usr = i << 1;
        rte_distributor_process(d, bufs, num_workers);

        rte_mempool_put_bulk(p, (void *)bufs, num_workers);
diff --git a/app/test/test_distributor_perf.c b/app/test/test_distributor_perf.c
index b04864c..48ee344 100644
--- a/app/test/test_distributor_perf.c
+++ b/app/test/test_distributor_perf.c
@@ -159,7 +159,7 @@ perf_test(struct rte_distributor *d, struct rte_mempool *p)
        }
        /* ensure we have different hash value for each pkt */
        for (i = 0; i < BURST; i++)
-               bufs[i]->hash.rss = i;
+               bufs[i]->hash.usr = i;

        start = rte_rdtsc();
        for (i = 0; i < (1<<ITER_POWER); i++)
@@ -198,7 +198,7 @@ quit_workers(struct rte_distributor *d, struct rte_mempool 
*p)

        quit = 1;
        for (i = 0; i < num_workers; i++)
-               bufs[i]->hash.rss = i << 1;
+               bufs[i]->hash.usr = i << 1;
        rte_distributor_process(d, bufs, num_workers);

        rte_mempool_put_bulk(p, (void *)bufs, num_workers);
diff --git a/lib/librte_distributor/rte_distributor.c 
b/lib/librte_distributor/rte_distributor.c
index 656ee5c..a84ea97 100644
--- a/lib/librte_distributor/rte_distributor.c
+++ b/lib/librte_distributor/rte_distributor.c
@@ -92,7 +92,13 @@ struct rte_distributor {
        unsigned num_workers;                 /**< Number of workers polling */

        uint32_t in_flight_tags[RTE_MAX_LCORE];
-               /**< Tracks the tag being processed per core, 0 == no pkt */
+               /**< Tracks the tag being processed per core */
+       uint64_t in_flight_bitmask;
+               /**< on/off bits for in-flight tags.
+                * Note that if RTE_MAX_LCORE is larger than 64 then
+                * the bitmask has to expand.
+                */
+
        struct rte_distributor_backlog backlog[RTE_MAX_LCORE];

        union rte_distributor_buffer bufs[RTE_MAX_LCORE];
@@ -189,6 +195,7 @@ static inline void
 handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
 {
        d->in_flight_tags[wkr] = 0;
+       d->in_flight_bitmask &= ~(1UL << wkr);
        d->bufs[wkr].bufptr64 = 0;
        if (unlikely(d->backlog[wkr].count != 0)) {
                /* On return of a packet, we need to move the
@@ -211,7 +218,10 @@ handle_worker_shutdown(struct rte_distributor *d, unsigned 
wkr)
                        pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
                                        RTE_DISTRIB_FLAG_BITS));
                }
-               /* recursive call */
+               /* recursive call.
+                * Note that the tags were set before first level call
+                * to rte_distributor_process.
+                */
                rte_distributor_process(d, pkts, i);
                bl->count = bl->start = 0;
        }
@@ -242,6 +252,7 @@ process_returns(struct rte_distributor *d)
                        else {
                                d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF;
                                d->in_flight_tags[wkr] = 0;
+                               d->in_flight_bitmask &= ~(1UL << wkr);
                        }
                        oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
                } else if (data & RTE_DISTRIB_RETURN_BUF) {
@@ -284,17 +295,21 @@ rte_distributor_process(struct rte_distributor *d,
                        next_value = (((int64_t)(uintptr_t)next_mb)
                                        << RTE_DISTRIB_FLAG_BITS);
                        /*
-                        * Set the low bit on the tag, so we can guarantee that
-                        * we never store a tag value of zero. That means we can
-                        * use the zero-value to indicate that no packet is
-                        * being processed by a worker.
+                        * User is advocated to set tag vaue for each
+                        * mbuf before calling rte_distributor_process.
+                        * User defined tags are used to identify flows,
+                        * or sessions.
                         */
-                       new_tag = (next_mb->hash.rss | 1);
+                       new_tag = next_mb->hash.usr;

-                       uint32_t match = 0;
+                       /*
+                        * Note that if RTE_MAX_LCORE is larger than 64 then
+                        * the size of match has to be expanded.
+                        */
+                       uint64_t match = 0;
                        unsigned i;
                        /*
-                        * to scan for a match use "xor" and "not" to get a 0/1
+                        * To scan for a match use "xor" and "not" to get a 0/1
                         * value, then use shifting to merge to single "match"
                         * variable, where a one-bit indicates a match for the
                         * worker given by the bit-position
@@ -303,9 +318,11 @@ rte_distributor_process(struct rte_distributor *d,
                                match |= (!(d->in_flight_tags[i] ^ new_tag)
                                        << i);

+                       /* Only turned-on bits are considered as match */
+                       match &= d->in_flight_bitmask;
                        if (match) {
                                next_mb = NULL;
-                               unsigned worker = __builtin_ctz(match);
+                               unsigned worker = __builtin_ctzl(match);
                                if (add_to_backlog(&d->backlog[worker],
                                                next_value) < 0)
                                        next_idx--;
@@ -322,6 +339,7 @@ rte_distributor_process(struct rte_distributor *d,
                        else {
                                d->bufs[wkr].bufptr64 = next_value;
                                d->in_flight_tags[wkr] = new_tag;
+                               d->in_flight_bitmask |= (1UL << wkr);
                                next_mb = NULL;
                        }
                        oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
@@ -379,11 +397,12 @@ rte_distributor_returned_pkts(struct rte_distributor *d,
 static inline unsigned
 total_outstanding(const struct rte_distributor *d)
 {
-       unsigned wkr, total_outstanding = 0;
+       unsigned wkr, total_outstanding;

+       total_outstanding = __builtin_popcountl(d->in_flight_bitmask);
        for (wkr = 0; wkr < d->num_workers; wkr++)
-               total_outstanding += d->backlog[wkr].count +
-                               !!(d->in_flight_tags[wkr]);
+               total_outstanding += d->backlog[wkr].count;
+
        return total_outstanding;
 }

diff --git a/lib/librte_distributor/rte_distributor.h 
b/lib/librte_distributor/rte_distributor.h
index ec0d74a..a29c506 100644
--- a/lib/librte_distributor/rte_distributor.h
+++ b/lib/librte_distributor/rte_distributor.h
@@ -87,6 +87,9 @@ rte_distributor_create(const char *name, unsigned socket_id,
  * Process a set of packets by distributing them among workers that request
  * packets. The distributor will ensure that no two packets that have the
  * same flow id, or tag, in the mbuf will be procesed at the same time.
+ * The user is advocated to set tag for each mbuf. If user doesn't set the
+ * tag, the tag value can be various values depending on driver implementation
+ * and configuration.
  *
  * This is not multi-thread safe and should only be called on a single lcore.
  *
diff --git a/lib/librte_mbuf/rte_mbuf.h b/lib/librte_mbuf/rte_mbuf.h
index e8f9bfc..f5f8658 100644
--- a/lib/librte_mbuf/rte_mbuf.h
+++ b/lib/librte_mbuf/rte_mbuf.h
@@ -185,6 +185,7 @@ struct rte_mbuf {
                        uint16_t id;
                } fdir;           /**< Filter identifier if FDIR enabled */
                uint32_t sched;   /**< Hierarchical scheduler */
+               uint32_t usr;     /**< User defined tags. See 
@rte_distributor_process */
        } hash;                   /**< hash information */

        /* second cache line - fields only used in slow path or on TX */
-- 
1.7.1

Reply via email to