W dniu 16.10.2020 o 14:43, Lukasz Wojciechowski pisze: > Hi Honnappa, > > Thank you for your answer. > In the current v7 version I followed your advise and used RELAXED memory > model. > And it works without any issues. I guess after fixing other issues found > since v4 the distributor works more stable. > I didn't have time to rearrange all tests in the way I proposed, but I guess > if they work like this it's not a top priority. > > Can you give an ack on the series? I believe David Marchand is waiting for > your opinion to process it. I'm sorry I didn't see your other comments. I'll try to fix them them today. > > Best regards > Lukasz > > W dniu 16.10.2020 o 07:43, Honnappa Nagarahalli pisze: >> <snip> >> >>> Hi Honnappa, >>> >>> Many thanks for the review! >>> >>> I'll write my answers here not inline as it would be easier to read them in >>> one >>> place, I think. >>> So first of all I agree with you in 2 things: >>> 1) all uses of statistics must be atomic and lack of that caused most of the >>> problems >>> 2) it would be better to replace barrier and memset in >>> clear_packet_count() with atomic stores as you suggested >>> >>> So I will apply both of above. >>> >>> However I wasn't not fully convinced on changing acquire/release to relaxed. >>> It wood be perfectly ok if it would look like in this Herb Sutter's example: >>> https://youtu.be/KeLBd2[] EJLOU?t=4170 >>> But in his case the counters are cleared before worker threads start and are >>> printout after they are completed. >>> >>> In case of the dpdk distributor tests both worker and main cores are running >>> at the same time. In the sanity_test, the statistics are cleared and >>> verified >>> few times for different hashes of packages. The worker cores are not >>> stopped at this time and they continue their loops in handle procedure. >>> Verification made in main core is an exchange of data as the current >>> statistics >>> indicate how the test will result. >> Agree. The key point we have to note is that the data that is exchanged >> between the two threads is already atomic (handled_packets is atomic). >> >>> So as I wasn't convinced, I run some tests with both both relaxed and >>> acquire/release modes and they both fail :( The failures caused by >>> statistics >>> errors to number of tests ratio for >>> 200000 tests was: >>> for relaxed: 0,000790562 >>> for acq/rel: 0,000091321 >>> >>> >>> That's why I'm going to modify tests in such way, that they would: >>> 1) clear statistics >>> 2) launch worker threads >>> 3) run test >>> 4) wait for workers procedures to complete >>> 5) check stats, verify results and print them out >>> >>> This way worker main core will use (clear or verify) stats only when there >>> are >>> no worker threads. This would make things simpler and allowing to focus on >>> testing the distributor not tests. And of course relaxed mode would be >>> enough! >> Agree, this would be the only way to ensure that the main thread sees the >> correct statistics (just like in the video) >> >>> Best regards >>> Lukasz >>> >>> >>> W dniu 29.09.2020 o 07:49, Honnappa Nagarahalli pisze: >>>> <snip> >>>> >>>>> Statistics of handled packets are cleared and read on main lcore, >>>>> while they are increased in workers handlers on different lcores. >>>>> >>>>> Without synchronization occasionally showed invalid values. >>>>> This patch uses atomic acquire/release mechanisms to synchronize. >>>> In general, load-acquire and store-release memory orderings are required >>> while synchronizing data (that cannot be updated atomically) between >>> threads. In the situation, making counters atomic is enough. >>>>> Fixes: c3eabff124e6 ("distributor: add unit tests") >>>>> Cc: bruce.richard...@intel.com >>>>> Cc: sta...@dpdk.org >>>>> >>>>> Signed-off-by: Lukasz Wojciechowski >>>>> <l.wojciec...@partner.samsung.com> >>>>> Acked-by: David Hunt <david.h...@intel.com> >>>>> --- >>>>> app/test/test_distributor.c | 39 ++++++++++++++++++++++++----------- >>> -- >>>>> 1 file changed, 26 insertions(+), 13 deletions(-) >>>>> >>>>> diff --git a/app/test/test_distributor.c >>>>> b/app/test/test_distributor.c index >>>>> 35b25463a..0e49e3714 100644 >>>>> --- a/app/test/test_distributor.c >>>>> +++ b/app/test/test_distributor.c >>>>> @@ -43,7 +43,8 @@ total_packet_count(void) { >>>>> unsigned i, count = 0; >>>>> for (i = 0; i < worker_idx; i++) >>>>> - count += worker_stats[i].handled_packets; >>>>> + count += >>>>> __atomic_load_n(&worker_stats[i].handled_packets, >>>>> + __ATOMIC_ACQUIRE); >>>> RELAXED memory order is sufficient. For ex: the worker threads are not >>> 'releasing' any data that is not atomically updated to the main thread. >>>>> return count; >>>>> } >>>>> >>>>> @@ -52,6 +53,7 @@ static inline void >>>>> clear_packet_count(void) >>>>> { >>>>> memset(&worker_stats, 0, sizeof(worker_stats)); >>>>> + rte_atomic_thread_fence(__ATOMIC_RELEASE); >>>> Ideally, the counters should be set to 0 atomically rather than using a >>> memset. >>>>> } >>>>> >>>>> /* this is the basic worker function for sanity test @@ -72,13 >>>>> +74,13 @@ handle_work(void *arg) >>>>> num = rte_distributor_get_pkt(db, id, buf, buf, num); >>>>> while (!quit) { >>>>> __atomic_fetch_add(&worker_stats[id].handled_packets, >>>>> num, >>>>> - __ATOMIC_RELAXED); >>>>> + __ATOMIC_ACQ_REL); >>>> Using the __ATOMIC_ACQ_REL order does not mean anything to the main >>> thread. The main thread might still see the updates from different threads >>> in >>> different order. >>>>> count += num; >>>>> num = rte_distributor_get_pkt(db, id, >>>>> buf, buf, num); >>>>> } >>>>> __atomic_fetch_add(&worker_stats[id].handled_packets, num, >>>>> - __ATOMIC_RELAXED); >>>>> + __ATOMIC_ACQ_REL); >>>> Same here, do not see why this change is required. >>>> >>>>> count += num; >>>>> rte_distributor_return_pkt(db, id, buf, num); >>>>> return 0; >>>>> @@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct >>>>> rte_mempool *p) >>>>> >>>>> for (i = 0; i < rte_lcore_count() - 1; i++) >>>>> printf("Worker %u handled %u packets\n", i, >>>>> - worker_stats[i].handled_packets); >>>>> + __atomic_load_n(&worker_stats[i].handled_packets, >>>>> + __ATOMIC_ACQUIRE)); >>>> __ATOMIC_RELAXED is enough. >>>> >>>>> printf("Sanity test with all zero hashes done.\n"); >>>>> >>>>> /* pick two flows and check they go correctly */ @@ -159,7 >>>>> +162,9 >>>>> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p) >>>>> >>>>> for (i = 0; i < rte_lcore_count() - 1; i++) >>>>> printf("Worker %u handled %u packets\n", i, >>>>> - worker_stats[i].handled_packets); >>>>> + __atomic_load_n( >>>>> + &worker_stats[i].handled_packets, >>>>> + __ATOMIC_ACQUIRE)); >>>> __ATOMIC_RELAXED is enough >>>> >>>>> printf("Sanity test with two hash values done\n"); >>>>> } >>>>> >>>>> @@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct >>>>> rte_mempool *p) >>>>> >>>>> for (i = 0; i < rte_lcore_count() - 1; i++) >>>>> printf("Worker %u handled %u packets\n", i, >>>>> - worker_stats[i].handled_packets); >>>>> + __atomic_load_n(&worker_stats[i].handled_packets, >>>>> + __ATOMIC_ACQUIRE)); >>>> __ATOMIC_RELAXED is enough >>>> >>>>> printf("Sanity test with non-zero hashes done\n"); >>>>> >>>>> rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -280,15 >>>>> +286,17 @@ handle_work_with_free_mbufs(void *arg) >>>>> buf[i] = NULL; >>>>> num = rte_distributor_get_pkt(d, id, buf, buf, num); >>>>> while (!quit) { >>>>> - worker_stats[id].handled_packets += num; >>>>> count += num; >>>>> + __atomic_fetch_add(&worker_stats[id].handled_packets, >>>>> num, >>>>> + __ATOMIC_ACQ_REL); >>>> IMO, the problem would be the non-atomic update of the statistics. So, >>>> __ATOMIC_RELAXED is enough >>>> >>>>> for (i = 0; i < num; i++) >>>>> rte_pktmbuf_free(buf[i]); >>>>> num = rte_distributor_get_pkt(d, >>>>> id, buf, buf, num); >>>>> } >>>>> - worker_stats[id].handled_packets += num; >>>>> count += num; >>>>> + __atomic_fetch_add(&worker_stats[id].handled_packets, num, >>>>> + __ATOMIC_ACQ_REL); >>>> Same here, the problem is non-atomic update of the statistics, >>> __ATOMIC_RELAXED is enough. >>>> Similarly, for changes below, __ATOMIC_RELAXED is enough. >>>> >>>>> rte_distributor_return_pkt(d, id, buf, num); >>>>> return 0; >>>>> } >>>>> @@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg) >>>>> /* wait for quit single globally, or for worker zero, wait >>>>> * for zero_quit */ >>>>> while (!quit && !(id == zero_id && zero_quit)) { >>>>> - worker_stats[id].handled_packets += num; >>>>> count += num; >>>>> + __atomic_fetch_add(&worker_stats[id].handled_packets, >>>>> num, >>>>> + __ATOMIC_ACQ_REL); >>>>> for (i = 0; i < num; i++) >>>>> rte_pktmbuf_free(buf[i]); >>>>> num = rte_distributor_get_pkt(d, >>>>> @@ -379,10 +388,11 @@ handle_work_for_shutdown_test(void *arg) >>>>> >>>>> total += num; >>>>> } >>>>> - worker_stats[id].handled_packets += num; >>>>> count += num; >>>>> returned = rte_distributor_return_pkt(d, id, buf, num); >>>>> >>>>> + __atomic_fetch_add(&worker_stats[id].handled_packets, num, >>>>> + __ATOMIC_ACQ_REL); >>>>> if (id == zero_id) { >>>>> /* for worker zero, allow it to restart to pick up last >>>>> packet >>>>> * when all workers are shutting down. >>>>> @@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg) >>>>> id, buf, buf, num); >>>>> >>>>> while (!quit) { >>>>> - worker_stats[id].handled_packets += num; >>>>> count += num; >>>>> rte_pktmbuf_free(pkt); >>>>> num = rte_distributor_get_pkt(d, id, buf, buf, >>>>> num); >>>>> + >>>>> __atomic_fetch_add(&worker_stats[id].handled_packets, >>>>> + num, __ATOMIC_ACQ_REL); >>>>> } >>>>> returned = rte_distributor_return_pkt(d, >>>>> id, buf, num); >>>>> @@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct >>>>> worker_params *wp, >>>>> >>>>> for (i = 0; i < rte_lcore_count() - 1; i++) >>>>> printf("Worker %u handled %u packets\n", i, >>>>> - worker_stats[i].handled_packets); >>>>> + __atomic_load_n(&worker_stats[i].handled_packets, >>>>> + __ATOMIC_ACQUIRE)); >>>>> >>>>> if (total_packet_count() != BURST * 2) { >>>>> printf("Line %d: Error, not all packets flushed. " >>>>> @@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct >>>>> worker_params *wp, >>>>> zero_quit = 0; >>>>> for (i = 0; i < rte_lcore_count() - 1; i++) >>>>> printf("Worker %u handled %u packets\n", i, >>>>> - worker_stats[i].handled_packets); >>>>> + __atomic_load_n(&worker_stats[i].handled_packets, >>>>> + __ATOMIC_ACQUIRE)); >>>>> >>>>> if (total_packet_count() != BURST) { >>>>> printf("Line %d: Error, not all packets flushed. " >>>>> -- >>>>> 2.17.1 >>> -- >>> Lukasz Wojciechowski >>> Principal Software Engineer >>> >>> Samsung R&D Institute Poland >>> Samsung Electronics >>> Office +48 22 377 88 25 >>> l.wojciec...@partner.samsung.com
-- Lukasz Wojciechowski Principal Software Engineer Samsung R&D Institute Poland Samsung Electronics Office +48 22 377 88 25 l.wojciec...@partner.samsung.com