This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 09d2f10f4 IMPALA-13040: Add waiting mechanism in UpdateFilterFromRemote
09d2f10f4 is described below

commit 09d2f10f4ddf3499b6255a6d14653e7738c2928b
Author: Riza Suminto <[email protected]>
AuthorDate: Tue Apr 30 20:14:04 2024 -0700

    IMPALA-13040: Add waiting mechanism in UpdateFilterFromRemote
    
    It is possible to have UpdateFilterFromRemote RPC arrive to an impalad
    executor before QueryState of the destination query is created or
    complete initialization. This patch add wait mechanism in
    UpdateFilterFromRemote RPC endpoint to wait for few miliseconds until
    QueryState exist and complete initialization.
    
    The wait time is fixed at 500ms, with exponential sleep period in
    between. If wait time passed and QueryState still not found or
    initialized, UpdateFilterFromRemote RPC is deemed fail and query
    execution move on without complete filter.
    
    Testing:
    - Add BE tests in network-util-test.cc
    - Add test_runtime_filter_aggregation.py::TestLateQueryStateInit
    - Pass exhastive runs of test_runtime_filter_aggregation.py,
      test_query_live.py, and test_query_log.py
    
    Change-Id: I156d1f0c694b91ba34be70bc53ae9bacf924b3b9
    Reviewed-on: http://gerrit.cloudera.org:8080/21383
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/runtime/query-exec-mgr.cc                   |  6 ++
 be/src/runtime/query-state.cc                      |  5 ++
 be/src/runtime/query-state.h                       | 18 +++--
 be/src/runtime/runtime-filter-bank.cc              | 17 +++-
 be/src/runtime/runtime-filter-bank.h               |  2 +
 be/src/scheduling/scheduler.cc                     | 17 +++-
 be/src/service/data-stream-service.cc              | 61 +++++++++++---
 be/src/service/query-state-record.cc               |  7 +-
 be/src/util/network-util-test.cc                   | 93 ++++++++++++++++++++++
 be/src/util/network-util.cc                        | 15 +++-
 be/src/util/network-util.h                         | 23 +++++-
 common/protobuf/data_stream_service.proto          |  3 +
 .../test_runtime_filter_aggregation.py             | 69 +++++++++++++++-
 tests/util/workload_management.py                  |  6 +-
 14 files changed, 304 insertions(+), 38 deletions(-)

diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 8c5e97870..262f47a6f 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -53,6 +53,8 @@ DEFINE_int32(query_exec_mgr_cancellation_thread_pool_size, 1,
     "(Advanced) Size of the QueryExecMgr thread-pool processing cancellations 
due to "
     "coordinator failure");
 
