This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 085b1806d IMPALA-11568: Fix SamplingTimeSeriesCounter not correctly
cleared
085b1806d is described below
commit 085b1806da6a1941200288a2f9a243e389e10820
Author: stiga-huang <[email protected]>
AuthorDate: Tue Feb 6 10:57:18 2024 +0800
IMPALA-11568: Fix SamplingTimeSeriesCounter not correctly cleared
TimeSeriesCounters are scheduled in PeriodicCounterUpdater at the time
when they are added. TimeSeriesCounterToJsonTest stops and clears them
to avoid the update thread interfere with the rest of the test. However,
the samples in SamplingTimeSeriesCounter are not cleared since it
doesn't implement the Clear() method of TimeSeriesCounter. This leads to
non-determined results depend on when the counter is stopped.
This patch adds a Reset() method for SamplingTimeSeriesCounter to clear
the samples. It's only used in the test. A Reset() method in
StreamingSampler is also added for this purpose.
Test:
- Ran TimeSeriesCounterToJsonTest 100 times. It used to fail in 10 runs.
Change-Id: I8bc2c94ad96899522b288708fcaf7413d4b99fa1
Reviewed-on: http://gerrit.cloudera.org:8080/21001
Reviewed-by: Riza Suminto <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Wenzhe Zhou <[email protected]>
---
be/src/util/runtime-profile-counters.h | 4 ++++
be/src/util/runtime-profile-test.cc | 13 +++++++++----
be/src/util/runtime-profile.cc | 4 ++++
be/src/util/streaming-sampler.h | 13 +++++++++++++
4 files changed, 30 insertions(+), 4 deletions(-)
diff --git a/be/src/util/runtime-profile-counters.h
b/be/src/util/runtime-profile-counters.h
index ad4261026..796186c41 100644
--- a/be/src/util/runtime-profile-counters.h
+++ b/be/src/util/runtime-profile-counters.h
@@ -816,6 +816,7 @@ class RuntimeProfile::SamplingTimeSeriesCounter
: public RuntimeProfile::TimeSeriesCounter {
private:
friend class RuntimeProfile;
+ friend class ToJson_TimeSeriesCounterToJsonTest_Test;
SamplingTimeSeriesCounter(
const std::string& name, TUnit::type unit, SampleFunction fn, int
initial_period)
@@ -824,6 +825,9 @@ class RuntimeProfile::SamplingTimeSeriesCounter
virtual void AddSampleLocked(int64_t sample, int ms_elapsed) override;
virtual const int64_t* GetSamplesLocked( int* num_samples, int* period)
const override;
+ /// Reset the underlying StreamingCounterSampler to the initial state. Used
in tests.
+ void Reset();
+
StreamingCounterSampler samples_;
};
diff --git a/be/src/util/runtime-profile-test.cc
b/be/src/util/runtime-profile-test.cc
index 3a446e897..b2ee1beb2 100644
--- a/be/src/util/runtime-profile-test.cc
+++ b/be/src/util/runtime-profile-test.cc
@@ -1839,11 +1839,14 @@ TEST(ToJson, TimeSeriesCounterToJsonTest) {
FLAGS_status_report_interval_ms = 50000;
RuntimeProfile::TimeSeriesCounter* counter =
profile->AddChunkedTimeSeriesCounter("TimeSeriesCounter", TUnit::UNIT,
sample_fn);
- RuntimeProfile::TimeSeriesCounter* counter2 =
- profile->AddSamplingTimeSeriesCounter("SamplingCounter", TUnit::UNIT,
sample_fn);
+ auto counter2 = static_cast<RuntimeProfile::SamplingTimeSeriesCounter*>(
+ profile->AddSamplingTimeSeriesCounter("SamplingCounter", TUnit::UNIT,
sample_fn));
// Stop counter updates from interfering with the rest of the test.
StopAndClearCounter(profile, counter);
+ // ChunkedTimeSeriesCounters are stopped and cleared above.
+ // But SamplingTimeSeriesCounter needs explicitly Reset() to back to the
initial state.
+ counter2->Reset();
// Reset value after previous values have been retrieved.
value = 0;
@@ -1853,15 +1856,17 @@ TEST(ToJson, TimeSeriesCounterToJsonTest) {
for (int i = 0; i < 80; ++i) counter2->AddSample(test_period);
profile->ToJson(&doc);
+ EXPECT_EQ(doc["contents"]["time_series_counters"][1]["counter_name"],
+ "TimeSeriesCounter");
EXPECT_STR_CONTAINS(
doc["contents"]["time_series_counters"][1]["data"].GetString(),
"0,1,2,3,4");
-
EXPECT_STR_CONTAINS(
doc["contents"]["time_series_counters"][1]["data"].GetString(),
"60,61,62,63");
+ EXPECT_EQ(doc["contents"]["time_series_counters"][0]["counter_name"],
+ "SamplingCounter");
EXPECT_STR_CONTAINS(
doc["contents"]["time_series_counters"][0]["data"].GetString(),
"0,2,4,6");
-
EXPECT_STR_CONTAINS(
doc["contents"]["time_series_counters"][0]["data"].GetString(),
"72,74,76,78");
}
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index 0b5ec53ca..96f57fd65 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -2071,6 +2071,10 @@ const int64_t*
RuntimeProfile::SamplingTimeSeriesCounter::GetSamplesLocked(
return samples_.GetSamples(num_samples, period);
}
+void RuntimeProfile::SamplingTimeSeriesCounter::Reset() {
+ samples_.Reset();
+}
+
RuntimeProfile::ChunkedTimeSeriesCounter::ChunkedTimeSeriesCounter(
const string& name, TUnit::type unit, SampleFunction fn)
: TimeSeriesCounter(name, unit, fn)
diff --git a/be/src/util/streaming-sampler.h b/be/src/util/streaming-sampler.h
index cbfd2f204..19fef6f0b 100644
--- a/be/src/util/streaming-sampler.h
+++ b/be/src/util/streaming-sampler.h
@@ -40,11 +40,22 @@ class StreamingSampler {
StreamingSampler(int initial_period)
: samples_collected_(0) ,
period_(initial_period),
+ initial_period_(initial_period),
current_sample_sum_(0),
current_sample_count_(0),
current_sample_total_time_(0) {
}
+ /// Resets everything back to the state when this instance was just created.
+ void Reset() {
+ samples_collected_ = 0;
+ current_sample_sum_ = 0;
+ current_sample_count_ = 0;
+ current_sample_total_time_ = 0;
+ // Reset 'period_' to the value got from the constructor.
+ period_ = initial_period_;
+ }
+
/// Add a sample to the sampler. 'ms' is the time elapsed since the last
time this
/// was called.
/// The input value is accumulated into current_*. If the total time elapsed
@@ -93,6 +104,8 @@ class StreamingSampler {
/// Storage period in ms.
int period_;
+ /// Keeps the initial period got from the constructor. Used to reset
'period_'.
+ int initial_period_;
/// The sum of input samples that makes up the next stored sample.
T current_sample_sum_;