On Thu, Dec 22, 2016 at 04:37:04AM +0000, David Hunt wrote:
> Now sends bursts of up to 8 mbufs to each worker, and tracks
> the in-flight flow-ids (atomic scheduling)
> 
> New file with a new api, similar to the old API except with _burst
> at the end of the function names
> 
> Signed-off-by: David Hunt <david.h...@intel.com>
> +
> +int
> +rte_distributor_get_pkt_burst(struct rte_distributor_burst *d,
> +             unsigned int worker_id, struct rte_mbuf **pkts,
> +             struct rte_mbuf **oldpkt, unsigned int return_count)
> +{
> +     unsigned int count;
> +     uint64_t retries = 0;
> +
> +     rte_distributor_request_pkt_burst(d, worker_id, oldpkt, return_count);
> +
> +     count = rte_distributor_poll_pkt_burst(d, worker_id, pkts);
> +     while (count == 0) {
> +             rte_pause();
> +             retries++;
> +             if (retries > 1000) {
> +                     retries = 0;

This retries write may not have any significance as it just before the
return

> +                     return 0;
> +             }
> +             uint64_t t = __rdtsc()+100;

Use rte_ version of __rdtsc.

> +
> +             while (__rdtsc() < t)
> +                     rte_pause();
> +
> +             count = rte_distributor_poll_pkt_burst(d, worker_id, pkts);
> +     }
> +     return count;
> +}
> +
> +int
> +rte_distributor_return_pkt_burst(struct rte_distributor_burst *d,
> +             unsigned int worker_id, struct rte_mbuf **oldpkt, int num)
> +{
> +     struct rte_distributor_buffer_burst *buf = &d->bufs[worker_id];
> +     unsigned int i;
> +
> +     for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
> +             /* Switch off the return bit first */
> +             buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
> +
> +     for (i = num; i-- > 0; )
> +             buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) <<
> +                     RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
> +
> +     /* set the GET_BUF but even if we got no returns */
> +     buf->retptr64[0] |= RTE_DISTRIB_GET_BUF;
> +
> +     return 0;
> +}
> +
> +#if RTE_MACHINE_CPUFLAG_SSE2
> +static inline void

Move SSE version of the code to separate file so that later other SIMD arch
specific version like NEON can be incorporated.

