Hi Honnappa, Many thanks for the review!
I'll write my answers here not inline as it would be easier to read them in one place, I think. So first of all I agree with you in 2 things: 1) all uses of statistics must be atomic and lack of that caused most of the problems 2) it would be better to replace barrier and memset in clear_packet_count() with atomic stores as you suggested So I will apply both of above. However I wasn't not fully convinced on changing acquire/release to relaxed. It wood be perfectly ok if it would look like in this Herb Sutter's example: https://youtu.be/KeLBd2EJLOU?t=4170 But in his case the counters are cleared before worker threads start and are printout after they are completed. In case of the dpdk distributor tests both worker and main cores are running at the same time. In the sanity_test, the statistics are cleared and verified few times for different hashes of packages. The worker cores are not stopped at this time and they continue their loops in handle procedure. Verification made in main core is an exchange of data as the current statistics indicate how the test will result. So as I wasn't convinced, I run some tests with both both relaxed and acquire/release modes and they both fail :( The failures caused by statistics errors to number of tests ratio for 200000 tests was: for relaxed: 0,000790562 for acq/rel: 0,000091321 That's why I'm going to modify tests in such way, that they would: 1) clear statistics 2) launch worker threads 3) run test 4) wait for workers procedures to complete 5) check stats, verify results and print them out This way worker main core will use (clear or verify) stats only when there are no worker threads. This would make things simpler and allowing to focus on testing the distributor not tests. And of course relaxed mode would be enough! Best regards Lukasz W dniu 29.09.2020 o 07:49, Honnappa Nagarahalli pisze: > <snip> > >> Statistics of handled packets are cleared and read on main lcore, while they >> are increased in workers handlers on different lcores. >> >> Without synchronization occasionally showed invalid values. >> This patch uses atomic acquire/release mechanisms to synchronize. > In general, load-acquire and store-release memory orderings are required > while synchronizing data (that cannot be updated atomically) between threads. > In the situation, making counters atomic is enough. > >> Fixes: c3eabff124e6 ("distributor: add unit tests") >> Cc: bruce.richard...@intel.com >> Cc: sta...@dpdk.org >> >> Signed-off-by: Lukasz Wojciechowski <l.wojciec...@partner.samsung.com> >> Acked-by: David Hunt <david.h...@intel.com> >> --- >> app/test/test_distributor.c | 39 ++++++++++++++++++++++++------------- >> 1 file changed, 26 insertions(+), 13 deletions(-) >> >> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index >> 35b25463a..0e49e3714 100644 >> --- a/app/test/test_distributor.c >> +++ b/app/test/test_distributor.c >> @@ -43,7 +43,8 @@ total_packet_count(void) { >> unsigned i, count = 0; >> for (i = 0; i < worker_idx; i++) >> - count += worker_stats[i].handled_packets; >> + count += >> __atomic_load_n(&worker_stats[i].handled_packets, >> + __ATOMIC_ACQUIRE); > RELAXED memory order is sufficient. For ex: the worker threads are not > 'releasing' any data that is not atomically updated to the main thread. > >> return count; >> } >> >> @@ -52,6 +53,7 @@ static inline void >> clear_packet_count(void) >> { >> memset(&worker_stats, 0, sizeof(worker_stats)); >> + rte_atomic_thread_fence(__ATOMIC_RELEASE); > Ideally, the counters should be set to 0 atomically rather than using a > memset. > >> } >> >> /* this is the basic worker function for sanity test @@ -72,13 +74,13 @@ >> handle_work(void *arg) >> num = rte_distributor_get_pkt(db, id, buf, buf, num); >> while (!quit) { >> __atomic_fetch_add(&worker_stats[id].handled_packets, >> num, >> - __ATOMIC_RELAXED); >> + __ATOMIC_ACQ_REL); > Using the __ATOMIC_ACQ_REL order does not mean anything to the main thread. > The main thread might still see the updates from different threads in > different order. > >> count += num; >> num = rte_distributor_get_pkt(db, id, >> buf, buf, num); >> } >> __atomic_fetch_add(&worker_stats[id].handled_packets, num, >> - __ATOMIC_RELAXED); >> + __ATOMIC_ACQ_REL); > Same here, do not see why this change is required. > >> count += num; >> rte_distributor_return_pkt(db, id, buf, num); >> return 0; >> @@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct >> rte_mempool *p) >> >> for (i = 0; i < rte_lcore_count() - 1; i++) >> printf("Worker %u handled %u packets\n", i, >> - worker_stats[i].handled_packets); >> + __atomic_load_n(&worker_stats[i].handled_packets, >> + __ATOMIC_ACQUIRE)); > __ATOMIC_RELAXED is enough. > >> printf("Sanity test with all zero hashes done.\n"); >> >> /* pick two flows and check they go correctly */ @@ -159,7 +162,9 >> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) >> >> for (i = 0; i < rte_lcore_count() - 1; i++) >> printf("Worker %u handled %u packets\n", i, >> - worker_stats[i].handled_packets); >> + __atomic_load_n( >> + &worker_stats[i].handled_packets, >> + __ATOMIC_ACQUIRE)); > __ATOMIC_RELAXED is enough > >> printf("Sanity test with two hash values done\n"); >> } >> >> @@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct >> rte_mempool *p) >> >> for (i = 0; i < rte_lcore_count() - 1; i++) >> printf("Worker %u handled %u packets\n", i, >> - worker_stats[i].handled_packets); >> + __atomic_load_n(&worker_stats[i].handled_packets, >> + __ATOMIC_ACQUIRE)); > __ATOMIC_RELAXED is enough > >> printf("Sanity test with non-zero hashes done\n"); >> >> rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -280,15 >> +286,17 @@ handle_work_with_free_mbufs(void *arg) >> buf[i] = NULL; >> num = rte_distributor_get_pkt(d, id, buf, buf, num); >> while (!quit) { >> - worker_stats[id].handled_packets += num; >> count += num; >> + __atomic_fetch_add(&worker_stats[id].handled_packets, >> num, >> + __ATOMIC_ACQ_REL); > IMO, the problem would be the non-atomic update of the statistics. So, > __ATOMIC_RELAXED is enough > >> for (i = 0; i < num; i++) >> rte_pktmbuf_free(buf[i]); >> num = rte_distributor_get_pkt(d, >> id, buf, buf, num); >> } >> - worker_stats[id].handled_packets += num; >> count += num; >> + __atomic_fetch_add(&worker_stats[id].handled_packets, num, >> + __ATOMIC_ACQ_REL); > Same here, the problem is non-atomic update of the statistics, > __ATOMIC_RELAXED is enough. > Similarly, for changes below, __ATOMIC_RELAXED is enough. > >> rte_distributor_return_pkt(d, id, buf, num); >> return 0; >> } >> @@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg) >> /* wait for quit single globally, or for worker zero, wait >> * for zero_quit */ >> while (!quit && !(id == zero_id && zero_quit)) { >> - worker_stats[id].handled_packets += num; >> count += num; >> + __atomic_fetch_add(&worker_stats[id].handled_packets, >> num, >> + __ATOMIC_ACQ_REL); >> for (i = 0; i < num; i++) >> rte_pktmbuf_free(buf[i]); >> num = rte_distributor_get_pkt(d, >> @@ -379,10 +388,11 @@ handle_work_for_shutdown_test(void *arg) >> >> total += num; >> } >> - worker_stats[id].handled_packets += num; >> count += num; >> returned = rte_distributor_return_pkt(d, id, buf, num); >> >> + __atomic_fetch_add(&worker_stats[id].handled_packets, num, >> + __ATOMIC_ACQ_REL); >> if (id == zero_id) { >> /* for worker zero, allow it to restart to pick up last packet >> * when all workers are shutting down. >> @@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg) >> id, buf, buf, num); >> >> while (!quit) { >> - worker_stats[id].handled_packets += num; >> count += num; >> rte_pktmbuf_free(pkt); >> num = rte_distributor_get_pkt(d, id, buf, buf, num); >> + >> __atomic_fetch_add(&worker_stats[id].handled_packets, >> + num, __ATOMIC_ACQ_REL); >> } >> returned = rte_distributor_return_pkt(d, >> id, buf, num); >> @@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct >> worker_params *wp, >> >> for (i = 0; i < rte_lcore_count() - 1; i++) >> printf("Worker %u handled %u packets\n", i, >> - worker_stats[i].handled_packets); >> + __atomic_load_n(&worker_stats[i].handled_packets, >> + __ATOMIC_ACQUIRE)); >> >> if (total_packet_count() != BURST * 2) { >> printf("Line %d: Error, not all packets flushed. " >> @@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct >> worker_params *wp, >> zero_quit = 0; >> for (i = 0; i < rte_lcore_count() - 1; i++) >> printf("Worker %u handled %u packets\n", i, >> - worker_stats[i].handled_packets); >> + __atomic_load_n(&worker_stats[i].handled_packets, >> + __ATOMIC_ACQUIRE)); >> >> if (total_packet_count() != BURST) { >> printf("Line %d: Error, not all packets flushed. " >> -- >> 2.17.1 -- Lukasz Wojciechowski Principal Software Engineer Samsung R&D Institute Poland Samsung Electronics Office +48 22 377 88 25 l.wojciec...@partner.samsung.com