Hi Honappa, I pushed v5 of the patches today.
However all I fixed in this patch is replacing memset to loop of operations on atomic. I didn't move clearing and checking the statistics out of the test yet (to make sure no worker is running). After fixing few more distributor issues I wasn't able now to catch even a single failure using the ACQ/REL memory models. I'll test it maybe tomorrow with RELAXED to see if it will work. Best regards Lukasz W dniu 02.10.2020 o 13:25, Lukasz Wojciechowski pisze: > Hi Honnappa, > > Many thanks for the review! > > I'll write my answers here not inline as it would be easier to read them > in one place, I think. > So first of all I agree with you in 2 things: > 1) all uses of statistics must be atomic and lack of that caused most of > the problems > 2) it would be better to replace barrier and memset in > clear_packet_count() with atomic stores as you suggested > > So I will apply both of above. > > However I wasn't not fully convinced on changing acquire/release to > relaxed. It wood be perfectly ok > if it would look like in this Herb Sutter's example: > https://youtu.be/KeLBd2EJLOU?t=4170 > But in his case the counters are cleared before worker threads start and > are printout after they are completed. > > In case of the dpdk distributor tests both worker and main cores are > running at the same time. In the sanity_test, the statistics are cleared > and verified few times for different hashes of packages. The worker > cores are not stopped at this time and they continue their loops in > handle procedure. Verification made in main core is an exchange of data > as the current statistics indicate how the test will result. > > So as I wasn't convinced, I run some tests with both both relaxed and > acquire/release modes and they both fail :( > The failures caused by statistics errors to number of tests ratio for > 200000 tests was: > for relaxed: 0,000790562 > for acq/rel: 0,000091321 > > > That's why I'm going to modify tests in such way, that they would: > 1) clear statistics > 2) launch worker threads > 3) run test > 4) wait for workers procedures to complete > 5) check stats, verify results and print them out > > This way worker main core will use (clear or verify) stats only when > there are no worker threads. This would make things simpler and allowing > to focus on testing the distributor not tests. And of course relaxed > mode would be enough! > > > Best regards > Lukasz > > > W dniu 29.09.2020 o 07:49, Honnappa Nagarahalli pisze: >> <snip> >> >>> Statistics of handled packets are cleared and read on main lcore, while they >>> are increased in workers handlers on different lcores. >>> >>> Without synchronization occasionally showed invalid values. >>> This patch uses atomic acquire/release mechanisms to synchronize. >> In general, load-acquire and store-release memory orderings are required >> while synchronizing data (that cannot be updated atomically) between >> threads. In the situation, making counters atomic is enough. >> >>> Fixes: c3eabff124e6 ("distributor: add unit tests") >>> Cc: bruce.richard...@intel.com >>> Cc: sta...@dpdk.org >>> >>> Signed-off-by: Lukasz Wojciechowski <l.wojciec...@partner.samsung.com> >>> Acked-by: David Hunt <david.h...@intel.com> >>> --- >>> app/test/test_distributor.c | 39 ++++++++++++++++++++++++------------- >>> 1 file changed, 26 insertions(+), 13 deletions(-) >>> >>> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index >>> 35b25463a..0e49e3714 100644 >>> --- a/app/test/test_distributor.c >>> +++ b/app/test/test_distributor.c >>> @@ -43,7 +43,8 @@ total_packet_count(void) { >>> unsigned i, count = 0; >>> for (i = 0; i < worker_idx; i++) >>> - count += worker_stats[i].handled_packets; >>> + count += >>> __atomic_load_n(&worker_stats[i].handled_packets, >>> + __ATOMIC_ACQUIRE); >> RELAXED memory order is sufficient. For ex: the worker threads are not >> 'releasing' any data that is not atomically updated to the main thread. >> >>> return count; >>> } >>> >>> @@ -52,6 +53,7 @@ static inline void >>> clear_packet_count(void) >>> { >>> memset(&worker_stats, 0, sizeof(worker_stats)); >>> + rte_atomic_thread_fence(__ATOMIC_RELEASE); >> Ideally, the counters should be set to 0 atomically rather than using a >> memset. >> >>> } >>> >>> /* this is the basic worker function for sanity test @@ -72,13 +74,13 @@ >>> handle_work(void *arg) >>> num = rte_distributor_get_pkt(db, id, buf, buf, num); >>> while (!quit) { >>> __atomic_fetch_add(&worker_stats[id].handled_packets, >>> num, >>> - __ATOMIC_RELAXED); >>> + __ATOMIC_ACQ_REL); >> Using the __ATOMIC_ACQ_REL order does not mean anything to the main thread. >> The main thread might still see the updates from different threads in >> different order. >> >>> count += num; >>> num = rte_distributor_get_pkt(db, id, >>> buf, buf, num); >>> } >>> __atomic_fetch_add(&worker_stats[id].handled_packets, num, >>> - __ATOMIC_RELAXED); >>> + __ATOMIC_ACQ_REL); >> Same here, do not see why this change is required. >> >>> count += num; >>> rte_distributor_return_pkt(db, id, buf, num); >>> return 0; >>> @@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct >>> rte_mempool *p) >>> >>> for (i = 0; i < rte_lcore_count() - 1; i++) >>> printf("Worker %u handled %u packets\n", i, >>> - worker_stats[i].handled_packets); >>> + __atomic_load_n(&worker_stats[i].handled_packets, >>> + __ATOMIC_ACQUIRE)); >> __ATOMIC_RELAXED is enough. >> >>> printf("Sanity test with all zero hashes done.\n"); >>> >>> /* pick two flows and check they go correctly */ @@ -159,7 +162,9 >>> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) >>> >>> for (i = 0; i < rte_lcore_count() - 1; i++) >>> printf("Worker %u handled %u packets\n", i, >>> - worker_stats[i].handled_packets); >>> + __atomic_load_n( >>> + &worker_stats[i].handled_packets, >>> + __ATOMIC_ACQUIRE)); >> __ATOMIC_RELAXED is enough >> >>> printf("Sanity test with two hash values done\n"); >>> } >>> >>> @@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct >>> rte_mempool *p) >>> >>> for (i = 0; i < rte_lcore_count() - 1; i++) >>> printf("Worker %u handled %u packets\n", i, >>> - worker_stats[i].handled_packets); >>> + __atomic_load_n(&worker_stats[i].handled_packets, >>> + __ATOMIC_ACQUIRE)); >> __ATOMIC_RELAXED is enough >> >>> printf("Sanity test with non-zero hashes done\n"); >>> >>> rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -280,15 >>> +286,17 @@ handle_work_with_free_mbufs(void *arg) >>> buf[i] = NULL; >>> num = rte_distributor_get_pkt(d, id, buf, buf, num); >>> while (!quit) { >>> - worker_stats[id].handled_packets += num; >>> count += num; >>> + __atomic_fetch_add(&worker_stats[id].handled_packets, >>> num, >>> + __ATOMIC_ACQ_REL); >> IMO, the problem would be the non-atomic update of the statistics. So, >> __ATOMIC_RELAXED is enough >> >>> for (i = 0; i < num; i++) >>> rte_pktmbuf_free(buf[i]); >>> num = rte_distributor_get_pkt(d, >>> id, buf, buf, num); >>> } >>> - worker_stats[id].handled_packets += num; >>> count += num; >>> + __atomic_fetch_add(&worker_stats[id].handled_packets, num, >>> + __ATOMIC_ACQ_REL); >> Same here, the problem is non-atomic update of the statistics, >> __ATOMIC_RELAXED is enough. >> Similarly, for changes below, __ATOMIC_RELAXED is enough. >> >>> rte_distributor_return_pkt(d, id, buf, num); >>> return 0; >>> } >>> @@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg) >>> /* wait for quit single globally, or for worker zero, wait >>> * for zero_quit */ >>> while (!quit && !(id == zero_id && zero_quit)) { >>> - worker_stats[id].handled_packets += num; >>> count += num; >>> + __atomic_fetch_add(&worker_stats[id].handled_packets, >>> num, >>> + __ATOMIC_ACQ_REL); >>> for (i = 0; i < num; i++) >>> rte_pktmbuf_free(buf[i]); >>> num = rte_distributor_get_pkt(d, >>> @@ -379,10 +388,11 @@ handle_work_for_shutdown_test(void *arg) >>> >>> total += num; >>> } >>> - worker_stats[id].handled_packets += num; >>> count += num; >>> returned = rte_distributor_return_pkt(d, id, buf, num); >>> >>> + __atomic_fetch_add(&worker_stats[id].handled_packets, num, >>> + __ATOMIC_ACQ_REL); >>> if (id == zero_id) { >>> /* for worker zero, allow it to restart to pick up last packet >>> * when all workers are shutting down. >>> @@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg) >>> id, buf, buf, num); >>> >>> while (!quit) { >>> - worker_stats[id].handled_packets += num; >>> count += num; >>> rte_pktmbuf_free(pkt); >>> num = rte_distributor_get_pkt(d, id, buf, buf, num); >>> + >>> __atomic_fetch_add(&worker_stats[id].handled_packets, >>> + num, __ATOMIC_ACQ_REL); >>> } >>> returned = rte_distributor_return_pkt(d, >>> id, buf, num); >>> @@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct >>> worker_params *wp, >>> >>> for (i = 0; i < rte_lcore_count() - 1; i++) >>> printf("Worker %u handled %u packets\n", i, >>> - worker_stats[i].handled_packets); >>> + __atomic_load_n(&worker_stats[i].handled_packets, >>> + __ATOMIC_ACQUIRE)); >>> >>> if (total_packet_count() != BURST * 2) { >>> printf("Line %d: Error, not all packets flushed. " >>> @@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct >>> worker_params *wp, >>> zero_quit = 0; >>> for (i = 0; i < rte_lcore_count() - 1; i++) >>> printf("Worker %u handled %u packets\n", i, >>> - worker_stats[i].handled_packets); >>> + __atomic_load_n(&worker_stats[i].handled_packets, >>> + __ATOMIC_ACQUIRE)); >>> >>> if (total_packet_count() != BURST) { >>> printf("Line %d: Error, not all packets flushed. " >>> -- >>> 2.17.1 -- Lukasz Wojciechowski Principal Software Engineer Samsung R&D Institute Poland Samsung Electronics Office +48 22 377 88 25 l.wojciec...@partner.samsung.com