<snip> > > Sanity tests with mbuf alloc and shutdown tests assume that mbufs passed > to worker cores are freed in handlers. > Such packets should not be returned to the distributor's main core. The only > packets that should be returned are the packets send after completion of > the tests in quit_workers function. > > This patch stops returning mbufs to distributor's core. > In case of shutdown tests it is impossible to determine how worker and > distributor threads would synchronize. > Packets used by tests should be freed and packets used during > quit_workers() shouldn't. That's why returning mbufs to mempool is moved > to test procedure run on distributor thread from worker threads. > > Additionally this patch cleans up unused variables. > > Fixes: c0de0eb82e40 ("distributor: switch over to new API") > Cc: david.h...@intel.com > Cc: sta...@dpdk.org > > Signed-off-by: Lukasz Wojciechowski <l.wojciec...@partner.samsung.com> > Acked-by: David Hunt <david.h...@intel.com> > --- > app/test/test_distributor.c | 96 ++++++++++++++++++------------------- > 1 file changed, 47 insertions(+), 49 deletions(-) > > diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index > 838459392..06e01ff9d 100644 > --- a/app/test/test_distributor.c > +++ b/app/test/test_distributor.c > @@ -44,7 +44,7 @@ total_packet_count(void) > unsigned i, count = 0; > for (i = 0; i < worker_idx; i++) > count += > __atomic_load_n(&worker_stats[i].handled_packets, > - __ATOMIC_ACQUIRE); > + __ATOMIC_RELAXED); I think it is better to make this and other statistics changes below in commit 6/16. It will be in line with the commit log as well.
> return count; > } > > @@ -55,7 +55,7 @@ clear_packet_count(void) > unsigned int i; > for (i = 0; i < RTE_MAX_LCORE; i++) > __atomic_store_n(&worker_stats[i].handled_packets, 0, > - __ATOMIC_RELEASE); > + __ATOMIC_RELAXED); > } > > /* this is the basic worker function for sanity test @@ -67,20 +67,18 @@ > handle_work(void *arg) > struct rte_mbuf *buf[8] __rte_cache_aligned; > struct worker_params *wp = arg; > struct rte_distributor *db = wp->dist; > - unsigned int count = 0, num; > + unsigned int num; > unsigned int id = __atomic_fetch_add(&worker_idx, 1, > __ATOMIC_RELAXED); > > num = rte_distributor_get_pkt(db, id, buf, NULL, 0); > while (!quit) { > __atomic_fetch_add(&worker_stats[id].handled_packets, > num, > - __ATOMIC_ACQ_REL); > - count += num; > + __ATOMIC_RELAXED); > num = rte_distributor_get_pkt(db, id, > buf, buf, num); > } > __atomic_fetch_add(&worker_stats[id].handled_packets, num, > - __ATOMIC_ACQ_REL); > - count += num; > + __ATOMIC_RELAXED); > rte_distributor_return_pkt(db, id, buf, num); > return 0; > } > @@ -136,7 +134,7 @@ sanity_test(struct worker_params *wp, struct > rte_mempool *p) > for (i = 0; i < rte_lcore_count() - 1; i++) > printf("Worker %u handled %u packets\n", i, > __atomic_load_n(&worker_stats[i].handled_packets, > - __ATOMIC_ACQUIRE)); > + __ATOMIC_RELAXED)); > printf("Sanity test with all zero hashes done.\n"); > > /* pick two flows and check they go correctly */ @@ -163,7 +161,7 > @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) > printf("Worker %u handled %u packets\n", i, > __atomic_load_n( > &worker_stats[i].handled_packets, > - __ATOMIC_ACQUIRE)); > + __ATOMIC_RELAXED)); > printf("Sanity test with two hash values done\n"); > } > > @@ -190,7 +188,7 @@ sanity_test(struct worker_params *wp, struct > rte_mempool *p) > for (i = 0; i < rte_lcore_count() - 1; i++) > printf("Worker %u handled %u packets\n", i, > __atomic_load_n(&worker_stats[i].handled_packets, > - __ATOMIC_ACQUIRE)); > + __ATOMIC_RELAXED)); > printf("Sanity test with non-zero hashes done\n"); > > rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -276,23 > +274,20 @@ handle_work_with_free_mbufs(void *arg) > struct rte_mbuf *buf[8] __rte_cache_aligned; > struct worker_params *wp = arg; > struct rte_distributor *d = wp->dist; > - unsigned int count = 0; > unsigned int i; > unsigned int num; > unsigned int id = __atomic_fetch_add(&worker_idx, 1, > __ATOMIC_RELAXED); > > num = rte_distributor_get_pkt(d, id, buf, NULL, 0); > while (!quit) { > - count += num; > __atomic_fetch_add(&worker_stats[id].handled_packets, > num, > - __ATOMIC_ACQ_REL); > + __ATOMIC_RELAXED); > for (i = 0; i < num; i++) > rte_pktmbuf_free(buf[i]); > num = rte_distributor_get_pkt(d, id, buf, NULL, 0); > } > - count += num; > __atomic_fetch_add(&worker_stats[id].handled_packets, num, > - __ATOMIC_ACQ_REL); > + __ATOMIC_RELAXED); > rte_distributor_return_pkt(d, id, buf, num); > return 0; > } > @@ -318,7 +313,6 @@ sanity_test_with_mbuf_alloc(struct worker_params > *wp, struct rte_mempool *p) > rte_distributor_process(d, NULL, 0); > for (j = 0; j < BURST; j++) { > bufs[j]->hash.usr = (i+j) << 1; > - rte_mbuf_refcnt_set(bufs[j], 1); > } > > rte_distributor_process(d, bufs, BURST); @@ -342,15 +336,10 > @@ sanity_test_with_mbuf_alloc(struct worker_params *wp, struct > rte_mempool *p) static int handle_work_for_shutdown_test(void *arg) { > - struct rte_mbuf *pkt = NULL; > struct rte_mbuf *buf[8] __rte_cache_aligned; > struct worker_params *wp = arg; > struct rte_distributor *d = wp->dist; > - unsigned int count = 0; > unsigned int num; > - unsigned int total = 0; > - unsigned int i; > - unsigned int returned = 0; > unsigned int zero_id = 0; > unsigned int zero_unset; > const unsigned int id = __atomic_fetch_add(&worker_idx, 1, @@ - > 368,11 +357,8 @@ handle_work_for_shutdown_test(void *arg) > /* wait for quit single globally, or for worker zero, wait > * for zero_quit */ > while (!quit && !(id == zero_id && zero_quit)) { > - count += num; > __atomic_fetch_add(&worker_stats[id].handled_packets, > num, > - __ATOMIC_ACQ_REL); > - for (i = 0; i < num; i++) > - rte_pktmbuf_free(buf[i]); > + __ATOMIC_RELAXED); > num = rte_distributor_get_pkt(d, id, buf, NULL, 0); > > if (num > 0) { > @@ -381,15 +367,12 @@ handle_work_for_shutdown_test(void *arg) > false, __ATOMIC_ACQ_REL, > __ATOMIC_ACQUIRE); > } > zero_id = __atomic_load_n(&zero_idx, > __ATOMIC_ACQUIRE); > - > - total += num; > } > - count += num; > - returned = rte_distributor_return_pkt(d, id, buf, num); > - > __atomic_fetch_add(&worker_stats[id].handled_packets, num, > - __ATOMIC_ACQ_REL); > + __ATOMIC_RELAXED); > if (id == zero_id) { > + rte_distributor_return_pkt(d, id, NULL, 0); > + > /* for worker zero, allow it to restart to pick up last packet > * when all workers are shutting down. > */ > @@ -400,15 +383,11 @@ handle_work_for_shutdown_test(void *arg) > > while (!quit) { > > __atomic_fetch_add(&worker_stats[id].handled_packets, > - num, __ATOMIC_ACQ_REL); > - count += num; > - rte_pktmbuf_free(pkt); > + num, __ATOMIC_RELAXED); > num = rte_distributor_get_pkt(d, id, buf, NULL, 0); > } > - returned = rte_distributor_return_pkt(d, > - id, buf, num); > - printf("Num returned = %d\n", returned); > } > + rte_distributor_return_pkt(d, id, buf, num); > return 0; > } > > @@ -424,7 +403,9 @@ sanity_test_with_worker_shutdown(struct > worker_params *wp, { > struct rte_distributor *d = wp->dist; > struct rte_mbuf *bufs[BURST]; > - unsigned i; > + struct rte_mbuf *bufs2[BURST]; > + unsigned int i; > + unsigned int failed = 0; > > printf("=== Sanity test of worker shutdown ===\n"); > > @@ -450,16 +431,17 @@ sanity_test_with_worker_shutdown(struct > worker_params *wp, > */ > > /* get more buffers to queue up, again setting them to the same > flow */ > - if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) { > + if (rte_mempool_get_bulk(p, (void *)bufs2, BURST) != 0) { > printf("line %d: Error getting mbufs from pool\n", __LINE__); > + rte_mempool_put_bulk(p, (void *)bufs, BURST); > return -1; > } > for (i = 0; i < BURST; i++) > - bufs[i]->hash.usr = 1; > + bufs2[i]->hash.usr = 1; > > /* get worker zero to quit */ > zero_quit = 1; > - rte_distributor_process(d, bufs, BURST); > + rte_distributor_process(d, bufs2, BURST); > > /* flush the distributor */ > rte_distributor_flush(d); > @@ -468,15 +450,21 @@ sanity_test_with_worker_shutdown(struct > worker_params *wp, > for (i = 0; i < rte_lcore_count() - 1; i++) > printf("Worker %u handled %u packets\n", i, > __atomic_load_n(&worker_stats[i].handled_packets, > - __ATOMIC_ACQUIRE)); > + __ATOMIC_RELAXED)); > > if (total_packet_count() != BURST * 2) { > printf("Line %d: Error, not all packets flushed. " > "Expected %u, got %u\n", > __LINE__, BURST * 2, total_packet_count()); > - return -1; > + failed = 1; > } > > + rte_mempool_put_bulk(p, (void *)bufs, BURST); > + rte_mempool_put_bulk(p, (void *)bufs2, BURST); > + > + if (failed) > + return -1; > + > printf("Sanity test with worker shutdown passed\n\n"); > return 0; > } > @@ -490,7 +478,8 @@ test_flush_with_worker_shutdown(struct > worker_params *wp, { > struct rte_distributor *d = wp->dist; > struct rte_mbuf *bufs[BURST]; > - unsigned i; > + unsigned int i; > + unsigned int failed = 0; > > printf("=== Test flush fn with worker shutdown (%s) ===\n", wp- > >name); > > @@ -522,15 +511,20 @@ test_flush_with_worker_shutdown(struct > worker_params *wp, > for (i = 0; i < rte_lcore_count() - 1; i++) > printf("Worker %u handled %u packets\n", i, > __atomic_load_n(&worker_stats[i].handled_packets, > - __ATOMIC_ACQUIRE)); > + __ATOMIC_RELAXED)); > > if (total_packet_count() != BURST) { > printf("Line %d: Error, not all packets flushed. " > "Expected %u, got %u\n", > __LINE__, BURST, total_packet_count()); > - return -1; > + failed = 1; > } > > + rte_mempool_put_bulk(p, (void *)bufs, BURST); > + > + if (failed) > + return -1; > + > printf("Flush test with worker shutdown passed\n\n"); > return 0; > } > @@ -596,7 +590,10 @@ quit_workers(struct worker_params *wp, struct > rte_mempool *p) > const unsigned num_workers = rte_lcore_count() - 1; > unsigned i; > struct rte_mbuf *bufs[RTE_MAX_LCORE]; > - rte_mempool_get_bulk(p, (void *)bufs, num_workers); > + if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) { > + printf("line %d: Error getting mbufs from pool\n", __LINE__); > + return; > + } > > zero_quit = 0; > quit = 1; > @@ -604,11 +601,12 @@ quit_workers(struct worker_params *wp, struct > rte_mempool *p) > bufs[i]->hash.usr = i << 1; > rte_distributor_process(d, bufs, num_workers); > > - rte_mempool_put_bulk(p, (void *)bufs, num_workers); > - > rte_distributor_process(d, NULL, 0); > rte_distributor_flush(d); > rte_eal_mp_wait_lcore(); > + > + rte_mempool_put_bulk(p, (void *)bufs, num_workers); > + > quit = 0; > worker_idx = 0; > zero_idx = RTE_MAX_LCORE; > -- > 2.17.1