On Thu, Dec 22, 2016 at 04:37:04AM +0000, David Hunt wrote: > Now sends bursts of up to 8 mbufs to each worker, and tracks > the in-flight flow-ids (atomic scheduling) > > New file with a new api, similar to the old API except with _burst > at the end of the function names > > Signed-off-by: David Hunt <david.h...@intel.com> > + > +int > +rte_distributor_get_pkt_burst(struct rte_distributor_burst *d, > + unsigned int worker_id, struct rte_mbuf **pkts, > + struct rte_mbuf **oldpkt, unsigned int return_count) > +{ > + unsigned int count; > + uint64_t retries = 0; > + > + rte_distributor_request_pkt_burst(d, worker_id, oldpkt, return_count); > + > + count = rte_distributor_poll_pkt_burst(d, worker_id, pkts); > + while (count == 0) { > + rte_pause(); > + retries++; > + if (retries > 1000) { > + retries = 0;
This retries write may not have any significance as it just before the return > + return 0; > + } > + uint64_t t = __rdtsc()+100; Use rte_ version of __rdtsc. > + > + while (__rdtsc() < t) > + rte_pause(); > + > + count = rte_distributor_poll_pkt_burst(d, worker_id, pkts); > + } > + return count; > +} > + > +int > +rte_distributor_return_pkt_burst(struct rte_distributor_burst *d, > + unsigned int worker_id, struct rte_mbuf **oldpkt, int num) > +{ > + struct rte_distributor_buffer_burst *buf = &d->bufs[worker_id]; > + unsigned int i; > + > + for (i = 0; i < RTE_DIST_BURST_SIZE; i++) > + /* Switch off the return bit first */ > + buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF; > + > + for (i = num; i-- > 0; ) > + buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) << > + RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF; > + > + /* set the GET_BUF but even if we got no returns */ > + buf->retptr64[0] |= RTE_DISTRIB_GET_BUF; > + > + return 0; > +} > + > +#if RTE_MACHINE_CPUFLAG_SSE2 > +static inline void Move SSE version of the code to separate file so that later other SIMD arch specific version like NEON can be incorporated. > +find_match_sse2(struct rte_distributor_burst *d, > + uint16_t *data_ptr, > + uint16_t *output_ptr) > +{ > + /* Setup */ > + __m128i incoming_fids; > + __m128i inflight_fids; > + __m128i preflight_fids; > + __m128i wkr; > + __m128i mask1; > + __m128i mask2; > + __m128i output; > + struct rte_distributor_backlog *bl; > + > + /* > + * Function overview: > + * 2. Loop through all worker ID's > + * 2a. Load the current inflights for that worker into an xmm reg > + * 2b. Load the current backlog for that worker into an xmm reg > + * 2c. use cmpestrm to intersect flow_ids with backlog and inflights > + * 2d. Add any matches to the output > + * 3. Write the output xmm (matching worker ids). > + */ > + > + > + output = _mm_set1_epi16(0); > + incoming_fids = _mm_load_si128((__m128i *)data_ptr); > + > + for (uint16_t i = 0; i < d->num_workers; i++) { > + bl = &d->backlog[i]; > + > + inflight_fids = > + _mm_load_si128((__m128i *)&(d->in_flight_tags[i])); > + preflight_fids = > + _mm_load_si128((__m128i *)(bl->tags)); > + > + /* > + * Any incoming_fid that exists anywhere in inflight_fids will > + * have 0xffff in same position of the mask as the incoming fid > + * Example (shortened to bytes for brevity): > + * incoming_fids 0x01 0x02 0x03 0x04 0x05 0x06 0x07 0x08 > + * inflight_fids 0x03 0x05 0x07 0x00 0x00 0x00 0x00 0x00 > + * mask 0x00 0x00 0xff 0x00 0xff 0x00 0xff 0x00 > + */ > + > + mask1 = _mm_cmpestrm(inflight_fids, 8, incoming_fids, 8, > + _SIDD_UWORD_OPS | > + _SIDD_CMP_EQUAL_ANY | > + _SIDD_UNIT_MASK); > + mask2 = _mm_cmpestrm(preflight_fids, 8, incoming_fids, 8, > + _SIDD_UWORD_OPS | > + _SIDD_CMP_EQUAL_ANY | > + _SIDD_UNIT_MASK); > + > + mask1 = _mm_or_si128(mask1, mask2); > + /* > + * Now mask contains 0xffff where there's a match. > + * Next we need to store the worker_id in the relevant position > + * in the output. > + */ > + > + wkr = _mm_set1_epi16(i+1); > + mask1 = _mm_and_si128(mask1, wkr); > + output = _mm_or_si128(mask1, output); > + } > + > +/* process a set of packets to distribute them to workers */ > +int > +rte_distributor_process_burst(struct rte_distributor_burst *d, > + struct rte_mbuf **mbufs, unsigned int num_mbufs) > +{ > + unsigned int next_idx = 0; > + static unsigned int wkr; > + struct rte_mbuf *next_mb = NULL; > + int64_t next_value = 0; > + uint16_t new_tag = 0; > + uint16_t flows[8] __rte_cache_aligned; The const 8 has been used down in the function also. Please replace with macro > + //static int iter=0; Please remove the test-code with // across the patch. > + > + if (unlikely(num_mbufs == 0)) { > + /* Flush out all non-full cache-lines to workers. */ > + for (unsigned int wid = 0 ; wid < d->num_workers; wid++) { > + if ((d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF)) { > + release(d, wid); > + handle_returns(d, wid); > + } > + } > + return 0; > + } > + > + while (next_idx < num_mbufs) { > + uint16_t matches[8]; > + int pkts; > + > + if (d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF) > + d->bufs[wkr].count = 0; > + > + for (unsigned int i = 0; i < RTE_DIST_BURST_SIZE; i++) { > + if (mbufs[next_idx + i]) { > + /* flows have to be non-zero */ > + flows[i] = mbufs[next_idx + i]->hash.usr | 1; > + } else > + flows[i] = 0; > + } > + > + switch (d->dist_match_fn) { > +#ifdef RTE_MACHINE_CPUFLAG_SSE2 Is this conditional compilation flag is really required ? i.e RTE_DIST_MATCH_SSE will not enabled in non SSE case > + case RTE_DIST_MATCH_SSE: > + find_match_sse2(d, &flows[0], &matches[0]); > + break; > +#endif > + default: > + find_match_scalar(d, &flows[0], &matches[0]); > + } > + > + /* > + * Matches array now contain the intended worker ID (+1) of > + * the incoming packets. Any zeroes need to be assigned > + * workers. > + */ > + > + if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE) > + pkts = num_mbufs - next_idx; > + else > + pkts = RTE_DIST_BURST_SIZE; > + > + for (int j = 0; j < pkts; j++) { > + > + next_mb = mbufs[next_idx++]; > + next_value = (((int64_t)(uintptr_t)next_mb) << > + RTE_DISTRIB_FLAG_BITS); > + /* > + * User is advocated to set tag vaue for each > + * mbuf before calling rte_distributor_process. > + * User defined tags are used to identify flows, > + * or sessions. > + */ > + /* flows MUST be non-zero */ > + new_tag = (uint16_t)(next_mb->hash.usr) | 1; > + > + /* > + * Using the next line will cause the find_match > + * function to be optimised out, making this function > + * do parallel (non-atomic) distribution > + */ > + //matches[j] = 0; test code with //