This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fd11c4  ARROW-9344: [C++][Flight] Measure latency quantiles
9fd11c4 is described below

commit 9fd11c4e64d05ccb3a11ae891af7e57c815b9379
Author: Yibo Cai <[email protected]>
AuthorDate: Fri Jul 31 11:44:21 2020 -0400

    ARROW-9344: [C++][Flight] Measure latency quantiles
    
    Measure average, median, 95%, 99% quantile and maximal latency.
    
    Closes #7863 from cyb70289/flight
    
    Authored-by: Yibo Cai <[email protected]>
    Signed-off-by: David Li <[email protected]>
---
 cpp/src/arrow/flight/flight_benchmark.cc | 65 ++++++++++++++++++++++++--------
 1 file changed, 50 insertions(+), 15 deletions(-)

diff --git a/cpp/src/arrow/flight/flight_benchmark.cc 
b/cpp/src/arrow/flight/flight_benchmark.cc
index dc8ab04..6180e05 100644
--- a/cpp/src/arrow/flight/flight_benchmark.cc
+++ b/cpp/src/arrow/flight/flight_benchmark.cc
@@ -21,6 +21,12 @@
 #include <string>
 #include <vector>
 
+#include <boost/accumulators/accumulators.hpp>
+#include <boost/accumulators/statistics/extended_p_square_quantile.hpp>
+#include <boost/accumulators/statistics/max.hpp>
+#include <boost/accumulators/statistics/mean.hpp>
+#include <boost/accumulators/statistics/stats.hpp>
+
 #include <gflags/gflags.h>
 
 #include "arrow/api.h"
