W dniu 23.09.2020 o 06:30, Honnappa Nagarahalli pisze:
> <snip>
>
>> Statistics of handled packets are cleared and read on main lcore, while they
>> are increased in workers handlers on different lcores.
>>
>> Without synchronization occasionally showed invalid values.
> What exactly do you mean by invalid values? Can you elaborate?

I mean values that shouldn't be there which are obviously not related to 
number of packets handled by workers.

I reverted the patch and run stress tests. Failures without this patch 
look like these:

=== Test flush fn with worker shutdown (burst) ===
Worker 0 handled 0 packets
Worker 1 handled 0 packets
Worker 2 handled 0 packets
Worker 3 handled 0 packets
Worker 4 handled 32 packets
Worker 5 handled 0 packets
Worker 6 handled 6 packets
Line 519: Error, not all packets flushed. Expected 32, got 38
Test Failed

or:

=== Sanity test of worker shutdown ===
Worker 0 handled 0 packets
Worker 1 handled 0 packets
Worker 2 handled 0 packets
Worker 3 handled 0 packets
Worker 4 handled 0 packets
Worker 5 handled 64 packets
Worker 6 handled 149792 packets
Line 466: Error, not all packets flushed. Expected 64, got 149856
Test Failed

The 6 or 149792 packets reported by worker 6 were never sent to or 
processed by the workers.

