This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 97cd30c60 IMPALA-12426: Workload Management Supporting Changes
97cd30c60 is described below

commit 97cd30c607a274d25f5636a21461fc7441d5e63e
Author: jasonmfehr <[email protected]>
AuthorDate: Wed Feb 21 13:41:08 2024 -0800

    IMPALA-12426: Workload Management Supporting Changes
    
    Contains several disparate pieces of functionality that support the
    overall workload management work.
    
    1. TNetworkAddressComparator
      An unused existing comparator for the Thrift class TNetworkAddress
      already existed in the thrift-util.h file. This comparator has been
      moved to the network-util.h file where it now resides in the same
      place as other utility functions that operator on TNetworkAddress
      instances.
    
      The existing comparator did not consider the uds address. It only
      considered hostname and port. The new comparator considers all three.
    
      Testing is accomplished by porting the existing ctests and adding
      additional ctests.
    
    2. StringStreamPop
      This new class extends a std::basic_stringstream<char> to add a
      function that enables removing a character from the end.
    
      Testing is accomplished using new ctests.
    
    3. Ticker
      This new header-only class notifies a condition variable at periodic
      intervals. It is a lightweight that sleeps until the configured
      duration has passed at which point it wakes up and notifies the
      condition variable. It also enables consumers to offload spurious
      wakeup guards to this class.
    
      Ctests have been added to test the functionality of this new class.
    
    4. TUniqueId Empty Utility Function
      A new function UUIDEmpty returns true if a provided TUniqueID does
      not contain a UUID or false otherwise.
    
      Ctests have been added to test this new function.
    
    5. run_clang_tidy.sh
      Additional informational outputs have been added to this script to
      enable tracking the status of the script and to easily identify
      errors found by clang tidy.
    
    6. Summary Util Test
      A ctest was developed for testing the text table generation code that
      generates the exec summary portion of the query profile. This ctest
      was developed as part of an idea that did not ultimately pan out.
      Rather than throwing away that test code, it has been added as a new
      ctest.
    
    Change-Id: Iee23334ec56a18b192a75d052468bf59159b6424
    Reviewed-on: http://gerrit.cloudera.org:8080/21048
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/rpc/thrift-util-test.cc   |  14 ----
 be/src/rpc/thrift-util.cc        |   7 --
 be/src/rpc/thrift-util.h         |   4 --
 be/src/util/CMakeLists.txt       |   8 ++-
 be/src/util/network-util-test.cc | 100 +++++++++++++++++++++++++++
 be/src/util/network-util.cc      |  21 ++++++
 be/src/util/network-util.h       |   6 ++
 be/src/util/string-util-test.cc  |  73 ++++++++++++++++++++
 be/src/util/string-util.cc       |   7 ++
 be/src/util/string-util.h        |  15 +++-
 be/src/util/summary-util-test.cc | 113 +++++++++++++++++++++++++++++++
 be/src/util/ticker-test.cc       | 133 ++++++++++++++++++++++++++++++++++++
 be/src/util/ticker.h             | 143 +++++++++++++++++++++++++++++++++++++++
 be/src/util/uid-util-test.cc     |  23 +++++++
 be/src/util/uid-util.h           |   5 ++
 bin/run_clang_tidy.sh            |   3 +-
 16 files changed, 647 insertions(+), 28 deletions(-)

diff --git a/be/src/rpc/thrift-util-test.cc b/be/src/rpc/thrift-util-test.cc
index 4d1a580b6..427cb3d42 100644
--- a/be/src/rpc/thrift-util-test.cc
+++ b/be/src/rpc/thrift-util-test.cc
@@ -129,18 +129,4 @@ TEST(ThriftUtil, SerDeBuffer100MB) {
   }
 }
 
-TEST(ThriftUtil, TNetworkAddressComparator) {
-  EXPECT_TRUE(TNetworkAddressComparator(MakeNetworkAddress("aaaa", 500),
-                                        MakeNetworkAddress("zzzz", 500)));
-  EXPECT_FALSE(TNetworkAddressComparator(MakeNetworkAddress("zzzz", 500),
-                                         MakeNetworkAddress("aaaa", 500)));
-  EXPECT_TRUE(TNetworkAddressComparator(MakeNetworkAddress("aaaa", 500),
-                                        MakeNetworkAddress("aaaa", 501)));
-  EXPECT_FALSE(TNetworkAddressComparator(MakeNetworkAddress("aaaa", 501),
-                                         MakeNetworkAddress("aaaa", 500)));
-
-  EXPECT_FALSE(TNetworkAddressComparator(MakeNetworkAddress("aaaa", 500),
-                                         MakeNetworkAddress("aaaa", 500)));
-}
-
 }