@@ -47,6 +53,7 @@ DEFINE_int32(records_per_batch, 4096, "Total records per 
batch within stream");
 DEFINE_bool(test_put, false, "Test DoPut instead of DoGet");
 
 namespace perf = arrow::flight::perf;
+namespace acc = boost::accumulators;
 
 namespace arrow {
 
@@ -62,20 +69,40 @@ struct PerformanceResult {
 };
 
 struct PerformanceStats {
-  PerformanceStats() {}
+  using accumulator_type = acc::accumulator_set<
+      double, acc::stats<acc::tag::extended_p_square_quantile(acc::quadratic),
+                         acc::tag::mean, acc::tag::max>>;
+
+  PerformanceStats() : latencies(acc::extended_p_square_probabilities = 
quantiles) {}
   std::mutex mutex;
   int64_t total_batches = 0;
   int64_t total_records = 0;
   int64_t total_bytes = 0;
-  uint64_t total_nanos = 0;
+  const std::array<double, 3> quantiles = {0.5, 0.95, 0.99};
+  accumulator_type latencies;
 
-  void Update(int64_t total_batches, int64_t total_records, int64_t 
total_bytes,
-              uint64_t total_nanos) {
+  void Update(int64_t total_batches, int64_t total_records, int64_t 
total_bytes) {
     std::lock_guard<std::mutex> lock(this->mutex);
     this->total_batches += total_batches;
     this->total_records += total_records;
     this->total_bytes += total_bytes;
-    this->total_nanos += total_nanos;
+  }
+
+  // Invoked per batch in the test loop. Holding a lock looks not scalable.
+  // Tested with 1 ~ 8 threads, no noticeable overhead is observed.
+  // A better approach may be calculate per-thread quantiles and merge.
+  void AddLatency(uint64_t elapsed_nanos) {
+    std::lock_guard<std::mutex> lock(this->mutex);
+    latencies(elapsed_nanos);
+  }
+
+  // ns -> us
+  uint64_t max_latency() const { return acc::max(latencies) / 1000; }
+
+  uint64_t mean_latency() const { return acc::mean(latencies) / 1000; }
+
+  uint64_t quantile_latency(double q) const {
+    return acc::quantile(latencies, acc::quantile_probability = q) / 1000;
   }
 };
 
@@ -93,7 +120,8 @@ Status WaitForReady(FlightClient* client) {
 
 arrow::Result<PerformanceResult> RunDoGetTest(FlightClient* client,
                                               const perf::Token& token,
-                                              const FlightEndpoint& endpoint) {
+                                              const FlightEndpoint& endpoint,
+                                              PerformanceStats& stats) {
   std::unique_ptr<FlightStreamReader> reader;
   RETURN_NOT_OK(client->DoGet(endpoint.ticket, &reader));
 
@@ -108,8 +136,11 @@ arrow::Result<PerformanceResult> 
RunDoGetTest(FlightClient* client,
   int64_t num_bytes = 0;
   int64_t num_records = 0;
   int64_t num_batches = 0;
+  StopWatch timer;
   while (true) {
+    timer.Start();
     RETURN_NOT_OK(reader->Next(&batch));
+    stats.AddLatency(timer.Stop());
     if (!batch.data) {
       break;
     }
@@ -135,7 +166,8 @@ arrow::Result<PerformanceResult> RunDoGetTest(FlightClient* 
client,
 
 arrow::Result<PerformanceResult> RunDoPutTest(FlightClient* client,
                                               const perf::Token& token,
-                                              const FlightEndpoint& endpoint) {
+                                              const FlightEndpoint& endpoint,
+                                              PerformanceStats& stats) {
   std::unique_ptr<FlightStreamWriter> writer;
   std::unique_ptr<FlightMetadataReader> reader;
   std::shared_ptr<Schema> schema =
@@ -166,6 +198,7 @@ arrow::Result<PerformanceResult> RunDoPutTest(FlightClient* 
client,
 
   int64_t records_sent = 0;
   const int64_t total_records = token.definition().records_per_stream();
+  StopWatch timer;
   while (records_sent < total_records) {
     if (records_sent + length > total_records) {
       const int last_length = total_records - records_sent;
@@ -175,7 +208,9 @@ arrow::Result<PerformanceResult> RunDoPutTest(FlightClient* 
client,
       num_bytes += last_length * bytes_per_record;
       records_sent += last_length;
     } else {
+      timer.Start();
       RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
+      stats.AddLatency(timer.Stop());
       num_records += length;
       // Hard-coded
       num_bytes += length * bytes_per_record;
@@ -221,13 +256,10 @@ Status RunPerformanceTest(FlightClient* client, bool 
test_put) {
     perf::Token token;
     token.ParseFromString(endpoint.ticket.ticket);
 
-    StopWatch timer;
-    timer.Start();
-    const auto& result = test_loop(client.get(), token, endpoint);
-    uint64_t elapsed_nanos = timer.Stop();
+    const auto& result = test_loop(client.get(), token, endpoint, stats);
     if (result.ok()) {
       const PerformanceResult& perf = result.ValueOrDie();
-      stats.Update(perf.num_batches, perf.num_records, perf.num_bytes, 
elapsed_nanos);
+      stats.Update(perf.num_batches, perf.num_records, perf.num_bytes);
     }
     return result.status();
   };
@@ -281,9 +313,12 @@ Status RunPerformanceTest(FlightClient* client, bool 
test_put) {
   // Calculate throughput(IOPS) and latency vs batch size
   std::cout << "Throughput: " << (static_cast<double>(stats.total_batches) / 
time_elapsed)
             << " batches/s" << std::endl;
-  std::cout << "Latency: "
-            << (static_cast<double>(stats.total_nanos) / 1000 / 
stats.total_batches)
-            << " usec/batch" << std::endl;
+  std::cout << "Latency mean: " << stats.mean_latency() << " us" << std::endl;
+  for (auto q : stats.quantiles) {
+    std::cout << "Latency quantile=" << q << ": " << stats.quantile_latency(q) 
<< " us"
+              << std::endl;
+  }
+  std::cout << "Latency max: " << stats.max_latency() << " us" << std::endl;
 
   return Status::OK();
 }

Reply via email to