> +find_match_sse2(struct rte_distributor_burst *d,
> +                     uint16_t *data_ptr,
> +                     uint16_t *output_ptr)
> +{
> +     /* Setup */
> +     __m128i incoming_fids;
> +     __m128i inflight_fids;
> +     __m128i preflight_fids;
> +     __m128i wkr;
> +     __m128i mask1;
> +     __m128i mask2;
> +     __m128i output;
> +     struct rte_distributor_backlog *bl;
> +
> +     /*
> +      * Function overview:
> +      * 2. Loop through all worker ID's
> +      *  2a. Load the current inflights for that worker into an xmm reg
> +      *  2b. Load the current backlog for that worker into an xmm reg
> +      *  2c. use cmpestrm to intersect flow_ids with backlog and inflights
> +      *  2d. Add any matches to the output
> +      * 3. Write the output xmm (matching worker ids).
> +      */
> +
> +
> +     output = _mm_set1_epi16(0);
> +     incoming_fids = _mm_load_si128((__m128i *)data_ptr);
> +
> +     for (uint16_t i = 0; i < d->num_workers; i++) {
> +             bl = &d->backlog[i];
> +
> +             inflight_fids =
> +                     _mm_load_si128((__m128i *)&(d->in_flight_tags[i]));
> +             preflight_fids =
> +                     _mm_load_si128((__m128i *)(bl->tags));
> +
> +             /*
> +              * Any incoming_fid that exists anywhere in inflight_fids will
> +              * have 0xffff in same position of the mask as the incoming fid
> +              * Example (shortened to bytes for brevity):
> +              * incoming_fids   0x01 0x02 0x03 0x04 0x05 0x06 0x07 0x08
> +              * inflight_fids   0x03 0x05 0x07 0x00 0x00 0x00 0x00 0x00
> +              * mask            0x00 0x00 0xff 0x00 0xff 0x00 0xff 0x00
> +              */
> +
> +             mask1 = _mm_cmpestrm(inflight_fids, 8, incoming_fids, 8,
> +                     _SIDD_UWORD_OPS |
> +                     _SIDD_CMP_EQUAL_ANY |
> +                     _SIDD_UNIT_MASK);
> +             mask2 = _mm_cmpestrm(preflight_fids, 8, incoming_fids, 8,
> +                     _SIDD_UWORD_OPS |
> +                     _SIDD_CMP_EQUAL_ANY |
> +                     _SIDD_UNIT_MASK);
> +
> +             mask1 = _mm_or_si128(mask1, mask2);
> +             /*
> +              * Now mask contains 0xffff where there's a match.
> +              * Next we need to store the worker_id in the relevant position
> +              * in the output.
> +              */
> +
> +             wkr = _mm_set1_epi16(i+1);
> +             mask1 = _mm_and_si128(mask1, wkr);
> +             output = _mm_or_si128(mask1, output);
> +     }
> +
> +/* process a set of packets to distribute them to workers */
> +int
> +rte_distributor_process_burst(struct rte_distributor_burst *d,
> +             struct rte_mbuf **mbufs, unsigned int num_mbufs)
> +{
> +     unsigned int next_idx = 0;
> +     static unsigned int wkr;
> +     struct rte_mbuf *next_mb = NULL;
> +     int64_t next_value = 0;
> +     uint16_t new_tag = 0;
> +     uint16_t flows[8] __rte_cache_aligned;

The const 8 has been used down in the function also. Please replace with macro

> +     //static int iter=0;

Please remove the test-code with // across the patch.

> +
> +     if (unlikely(num_mbufs == 0)) {
> +             /* Flush out all non-full cache-lines to workers. */
> +             for (unsigned int wid = 0 ; wid < d->num_workers; wid++) {
> +                     if ((d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF)) {
> +                             release(d, wid);
> +                             handle_returns(d, wid);
> +                     }
> +             }
> +             return 0;
> +     }
> +
> +     while (next_idx < num_mbufs) {
> +             uint16_t matches[8];
> +             int pkts;
> +
> +             if (d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF)
> +                     d->bufs[wkr].count = 0;
> +
> +             for (unsigned int i = 0; i < RTE_DIST_BURST_SIZE; i++) {
> +                     if (mbufs[next_idx + i]) {
> +                             /* flows have to be non-zero */
> +                             flows[i] = mbufs[next_idx + i]->hash.usr | 1;
> +                     } else
> +                             flows[i] = 0;
> +             }
> +
> +             switch (d->dist_match_fn) {
> +#ifdef RTE_MACHINE_CPUFLAG_SSE2

Is this conditional compilation flag is really required ? i.e
RTE_DIST_MATCH_SSE will not enabled in non SSE case

> +             case RTE_DIST_MATCH_SSE:
> +                     find_match_sse2(d, &flows[0], &matches[0]);
> +                     break;
> +#endif
> +             default:
> +                     find_match_scalar(d, &flows[0], &matches[0]);
> +             }
> +
> +             /*
> +              * Matches array now contain the intended worker ID (+1) of
> +              * the incoming packets. Any zeroes need to be assigned
> +              * workers.
> +              */
> +
> +             if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE)
> +                     pkts = num_mbufs - next_idx;
> +             else
> +                     pkts = RTE_DIST_BURST_SIZE;
> +
> +             for (int j = 0; j < pkts; j++) {
> +
> +                     next_mb = mbufs[next_idx++];
> +                     next_value = (((int64_t)(uintptr_t)next_mb) <<
> +                                     RTE_DISTRIB_FLAG_BITS);
> +                     /*
> +                      * User is advocated to set tag vaue for each
> +                      * mbuf before calling rte_distributor_process.
> +                      * User defined tags are used to identify flows,
> +                      * or sessions.
> +                      */
> +                     /* flows MUST be non-zero */
> +                     new_tag = (uint16_t)(next_mb->hash.usr) | 1;
> +
> +                     /*
> +                      * Using the next line will cause the find_match
> +                      * function to be optimised out, making this function
> +                      * do parallel (non-atomic) distribution
> +                      */
> +                     //matches[j] = 0;

test code with //

Reply via email to