<snip>

> 
> Hi Honnappa,
> 
> Many thanks for the review!
> 
> I'll write my answers here not inline as it would be easier to read them in 
> one
> place, I think.
> So first of all I agree with you in 2 things:
> 1) all uses of statistics must be atomic and lack of that caused most of the
> problems
> 2) it would be better to replace barrier and memset in
> clear_packet_count() with atomic stores as you suggested
> 
> So I will apply both of above.
> 
> However I wasn't not fully convinced on changing acquire/release to relaxed.
> It wood be perfectly ok if it would look like in this Herb Sutter's example:
> https://youtu.be/KeLBd2[]  EJLOU?t=4170
> But in his case the counters are cleared before worker threads start and are
> printout after they are completed.
> 
> In case of the dpdk distributor tests both worker and main cores are running
> at the same time. In the sanity_test, the statistics are cleared and verified
> few times for different hashes of packages. The worker cores are not
> stopped at this time and they continue their loops in handle procedure.
> Verification made in main core is an exchange of data as the current 
> statistics
> indicate how the test will result.
Agree. The key point we have to note is that the data that is exchanged between 
the two threads is already atomic (handled_packets is atomic).

> 
> So as I wasn't convinced, I run some tests with both both relaxed and
> acquire/release modes and they both fail :( The failures caused by statistics
> errors to number of tests ratio for
> 200000 tests was:
> for relaxed: 0,000790562
> for acq/rel: 0,000091321
> 
> 
> That's why I'm going to modify tests in such way, that they would:
> 1) clear statistics
> 2) launch worker threads
> 3) run test
> 4) wait for workers procedures to complete
> 5) check stats, verify results and print them out
> 
> This way worker main core will use (clear or verify) stats only when there are
> no worker threads. This would make things simpler and allowing to focus on
> testing the distributor not tests. And of course relaxed mode would be
> enough!
Agree, this would be the only way to ensure that the main thread sees the 
correct statistics (just like in the video)