+DECLARE_int32(krpc_port);
+
 const uint32_t QUERY_EXEC_MGR_MAX_CANCELLATION_QUEUE_SIZE = 65536;
 
 QueryExecMgr::QueryExecMgr() {
@@ -78,6 +80,10 @@ Status QueryExecMgr::StartQuery(const 
ExecQueryFInstancesRequestPB* request,
   bool dummy;
   QueryState* qs =
       GetOrCreateQueryState(query_ctx, request->per_backend_mem_limit(), 
&dummy);
+  
RETURN_IF_ERROR(DebugAction(query_ctx.client_request.query_options.debug_action,
+      "QUERY_STATE_BEFORE_INIT_GLOBAL"));
+  
RETURN_IF_ERROR(DebugAction(query_ctx.client_request.query_options.debug_action,
+      "QUERY_STATE_BEFORE_INIT", {std::to_string(FLAGS_krpc_port)}));
   Status status = qs->Init(request, fragment_info);
   if (!status.ok()) {
     qs->ReleaseBackendResourceRefcount(); // Release refcnt acquired in Init().
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index bf61b26e3..60c043ccf 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -841,6 +841,11 @@ bool QueryState::codegen_cache_enabled() const {
       && ExecEnv::GetInstance()->codegen_cache_enabled();
 }
 
+bool QueryState::is_initialized() {
+  std::lock_guard<std::mutex> l(init_lock_);
+  return is_initialized_;
+}
+
 bool QueryState::StartFInstances() {
   VLOG(2) << "StartFInstances(): query_id=" << PrintId(query_id())
           << " #instances=" << fragment_info_.fragment_instance_ctxs.size();
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 15195d64d..b907fc8c1 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -168,6 +168,7 @@ class QueryState {
     return query_ctx_.client_request.query_options;
   }
   bool codegen_cache_enabled() const;
+  bool is_initialized();
   MemTracker* query_mem_tracker() const { return query_mem_tracker_; }
   RuntimeProfile* host_profile() const { return host_profile_; }
   const NodeToFileSchedulings* node_to_file_schedulings() const {
@@ -320,6 +321,16 @@ class QueryState {
     return fragment_state_map_;
   }
 
+  /// Returns true if the query has reached a terminal state.
+  bool IsTerminalState() const {
+    // Read into local variable to protect against concurrent modification
+    // of backend_exec_state_.
+    BackendExecState exec_state = backend_exec_state_;
+    return exec_state == BackendExecState::FINISHED
+        || exec_state == BackendExecState::CANCELLED
+        || exec_state == BackendExecState::ERROR;
+  }
+
  private:
   friend class QueryExecMgr;
 
@@ -548,13 +559,6 @@ class QueryState {
     return !overall_status_.ok() && !overall_status_.IsCancelled();
   }
 
-  /// Returns true if the query has reached a terminal state.
-  bool IsTerminalState() const {
-    return backend_exec_state_ == BackendExecState::FINISHED
-        || backend_exec_state_ == BackendExecState::CANCELLED
-        || backend_exec_state_ == BackendExecState::ERROR;
-  }
-
   /// Updates the BackendExecState based on 'overall_status_'. Should only be 
called when
   /// the current state is a non-terminal state. The transition can either be 
to the next
   /// legal state or ERROR if 'overall_status_' is an error. Called by the 
query state
diff --git a/be/src/runtime/runtime-filter-bank.cc 
b/be/src/runtime/runtime-filter-bank.cc
index e7b149050..cf41fbf48 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -557,6 +557,10 @@ void RuntimeFilterBank::DistributeCompleteFilter(
       ++num_inflight_rpcs_;
     }
 
+    int32_t remaining_wait_time_ms =
+        max(0, GetRuntimeFilterWaitTime() - 
complete_filter->TimeSinceRegistrationMs());
+    params.set_remaining_filter_wait_time_ms(remaining_wait_time_ms);
+
     if (to_coordinator) {
       proxy->UpdateFilterAsync(params, res, controller,
           boost::bind(
@@ -743,10 +747,7 @@ vector<unique_lock<SpinLock>> 
RuntimeFilterBank::LockAllFilters() {
 }
 
 void RuntimeFilterBank::SendIncompleteFilters() {
-  int32_t wait_time_ms = FLAGS_runtime_filter_wait_time_ms;
-  if (query_state_->query_options().runtime_filter_wait_time_ms > 0) {
-    wait_time_ms = query_state_->query_options().runtime_filter_wait_time_ms;
-  }
+  int32_t wait_time_ms = GetRuntimeFilterWaitTime();
 
   bool try_wait_aggregation = !cancelled_;
   for (auto& entry : filters_) {
@@ -845,6 +846,14 @@ void RuntimeFilterBank::Close() {
   filter_mem_tracker_->Close();
 }
 
+int32_t RuntimeFilterBank::GetRuntimeFilterWaitTime() const {
+  int32_t wait_time_ms = FLAGS_runtime_filter_wait_time_ms;
+  if (query_state_->query_options().runtime_filter_wait_time_ms > 0) {
+    wait_time_ms = query_state_->query_options().runtime_filter_wait_time_ms;
+  }
+  return wait_time_ms;
+}
+
 RuntimeFilterBank::ProducedFilter::ProducedFilter(
     int pending_producers, int pending_remotes, RuntimeFilter* result_filter)
   : result_filter(result_filter),
diff --git a/be/src/runtime/runtime-filter-bank.h 
b/be/src/runtime/runtime-filter-bank.h
index b656deedd..d013a15ab 100644
--- a/be/src/runtime/runtime-filter-bank.h
+++ b/be/src/runtime/runtime-filter-bank.h
@@ -373,6 +373,8 @@ class RuntimeFilterBank {
   /// Disable a bloom filter by replacing it with an ALWAYS_TRUE_FILTER.
   /// Return a pointer to the new runtime filter.
   RuntimeFilter* DisableBloomFilter(std::unique_ptr<RuntimeFilter>& 
bloom_filter);
+
+  int32_t GetRuntimeFilterWaitTime() const;
 };
 
 }
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index b3465b36d..6cc002233 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -56,6 +56,10 @@ using namespace apache::thrift;
 using namespace org::apache::impala::fb;
 using namespace strings;
 
+DEFINE_bool_hidden(sort_runtime_filter_aggregator_candidates, false,
+    "Control whether to sort intermediate runtime filter aggregator candidates 
based on "
+    "their KRPC address. Only used for testing.");
+
 namespace impala {
 
 static const string 
LOCAL_ASSIGNMENTS_KEY("simple-scheduler.local-assignments.total");
@@ -334,7 +338,18 @@ void Scheduler::ComputeRandomKrpcForAggregation(const 
ExecutorConfig& executor_c
   int num_agg = (int)ceil((double)num_non_coordinator_host / 
num_filters_per_host);
   DCHECK_GT(num_agg, 0);
 
-  std::shuffle(instance_groups.begin(), instance_groups.end(), *state->rng());
+  if (UNLIKELY(FLAGS_sort_runtime_filter_aggregator_candidates)) {
+    sort(instance_groups.begin(), instance_groups.end(),
+        [src_state](InstanceToAggPairs a, InstanceToAggPairs b) {
+          int idx_a = a[0].first;
+          int idx_b = b[0].first;
+          return 
CompareNetworkAddressPB(src_state->instance_states[idx_a].krpc_host,
+                     src_state->instance_states[idx_b].krpc_host)
+              < 0;
+        });
+  } else {
+    std::shuffle(instance_groups.begin(), instance_groups.end(), 
*state->rng());
+  }
   if (coordinator_instances.size() > 0) {
     // Put coordinator group behind so that coordinator won't be selected as 
intermediate
     // aggregator.
diff --git a/be/src/service/data-stream-service.cc 
b/be/src/service/data-stream-service.cc
index f26c67cbf..8a497f7b2 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -51,6 +51,9 @@ DEFINE_string(datastream_service_queue_mem_limit, "5%", 
QUEUE_LIMIT_MSG.c_str())
 DEFINE_int32(datastream_service_num_svc_threads, 0, "Number of threads for 
processing "
     "datastream services' RPCs. If left at default value 0, it will be set to 
number of "
     "CPU cores.  Set it to a positive value to change from the default.");
+DEFINE_int32_hidden(update_filter_min_wait_time_ms, 500,
+    "Minimum time for UpdateFilterFromRemote RPC to wait until destination 
QueryState is "
+    "ready.");
 DECLARE_string(debug_actions);
 
 namespace impala {
@@ -128,20 +131,52 @@ void DataStreamService::UpdateFilterFromRemote(
   DCHECK(req->has_query_id());
   DCHECK(
       req->has_bloom_filter() || req->has_min_max_filter() || 
req->has_in_list_filter());
-  QueryState::ScopedRef qs(ProtoToQueryId(req->query_id()));
-
-  if (qs.get() != nullptr) {
-    qs->UpdateFilterFromRemote(*req, context);
-    RespondAndReleaseRpc(Status::OK(), resp, context, mem_tracker_.get());
-  } else {
-    // Query state for requested query_id might have been cancelled or closed.
-    // i.e., RUNTIME_FILTER_WAIT_TIME_MS has passed and all fragment instances 
of
-    // query_id has complete their execution.
-    string err_msg = Substitute("Query State not found for query_id=$0",
-        PrintId(ProtoToQueryId(req->query_id())));
-    LOG(INFO) << err_msg;
-    RespondAndReleaseRpc(Status(err_msg), resp, context, mem_tracker_.get());
+  int64_t arrival_time = MonotonicMillis();
+
+  // Loop until destination QueryState is ready to accept filter update from 
remote.
+  // Sleep for few miliseconds in-between and break after 500ms grace period 
passed.
+  // The grace period is short so that RPC thread is not blocked for too long.
+  // This is a much simpler mechanism than KrpcDataStreamMgr::AddData.
+  // TODO: Revisit this with more sophisticated deferral mechanism if needed.
+  bool query_found = false;
+  int64_t total_wait_time = 0;
+  int32_t sleep_duration_ms = 2;
+  if (req->remaining_filter_wait_time_ms() < 
FLAGS_update_filter_min_wait_time_ms) {
+    LOG(INFO) << "UpdateFilterFromRemote RPC called with remaining wait time "
+              << req->remaining_filter_wait_time_ms() << " ms, less than "
+              << FLAGS_update_filter_min_wait_time_ms << " ms minimum wait 
time.";
   }
+
+  do {
+    {
+      QueryState::ScopedRef qs(ProtoToQueryId(req->query_id()));
+      query_found |= (qs.get() != nullptr);
+      if (query_found) {
+        if (qs.get() == nullptr || qs->IsTerminalState()) {
+          // Query was found, but now is either missing or in terminal state.
+          // Break the loop and response with an error.
+          break;
+        } else if (qs->is_initialized()) {
+          qs->UpdateFilterFromRemote(*req, context);
+          RespondAndReleaseRpc(Status::OK(), resp, context, 
mem_tracker_.get());
+          return;
+        }
+      }
+    }
+    usleep(sleep_duration_ms * 1000);
+    // double sleep time for next iteration up to 128ms.
+    if (2 * sleep_duration_ms <= 128) sleep_duration_ms *= 2;
+    total_wait_time = MonotonicMillis() - arrival_time;
+  } while (total_wait_time < FLAGS_update_filter_min_wait_time_ms);
+
+  // Query state for requested query_id might have been cancelled, closed, or 
not ready.
+  // i.e., RUNTIME_FILTER_WAIT_TIME_MS has passed and all fragment instances of
+  // query_id has complete their execution.
+  string err_msg = Substitute("QueryState for query_id=$0 $1 after $2 ms",
+      PrintId(ProtoToQueryId(req->query_id())),
+      query_found ? "no longer running" : "not found", total_wait_time);
+  LOG(INFO) << err_msg;
+  RespondAndReleaseRpc(Status(err_msg), resp, context, mem_tracker_.get());
 }
 
 void DataStreamService::PublishFilter(
diff --git a/be/src/service/query-state-record.cc 
b/be/src/service/query-state-record.cc
index d054cf644..db1b6c928 100644
--- a/be/src/service/query-state-record.cc
+++ b/be/src/service/query-state-record.cc
@@ -221,10 +221,7 @@ QueryStateExpanded::QueryStateExpanded(const 
ClientRequestState& exec_state,
     // Per-Host Metrics
     for (int i =0; i < exec_state.schedule()->backend_exec_params_size(); i++) 
{
       const BackendExecParamsPB& b = 
exec_state.schedule()->backend_exec_params(i);
-      TNetworkAddress host;
-      host.hostname = b.address().hostname();
-      host.uds_address = b.address().uds_address();
-      host.port = b.address().port();
+      TNetworkAddress host = FromNetworkAddressPB(b.address());
 
       PerHostState state;
       state.fragment_instance_count = b.instance_params_size();
@@ -445,4 +442,4 @@ EventsTimelineIterator EventsTimelineIterator::end() {
   return EventsTimelineIterator(labels_, timestamps_, labels_->size());
 }
 
-} // namespace impala
\ No newline at end of file
+} // namespace impala
diff --git a/be/src/util/network-util-test.cc b/be/src/util/network-util-test.cc
index c255466cc..035b9b7eb 100644
--- a/be/src/util/network-util-test.cc
+++ b/be/src/util/network-util-test.cc
@@ -41,6 +41,12 @@ TEST(NetworkUtil, NetAddrCompHostnameDiff) {
 
   ASSERT_TRUE(fixture(first, second));
   ASSERT_FALSE(fixture(second, first));
+  ASSERT_TRUE(
+      CompareNetworkAddressPB(FromTNetworkAddress(first), 
FromTNetworkAddress(second))
+      < 0);
+  ASSERT_TRUE(
+      CompareNetworkAddressPB(FromTNetworkAddress(second), 
FromTNetworkAddress(first))
+      > 0);
 }
 
 // Assert where host fields are equal but port is different.
@@ -59,6 +65,12 @@ TEST(NetworkUtil, NetAddrCompPortDiff) {
 
   ASSERT_TRUE(fixture(first, second));
   ASSERT_FALSE(fixture(second, first));
+  ASSERT_TRUE(
+      CompareNetworkAddressPB(FromTNetworkAddress(first), 
FromTNetworkAddress(second))
+      < 0);
+  ASSERT_TRUE(
+      CompareNetworkAddressPB(FromTNetworkAddress(second), 
FromTNetworkAddress(first))
+      > 0);
 }
 
 // Assert where host and port fields are equal but uds address is different.
@@ -77,6 +89,12 @@ TEST(NetworkUtil, NetAddrCompUDSAddrDiff) {
 
   ASSERT_TRUE(fixture(first, second));
   ASSERT_FALSE(fixture(second, first));
+  ASSERT_TRUE(
+      CompareNetworkAddressPB(FromTNetworkAddress(first), 
FromTNetworkAddress(second))
+      < 0);
+  ASSERT_TRUE(
+      CompareNetworkAddressPB(FromTNetworkAddress(second), 
FromTNetworkAddress(first))
+      > 0);
 }
 
 // Assert where all three comparison fields are equal.
@@ -95,6 +113,81 @@ TEST(NetworkUtil, NetAddrUDSAddrSame) {
 
   ASSERT_FALSE(fixture(first, second));
   ASSERT_FALSE(fixture(second, first));
+  ASSERT_TRUE(
+      CompareNetworkAddressPB(FromTNetworkAddress(first), 
FromTNetworkAddress(second))
+      == 0);
+  ASSERT_TRUE(
+      CompareNetworkAddressPB(FromTNetworkAddress(second), 
FromTNetworkAddress(first))
+      == 0);
+}
+
+// Assert where host and port fields are equal first address does not have
+// uds address set.
+TEST(NetworkUtil, NetAddrOneMissUDSAddr) {
+  TNetworkAddressComparator fixture;
+  TNetworkAddress first;
+  TNetworkAddress second;
+
+  first.__set_hostname("host");
+  first.__set_port(0);
+
+  second.__set_hostname("host");
+  second.__set_port(0);
+  second.__set_uds_address("");
+
+  ASSERT_TRUE(fixture(first, second));
+  ASSERT_FALSE(fixture(second, first));
+  ASSERT_TRUE(
+      CompareNetworkAddressPB(FromTNetworkAddress(first), 
FromTNetworkAddress(second))
+      < 0);
+  ASSERT_TRUE(
+      CompareNetworkAddressPB(FromTNetworkAddress(second), 
FromTNetworkAddress(first))
+      > 0);
+}
+
+// Assert where host and port fields are equal and both address does not have
+// uds address set.
+TEST(NetworkUtil, NetAddrAllMissUDSAddr) {
+  TNetworkAddressComparator fixture;
+  TNetworkAddress first;
+  TNetworkAddress second;
+
+  first.__set_hostname("host");
+  first.__set_port(0);
+
+  second.__set_hostname("host");
+  second.__set_port(0);
+
+  ASSERT_FALSE(fixture(first, second));
+  ASSERT_FALSE(fixture(second, first));
+  ASSERT_TRUE(
+      CompareNetworkAddressPB(FromTNetworkAddress(first), 
FromTNetworkAddress(second))
+      == 0);
+  ASSERT_TRUE(
+      CompareNetworkAddressPB(FromTNetworkAddress(second), 
FromTNetworkAddress(first))
+      == 0);
+}
+
+void CheckTranslation(TNetworkAddress thrift_address) {
+  NetworkAddressPB proto_address = FromTNetworkAddress(thrift_address);
+  TNetworkAddress thrift_address2 = FromNetworkAddressPB(proto_address);
+  NetworkAddressPB proto_address2 = FromTNetworkAddress(thrift_address2);
+
+  TNetworkAddressComparator fixture;
+  ASSERT_FALSE(fixture(thrift_address, thrift_address2));
+  ASSERT_FALSE(fixture(thrift_address2, thrift_address));
+  ASSERT_TRUE(CompareNetworkAddressPB(proto_address, proto_address2) == 0);
+  ASSERT_TRUE(CompareNetworkAddressPB(proto_address2, proto_address) == 0);
+}
+
+// Assert consistent translation between TNetworkAddress and NetworkAddressPB.
+TEST(NetworkUtil, NetAddrTranslation) {
+  TNetworkAddress addr;
+  addr.__set_hostname("host");
+  addr.__set_port(0);
+  CheckTranslation(addr);
+  addr.__set_uds_address("uds");
+  CheckTranslation(addr);
 }
 
 } // namespace impala
diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc
index 80b2cde0a..b59cd408b 100644
--- a/be/src/util/network-util.cc
+++ b/be/src/util/network-util.cc
@@ -240,7 +240,7 @@ TNetworkAddress FromNetworkAddressPB(const 
NetworkAddressPB& address) {
   TNetworkAddress t_address;
   t_address.__set_hostname(address.hostname());
   t_address.__set_port(address.port());
-  t_address.__set_uds_address(address.uds_address());
+  if (address.has_uds_address()) 
t_address.__set_uds_address(address.uds_address());
   return t_address;
 }
 
@@ -248,7 +248,7 @@ NetworkAddressPB FromTNetworkAddress(const TNetworkAddress& 
address) {
   NetworkAddressPB address_pb;
   address_pb.set_hostname(address.hostname);
   address_pb.set_port(address.port);
-  address_pb.set_uds_address(address.uds_address);
+  if (address.__isset.uds_address) 
address_pb.set_uds_address(address.uds_address);
   return address_pb;
 }
 
@@ -270,7 +270,16 @@ bool TNetworkAddressComparator::operator()(const 
TNetworkAddress& a,
     }
 
     // Hostnames and ports were the same, compare on uds address.
-    return a.uds_address.compare(b.uds_address) < 0;
+    if (a.__isset.uds_address) {
+      if (b.__isset.uds_address) {
+        return a.uds_address.compare(b.uds_address) < 0;
+      } else {
+        return false;
+      }
+    } else if (b.__isset.uds_address) {
+      return true;
+    }
+    return false;
 }
 
 /// Pick a random port in the range of ephemeral ports
diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h
index 359cc11f7..0facbb191 100644
--- a/be/src/util/network-util.h
+++ b/be/src/util/network-util.h
@@ -109,10 +109,29 @@ struct TNetworkAddressComparator {
 /// a free ephemeral port can't be found after 100 tries.
 int FindUnusedEphemeralPort();
 
+/// Compare function for two NetworkAddressPB.
+/// The order is decided first by hostname, then by port, then by uds address.
+inline int CompareNetworkAddressPB(
+    const NetworkAddressPB& lhs, const NetworkAddressPB& rhs) {
+  int comp = lhs.hostname().compare(rhs.hostname());
+  if (comp == 0) comp = lhs.port() - rhs.port();
+  if (comp == 0) {
+    if (lhs.has_uds_address()) {
+      if (rhs.has_uds_address()) {
+        comp = lhs.uds_address().compare(rhs.uds_address());
+      } else {
+        comp = 1; // lhs preceed rhs
+      }
+    } else if (rhs.has_uds_address()) {
+      comp = -1; // rhs preceed lhs
+    }
+  }
+  return comp;
+}
+
 /// Return true if two NetworkAddressPB are match.
 inline bool KrpcAddressEqual(const NetworkAddressPB& lhs, const 
NetworkAddressPB& rhs) {
-  return lhs.hostname() == rhs.hostname() && lhs.port() == rhs.port()
-      && lhs.uds_address() == rhs.uds_address();
+  return CompareNetworkAddressPB(lhs, rhs) == 0;
 }
 
 extern const std::string LOCALHOST_IP_STR;
diff --git a/common/protobuf/data_stream_service.proto 
b/common/protobuf/data_stream_service.proto
index 6bca8e4d7..46c17beb0 100644
--- a/common/protobuf/data_stream_service.proto
+++ b/common/protobuf/data_stream_service.proto
@@ -124,6 +124,9 @@ message UpdateFilterParamsPB {
   optional MinMaxFilterPB min_max_filter = 4;
 
   optional InListFilterPB in_list_filter = 5;
+
+  // Remaining filter wait time as understood by sender.
+  optional int32 remaining_filter_wait_time_ms = 6;
 }
 
 message UpdateFilterResultPB {
diff --git a/tests/custom_cluster/test_runtime_filter_aggregation.py 
b/tests/custom_cluster/test_runtime_filter_aggregation.py
index da00e5367..30675d64a 100644
--- a/tests/custom_cluster/test_runtime_filter_aggregation.py
+++ b/tests/custom_cluster/test_runtime_filter_aggregation.py
@@ -21,7 +21,10 @@ import pytest
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.environ import build_flavor_timeout, 
ImpalaTestClusterProperties
-from tests.common.test_dimensions import add_mandatory_exec_option
+from tests.common.test_dimensions import (
+  add_mandatory_exec_option,
+  add_exec_option_dimension
+)
 
 # slow_build_timeout is set to 200000 to avoid failures like IMPALA-8064 where 
the
 # runtime filters don't arrive in time.
@@ -43,6 +46,9 @@ class TestRuntimeFilterAggregation(CustomClusterTestSuite):
   def add_test_dimensions(cls):
     super(TestRuntimeFilterAggregation, cls).add_test_dimensions()
     add_mandatory_exec_option(cls, 'max_num_filters_aggregated_per_host', 2)
+    # Exercise small, non-fatal jitters.
+    add_exec_option_dimension(
+      cls, 'debug_action', ['', 'QUERY_STATE_BEFORE_INIT_GLOBAL:JITTER@200]'])
     # Enable query option ASYNC_CODEGEN for slow build
     if build_runs_slowly:
       add_mandatory_exec_option(cls, "async_codegen", 1)
@@ -60,3 +66,64 @@ class TestRuntimeFilterAggregation(CustomClusterTestSuite):
     }
     self.run_test_case('QueryTest/runtime_filters', vector, 
test_file_vars=vars)
     self.run_test_case('QueryTest/bloom_filters', vector)
+
+
+class TestLateQueryStateInit(CustomClusterTestSuite):
+  """Test that distributed runtime filter aggregation still works
+  when remote query state of intermediate aggregator node is late to 
initialize."""
+  _wait_time = WAIT_TIME_MS // 20
+  _init_delay = [100, 3000]
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestLateQueryStateInit, cls).add_test_dimensions()
+    add_mandatory_exec_option(cls, 'max_num_filters_aggregated_per_host', 2)
+    add_mandatory_exec_option(cls, 'runtime_filter_wait_time_ms', 
cls._wait_time)
+    # Inject sleep in second impalad since 
sort_runtime_filter_aggregator_cadidate=true
+    # and the first one (coordinator) will never be selected as intermediate 
aggregator.
+    actions = ["QUERY_STATE_BEFORE_INIT:27001:SLEEP@{0}".format(d) for d in
+        cls._init_delay]
+    add_exec_option_dimension(cls, 'debug_action', actions)
+    # Enable query option ASYNC_CODEGEN for slow build
+    if build_runs_slowly:
+      add_mandatory_exec_option(cls, "async_codegen", 1)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--sort_runtime_filter_aggregator_candidates=true 
--logbuflevel=-1")
+  def test_late_query_state_init(self, vector):
+    """Test that distributed runtime filter aggregation still works
+    when remote query state of intermediate aggregator node is late to 
initialize."""
+    query = ('select count(*) from functional.alltypes p '
+             'join [SHUFFLE] functional.alltypestiny b '
+             'on p.month = b.int_col and b.month = 1 and b.string_col = "1"')
+    exec_options = vector.get_value('exec_option')
+    result = self.execute_query_expect_success(self.client, query, 
exec_options)
+    assert result.data[0] == '620'
+
+    # Expect no log printed in short init delay scenario.
+    # In long init delay scenario, two possible situations can happen:
+    # 1. The build scanner assigned in first impalad exchange all build rows 
(1) to
+    #    the second impalad that is blocked at QueryExecMgr::StartQuery. This 
indirectly
+    #    delay all impalad because all JOIN BUILD fragment need to wait for 
EOS signal
+    #    from exchange sender. The probability for this case is 1/3.
+    # 2. The build scanner exchange all build rows to other impalads than the 
second one.
+    #    The JOIN BUILD fragment in first and third impalads immediately 
receive EOS
+    #    signal from exchange sender, complete build, and send their filter 
update to
+    #    the second impalad. The second impalad stay blocked at 
QueryExecMgr::StartQuery
+    #    and filter update need to wait until it gives up. The probability for 
this case
+    #    is 2/3.
+    expected = -1 if str(self._init_delay[-1]) in exec_options['debug_action'] 
else 0
+    all_blocked = 'UpdateFilterFromRemote RPC called with remaining wait time'
+    preagg_blocked = 'QueryState for query_id={0} no'.format(result.query_id)
+    log_pattern = '({0}|{1})'.format(all_blocked, preagg_blocked)
+    if expected == -1:
+      if 'Filter 0 inflight for final aggregation' in result.runtime_profile:
+        log_pattern = all_blocked  # case 1.
+      else:
+        log_pattern = preagg_blocked  # case 2.
+    self.assert_log_contains('impalad_node1', 'INFO', log_pattern, expected)
diff --git a/tests/util/workload_management.py 
b/tests/util/workload_management.py
index c092f85c5..772041aa7 100644
--- a/tests/util/workload_management.py
+++ b/tests/util/workload_management.py
@@ -359,8 +359,10 @@ def assert_query(query_tbl, client, expected_cluster_id, 
raw_profile=None, impal
     perhost_frags = re.search(r'\n\s+Per Host Number of Fragment 
Instances:\s+(.*?)\n',
         profile_text)
     assert perhost_frags is not None
-    assert data[index] == ",".join(sorted(perhost_frags.group(1).replace("(", 
"=")
-        .replace(")", "").split(" "))), "per-host fragment instances incorrect"
+    expected = ",".join(sorted(perhost_frags.group(1).replace("(", "=")
+        .replace(")", "").split(" ")))
+    assert data[index] == expected, ('per-host fragment instances incorrect.'
+        ' expected="{0}" actual="{1}"').format(expected, data[index])
 
   # Backends Count
   index += 1

Reply via email to