>> This patch uses atomic acquire/release mechanisms to synchronize.
>>
>> Fixes: c3eabff124e6 ("distributor: add unit tests")
>> Cc: bruce.richard...@intel.com
>> Cc: sta...@dpdk.org
>>
>> Signed-off-by: Lukasz Wojciechowski <l.wojciec...@partner.samsung.com>
>> ---
>>   app/test/test_distributor.c | 39 ++++++++++++++++++++++++-------------
>>   1 file changed, 26 insertions(+), 13 deletions(-)
>>
>> diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c index
>> 35b25463a..0e49e3714 100644
>> --- a/app/test/test_distributor.c
>> +++ b/app/test/test_distributor.c
>> @@ -43,7 +43,8 @@ total_packet_count(void)  {
>>      unsigned i, count = 0;
>>      for (i = 0; i < worker_idx; i++)
>> -            count += worker_stats[i].handled_packets;
>> +            count +=
>> __atomic_load_n(&worker_stats[i].handled_packets,
>> +                            __ATOMIC_ACQUIRE);
>>      return count;
>>   }
>>
>> @@ -52,6 +53,7 @@ static inline void
>>   clear_packet_count(void)
>>   {
>>      memset(&worker_stats, 0, sizeof(worker_stats));
>> +    rte_atomic_thread_fence(__ATOMIC_RELEASE);
>>   }
>>
>>   /* this is the basic worker function for sanity test @@ -72,13 +74,13 @@
>> handle_work(void *arg)
>>      num = rte_distributor_get_pkt(db, id, buf, buf, num);
>>      while (!quit) {
>>              __atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> -                            __ATOMIC_RELAXED);
>> +                            __ATOMIC_ACQ_REL);
>>              count += num;
>>              num = rte_distributor_get_pkt(db, id,
>>                              buf, buf, num);
>>      }
>>      __atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> -                    __ATOMIC_RELAXED);
>> +                    __ATOMIC_ACQ_REL);
>>      count += num;
>>      rte_distributor_return_pkt(db, id, buf, num);
>>      return 0;
>> @@ -134,7 +136,8 @@ sanity_test(struct worker_params *wp, struct
>> rte_mempool *p)
>>
>>      for (i = 0; i < rte_lcore_count() - 1; i++)
>>              printf("Worker %u handled %u packets\n", i,
>> -                            worker_stats[i].handled_packets);
>> +                    __atomic_load_n(&worker_stats[i].handled_packets,
>> +                                    __ATOMIC_ACQUIRE));
>>      printf("Sanity test with all zero hashes done.\n");
>>
>>      /* pick two flows and check they go correctly */ @@ -159,7 +162,9
>> @@ sanity_test(struct worker_params *wp, struct rte_mempool *p)
>>
>>              for (i = 0; i < rte_lcore_count() - 1; i++)
>>                      printf("Worker %u handled %u packets\n", i,
>> -                                    worker_stats[i].handled_packets);
>> +                            __atomic_load_n(
>> +                                    &worker_stats[i].handled_packets,
>> +                                    __ATOMIC_ACQUIRE));
>>              printf("Sanity test with two hash values done\n");
>>      }
>>
>> @@ -185,7 +190,8 @@ sanity_test(struct worker_params *wp, struct
>> rte_mempool *p)
>>
>>      for (i = 0; i < rte_lcore_count() - 1; i++)
>>              printf("Worker %u handled %u packets\n", i,
>> -                            worker_stats[i].handled_packets);
>> +                    __atomic_load_n(&worker_stats[i].handled_packets,
>> +                                    __ATOMIC_ACQUIRE));
>>      printf("Sanity test with non-zero hashes done\n");
>>
>>      rte_mempool_put_bulk(p, (void *)bufs, BURST); @@ -280,15
>> +286,17 @@ handle_work_with_free_mbufs(void *arg)
>>              buf[i] = NULL;
>>      num = rte_distributor_get_pkt(d, id, buf, buf, num);
>>      while (!quit) {
>> -            worker_stats[id].handled_packets += num;
>>              count += num;
>> +            __atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> +                            __ATOMIC_ACQ_REL);
>>              for (i = 0; i < num; i++)
>>                      rte_pktmbuf_free(buf[i]);
>>              num = rte_distributor_get_pkt(d,
>>                              id, buf, buf, num);
>>      }
>> -    worker_stats[id].handled_packets += num;
>>      count += num;
>> +    __atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> +                    __ATOMIC_ACQ_REL);
>>      rte_distributor_return_pkt(d, id, buf, num);
>>      return 0;
>>   }
>> @@ -363,8 +371,9 @@ handle_work_for_shutdown_test(void *arg)
>>      /* wait for quit single globally, or for worker zero, wait
>>       * for zero_quit */
>>      while (!quit && !(id == zero_id && zero_quit)) {
>> -            worker_stats[id].handled_packets += num;
>>              count += num;
>> +            __atomic_fetch_add(&worker_stats[id].handled_packets,
>> num,
>> +                            __ATOMIC_ACQ_REL);
>>              for (i = 0; i < num; i++)
>>                      rte_pktmbuf_free(buf[i]);
>>              num = rte_distributor_get_pkt(d,
>> @@ -379,10 +388,11 @@ handle_work_for_shutdown_test(void *arg)
>>
>>              total += num;
>>      }
>> -    worker_stats[id].handled_packets += num;
>>      count += num;
>>      returned = rte_distributor_return_pkt(d, id, buf, num);
>>
>> +    __atomic_fetch_add(&worker_stats[id].handled_packets, num,
>> +                    __ATOMIC_ACQ_REL);
>>      if (id == zero_id) {
>>              /* for worker zero, allow it to restart to pick up last packet
>>               * when all workers are shutting down.
>> @@ -394,10 +404,11 @@ handle_work_for_shutdown_test(void *arg)
>>                              id, buf, buf, num);
>>
>>              while (!quit) {
>> -                    worker_stats[id].handled_packets += num;
>>                      count += num;
>>                      rte_pktmbuf_free(pkt);
>>                      num = rte_distributor_get_pkt(d, id, buf, buf, num);
>> +
>>      __atomic_fetch_add(&worker_stats[id].handled_packets,
>> +                                    num, __ATOMIC_ACQ_REL);
>>              }
>>              returned = rte_distributor_return_pkt(d,
>>                              id, buf, num);
>> @@ -461,7 +472,8 @@ sanity_test_with_worker_shutdown(struct
>> worker_params *wp,
>>
>>      for (i = 0; i < rte_lcore_count() - 1; i++)
>>              printf("Worker %u handled %u packets\n", i,
>> -                            worker_stats[i].handled_packets);
>> +                    __atomic_load_n(&worker_stats[i].handled_packets,
>> +                                    __ATOMIC_ACQUIRE));
>>
>>      if (total_packet_count() != BURST * 2) {
>>              printf("Line %d: Error, not all packets flushed. "
>> @@ -514,7 +526,8 @@ test_flush_with_worker_shutdown(struct
>> worker_params *wp,
>>      zero_quit = 0;
>>      for (i = 0; i < rte_lcore_count() - 1; i++)
>>              printf("Worker %u handled %u packets\n", i,
>> -                            worker_stats[i].handled_packets);
>> +                    __atomic_load_n(&worker_stats[i].handled_packets,
>> +                                    __ATOMIC_ACQUIRE));
>>
>>      if (total_packet_count() != BURST) {
>>              printf("Line %d: Error, not all packets flushed. "
>> --
>> 2.17.1

-- 
Lukasz Wojciechowski
Principal Software Engineer

Samsung R&D Institute Poland
Samsung Electronics
Office +48 22 377 88 25
l.wojciec...@partner.samsung.com

Reply via email to