diff --git a/be/src/rpc/thrift-util.cc b/be/src/rpc/thrift-util.cc
index ce88b9b9b..973544cca 100644
--- a/be/src/rpc/thrift-util.cc
+++ b/be/src/rpc/thrift-util.cc
@@ -253,13 +253,6 @@ void PrintTColumnValue(std::ostream& out, const 
TColumnValue& colval) {
   }
 }
 
-bool TNetworkAddressComparator(const TNetworkAddress& a, const 
TNetworkAddress& b) {
-  int cmp = a.hostname.compare(b.hostname);
-  if (cmp < 0) return true;
-  if (cmp == 0) return a.port < b.port;
-  return false;
-}
-
 bool IsReadTimeoutTException(const TTransportException& e) {
   // String taken from TSocket::read() Thrift's TSocket.cpp and TSSLSocket.cpp.
   // Specifically, "THRIFT_EAGAIN (timed out)" from TSocket.cpp,
diff --git a/be/src/rpc/thrift-util.h b/be/src/rpc/thrift-util.h
index 37a782b39..0945c1278 100644
--- a/be/src/rpc/thrift-util.h
+++ b/be/src/rpc/thrift-util.h
@@ -175,10 +175,6 @@ Status WaitForServer(const std::string& host, int port, 
int num_retries,
 /// Print a TColumnValue. If null, print "NULL".
 void PrintTColumnValue(std::ostream& out, const TColumnValue& colval);
 
-/// Compares two TNetworkAddresses alphanumerically by their host:port
-/// string representation
-bool TNetworkAddressComparator(const TNetworkAddress& a, const 
TNetworkAddress& b);
-
 /// Returns true if the TTransportException corresponds to a TCP socket read 
timeout.
 bool IsReadTimeoutTException(const 
apache::thrift::transport::TTransportException& e);
 
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 3dbbbbdea..ab913f027 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -191,6 +191,7 @@ add_library(UtilTests STATIC
   lru-multi-cache-test.cc
   metrics-test.cc
   min-max-filter-test.cc
+  network-util-test.cc
   openssl-util-test.cc
   os-info-test.cc
   os-util-test.cc
@@ -208,11 +209,13 @@ add_library(UtilTests STATIC
   simple-logger-test.cc
   string-parser-test.cc
   string-util-test.cc
+  summary-util-test.cc
   symbols-util-test.cc
   sys-info-test.cc
   system-state-info-test.cc
   tagged-ptr-test.cc
   thread-pool-test.cc
+  ticker-test.cc
   time-test.cc
   tuple-row-compare-test.cc
   uid-util-test.cc
@@ -250,6 +253,7 @@ ADD_UNIFIED_BE_LSAN_TEST(metrics-test "MetricsTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(min-max-filter-test "MinMaxFilterTest.*")
 # minidump-test is flaky when the jvm pause monitor is running. So it can't be 
unified.
 ADD_BE_LSAN_TEST(minidump-test)
+ADD_UNIFIED_BE_LSAN_TEST(network-util-test "NetworkUtil.*")
 ADD_UNIFIED_BE_LSAN_TEST(openssl-util-test "OpenSSLUtilTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(os-info-test "OsInfo.*")
 ADD_UNIFIED_BE_LSAN_TEST(os-util-test "OsUtil.*")
@@ -267,11 +271,13 @@ ADD_UNIFIED_BE_LSAN_TEST(rle-test "BitArray.*:RleTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(runtime-profile-test 
"CountersTest.*:TimerCounterTest.*:TimeSeriesCounterTest.*:VariousNumbers/TimeSeriesCounterResampleTest.*:ToThrift.*:ToJson.*")
 ADD_UNIFIED_BE_LSAN_TEST(simple-logger-test "SimpleLoggerTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(string-parser-test 
"StringToInt.*:StringToIntWithBase.*:StringToFloat.*:StringToBool.*:StringToDate.*")
-ADD_UNIFIED_BE_LSAN_TEST(string-util-test 
"TruncateDownTest.*:TruncateUpTest.*:CommaSeparatedContainsTest.*:FindUtf8PosForwardTest.*:FindUtf8PosBackwardTest.*:RandomFindUtf8PosTest.*")
+ADD_UNIFIED_BE_LSAN_TEST(string-util-test 
"TruncateDownTest.*:TruncateUpTest.*:CommaSeparatedContainsTest.*:FindUtf8PosForwardTest.*:FindUtf8PosBackwardTest.*:RandomFindUtf8PosTest.*:StringStreamPopTest.*")
+ADD_UNIFIED_BE_LSAN_TEST(summary-util-test "PrintTableTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(symbols-util-test "SymbolsUtil.*")
 ADD_UNIFIED_BE_LSAN_TEST(system-state-info-test "SystemStateInfoTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(sys-info-test "CpuInfoTest.*:DiskInfoTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(thread-pool-test "ThreadPoolTest.*")
+ADD_UNIFIED_BE_LSAN_TEST(ticker-test "TickerTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(time-test "TimeTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(tuple-row-compare-test "TupleRowCompareTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(uid-util-test "UidUtil.*")
diff --git a/be/src/util/network-util-test.cc b/be/src/util/network-util-test.cc
new file mode 100644
index 000000000..c255466cc
--- /dev/null
+++ b/be/src/util/network-util-test.cc
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "gen-cpp/Types_types.h"
+
+#include "testutil/gtest-util.h"
+#include "util/network-util.h"
+
+namespace impala {
+
+// NetAddrComp Tests: These tests assert the TNetworkAddressComparator sorts 
two
+// TNetworkAddress objects correctly based on their host, port, and uds 
address fields.
+
+// Assert where host fields are different.
+TEST(NetworkUtil, NetAddrCompHostnameDiff) {
+  TNetworkAddressComparator fixture;
+  TNetworkAddress first;
+  TNetworkAddress second;
+
+  first.__set_hostname("aaaa");
+  first.__set_uds_address("uds");
+  first.__set_port(0);
+
+  second.__set_hostname("bbbb");
+  second.__set_uds_address("uds");
+  second.__set_port(0);
+
+  ASSERT_TRUE(fixture(first, second));
+  ASSERT_FALSE(fixture(second, first));
+}
+
+// Assert where host fields are equal but port is different.
+TEST(NetworkUtil, NetAddrCompPortDiff) {
+  TNetworkAddressComparator fixture;
+  TNetworkAddress first;
+  TNetworkAddress second;
+
+  first.__set_hostname("host");
+  first.__set_port(0);
+  first.__set_uds_address("");
+
+  second.__set_hostname("host");
+  second.__set_port(1);
+  second.__set_uds_address("");
+
+  ASSERT_TRUE(fixture(first, second));
+  ASSERT_FALSE(fixture(second, first));
+}
+
+// Assert where host and port fields are equal but uds address is different.
+TEST(NetworkUtil, NetAddrCompUDSAddrDiff) {
+  TNetworkAddressComparator fixture;
+  TNetworkAddress first;
+  TNetworkAddress second;
+
+  first.__set_hostname("host");
+  first.__set_port(0);
+  first.__set_uds_address("aaaa");
+
+  second.__set_hostname("host");
+  second.__set_port(0);
+  second.__set_uds_address("bbbb");
+
+  ASSERT_TRUE(fixture(first, second));
+  ASSERT_FALSE(fixture(second, first));
+}
+
+// Assert where all three comparison fields are equal.
+TEST(NetworkUtil, NetAddrUDSAddrSame) {
+  TNetworkAddressComparator fixture;
+  TNetworkAddress first;
+  TNetworkAddress second;
+
+  first.__set_hostname("host");
+  first.__set_port(0);
+  first.__set_uds_address("uds");
+
+  second.__set_hostname("host");
+  second.__set_port(0);
+  second.__set_uds_address("uds");
+
+  ASSERT_FALSE(fixture(first, second));
+  ASSERT_FALSE(fixture(second, first));
+}
+
+} // namespace impala
diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc
index f0c352abe..80b2cde0a 100644
--- a/be/src/util/network-util.cc
+++ b/be/src/util/network-util.cc
@@ -252,6 +252,27 @@ NetworkAddressPB FromTNetworkAddress(const 
TNetworkAddress& address) {
   return address_pb;
 }
 
+bool TNetworkAddressComparator::operator()(const TNetworkAddress& a,
+    const TNetworkAddress& b) const {
+    const int host_compare = a.hostname.compare(b.hostname);
+
+    if (host_compare < 0) {
+      return true;
+    } else if(host_compare > 0) {
+      return false;
+    }
+
+    // Hostnames were the same, compare on port
+    if (a.port < b.port) {
+      return true;
+    } else if (a.port > b.port) {
+      return false;
+    }
+
+    // Hostnames and ports were the same, compare on uds address.
+    return a.uds_address.compare(b.uds_address) < 0;
+}
+
 /// Pick a random port in the range of ephemeral ports
 /// https://tools.ietf.org/html/rfc6335
 int FindUnusedEphemeralPort() {
diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h
index 600bb7396..359cc11f7 100644
--- a/be/src/util/network-util.h
+++ b/be/src/util/network-util.h
@@ -99,6 +99,12 @@ NetworkAddressPB FromTNetworkAddress(const TNetworkAddress& 
address);
 Status NetworkAddressPBToSockaddr(
     const NetworkAddressPB& address, bool use_uds, kudu::Sockaddr* sockaddr);
 
+/// Custom comparator to sort network addresses first by host (alphabetically) 
and then by
+/// by port (numerically) and finally by uds address (alphabetically).
+struct TNetworkAddressComparator {
+  bool operator()(const TNetworkAddress& a, const TNetworkAddress& b) const;
+};
+
 /// Returns a ephemeral port that is currently unused. Returns -1 on an error 
or if
 /// a free ephemeral port can't be found after 100 tries.
 int FindUnusedEphemeralPort();
diff --git a/be/src/util/string-util-test.cc b/be/src/util/string-util-test.cc
index 6d88b8812..6e2c6b07b 100644
--- a/be/src/util/string-util-test.cc
+++ b/be/src/util/string-util-test.cc
@@ -269,5 +269,78 @@ TEST(RandomFindUtf8PosTest, Basic) {
   }
 }
 
+// StringStreamPopTest: These tests assert the functionality of the 
StringStreamPop class.
+
+// Assert the most common use case where the last character is popped and a 
new character
+// is written to the stream.
+TEST(StringStreamPopTest, NotEmptyPopOnce) {
+  StringStreamPop fixture;
+  fixture << "this is a tes,";
+  fixture.move_back();
+  fixture << "t";
+  EXPECT_EQ("this is a test", fixture.str());
+}
+
+// Asssert where the stream only contains a single character that is popped 
before another
+// character is written to the stream.
+TEST(StringStreamPopTest, OneCharPop) {
+  StringStreamPop fixture;
+  fixture << "t";
+  fixture.move_back();
+  fixture << "v";
+  EXPECT_EQ("v", fixture.str());
+}
+
+// Assert where the last two characters of a non-empty stream are popped.
+TEST(StringStreamPopTest, NotEmptyPopTwice) {
+  StringStreamPop fixture;
+  fixture << "this is a second te,,";
+  fixture.move_back();
+  fixture.move_back();
+  fixture << "st";
+  EXPECT_EQ("this is a second test", fixture.str());
+}
+
+// Assert where an empty stream has it's last (nonexistant) character popped.
+TEST(StringStreamPopTest, EmptyPopOnce) {
+  StringStreamPop fixture;
+  fixture.move_back();
+  EXPECT_TRUE(fixture.str().empty());
+}
+
+// Assert where an empty stream has it's last (nonexistant) character popped 
twice.
+TEST(StringStreamPopTest, EmptyPopTwice) {
+  StringStreamPop fixture;
+  fixture.move_back();
+  fixture.move_back();
+  EXPECT_TRUE(fixture.str().empty());
+}
+
+// Assert the move_back functionality does not actually remove the character.
+TEST(StringStreamPopTest, PopOnceBeforeAppend) {
+  StringStreamPop fixture;
+  fixture.move_back();
+  fixture << "a";
+  fixture.move_back();
+
+  // This assertion is correct because the move_back() function only moves the 
write
+  // pointer, it does not modify the internal buffer.
+  EXPECT_EQ("a", fixture.str());
+}
+
+// Assert the StringStreamPop class behavior matches the behavior of the 
stringstream
+// class.
+TEST(StringStreamPopTest, CompareWithStringstream) {
+  StringStreamPop fixture;
+  stringstream expected;
+
+  expected << "C++ is" << " an " << "invisible found" << "ation of " << 
"everything!";
+  fixture  << "C++ is" << " an " << "invisible found" << "ation of " << 
"everything?";
+  fixture.move_back();
+  fixture << '!';
+
+  EXPECT_EQ(expected.str(), fixture.str());
+}
+
 }
 
diff --git a/be/src/util/string-util.cc b/be/src/util/string-util.cc
index 5394b1ffd..3e0a92213 100644
--- a/be/src/util/string-util.cc
+++ b/be/src/util/string-util.cc
@@ -143,4 +143,11 @@ int FindUtf8PosBackward(const uint8_t* ptr, const int len, 
int index) {
   DCHECK_EQ(pos, -1);
   return -1;
 }
+
+void StringStreamPop::move_back() {
+  if (tellp() > 0) {
+    seekp(-1, std::ios_base::cur);
+  }
+}
+
 }
diff --git a/be/src/util/string-util.h b/be/src/util/string-util.h
index e1749ce72..7b16bf498 100644
--- a/be/src/util/string-util.h
+++ b/be/src/util/string-util.h
@@ -93,6 +93,19 @@ int FindUtf8PosBackward(const uint8_t* str_ptr, const int 
str_len, const int ind
 inline int FindUtf8PosBackward(const char* str_ptr, const int str_len, const 
int index) {
   return FindUtf8PosBackward(reinterpret_cast<const uint8_t*>(str_ptr), 
str_len, index);
 }
-}
 
+/// Subclass of std::stringstream that adds functionality to allow overwriting 
the very
+/// last character of the stream. The purpose of this additional functionality 
is to
+/// enable comma delimited string building where the last instance of the 
comma needs to
+/// be removed (for example when building a list of columns in a sql 
statement).
+class StringStreamPop : public std::basic_stringstream<char> {
+public:
+  /// Directly modifies the underlying stream buffer seeking it backwards 1 
position.
+  /// Then, when additional characters are written, the character at the end 
of the stream
+  /// is overwritten. Thus, to truly remove the character at the end of the 
stream
+  /// requires writing at least one character to the stream after this 
function is called.
+  void move_back();
+};
+
+}
 #endif
diff --git a/be/src/util/summary-util-test.cc b/be/src/util/summary-util-test.cc
new file mode 100644
index 000000000..c0e209cba
--- /dev/null
+++ b/be/src/util/summary-util-test.cc
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "testutil/gtest-util.h"
+
+#include "gen-cpp/ExecStats_types.h"
+#include "util/summary-util.h"
+
+namespace impala {
+
+static const string expected_str = "\n"
+"Operator              #Hosts  #Inst   Avg Time   Max Time  #Rows  Est. #Rows  
 Peak Mem "
+" Est. Peak Mem  Detail                                    \n"
+"----------------------------------------------------------------------------------------"
+"----------------------------------------------------------\n"
+"F01:ROOT                   1      1  857.074us  857.074us                     
  4.01 MB "
+"       4.00 MB                                            \n"
+"01:EXCHANGE                1      1  269.934us  269.934us     99           7  
 88.00 KB "
+"      16.00 KB  UNPARTITIONED                             \n"
+"F00:EXCHANGE SENDER        3      3  332.506us  338.449us                     
  7.95 KB "
+"      96.00 KB                                            \n"
+"00:SCAN HDFS               3      3    1s328ms    1s331ms     99           7  
360.00 KB "
+"      64.00 MB  default.test_query_log_beeswax_1707938440 ";
+
+static TExecStats buildExecStats(int64_t latency, int64_t mem_used,
+    int64_t cardinality) {
+  TExecStats stat;
+
+  if (latency > -1) {
+    stat.__set_latency_ns(latency);
+  }
+  stat.__set_memory_used(mem_used);
+  stat.__set_cardinality(cardinality);
+
+  return stat;
+}
+
+static TPlanNodeExecSummary buildPlanNode(TPlanNodeId node_id, TFragmentIdx 
fragment_idx,
+    string label, string detail, int32_t num_hosts, int32_t num_children,
+    bool is_broadcast, TExecStats estimates) {
+  TPlanNodeExecSummary node;
+
+  node.__set_node_id(node_id);
+  node.__set_fragment_idx(fragment_idx);
+  node.__set_label(label);
+  node.__set_label_detail(detail);
+  node.__set_num_children(num_children);
+  node.__set_estimated_stats(estimates);
+  if (is_broadcast) {
+    node.__set_is_broadcast(is_broadcast);
+  }
+  node.__set_num_hosts(num_hosts);
+
+  return node;
+}
+
+// Constructs a simple exec summary and ensures the text table is generated 
correctly for
+// that exec summary.
+TEST(PrintTableTest, HappyPath) {
+  TExecSummary input;
+
+  TPlanNodeExecSummary node = buildPlanNode(-1, 0, "F01:ROOT", "", 1, 1,
+      false, buildExecStats(-1, 4194304, -1));
+  node.exec_stats.push_back(buildExecStats(857074, 4202496, -1));
+  node.__isset.exec_stats = true;
+  input.nodes.push_back(node);
+
+  node = buildPlanNode(1, 0, "01:EXCHANGE", "UNPARTITIONED", 1, 0,
+      true, buildExecStats(-1, 16384, 7));
+  node.exec_stats.push_back(buildExecStats(269934, 90112, 99));
+  node.__isset.exec_stats = true;
+  input.nodes.push_back(node);
+
+  node = buildPlanNode(-1, 1, "F00:EXCHANGE SENDER", "", 3, 1,
+      false, buildExecStats(-1, 98304, -1));
+  node.exec_stats.push_back(buildExecStats(338449, 6862, -1));
+  node.exec_stats.push_back(buildExecStats(333098, 8139, -1));
+  node.exec_stats.push_back(buildExecStats(325971, 8139, -1));
+  node.__isset.exec_stats = true;
+  input.nodes.push_back(node);
+
+  node = buildPlanNode(0, 1, "00:SCAN HDFS",
+      "default.test_query_log_beeswax_1707938440", 3, 0, false,
+      buildExecStats(-1, 67108864, 7));
+  node.exec_stats.push_back(buildExecStats(1331010710, 368640, 39));
+  node.exec_stats.push_back(buildExecStats(1326558546, 279552, 30));
+  node.exec_stats.push_back(buildExecStats(1327758097, 283648, 30));
+  node.__isset.exec_stats = true;
+  input.nodes.push_back(node);
+
+  input.__isset.nodes = true;
+  input.exch_to_sender_map.emplace(1, 2);
+  input.__isset.exch_to_sender_map = true;
+
+  string actual = PrintExecSummary(input);
+  EXPECT_EQ(expected_str, actual);
+}
+
+} // namespace impala
\ No newline at end of file
diff --git a/be/src/util/ticker-test.cc b/be/src/util/ticker-test.cc
new file mode 100644
index 000000000..08fe8d6a3
--- /dev/null
+++ b/be/src/util/ticker-test.cc
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <chrono>
+#include <condition_variable>
+#include <memory>
+#include <mutex>
+#include <string>
+
+#include "testutil/gtest-util.h"
+
+#include "common/status.h"
+#include "util/stopwatch.h"
+#include "util/ticker.h"
+
+using namespace std;
+
+namespace impala {
+
+static inline float NsToMs(int64_t nanos) {
+  return static_cast<float>(nanos / 1000000);
+}
+
+TEST(TickerTest, TickerSecondsBoolHappyPath) {
+  condition_variable cv;
+  mutex mu;
+  uint8_t cntr = 0;
+
+  TickerSecondsBool fixture(1, cv, mu);
+  MonotonicStopWatch sw;
+
+  sw.Start();
+  ABORT_IF_ERROR(fixture.Start("category", "tickersecondsbool-happy-path"));
+
+  while (cntr < 3) {
+    unique_lock<mutex> l(mu);
+    cv.wait(l, fixture.WakeupGuard());
+    fixture.ResetWakeupGuard();
+    cntr++;
+  }
+
+  sw.Stop();
+
+  fixture.RequestStop();
+  fixture.Join();
+
+  EXPECT_EQ(cntr, 3);
+  // Include a 30 millisecond (1%) margin of error to tolerate differences in 
the
+  // precision of time measurements.
+  EXPECT_NEAR(NsToMs(sw.ElapsedTime()), static_cast<float>(3000), 30);
+}
+
+TEST(TickerTest, GenericTickerHappyPath) {
+  condition_variable cv;
+  mutex mu;
+  shared_ptr<string> wakeup_guard = make_shared<string>();
+  uint8_t cntr = 0;
+  const string wakeup_val = "wakeup";
+
+  Ticker<chrono::milliseconds, string> fixture(chrono::milliseconds(5), cv, mu,
+      wakeup_guard, wakeup_val);
+  MonotonicStopWatch sw;
+
+  sw.Start();
+  ABORT_IF_ERROR(fixture.Start("category", "generic-ticker-happy-path"));
+
+  while (cntr < 10) {
+    unique_lock<mutex> l(mu);
+    cv.wait(l, fixture.WakeupGuard());
+    *wakeup_guard = "";
+    cntr++;
+  }
+
+  sw.Stop();
+
+  fixture.RequestStop();
+  fixture.Join();
+
+  EXPECT_EQ(cntr, 10);
+  // Include a 5 millisecond (1%) margin of error to tolerate differences in 
the
+  // precision of time measurements.
+  EXPECT_NEAR(NsToMs(sw.ElapsedTime()), static_cast<float>(50), 5);
+}
+
+// Tests the case where the wakeup guard is not reset by the consuming code.
+TEST(TickerTest, GenericTickerNoWakeupGuardReset) {
+  condition_variable cv;
+  mutex mu;
+  shared_ptr<string> wakeup_guard = make_shared<string>();
+  uint8_t cntr = 0;
+  const string wakeup_val = "wakeup";
+
+  Ticker<chrono::milliseconds, string> fixture(chrono::milliseconds(5), cv, mu,
+      wakeup_guard, wakeup_val);
+  MonotonicStopWatch sw;
+
+  sw.Start();
+  ABORT_IF_ERROR(fixture.Start("category", "generic-ticker-happy-path"));
+
+  while (cntr < 10) {
+    unique_lock<mutex> l(mu);
+    cv.wait(l, fixture.WakeupGuard());
+    // No wakeup guard reset here.
+    cntr++;
+  }
+
+  sw.Stop();
+
+  fixture.RequestStop();
+  fixture.Join();
+
+  EXPECT_EQ(cntr, 10);
+  // If the wakeup guard was set properly, elapsed time would be 50 
milliseconds. Since
+  // the wakeup guard does not get set, spurious wakeups of the condition 
variable happen
+  // much more frequently than they should.
+  EXPECT_NEAR(NsToMs(sw.ElapsedTime()), static_cast<float>(5), 5);
+}
+
+} // namespace impala
diff --git a/be/src/util/ticker.h b/be/src/util/ticker.h
new file mode 100644
index 000000000..116e9574a
--- /dev/null
+++ b/be/src/util/ticker.h
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <chrono>
+#include <condition_variable>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <thread>
+
+#include <boost/bind.hpp>
+
+#include "common/atomic.h"
+#include "common/status.h"
+#include "util/thread.h"
+
+namespace impala {
+
+// Manages a thread that periodically notifies a condition variable. This 
thread never
+// returns. An indicator variable must be specified to guard against spurious 
wakeups.
+//
+// Immediately before this class notfies the condition variable, it sets the 
indicator
+// variable to the `wakeup_value` specified in the constructor. It is the 
responsibility
+// of the thread consuming this class to reset the indicator variable to a 
value other
+// than `wakeup_value` before the consuming thread goes to sleep.
+//
+// If the periodic code takes longer to run than the specified duration, then 
the code
+// will immediately execute the next time around.
+//
+// Internally, this class uses std::this_thread:sleep_for which may sleep for 
longer than
+// the specified duration due to scheduling or resource contention delays.
+// For details, see https://en.cppreference.com/w/cpp/thread/sleep_for.
+//
+// Example usage:
+//
+//   #include <chrono>
+//   #include <condition_variable>
+//   #include <memory>
+//   #include <mutex>
+//
+//   #include "common/status.h"
+//
+//   std::condition_variable cv;
+//   std::mutex mu;
+//   std::shared_ptr<bool> wakeup_guard = make_shared<bool>();
+//   Ticker<std::chrono::seconds, bool> ticker(std::chrono::seconds(30), cv, 
mu,
+//       wakeup_guard, true);
+//
+//   ABORT_IF_ERROR(ticker.Start());
+//
+//   while(true) {
+//     unique_lock<mutex> l(mu);
+//     cv.wait(l, ticker.WakeupGuard());
+//     *wakeup_guard = false;
+//
+//     run_my_code();
+//   }
+
+template <typename DurationType, typename IndicatorType>
+class Ticker {
+  public:
+    Ticker(DurationType interval, std::condition_variable& cv,
+        std::mutex& lock, std::shared_ptr<IndicatorType> indicator,
+        IndicatorType wakeup_value) : interval_(interval), cv_(cv), 
lock_(lock),
+        indicator_(indicator), wakeup_value_(wakeup_value) {}
+
+    Status Start(const std::string& category, const std::string& name) {
+      return Thread::Create(category, name, &Ticker::run, this, &my_thread_);
+    }
+
+    // Specify that the next iteration of this ticker be the last. This 
function does not
+    // block nor does it cause the ticker to wake up earlier than scheduled.
+    void RequestStop() {
+      stop_requested_.Store(true);
+    }
+
+    // Wait for the ticker to exit after it's final iteration.
+    void Join() {
+      my_thread_->Join();
+    }
+
+    // Provides a default implementation for the condition variable predicate 
lambda.
+    std::function<bool()> WakeupGuard() {
+      return [this]{ return *indicator_ == wakeup_value_; };
+    }
+
+  protected:
+    const DurationType interval_;
+    std::condition_variable& cv_;
+    std::mutex& lock_;
+    std::shared_ptr<IndicatorType> indicator_;
+    const IndicatorType wakeup_value_;
+
+  private:
+    std::unique_ptr<Thread> my_thread_;
+    AtomicBool stop_requested_;
+
+    void run() {
+      while (!stop_requested_.Load()) {
+        std::this_thread::sleep_for(interval_);
+
+        {
+          std::lock_guard<std::mutex> l(lock_);
+          *indicator_ = wakeup_value_;
+        }
+
+        cv_.notify_all();
+      }
+    }
+}; // class Ticker
+
+// Specialization of the Ticker class that uses seconds for the duration and 
bool as the
+// wakeup indicator. The boolean shared_ptr indicator is internally managed. 
Use the
+// ResetWakeupGuard() function in your code immediately after the condition 
variable wait
+// to set the internally managed wakeup guard for the next iteration.
+class TickerSecondsBool : public Ticker<std::chrono::seconds, bool> {
+  public:
+    TickerSecondsBool(uint32_t interval, std::condition_variable& cv,
+      std::mutex& lock) :
+      Ticker(std::chrono::seconds(interval), cv, lock, 
std::make_shared<bool>(), true) {}
+
+    void ResetWakeupGuard() {
+      *indicator_ = false;
+    }
+}; // class TickerSecondsBool
+
+} // namespace impala
diff --git a/be/src/util/uid-util-test.cc b/be/src/util/uid-util-test.cc
index 60b985b64..cf255f052 100644
--- a/be/src/util/uid-util-test.cc
+++ b/be/src/util/uid-util-test.cc
@@ -35,5 +35,28 @@ TEST(UidUtil, FragmentInstanceId) {
   }
 }
 
+TEST(UidUtil, UuidNotEmpty) {
+  TUniqueId fixture = GenerateUUID();
+  EXPECT_FALSE(UUIDEmpty(fixture));
+}
+
+TEST(UidUtil, UuidHalfEmptyHi) {
+  TUniqueId fixture;
+  fixture.hi = 0;
+  fixture.lo = 1;
+  EXPECT_FALSE(UUIDEmpty(fixture));
+}
+
+TEST(UidUtil, UuidHalfEmptyLo) {
+  TUniqueId fixture;
+  fixture.hi = 1;
+  fixture.lo = 0;
+  EXPECT_FALSE(UUIDEmpty(fixture));
+}
+
+TEST(UidUtil, UuidEmpty) {
+  EXPECT_TRUE(UUIDEmpty(TUniqueId()));
+}
+
 }
 
diff --git a/be/src/util/uid-util.h b/be/src/util/uid-util.h
index 04c1ad75e..b3fa39f5f 100644
--- a/be/src/util/uid-util.h
+++ b/be/src/util/uid-util.h
@@ -129,4 +129,9 @@ inline TUniqueId GenerateUUID() {
   memcpy(&uid.lo, u.data() + sizeof(int64_t), sizeof(int64_t));
   return uid;
 }
+
+/// Determines if a query id is empty.
+inline bool UUIDEmpty(const TUniqueId& id) {
+  return id.hi == 0 && id.lo == 0;
+}
 } // namespace impala
diff --git a/bin/run_clang_tidy.sh b/bin/run_clang_tidy.sh
index 816f201dd..7ca1d58da 100755
--- a/bin/run_clang_tidy.sh
+++ b/bin/run_clang_tidy.sh
@@ -29,8 +29,8 @@
 
 set -euo pipefail
 
-echo "Compiling"
 TMP_BUILDALL_LOG=$(mktemp)
+echo "Compiling, for build logs see ${TMP_BUILDALL_LOG}"
 if ! ./buildall.sh -skiptests -tidy -so -noclean &> "${TMP_BUILDALL_LOG}"
 then
   echo "buildall.sh failed, dumping output" >&2
@@ -59,6 +59,7 @@ export 
PATH="${IMPALA_TOOLCHAIN_PACKAGES_HOME}/llvm-${IMPALA_LLVM_VERSION}/share
 :${IMPALA_TOOLCHAIN_PACKAGES_HOME}/llvm-${IMPALA_LLVM_VERSION}/bin/\
 :$PATH"
 TMP_STDERR=$(mktemp)
+echo; echo "Running clang tidy, for error logs see ${TMP_STDERR}"
 STRCAT_MESSAGE="Impala-specific note: This can also be fixed using the 
StrCat() function \
 from be/src/gutil/strings strcat.h)"
 CLANG_STRING_CONCAT="performance-inefficient-string-concatenation"

Reply via email to