<snip> > > Hi Honnappa, > > Many thanks for the review! > > I'll write my answers here not inline as it would be easier to read them in > one > place, I think. > So first of all I agree with you in 2 things: > 1) all uses of statistics must be atomic and lack of that caused most of the > problems > 2) it would be better to replace barrier and memset in > clear_packet_count() with atomic stores as you suggested > > So I will apply both of above. > > However I wasn't not fully convinced on changing acquire/release to relaxed. > It wood be perfectly ok if it would look like in this Herb Sutter's example: > https://youtu.be/KeLBd2[] EJLOU?t=4170 > But in his case the counters are cleared before worker threads start and are > printout after they are completed. > > In case of the dpdk distributor tests both worker and main cores are running > at the same time. In the sanity_test, the statistics are cleared and verified > few times for different hashes of packages. The worker cores are not > stopped at this time and they continue their loops in handle procedure. > Verification made in main core is an exchange of data as the current > statistics > indicate how the test will result. Agree. The key point we have to note is that the data that is exchanged between the two threads is already atomic (handled_packets is atomic).
> > So as I wasn't convinced, I run some tests with both both relaxed and > acquire/release modes and they both fail :( The failures caused by statistics > errors to number of tests ratio for > 200000 tests was: > for relaxed: 0,000790562 > for acq/rel: 0,000091321 > > > That's why I'm going to modify tests in such way, that they would: > 1) clear statistics > 2) launch worker threads > 3) run test > 4) wait for workers procedures to complete > 5) check stats, verify results and print them out > > This way worker main core will use (clear or verify) stats only when there are > no worker threads. This would make things simpler and allowing to focus on > testing the distributor not tests. And of course relaxed mode would be > enough! Agree, this would be the only way to ensure that the main thread sees the correct statistics (just like in the video) > > > Best regards > Lukasz > > > W dniu 29.09.2020 o 07:49, Honnappa Nagarahalli pisze: > > <snip> > > > >> Statistics of handled packets are cleared and read on main lcore, > >> while they are increased in workers handlers on different lcores. > >> > >> Without synchronization occasionally showed invalid values. > >> This patch uses atomic acquire/release mechanisms to synchronize. > > In general, load-acquire and store-release memory orderings are required > while synchronizing data (that cannot be updated atomically) between > threads. In the situation, making counters atomic is enough. > > > >> Fixes: c3eabff124e6 ("distributor: add unit tests") > >> Cc: bruce.richard...@intel.com > >> Cc: sta...@dpdk.org > >> > >> Signed-off-by: Lukasz Wojciechowski > >> <l.wojciec...@partner.samsung.com> > >> Acked-by: David Hunt <david.h...@intel.com> > >> --- > >> app/test/test_distributor.c | 39 ++++++++++++++++++++++++----------- > -- > >> 1 file changed, 26 insertions(+), 13 deletions(-) > >> > >> diff --git a/app/test/test_distributor.c > >> b/app/test/test_distributor.c index > >> 35b25463a..0e49e3714 100644 > >> --- a/app/test/test_distributor.c > >> +++ b/app/test/test_distributor.c > >> @@ -43,7 +43,8 @@ total_packet_count(void) { > >> unsigned i, count = 0; > >> for (i = 0; i < worker_idx; i++) > >> - count += worker_stats[i].handled_packets; > >> + count += > >> __atomic_load_n(&worker_stats[i].handled_packets, > >> + __ATOMIC_ACQUIRE); > > RELAXED memory order is sufficient. For ex: the worker threads are not > 'releasing' any data that is not atomically updated to the main thread. > > > >> return count; > >> } > >> > >> @@ -52,6 +53,7 @@ static inline void > >> clear_packet_count(void) > >> { > >> memset(&worker_stats, 0, sizeof(worker_stats)); > >> + rte_atomic_thread_fence(__ATOMIC_RELEASE); > > Ideally, the counters should be set to 0 atomically rather than using a > memset. > > > >> } > >> > >> /* this is the basic worker function for sanity test @@ -72,13 > >> +74,13 @@ handle_work(void *arg) > >> num = rte_distributor_get_pkt(db, id, buf, buf, num); > >> while (!quit) { > >> __atomic_fetch_add(&worker_stats[id].handled_packets, > >> num, > >> - __ATOMIC_RELAXED); > >> + __ATOMIC_ACQ_REL); > > Using the __ATOMIC_ACQ_REL order does not mean anything to the main > thread. The main thread might still see the updates from different threads in > different order. > > > >> count += num; > >> num = rte_distributor_get_pkt(db, id, > >> buf, buf, num); > >> } > >> __atomic_fetch_add(&worker_stats[id].handled_packets, num, > >> - __ATOMIC_RELAXED); > >> + __ATOMIC_ACQ_REL); > > Same here, do not see why this change is required. > > > >> count += num; > >> rte_distributor_return_pkt(db, id, buf, num); > >> return 0; > >> @@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct > >> rte_mempool *p) > >> > >> for (i = 0; i < rte_lcore_count() - 1; i++) > >> printf("Worker %u handled %u packets\n", i, > >> - worker_stats[i].handled_packets); > >> + __atomic_load_n(&worker_stats[i].handled_packets, > >> + __ATOMIC_ACQUIRE)); > > __ATOMIC_RELAXED is enough. > > > >> printf("Sanity test with all zero hashes done.\n"); > >> > >> /* pick two flows and check they go correctly */ @@ -159,7 +162,9 > >> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) > >> > >> for (i = 0; i < rte_lcore_count() - 1; i++) > >> printf("Worker %u handled %u packets\n", i, > >> - worker_stats[i].handled_packets); > >> + __atomic_load_n( > >> + &worker_stats[i].handled_packets, > >> + __ATOMIC_ACQUIRE)); > > __ATOMIC_RELAXED is enough > > > >> printf("Sanity test with two hash values done\n"); > >> } > >> > >> @@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct > >> rte_mempool *p) > >> > >> for (i = 0; i < rte_lcore_count() - 1; i++) > >> printf("Worker %u handled %u packets\n", i, > >> - worker_stats[i].handled_packets); > >> + __atomic_load_n(&worker_stats[i].handled_packets, > >> + __ATOMIC_ACQUIRE)); > > __ATOMIC_RELAXED is enough > > > >> printf("Sanity test with non-zero hashes done\n"); > >> > >> rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -280,15 > >> +286,17 @@ handle_work_with_free_mbufs(void *arg) > >> buf[i] = NULL; > >> num = rte_distributor_get_pkt(d, id, buf, buf, num); > >> while (!quit) { > >> - worker_stats[id].handled_packets += num; > >> count += num; > >> + __atomic_fetch_add(&worker_stats[id].handled_packets, > >> num, > >> + __ATOMIC_ACQ_REL); > > IMO, the problem would be the non-atomic update of the statistics. So, > > __ATOMIC_RELAXED is enough > > > >> for (i = 0; i < num; i++) > >> rte_pktmbuf_free(buf[i]); > >> num = rte_distributor_get_pkt(d, > >> id, buf, buf, num); > >> } > >> - worker_stats[id].handled_packets += num; > >> count += num; > >> + __atomic_fetch_add(&worker_stats[id].handled_packets, num, > >> + __ATOMIC_ACQ_REL); > > Same here, the problem is non-atomic update of the statistics, > __ATOMIC_RELAXED is enough. > > Similarly, for changes below, __ATOMIC_RELAXED is enough. > > > >> rte_distributor_return_pkt(d, id, buf, num); > >> return 0; > >> } > >> @@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg) > >> /* wait for quit single globally, or for worker zero, wait > >> * for zero_quit */ > >> while (!quit && !(id == zero_id && zero_quit)) { > >> - worker_stats[id].handled_packets += num; > >> count += num; > >> + __atomic_fetch_add(&worker_stats[id].handled_packets, > >> num, > >> + __ATOMIC_ACQ_REL); > >> for (i = 0; i < num; i++) > >> rte_pktmbuf_free(buf[i]); > >> num = rte_distributor_get_pkt(d, > >> @@ -379,10 +388,11 @@ handle_work_for_shutdown_test(void *arg) > >> > >> total += num; > >> } > >> - worker_stats[id].handled_packets += num; > >> count += num; > >> returned = rte_distributor_return_pkt(d, id, buf, num); > >> > >> + __atomic_fetch_add(&worker_stats[id].handled_packets, num, > >> + __ATOMIC_ACQ_REL); > >> if (id == zero_id) { > >> /* for worker zero, allow it to restart to pick up last packet > >> * when all workers are shutting down. > >> @@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg) > >> id, buf, buf, num); > >> > >> while (!quit) { > >> - worker_stats[id].handled_packets += num; > >> count += num; > >> rte_pktmbuf_free(pkt); > >> num = rte_distributor_get_pkt(d, id, buf, buf, num); > >> + > >> __atomic_fetch_add(&worker_stats[id].handled_packets, > >> + num, __ATOMIC_ACQ_REL); > >> } > >> returned = rte_distributor_return_pkt(d, > >> id, buf, num); > >> @@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct > >> worker_params *wp, > >> > >> for (i = 0; i < rte_lcore_count() - 1; i++) > >> printf("Worker %u handled %u packets\n", i, > >> - worker_stats[i].handled_packets); > >> + __atomic_load_n(&worker_stats[i].handled_packets, > >> + __ATOMIC_ACQUIRE)); > >> > >> if (total_packet_count() != BURST * 2) { > >> printf("Line %d: Error, not all packets flushed. " > >> @@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct > >> worker_params *wp, > >> zero_quit = 0; > >> for (i = 0; i < rte_lcore_count() - 1; i++) > >> printf("Worker %u handled %u packets\n", i, > >> - worker_stats[i].handled_packets); > >> + __atomic_load_n(&worker_stats[i].handled_packets, > >> + __ATOMIC_ACQUIRE)); > >> > >> if (total_packet_count() != BURST) { > >> printf("Line %d: Error, not all packets flushed. " > >> -- > >> 2.17.1 > > -- > Lukasz Wojciechowski > Principal Software Engineer > > Samsung R&D Institute Poland > Samsung Electronics > Office +48 22 377 88 25 > l.wojciec...@partner.samsung.com