This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 9fd11c4 ARROW-9344: [C++][Flight] Measure latency quantiles
9fd11c4 is described below
commit 9fd11c4e64d05ccb3a11ae891af7e57c815b9379
Author: Yibo Cai <[email protected]>
AuthorDate: Fri Jul 31 11:44:21 2020 -0400
ARROW-9344: [C++][Flight] Measure latency quantiles
Measure average, median, 95%, 99% quantile and maximal latency.
Closes #7863 from cyb70289/flight
Authored-by: Yibo Cai <[email protected]>
Signed-off-by: David Li <[email protected]>
---
cpp/src/arrow/flight/flight_benchmark.cc | 65 ++++++++++++++++++++++++--------
1 file changed, 50 insertions(+), 15 deletions(-)
diff --git a/cpp/src/arrow/flight/flight_benchmark.cc
b/cpp/src/arrow/flight/flight_benchmark.cc
index dc8ab04..6180e05 100644
--- a/cpp/src/arrow/flight/flight_benchmark.cc
+++ b/cpp/src/arrow/flight/flight_benchmark.cc
@@ -21,6 +21,12 @@
#include <string>
#include <vector>
+#include <boost/accumulators/accumulators.hpp>
+#include <boost/accumulators/statistics/extended_p_square_quantile.hpp>
+#include <boost/accumulators/statistics/max.hpp>
+#include <boost/accumulators/statistics/mean.hpp>
+#include <boost/accumulators/statistics/stats.hpp>
+
#include <gflags/gflags.h>
#include "arrow/api.h"
@@ -47,6 +53,7 @@ DEFINE_int32(records_per_batch, 4096, "Total records per
batch within stream");
DEFINE_bool(test_put, false, "Test DoPut instead of DoGet");
namespace perf = arrow::flight::perf;
+namespace acc = boost::accumulators;
namespace arrow {
@@ -62,20 +69,40 @@ struct PerformanceResult {
};
struct PerformanceStats {
- PerformanceStats() {}
+ using accumulator_type = acc::accumulator_set<
+ double, acc::stats<acc::tag::extended_p_square_quantile(acc::quadratic),
+ acc::tag::mean, acc::tag::max>>;
+
+ PerformanceStats() : latencies(acc::extended_p_square_probabilities =
quantiles) {}
std::mutex mutex;
int64_t total_batches = 0;
int64_t total_records = 0;
int64_t total_bytes = 0;
- uint64_t total_nanos = 0;
+ const std::array<double, 3> quantiles = {0.5, 0.95, 0.99};
+ accumulator_type latencies;
- void Update(int64_t total_batches, int64_t total_records, int64_t
total_bytes,
- uint64_t total_nanos) {
+ void Update(int64_t total_batches, int64_t total_records, int64_t
total_bytes) {
std::lock_guard<std::mutex> lock(this->mutex);
this->total_batches += total_batches;
this->total_records += total_records;
this->total_bytes += total_bytes;
- this->total_nanos += total_nanos;
+ }
+
+ // Invoked per batch in the test loop. Holding a lock looks not scalable.
+ // Tested with 1 ~ 8 threads, no noticeable overhead is observed.
+ // A better approach may be calculate per-thread quantiles and merge.
+ void AddLatency(uint64_t elapsed_nanos) {
+ std::lock_guard<std::mutex> lock(this->mutex);
+ latencies(elapsed_nanos);
+ }
+
+ // ns -> us
+ uint64_t max_latency() const { return acc::max(latencies) / 1000; }
+
+ uint64_t mean_latency() const { return acc::mean(latencies) / 1000; }
+
+ uint64_t quantile_latency(double q) const {
+ return acc::quantile(latencies, acc::quantile_probability = q) / 1000;
}
};
@@ -93,7 +120,8 @@ Status WaitForReady(FlightClient* client) {
arrow::Result<PerformanceResult> RunDoGetTest(FlightClient* client,
const perf::Token& token,
- const FlightEndpoint& endpoint) {
+ const FlightEndpoint& endpoint,
+ PerformanceStats& stats) {
std::unique_ptr<FlightStreamReader> reader;
RETURN_NOT_OK(client->DoGet(endpoint.ticket, &reader));
@@ -108,8 +136,11 @@ arrow::Result<PerformanceResult>
RunDoGetTest(FlightClient* client,
int64_t num_bytes = 0;
int64_t num_records = 0;
int64_t num_batches = 0;
+ StopWatch timer;
while (true) {
+ timer.Start();
RETURN_NOT_OK(reader->Next(&batch));
+ stats.AddLatency(timer.Stop());
if (!batch.data) {
break;
}
@@ -135,7 +166,8 @@ arrow::Result<PerformanceResult> RunDoGetTest(FlightClient*
client,
arrow::Result<PerformanceResult> RunDoPutTest(FlightClient* client,
const perf::Token& token,
- const FlightEndpoint& endpoint) {
+ const FlightEndpoint& endpoint,
+ PerformanceStats& stats) {
std::unique_ptr<FlightStreamWriter> writer;
std::unique_ptr<FlightMetadataReader> reader;
std::shared_ptr<Schema> schema =
@@ -166,6 +198,7 @@ arrow::Result<PerformanceResult> RunDoPutTest(FlightClient*
client,
int64_t records_sent = 0;
const int64_t total_records = token.definition().records_per_stream();
+ StopWatch timer;
while (records_sent < total_records) {
if (records_sent + length > total_records) {
const int last_length = total_records - records_sent;
@@ -175,7 +208,9 @@ arrow::Result<PerformanceResult> RunDoPutTest(FlightClient*
client,
num_bytes += last_length * bytes_per_record;
records_sent += last_length;
} else {
+ timer.Start();
RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
+ stats.AddLatency(timer.Stop());
num_records += length;
// Hard-coded
num_bytes += length * bytes_per_record;
@@ -221,13 +256,10 @@ Status RunPerformanceTest(FlightClient* client, bool
test_put) {
perf::Token token;
token.ParseFromString(endpoint.ticket.ticket);
- StopWatch timer;
- timer.Start();
- const auto& result = test_loop(client.get(), token, endpoint);
- uint64_t elapsed_nanos = timer.Stop();
+ const auto& result = test_loop(client.get(), token, endpoint, stats);
if (result.ok()) {
const PerformanceResult& perf = result.ValueOrDie();
- stats.Update(perf.num_batches, perf.num_records, perf.num_bytes,
elapsed_nanos);
+ stats.Update(perf.num_batches, perf.num_records, perf.num_bytes);
}
return result.status();
};
@@ -281,9 +313,12 @@ Status RunPerformanceTest(FlightClient* client, bool
test_put) {
// Calculate throughput(IOPS) and latency vs batch size
std::cout << "Throughput: " << (static_cast<double>(stats.total_batches) /
time_elapsed)
<< " batches/s" << std::endl;
- std::cout << "Latency: "
- << (static_cast<double>(stats.total_nanos) / 1000 /
stats.total_batches)
- << " usec/batch" << std::endl;
+ std::cout << "Latency mean: " << stats.mean_latency() << " us" << std::endl;
+ for (auto q : stats.quantiles) {
+ std::cout << "Latency quantile=" << q << ": " << stats.quantile_latency(q)
<< " us"
+ << std::endl;
+ }
+ std::cout << "Latency max: " << stats.max_latency() << " us" << std::endl;
return Status::OK();
}