Hi Honnappa, W dniu 16.10.2020 o 07:12, Honnappa Nagarahalli pisze: > <snip> > >> Sanity tests with mbuf alloc and shutdown tests assume that mbufs passed >> to worker cores are freed in handlers. >> Such packets should not be returned to the distributor's main core. The only >> packets that should be returned are the packets send after completion of >> the tests in quit_workers function. >> >> This patch stops returning mbufs to distributor's core. >> In case of shutdown tests it is impossible to determine how worker and >> distributor threads would synchronize. >> Packets used by tests should be freed and packets used during >> quit_workers() shouldn't. That's why returning mbufs to mempool is moved >> to test procedure run on distributor thread from worker threads. >> >> Additionally this patch cleans up unused variables. >> >> Fixes: c0de0eb82e40 ("distributor: switch over to new API") >> Cc: david.h...@intel.com >> Cc: sta...@dpdk.org >> >> Signed-off-by: Lukasz Wojciechowski <l.wojciec...@partner.samsung.com> >> Acked-by: David Hunt <david.h...@intel.com> >> --- >> app/test/test_distributor.c | 96 ++++++++++++++++++------------------- >> 1 file changed, 47 insertions(+), 49 deletions(-) >> >> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index >> 838459392..06e01ff9d 100644 >> --- a/app/test/test_distributor.c >> +++ b/app/test/test_distributor.c >> @@ -44,7 +44,7 @@ total_packet_count(void) >> unsigned i, count = 0; >> for (i = 0; i < worker_idx; i++) >> count += >> __atomic_load_n(&worker_stats[i].handled_packets, >> - __ATOMIC_ACQUIRE); >> + __ATOMIC_RELAXED); > I think it is better to make this and other statistics changes below in > commit 6/16. It will be in line with the commit log as well. I changed the order of patches to avoid duplicated changes in the code. > >> return count; >> } >> >> @@ -55,7 +55,7 @@ clear_packet_count(void) >> unsigned int i; >> for (i = 0; i < RTE_MAX_LCORE; i++) >> __atomic_store_n(&worker_stats[i].handled_packets, 0, >> - __ATOMIC_RELEASE); >> + __ATOMIC_RELAXED); >> } >> >> /* this is the basic worker function for sanity test @@ -67,20 +67,18 @@ >> handle_work(void *arg) >> struct rte_mbuf *buf[8] __rte_cache_aligned; >> struct worker_params *wp = arg; >> struct rte_distributor *db = wp->dist; >> - unsigned int count = 0, num; >> + unsigned int num; >> unsigned int id = __atomic_fetch_add(&worker_idx, 1, >> __ATOMIC_RELAXED); >> >> num = rte_distributor_get_pkt(db, id, buf, NULL, 0); >> while (!quit) { >> __atomic_fetch_add(&worker_stats[id].handled_packets, >> num, >> - __ATOMIC_ACQ_REL); >> - count += num; >> + __ATOMIC_RELAXED); >> num = rte_distributor_get_pkt(db, id, >> buf, buf, num); >> } >> __atomic_fetch_add(&worker_stats[id].handled_packets, num, >> - __ATOMIC_ACQ_REL); >> - count += num; >> + __ATOMIC_RELAXED); >> rte_distributor_return_pkt(db, id, buf, num); >> return 0; >> } >> @@ -136,7 +134,7 @@ sanity_test(struct worker_params *wp, struct >> rte_mempool *p) >> for (i = 0; i < rte_lcore_count() - 1; i++) >> printf("Worker %u handled %u packets\n", i, >> __atomic_load_n(&worker_stats[i].handled_packets, >> - __ATOMIC_ACQUIRE)); >> + __ATOMIC_RELAXED)); >> printf("Sanity test with all zero hashes done.\n"); >> >> /* pick two flows and check they go correctly */ @@ -163,7 +161,7 >> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) >> printf("Worker %u handled %u packets\n", i, >> __atomic_load_n( >> &worker_stats[i].handled_packets, >> - __ATOMIC_ACQUIRE)); >> + __ATOMIC_RELAXED)); >> printf("Sanity test with two hash values done\n"); >> } >> >> @@ -190,7 +188,7 @@ sanity_test(struct worker_params *wp, struct >> rte_mempool *p) >> for (i = 0; i < rte_lcore_count() - 1; i++) >> printf("Worker %u handled %u packets\n", i, >> __atomic_load_n(&worker_stats[i].handled_packets, >> - __ATOMIC_ACQUIRE)); >> + __ATOMIC_RELAXED)); >> printf("Sanity test with non-zero hashes done\n"); >> >> rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -276,23 >> +274,20 @@ handle_work_with_free_mbufs(void *arg) >> struct rte_mbuf *buf[8] __rte_cache_aligned; >> struct worker_params *wp = arg; >> struct rte_distributor *d = wp->dist; >> - unsigned int count = 0; >> unsigned int i; >> unsigned int num; >> unsigned int id = __atomic_fetch_add(&worker_idx, 1, >> __ATOMIC_RELAXED); >> >> num = rte_distributor_get_pkt(d, id, buf, NULL, 0); >> while (!quit) { >> - count += num; >> __atomic_fetch_add(&worker_stats[id].handled_packets, >> num, >> - __ATOMIC_ACQ_REL); >> + __ATOMIC_RELAXED); >> for (i = 0; i < num; i++) >> rte_pktmbuf_free(buf[i]); >> num = rte_distributor_get_pkt(d, id, buf, NULL, 0); >> } >> - count += num; >> __atomic_fetch_add(&worker_stats[id].handled_packets, num, >> - __ATOMIC_ACQ_REL); >> + __ATOMIC_RELAXED); >> rte_distributor_return_pkt(d, id, buf, num); >> return 0; >> } >> @@ -318,7 +313,6 @@ sanity_test_with_mbuf_alloc(struct worker_params >> *wp, struct rte_mempool *p) >> rte_distributor_process(d, NULL, 0); >> for (j = 0; j < BURST; j++) { >> bufs[j]->hash.usr = (i+j) << 1; >> - rte_mbuf_refcnt_set(bufs[j], 1); >> } >> >> rte_distributor_process(d, bufs, BURST); @@ -342,15 +336,10 >> @@ sanity_test_with_mbuf_alloc(struct worker_params *wp, struct >> rte_mempool *p) static int handle_work_for_shutdown_test(void *arg) { >> - struct rte_mbuf *pkt = NULL; >> struct rte_mbuf *buf[8] __rte_cache_aligned; >> struct worker_params *wp = arg; >> struct rte_distributor *d = wp->dist; >> - unsigned int count = 0; >> unsigned int num; >> - unsigned int total = 0; >> - unsigned int i; >> - unsigned int returned = 0; >> unsigned int zero_id = 0; >> unsigned int zero_unset; >> const unsigned int id = __atomic_fetch_add(&worker_idx, 1, @@ - >> 368,11 +357,8 @@ handle_work_for_shutdown_test(void *arg) >> /* wait for quit single globally, or for worker zero, wait >> * for zero_quit */ >> while (!quit && !(id == zero_id && zero_quit)) { >> - count += num; >> __atomic_fetch_add(&worker_stats[id].handled_packets, >> num, >> - __ATOMIC_ACQ_REL); >> - for (i = 0; i < num; i++) >> - rte_pktmbuf_free(buf[i]); >> + __ATOMIC_RELAXED); >> num = rte_distributor_get_pkt(d, id, buf, NULL, 0); >> >> if (num > 0) { >> @@ -381,15 +367,12 @@ handle_work_for_shutdown_test(void *arg) >> false, __ATOMIC_ACQ_REL, >> __ATOMIC_ACQUIRE); >> } >> zero_id = __atomic_load_n(&zero_idx, >> __ATOMIC_ACQUIRE); >> - >> - total += num; >> } >> - count += num; >> - returned = rte_distributor_return_pkt(d, id, buf, num); >> - >> __atomic_fetch_add(&worker_stats[id].handled_packets, num, >> - __ATOMIC_ACQ_REL); >> + __ATOMIC_RELAXED); >> if (id == zero_id) { >> + rte_distributor_return_pkt(d, id, NULL, 0); >> + >> /* for worker zero, allow it to restart to pick up last packet >> * when all workers are shutting down. >> */ >> @@ -400,15 +383,11 @@ handle_work_for_shutdown_test(void *arg) >> >> while (!quit) { >> >> __atomic_fetch_add(&worker_stats[id].handled_packets, >> - num, __ATOMIC_ACQ_REL); >> - count += num; >> - rte_pktmbuf_free(pkt); >> + num, __ATOMIC_RELAXED); >> num = rte_distributor_get_pkt(d, id, buf, NULL, 0); >> } >> - returned = rte_distributor_return_pkt(d, >> - id, buf, num); >> - printf("Num returned = %d\n", returned); >> } >> + rte_distributor_return_pkt(d, id, buf, num); >> return 0; >> } >> >> @@ -424,7 +403,9 @@ sanity_test_with_worker_shutdown(struct >> worker_params *wp, { >> struct rte_distributor *d = wp->dist; >> struct rte_mbuf *bufs[BURST]; >> - unsigned i; >> + struct rte_mbuf *bufs2[BURST]; >> + unsigned int i; >> + unsigned int failed = 0; >> >> printf("=== Sanity test of worker shutdown ===\n"); >> >> @@ -450,16 +431,17 @@ sanity_test_with_worker_shutdown(struct >> worker_params *wp, >> */ >> >> /* get more buffers to queue up, again setting them to the same >> flow */ >> - if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) { >> + if (rte_mempool_get_bulk(p, (void *)bufs2, BURST) != 0) { >> printf("line %d: Error getting mbufs from pool\n", __LINE__); >> + rte_mempool_put_bulk(p, (void *)bufs, BURST); >> return -1; >> } >> for (i = 0; i < BURST; i++) >> - bufs[i]->hash.usr = 1; >> + bufs2[i]->hash.usr = 1; >> >> /* get worker zero to quit */ >> zero_quit = 1; >> - rte_distributor_process(d, bufs, BURST); >> + rte_distributor_process(d, bufs2, BURST); >> >> /* flush the distributor */ >> rte_distributor_flush(d); >> @@ -468,15 +450,21 @@ sanity_test_with_worker_shutdown(struct >> worker_params *wp, >> for (i = 0; i < rte_lcore_count() - 1; i++) >> printf("Worker %u handled %u packets\n", i, >> __atomic_load_n(&worker_stats[i].handled_packets, >> - __ATOMIC_ACQUIRE)); >> + __ATOMIC_RELAXED)); >> >> if (total_packet_count() != BURST * 2) { >> printf("Line %d: Error, not all packets flushed. " >> "Expected %u, got %u\n", >> __LINE__, BURST * 2, total_packet_count()); >> - return -1; >> + failed = 1; >> } >> >> + rte_mempool_put_bulk(p, (void *)bufs, BURST); >> + rte_mempool_put_bulk(p, (void *)bufs2, BURST); >> + >> + if (failed) >> + return -1; >> + >> printf("Sanity test with worker shutdown passed\n\n"); >> return 0; >> } >> @@ -490,7 +478,8 @@ test_flush_with_worker_shutdown(struct >> worker_params *wp, { >> struct rte_distributor *d = wp->dist; >> struct rte_mbuf *bufs[BURST]; >> - unsigned i; >> + unsigned int i; >> + unsigned int failed = 0; >> >> printf("=== Test flush fn with worker shutdown (%s) ===\n", wp- >>> name); >> @@ -522,15 +511,20 @@ test_flush_with_worker_shutdown(struct >> worker_params *wp, >> for (i = 0; i < rte_lcore_count() - 1; i++) >> printf("Worker %u handled %u packets\n", i, >> __atomic_load_n(&worker_stats[i].handled_packets, >> - __ATOMIC_ACQUIRE)); >> + __ATOMIC_RELAXED)); >> >> if (total_packet_count() != BURST) { >> printf("Line %d: Error, not all packets flushed. " >> "Expected %u, got %u\n", >> __LINE__, BURST, total_packet_count()); >> - return -1; >> + failed = 1; >> } >> >> + rte_mempool_put_bulk(p, (void *)bufs, BURST); >> + >> + if (failed) >> + return -1; >> + >> printf("Flush test with worker shutdown passed\n\n"); >> return 0; >> } >> @@ -596,7 +590,10 @@ quit_workers(struct worker_params *wp, struct >> rte_mempool *p) >> const unsigned num_workers = rte_lcore_count() - 1; >> unsigned i; >> struct rte_mbuf *bufs[RTE_MAX_LCORE]; >> - rte_mempool_get_bulk(p, (void *)bufs, num_workers); >> + if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) { >> + printf("line %d: Error getting mbufs from pool\n", __LINE__); >> + return; >> + } >> >> zero_quit = 0; >> quit = 1; >> @@ -604,11 +601,12 @@ quit_workers(struct worker_params *wp, struct >> rte_mempool *p) >> bufs[i]->hash.usr = i << 1; >> rte_distributor_process(d, bufs, num_workers); >> >> - rte_mempool_put_bulk(p, (void *)bufs, num_workers); >> - >> rte_distributor_process(d, NULL, 0); >> rte_distributor_flush(d); >> rte_eal_mp_wait_lcore(); >> + >> + rte_mempool_put_bulk(p, (void *)bufs, num_workers); >> + >> quit = 0; >> worker_idx = 0; >> zero_idx = RTE_MAX_LCORE; >> -- >> 2.17.1
-- Lukasz Wojciechowski Principal Software Engineer Samsung R&D Institute Poland Samsung Electronics Office +48 22 377 88 25 l.wojciec...@partner.samsung.com