W dniu 23.09.2020 o 06:30, Honnappa Nagarahalli pisze: > <snip> > >> Statistics of handled packets are cleared and read on main lcore, while they >> are increased in workers handlers on different lcores. >> >> Without synchronization occasionally showed invalid values. > What exactly do you mean by invalid values? Can you elaborate?
I mean values that shouldn't be there which are obviously not related to number of packets handled by workers. I reverted the patch and run stress tests. Failures without this patch look like these: === Test flush fn with worker shutdown (burst) === Worker 0 handled 0 packets Worker 1 handled 0 packets Worker 2 handled 0 packets Worker 3 handled 0 packets Worker 4 handled 32 packets Worker 5 handled 0 packets Worker 6 handled 6 packets Line 519: Error, not all packets flushed. Expected 32, got 38 Test Failed or: === Sanity test of worker shutdown === Worker 0 handled 0 packets Worker 1 handled 0 packets Worker 2 handled 0 packets Worker 3 handled 0 packets Worker 4 handled 0 packets Worker 5 handled 64 packets Worker 6 handled 149792 packets Line 466: Error, not all packets flushed. Expected 64, got 149856 Test Failed The 6 or 149792 packets reported by worker 6 were never sent to or processed by the workers. >> This patch uses atomic acquire/release mechanisms to synchronize. >> >> Fixes: c3eabff124e6 ("distributor: add unit tests") >> Cc: bruce.richard...@intel.com >> Cc: sta...@dpdk.org >> >> Signed-off-by: Lukasz Wojciechowski <l.wojciec...@partner.samsung.com> >> --- >> app/test/test_distributor.c | 39 ++++++++++++++++++++++++------------- >> 1 file changed, 26 insertions(+), 13 deletions(-) >> >> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index >> 35b25463a..0e49e3714 100644 >> --- a/app/test/test_distributor.c >> +++ b/app/test/test_distributor.c >> @@ -43,7 +43,8 @@ total_packet_count(void) { >> unsigned i, count = 0; >> for (i = 0; i < worker_idx; i++) >> - count += worker_stats[i].handled_packets; >> + count += >> __atomic_load_n(&worker_stats[i].handled_packets, >> + __ATOMIC_ACQUIRE); >> return count; >> } >> >> @@ -52,6 +53,7 @@ static inline void >> clear_packet_count(void) >> { >> memset(&worker_stats, 0, sizeof(worker_stats)); >> + rte_atomic_thread_fence(__ATOMIC_RELEASE); >> } >> >> /* this is the basic worker function for sanity test @@ -72,13 +74,13 @@ >> handle_work(void *arg) >> num = rte_distributor_get_pkt(db, id, buf, buf, num); >> while (!quit) { >> __atomic_fetch_add(&worker_stats[id].handled_packets, >> num, >> - __ATOMIC_RELAXED); >> + __ATOMIC_ACQ_REL); >> count += num; >> num = rte_distributor_get_pkt(db, id, >> buf, buf, num); >> } >> __atomic_fetch_add(&worker_stats[id].handled_packets, num, >> - __ATOMIC_RELAXED); >> + __ATOMIC_ACQ_REL); >> count += num; >> rte_distributor_return_pkt(db, id, buf, num); >> return 0; >> @@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct >> rte_mempool *p) >> >> for (i = 0; i < rte_lcore_count() - 1; i++) >> printf("Worker %u handled %u packets\n", i, >> - worker_stats[i].handled_packets); >> + __atomic_load_n(&worker_stats[i].handled_packets, >> + __ATOMIC_ACQUIRE)); >> printf("Sanity test with all zero hashes done.\n"); >> >> /* pick two flows and check they go correctly */ @@ -159,7 +162,9 >> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) >> >> for (i = 0; i < rte_lcore_count() - 1; i++) >> printf("Worker %u handled %u packets\n", i, >> - worker_stats[i].handled_packets); >> + __atomic_load_n( >> + &worker_stats[i].handled_packets, >> + __ATOMIC_ACQUIRE)); >> printf("Sanity test with two hash values done\n"); >> } >> >> @@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct >> rte_mempool *p) >> >> for (i = 0; i < rte_lcore_count() - 1; i++) >> printf("Worker %u handled %u packets\n", i, >> - worker_stats[i].handled_packets); >> + __atomic_load_n(&worker_stats[i].handled_packets, >> + __ATOMIC_ACQUIRE)); >> printf("Sanity test with non-zero hashes done\n"); >> >> rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -280,15 >> +286,17 @@ handle_work_with_free_mbufs(void *arg) >> buf[i] = NULL; >> num = rte_distributor_get_pkt(d, id, buf, buf, num); >> while (!quit) { >> - worker_stats[id].handled_packets += num; >> count += num; >> + __atomic_fetch_add(&worker_stats[id].handled_packets, >> num, >> + __ATOMIC_ACQ_REL); >> for (i = 0; i < num; i++) >> rte_pktmbuf_free(buf[i]); >> num = rte_distributor_get_pkt(d, >> id, buf, buf, num); >> } >> - worker_stats[id].handled_packets += num; >> count += num; >> + __atomic_fetch_add(&worker_stats[id].handled_packets, num, >> + __ATOMIC_ACQ_REL); >> rte_distributor_return_pkt(d, id, buf, num); >> return 0; >> } >> @@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg) >> /* wait for quit single globally, or for worker zero, wait >> * for zero_quit */ >> while (!quit && !(id == zero_id && zero_quit)) { >> - worker_stats[id].handled_packets += num; >> count += num; >> + __atomic_fetch_add(&worker_stats[id].handled_packets, >> num, >> + __ATOMIC_ACQ_REL); >> for (i = 0; i < num; i++) >> rte_pktmbuf_free(buf[i]); >> num = rte_distributor_get_pkt(d, >> @@ -379,10 +388,11 @@ handle_work_for_shutdown_test(void *arg) >> >> total += num; >> } >> - worker_stats[id].handled_packets += num; >> count += num; >> returned = rte_distributor_return_pkt(d, id, buf, num); >> >> + __atomic_fetch_add(&worker_stats[id].handled_packets, num, >> + __ATOMIC_ACQ_REL); >> if (id == zero_id) { >> /* for worker zero, allow it to restart to pick up last packet >> * when all workers are shutting down. >> @@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg) >> id, buf, buf, num); >> >> while (!quit) { >> - worker_stats[id].handled_packets += num; >> count += num; >> rte_pktmbuf_free(pkt); >> num = rte_distributor_get_pkt(d, id, buf, buf, num); >> + >> __atomic_fetch_add(&worker_stats[id].handled_packets, >> + num, __ATOMIC_ACQ_REL); >> } >> returned = rte_distributor_return_pkt(d, >> id, buf, num); >> @@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct >> worker_params *wp, >> >> for (i = 0; i < rte_lcore_count() - 1; i++) >> printf("Worker %u handled %u packets\n", i, >> - worker_stats[i].handled_packets); >> + __atomic_load_n(&worker_stats[i].handled_packets, >> + __ATOMIC_ACQUIRE)); >> >> if (total_packet_count() != BURST * 2) { >> printf("Line %d: Error, not all packets flushed. " >> @@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct >> worker_params *wp, >> zero_quit = 0; >> for (i = 0; i < rte_lcore_count() - 1; i++) >> printf("Worker %u handled %u packets\n", i, >> - worker_stats[i].handled_packets); >> + __atomic_load_n(&worker_stats[i].handled_packets, >> + __ATOMIC_ACQUIRE)); >> >> if (total_packet_count() != BURST) { >> printf("Line %d: Error, not all packets flushed. " >> -- >> 2.17.1 -- Lukasz Wojciechowski Principal Software Engineer Samsung R&D Institute Poland Samsung Electronics Office +48 22 377 88 25 l.wojciec...@partner.samsung.com