> 
> 
> Best regards
> Lukasz
> 
> 
> W dniu 29.09.2020 o 07:49, Honnappa Nagarahalli pisze:
> > <snip>
> >
> >> Statistics of handled packets are cleared and read on main lcore,
> >> while they are increased in workers handlers on different lcores.
> >>
> >> Without synchronization occasionally showed invalid values.
> >> This patch uses atomic acquire/release mechanisms to synchronize.
> > In general, load-acquire and store-release memory orderings are required
> while synchronizing data (that cannot be updated atomically) between
> threads. In the situation, making counters atomic is enough.
> >
> >> Fixes: c3eabff124e6 ("distributor: add unit tests")
> >> Cc: bruce.richard...@intel.com
> >> Cc: sta...@dpdk.org
> >>
> >> Signed-off-by: Lukasz Wojciechowski
> >> <l.wojciec...@partner.samsung.com>
> >> Acked-by: David Hunt <david.h...@intel.com>
> >> ---
> >>   app/test/test_distributor.c | 39 ++++++++++++++++++++++++-----------
> --
> >>   1 file changed, 26 insertions(+), 13 deletions(-)
> >>
> >> diff --git a/app/test/test_distributor.c
> >> b/app/test/test_distributor.c index
> >> 35b25463a..0e49e3714 100644
> >> --- a/app/test/test_distributor.c
> >> +++ b/app/test/test_distributor.c
> >> @@ -43,7 +43,8 @@ total_packet_count(void)  {
> >>    unsigned i, count = 0;
> >>    for (i = 0; i < worker_idx; i++)
> >> -          count += worker_stats[i].handled_packets;
> >> +          count +=
> >> __atomic_load_n(&worker_stats[i].handled_packets,
> >> +                          __ATOMIC_ACQUIRE);
> > RELAXED memory order is sufficient. For ex: the worker threads are not
> 'releasing' any data that is not atomically updated to the main thread.
> >
> >>    return count;
> >>   }
> >>
> >> @@ -52,6 +53,7 @@ static inline void
> >>   clear_packet_count(void)
> >>   {
> >>    memset(&worker_stats, 0, sizeof(worker_stats));
> >> +  rte_atomic_thread_fence(__ATOMIC_RELEASE);
> > Ideally, the counters should be set to 0 atomically rather than using a
> memset.
> >
> >>   }
> >>
> >>   /* this is the basic worker function for sanity test @@ -72,13
> >> +74,13 @@ handle_work(void *arg)
> >>    num = rte_distributor_get_pkt(db, id, buf, buf, num);
> >>    while (!quit) {
> >>            __atomic_fetch_add(&worker_stats[id].handled_packets,
> >> num,
> >> -                          __ATOMIC_RELAXED);
> >> +                          __ATOMIC_ACQ_REL);
> > Using the __ATOMIC_ACQ_REL order does not mean anything to the main
> thread. The main thread might still see the updates from different threads in
> different order.
> >
> >>            count += num;
> >>            num = rte_distributor_get_pkt(db, id,
> >>                            buf, buf, num);
> >>    }
> >>    __atomic_fetch_add(&worker_stats[id].handled_packets, num,
> >> -                  __ATOMIC_RELAXED);
> >> +                  __ATOMIC_ACQ_REL);
> > Same here, do not see why this change is required.
> >
> >>    count += num;
> >>    rte_distributor_return_pkt(db, id, buf, num);
> >>    return 0;
> >> @@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct
> >> rte_mempool *p)
> >>
> >>    for (i = 0; i < rte_lcore_count() - 1; i++)
> >>            printf("Worker %u handled %u packets\n", i,
> >> -                          worker_stats[i].handled_packets);
> >> +                  __atomic_load_n(&worker_stats[i].handled_packets,
> >> +                                  __ATOMIC_ACQUIRE));
> > __ATOMIC_RELAXED is enough.
> >
> >>    printf("Sanity test with all zero hashes done.\n");
> >>
> >>    /* pick two flows and check they go correctly */ @@ -159,7 +162,9
> >> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
> >>
> >>            for (i = 0; i < rte_lcore_count() - 1; i++)
> >>                    printf("Worker %u handled %u packets\n", i,
> >> -                                  worker_stats[i].handled_packets);
> >> +                          __atomic_load_n(
> >> +                                  &worker_stats[i].handled_packets,
> >> +                                  __ATOMIC_ACQUIRE));
> > __ATOMIC_RELAXED is enough
> >
> >>            printf("Sanity test with two hash values done\n");
> >>    }
> >>
> >> @@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct
> >> rte_mempool *p)
> >>
> >>    for (i = 0; i < rte_lcore_count() - 1; i++)
> >>            printf("Worker %u handled %u packets\n", i,
> >> -                          worker_stats[i].handled_packets);
> >> +                  __atomic_load_n(&worker_stats[i].handled_packets,
> >> +                                  __ATOMIC_ACQUIRE));
> > __ATOMIC_RELAXED is enough
> >
> >>    printf("Sanity test with non-zero hashes done\n");
> >>
> >>    rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -280,15
> >> +286,17 @@ handle_work_with_free_mbufs(void *arg)
> >>            buf[i] = NULL;
> >>    num = rte_distributor_get_pkt(d, id, buf, buf, num);
> >>    while (!quit) {
> >> -          worker_stats[id].handled_packets += num;
> >>            count += num;
> >> +          __atomic_fetch_add(&worker_stats[id].handled_packets,
> >> num,
> >> +                          __ATOMIC_ACQ_REL);
> > IMO, the problem would be the non-atomic update of the statistics. So,
> > __ATOMIC_RELAXED is enough
> >
> >>            for (i = 0; i < num; i++)
> >>                    rte_pktmbuf_free(buf[i]);
> >>            num = rte_distributor_get_pkt(d,
> >>                            id, buf, buf, num);
> >>    }
> >> -  worker_stats[id].handled_packets += num;
> >>    count += num;
> >> +  __atomic_fetch_add(&worker_stats[id].handled_packets, num,
> >> +                  __ATOMIC_ACQ_REL);
> > Same here, the problem is non-atomic update of the statistics,
> __ATOMIC_RELAXED is enough.
> > Similarly, for changes below, __ATOMIC_RELAXED is enough.
> >
> >>    rte_distributor_return_pkt(d, id, buf, num);
> >>    return 0;
> >>   }
> >> @@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg)
> >>    /* wait for quit single globally, or for worker zero, wait
> >>     * for zero_quit */
> >>    while (!quit && !(id == zero_id && zero_quit)) {
> >> -          worker_stats[id].handled_packets += num;
> >>            count += num;
> >> +          __atomic_fetch_add(&worker_stats[id].handled_packets,
> >> num,
> >> +                          __ATOMIC_ACQ_REL);
> >>            for (i = 0; i < num; i++)
> >>                    rte_pktmbuf_free(buf[i]);
> >>            num = rte_distributor_get_pkt(d,
> >> @@ -379,10 +388,11 @@ handle_work_for_shutdown_test(void *arg)
> >>
> >>            total += num;
> >>    }
> >> -  worker_stats[id].handled_packets += num;
> >>    count += num;
> >>    returned = rte_distributor_return_pkt(d, id, buf, num);
> >>
> >> +  __atomic_fetch_add(&worker_stats[id].handled_packets, num,
> >> +                  __ATOMIC_ACQ_REL);
> >>    if (id == zero_id) {
> >>            /* for worker zero, allow it to restart to pick up last packet
> >>             * when all workers are shutting down.
> >> @@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg)
> >>                            id, buf, buf, num);
> >>
> >>            while (!quit) {
> >> -                  worker_stats[id].handled_packets += num;
> >>                    count += num;
> >>                    rte_pktmbuf_free(pkt);
> >>                    num = rte_distributor_get_pkt(d, id, buf, buf, num);
> >> +
> >>    __atomic_fetch_add(&worker_stats[id].handled_packets,
> >> +                                  num, __ATOMIC_ACQ_REL);
> >>            }
> >>            returned = rte_distributor_return_pkt(d,
> >>                            id, buf, num);
> >> @@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct
> >> worker_params *wp,
> >>
> >>    for (i = 0; i < rte_lcore_count() - 1; i++)
> >>            printf("Worker %u handled %u packets\n", i,
> >> -                          worker_stats[i].handled_packets);
> >> +                  __atomic_load_n(&worker_stats[i].handled_packets,
> >> +                                  __ATOMIC_ACQUIRE));
> >>
> >>    if (total_packet_count() != BURST * 2) {
> >>            printf("Line %d: Error, not all packets flushed. "
> >> @@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct
> >> worker_params *wp,
> >>    zero_quit = 0;
> >>    for (i = 0; i < rte_lcore_count() - 1; i++)
> >>            printf("Worker %u handled %u packets\n", i,
> >> -                          worker_stats[i].handled_packets);
> >> +                  __atomic_load_n(&worker_stats[i].handled_packets,
> >> +                                  __ATOMIC_ACQUIRE));
> >>
> >>    if (total_packet_count() != BURST) {
> >>            printf("Line %d: Error, not all packets flushed. "
> >> --
> >> 2.17.1
> 
> --
> Lukasz Wojciechowski
> Principal Software Engineer
> 
> Samsung R&D Institute Poland
> Samsung Electronics
> Office +48 22 377 88 25
> l.wojciec...@partner.samsung.com

Reply via email to