This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.4.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit bb9df269a0ea1baed5eb49aa6784308d3172565f
Author: Michael Smith <[email protected]>
AuthorDate: Mon Jun 3 16:36:36 2024 -0700

    IMPALA-13130: Prioritize EndDataStream messages
    
    Prioritize EndDataStream messages over other types handled by
    DataStreamService, and avoid rejecting them when memory limit is
    reached. They take very little memory (~75 bytes) and will usually help
    reduce memory use by closing out in-progress operations.
    
    Adds the 'data_stream_sender_eos_timeout_ms' flag to control EOS
    timeouts. Defaults to 1 hour, and can be disabled by setting to -1.
    
    Adds unit tests ensuring EOS are processed even if mem limit is reached
    and ahead of TransmitData messages in the queue.
    
    Change-Id: I2829e1ab5bcde36107e10bff5fe629c5ee60f3e8
    Reviewed-on: http://gerrit.cloudera.org:8080/21476
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/rpc/impala-service-pool.cc         |  27 +++--
 be/src/runtime/data-stream-test.cc        | 167 ++++++++++++++++++++++++++++--
 be/src/runtime/krpc-data-stream-sender.cc |  10 ++
 be/src/service/data-stream-service.cc     |   1 +
 be/src/service/data-stream-service.h      |   2 +
 5 files changed, 191 insertions(+), 16 deletions(-)

diff --git a/be/src/rpc/impala-service-pool.cc 
b/be/src/rpc/impala-service-pool.cc
index 009df3a9d..c1a597a76 100644
--- a/be/src/rpc/impala-service-pool.cc
+++ b/be/src/rpc/impala-service-pool.cc
@@ -38,6 +38,7 @@
 #include "kudu/util/trace.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
+#include "service/data-stream-service.h"
 #include "util/pretty-printer.h"
 #include "util/thread.h"
 
@@ -189,13 +190,25 @@ kudu::Status ImpalaServicePool::QueueInboundCall(
     // usage.
     unique_lock<SpinLock> mem_tracker_lock(mem_tracker_lock_);
     if (UNLIKELY(service_mem_tracker_->AnyLimitExceeded(MemLimit::HARD))) {
-      // Discards the transfer early so the transfer size drops to 0. This is 
to ensure
-      // the MemTracker::Release() call in FailAndReleaseRpc() is correct as 
we haven't
-      // called MemTracker::Consume() at this point.
-      mem_tracker_lock.unlock();
-      c->DiscardTransfer();
-      RejectTooBusy(c);
-      return kudu::Status::OK();
+      if (c->remote_method().method_name() == 
DataStreamService::END_DATA_STREAM) {
+        // EndDataStream operations use very little memory and help free up 
other
+        // resources, so ignore memory hard limit and always add them to the 
queue. This
+        // can help complete queries earlier when under heavy load that would 
otherwise
+        // drop the requests and require them to be retried.
+        LOG(WARNING) << "Admitting " << c->remote_method().method_name()
+            << " request on " << service_->service_name()
+            << "from " << c->remote_address().ToString()
+            << " to complete query despite exceeding memory limit; memory 
consumption is "
+            << PrettyPrinter::Print(service_mem_tracker_->consumption(), 
TUnit::BYTES);
+      } else {
+        // Discards the transfer early so the transfer size drops to 0. This 
is to ensure
+        // the MemTracker::Release() call in FailAndReleaseRpc() is correct as 
we haven't
+        // called MemTracker::Consume() at this point.
+        mem_tracker_lock.unlock();
+        c->DiscardTransfer();
+        RejectTooBusy(c);
+        return kudu::Status::OK();
+      }
     }
     service_mem_tracker_->Consume(transfer_size);
   }
diff --git a/be/src/runtime/data-stream-test.cc 
b/be/src/runtime/data-stream-test.cc
index e78c088b3..47ee9949f 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -57,6 +57,7 @@
 #include "gen-cpp/Descriptors_types.h"
 #include "service/fe-support.h"
 
+#include <atomic>
 #include <iostream>
 #include <string>
 #include <unistd.h>
@@ -70,15 +71,20 @@ using namespace apache::thrift;
 using namespace apache::thrift::protocol;
 
 using kudu::MetricEntity;
+using kudu::MonoDelta;
+using kudu::MonoTime;
 using kudu::rpc::ResultTracker;
 using kudu::rpc::RpcContext;
 using kudu::rpc::ServiceIf;
+using kudu::SleepFor;
 
 DEFINE_int32(port, 20001, "port on which to run Impala krpc based test 
backend.");
 DECLARE_int32(datastream_sender_timeout_ms);
 DECLARE_int32(datastream_service_num_deserialization_threads);
 DECLARE_int32(datastream_service_deserialization_queue_size);
 DECLARE_string(datastream_service_queue_mem_limit);
+DECLARE_int32(datastream_service_num_svc_threads);
+DECLARE_string(debug_actions);
 
 static const PlanNodeId DEST_NODE_ID = 1;
 static const int BATCH_CAPACITY = 100;  // rows
@@ -108,7 +114,10 @@ class ImpalaKRPCTestBackend : public DataStreamServiceIf {
   virtual ~ImpalaKRPCTestBackend() {}
 
   Status Init() {
-    return rpc_mgr_->RegisterService(CpuInfo::num_cores(), 1024, this, 
mem_tracker(),
+    int num_svc_threads = FLAGS_datastream_service_num_svc_threads > 0 ?
+        FLAGS_datastream_service_num_svc_threads : CpuInfo::num_cores();
+    LOG(INFO) << "Num svc thread=" << num_svc_threads;
+    return rpc_mgr_->RegisterService(num_svc_threads, 1024, this, 
mem_tracker(),
         ExecEnv::GetInstance()->rpc_metrics());
   }
 
@@ -119,6 +128,8 @@ class ImpalaKRPCTestBackend : public DataStreamServiceIf {
 
   virtual void TransmitData(const TransmitDataRequestPB* request,
       TransmitDataResponsePB* response, RpcContext* rpc_context) {
+    transmit_counter_++;
+    DebugActionNoFail(FLAGS_debug_actions, "TRANSMIT_DATA_DELAY");
     stream_mgr_->AddData(request, response, rpc_context);
   }
 
@@ -138,10 +149,21 @@ class ImpalaKRPCTestBackend : public DataStreamServiceIf {
 
   MemTracker* mem_tracker() { return mem_tracker_.get(); }
 
+  int NumTransmitsReceived() const {
+    return transmit_counter_.load();
+  }
+
+  int QueueSize() {
+    rapidjson::Document doc(rapidjson::kObjectType);
+    rpc_mgr_->ToJson(&doc);
+    return doc["services"][0]["queue_size"].GetInt();
+  }
+
  private:
   RpcMgr* rpc_mgr_;
   KrpcDataStreamMgr* stream_mgr_;
   unique_ptr<MemTracker> mem_tracker_;
+  std::atomic<int> transmit_counter_;
 };
 
 class DataStreamTest : public testing::Test {
@@ -149,6 +171,8 @@ class DataStreamTest : public testing::Test {
   DataStreamTest() : next_val_(0) {
     // Stop tests that rely on mismatched sender / receiver pairs timing out 
from failing.
     FLAGS_datastream_sender_timeout_ms = 250;
+    // MemTracker is not given a mem limit, so the default would be 5% * 0 = 0.
+    FLAGS_datastream_service_queue_mem_limit = std::to_string(TOTAL_DATA_SIZE);
   }
   ~DataStreamTest() { runtime_state_->ReleaseResources(); }
 
@@ -252,6 +276,9 @@ class DataStreamTest : public testing::Test {
     dest_.Clear();
   }
 
+  // Ensure gflags are reset in each test class.
+  gflags::FlagSaver saver_;
+
   ObjectPool obj_pool_;
   MemTracker tracker_;
   scoped_ptr<MemPool> mem_pool_;
@@ -401,7 +428,8 @@ class DataStreamTest : public testing::Test {
 
   // Start receiver (expecting given number of senders) in separate thread.
   void StartReceiver(TPartitionType::type stream_type, int num_senders, int 
receiver_num,
-      int buffer_size, bool is_merging, TUniqueId* out_id = nullptr) {
+      int buffer_size, bool is_merging, TUniqueId* out_id = nullptr,
+      RuntimeProfile** out_profile = nullptr) {
     VLOG_QUERY << "start receiver";
     RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, 
"TestReceiver");
     TUniqueId instance_id;
@@ -423,6 +451,7 @@ class DataStreamTest : public testing::Test {
           profile, less_than_comparator));
     }
     if (out_id != nullptr) *out_id = instance_id;
+    if (out_profile != nullptr) *out_profile = profile;
   }
 
   void JoinReceivers() {
@@ -476,7 +505,8 @@ class DataStreamTest : public testing::Test {
   }
 
   // Verify correctness of receivers' data values.
-  void CheckReceivers(TPartitionType::type stream_type, int num_senders) {
+  void CheckReceivers(TPartitionType::type stream_type, int num_senders,
+      int num_batches = NUM_BATCHES) {
     int64_t total = 0;
     multiset<int64_t> all_data_values;
     for (int i = 0; i < receiver_info_.size(); ++i) {
@@ -487,7 +517,7 @@ class DataStreamTest : public testing::Test {
       ASSERT_EQ(info->num_senders, num_senders);
       if (stream_type == TPartitionType::UNPARTITIONED) {
         EXPECT_EQ(
-            NUM_BATCHES * BATCH_CAPACITY * num_senders, 
info->data_values.size());
+            num_batches * BATCH_CAPACITY * num_senders, 
info->data_values.size());
       }
       all_data_values.insert(info->data_values.begin(), 
info->data_values.end());
 
@@ -510,7 +540,7 @@ class DataStreamTest : public testing::Test {
     }
 
     if (stream_type == TPartitionType::HASH_PARTITIONED) {
-      EXPECT_EQ(NUM_BATCHES * BATCH_CAPACITY * num_senders, total);
+      EXPECT_EQ(num_batches * BATCH_CAPACITY * num_senders, total);
 
       int k = 0;
       for (multiset<int64_t>::iterator j = all_data_values.begin();
@@ -584,13 +614,15 @@ class DataStreamTest : public testing::Test {
   }
 
   void StartSender(TPartitionType::type partition_type = 
TPartitionType::UNPARTITIONED,
-      int channel_buffer_size = 1024, bool reset_hash_seed = false) {
+      int channel_buffer_size = 1024, bool reset_hash_seed = false,
+      int num_batches = NUM_BATCHES) {
     VLOG_QUERY << "start sender";
     int num_senders = sender_info_.size();
     sender_info_.emplace_back(make_unique<SenderInfo>());
     sender_info_.back()->thread_handle.reset(
         new thread(&DataStreamTest::Sender, this, num_senders, 
channel_buffer_size,
-            partition_type, sender_info_[num_senders].get(), reset_hash_seed));
+            partition_type, sender_info_[num_senders].get(), reset_hash_seed,
+            num_batches));
   }
 
   void JoinSenders() {
@@ -601,7 +633,8 @@ class DataStreamTest : public testing::Test {
   }
 
   void Sender(int sender_num, int channel_buffer_size,
-      TPartitionType::type partition_type, SenderInfo* info, bool 
reset_hash_seed) {
+      TPartitionType::type partition_type, SenderInfo* info, bool 
reset_hash_seed,
+      int num_batches) {
     RuntimeState state(TQueryCtx(), exec_env_.get(), desc_tbl_);
     VLOG_QUERY << "create sender " << sender_num;
     const TDataSink sink = GetSink(partition_type);
@@ -633,7 +666,7 @@ class DataStreamTest : public testing::Test {
     EXPECT_OK(sender->Open(&state));
     scoped_ptr<RowBatch> batch(CreateRowBatch());
     int next_val = 0;
-    for (int i = 0; i < NUM_BATCHES; ++i) {
+    for (int i = 0; i < num_batches; ++i) {
       GetNextBatch(batch.get(), &next_val);
       VLOG_QUERY << "sender " << sender_num << ": #rows=" << batch->num_rows();
       info->status = sender->Send(&state, batch.get());
@@ -684,6 +717,19 @@ class DataStreamTestShortDeserQueue : public 
DataStreamTest {
   }
 };
 
+// A separate test class which simulates that the service queue is slow 
processing RPCs.
+class DataStreamTestSlowServiceQueue : public DataStreamTest {
+ protected:
+  virtual void SetUp() {
+    FLAGS_datastream_service_num_svc_threads = 1;
+    DataStreamTest::SetUp();
+  }
+
+  virtual void TearDown() {
+    DataStreamTest::TearDown();
+  }
+};
+
 // A separate test class which simulates that the service queue fills up.
 class DataStreamTestShortServiceQueue : public DataStreamTest {
  protected:
@@ -821,12 +867,115 @@ TEST_F(DataStreamTestShortDeserQueue, TestNoDeadlock) {
   CheckReceivers(TPartitionType::UNPARTITIONED, 4);
 }
 
+// Test that EndDataStream RPCs are prioritized over other RPCs in the 
ImpalaServicePool
+// queue. It starts a receiver with 1 service thread and a 3s delay on 
processing
+// TransmitData RPCs, and starts 2 senders each trying to send data. The 1st 
sender's RPC
+// starts processing, and the 2nd sender's RPC is queued in the 
ImpalaServicePool. Then
+// starts a 3rd sender that sends no data, just EndDataStream, and asserts it 
completes
+// before the 2nd block that's already queued i.e. before the 2nd 3s delay has 
passed.
+TEST_F(DataStreamTestSlowServiceQueue, TestPrioritizeEos) {
+  FLAGS_debug_actions="TRANSMIT_DATA_DELAY:SLEEP@3000";
+  TPartitionType::type stream_type = TPartitionType::UNPARTITIONED;
+  constexpr int buffer_size = 1024;
+  constexpr int num_batches = 1;
+  Reset();
+
+  TUniqueId instance_id;
+  RuntimeProfile* profile;
+  StartReceiver(stream_type, 3, 0, buffer_size, false, &instance_id, &profile);
+
+  MonotonicStopWatch timer;
+  timer.Start();
+  // Start sender; TRANSMIT_DATA_DELAY:SLEEP will block up the queue.
+  StartSender(stream_type, buffer_size, false, num_batches);
+  StartSender(stream_type, buffer_size, false, num_batches);
+  // Wait for 1st sender batch to be received and 2nd sender batch queued.
+  while (test_service_->NumTransmitsReceived() == 0) {
+    SleepFor(MonoDelta::FromMilliseconds(1));
+  }
+  while (test_service_->QueueSize() == 0) {
+    SleepFor(MonoDelta::FromMilliseconds(1));
+  }
+
+  // Start last sender with no batches so it just sends EndDataStream.
+  StartSender(stream_type, buffer_size, false, 0);
+
+  // Wait for last sender to complete.
+  sender_info_.back()->thread_handle->join();
+  sender_info_.pop_back();
+  // Verify receiver got EOS.
+  vector<RuntimeProfileBase::Counter*> counters;
+  profile->GetCounters("TotalEosReceived", &counters);
+  ASSERT_EQ(1, counters.size());
+  EXPECT_EQ(1, counters.front()->value());
+  // Assert 2nd TRANSMIT_DATA_DELAY sleep is not finished.
+  EXPECT_GT(timer.ElapsedTime(), 3 * MonoTime::kNanosecondsPerSecond);
+  EXPECT_LT(timer.ElapsedTime(), 6 * MonoTime::kNanosecondsPerSecond);
+
+  // Clean up.
+  JoinSenders();
+  JoinReceivers();
+  EXPECT_GT(timer.ElapsedTime(), 6 * MonoTime::kNanosecondsPerSecond);
+  EXPECT_EQ(2 * num_batches * BATCH_CAPACITY, 
receiver_info_[0]->data_values.size());
+  EXPECT_EQ(3, counters.front()->value());
+}
+
 // Test that payloads larger than the service queue's soft mem limit can be 
transmitted.
 TEST_F(DataStreamTestShortServiceQueue, TestLargePayload) {
   TestStream(
       TPartitionType::UNPARTITIONED, 4, 1, SHORT_SERVICE_QUEUE_MEM_LIMIT * 2, 
false);
 }
 
+// Test that EndDataStream messages are admitted to the ImpalaServicePool 
queue when the
+// pool memory limit has been reached. Starts a receiver with a very low 
memory limit on
+// the pool so only one message is admitted to the queue at a time; others 
will be
+// rejected and must be retried by the sender. Starts a sender that sends a 
batch, with
+// processing of TransmitData delayed for 3s; then starts another sender with 
no blocks
+// that just sends EndDataStream and verifies the EndDataStream message is 
processed
+// before the 3s delay finishes, so it must have been admitted while memory 
was still
+// held for the TransmitData message.
+TEST_F(DataStreamTestShortServiceQueue, TestFullQueue) {
+  FLAGS_debug_actions="TRANSMIT_DATA_DELAY:SLEEP@3000";
+  TPartitionType::type stream_type = TPartitionType::UNPARTITIONED;
+  constexpr int buffer_size = SHORT_SERVICE_QUEUE_MEM_LIMIT * 2;
+  constexpr int num_batches = 1;
+  Reset();
+
+  TUniqueId instance_id;
+  RuntimeProfile* profile;
+  StartReceiver(stream_type, 2, 0, buffer_size, false, &instance_id, &profile);
+
+  MonotonicStopWatch timer;
+  timer.Start();
+  // Start sender; TRANSMIT_DATA_DELAY:SLEEP will block up the queue.
+  StartSender(stream_type, buffer_size, false, num_batches);
+  // Wait for 1st batch to be received.
+  while (test_service_->NumTransmitsReceived() == 0) {
+    SleepFor(MonoDelta::FromMilliseconds(1));
+  }
+
+  // Start last sender with no batches so it just sends EndDataStream.
+  StartSender(stream_type, buffer_size, false, 0);
+
+  // Wait for last sender to complete.
+  sender_info_.back()->thread_handle->join();
+  sender_info_.pop_back();
+  // Verify receiver got EOS.
+  vector<RuntimeProfileBase::Counter*> counters;
+  profile->GetCounters("TotalEosReceived", &counters);
+  ASSERT_EQ(1, counters.size());
+  EXPECT_EQ(1, counters.front()->value());
+  // Assert TRANSMIT_DATA_DELAY sleep is not finished.
+  EXPECT_LT(timer.ElapsedTime(), 3 * MonoTime::kNanosecondsPerSecond);
+
+  // Clean up.
+  JoinSenders();
+  JoinReceivers();
+  EXPECT_GT(timer.ElapsedTime(), 3 * MonoTime::kNanosecondsPerSecond);
+  EXPECT_EQ(num_batches * BATCH_CAPACITY, 
receiver_info_[0]->data_values.size());
+  EXPECT_EQ(2, counters.front()->value());
+}
+
 // TODO: more tests:
 // - test case for transmission error in last batch
 // - receivers getting created concurrently
diff --git a/be/src/runtime/krpc-data-stream-sender.cc 
b/be/src/runtime/krpc-data-stream-sender.cc
index 6bacf7843..8dbf3dc3a 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -60,6 +60,10 @@
 DEFINE_int64(data_stream_sender_buffer_size, 16 * 1024,
     "(Advanced) Max size in bytes which a row batch in a data stream sender's 
channel "
     "can accumulate before the row batch is sent over the wire.");
+DEFINE_int64_hidden(data_stream_sender_eos_timeout_ms, 60*60*1000,
+    "Timeout for EndDataStream (EOS) RPCs. Setting a timeout prioritizes them 
over other "
+    "DataStreamService RPCs. Defaults to 1 hour. Set to 0 or negative value to 
disable "
+    "the timeout.");
 
 using std::condition_variable_any;
 using namespace apache::thrift;
@@ -676,6 +680,12 @@ Status KrpcDataStreamSender::Channel::DoEndDataStreamRpc() 
{
   DCHECK(rpc_in_flight_);
   EndDataStreamRequestPB eos_req;
   rpc_controller_.Reset();
+  if (FLAGS_data_stream_sender_eos_timeout_ms > 0) {
+    // Provide a timeout so EOS RPCs are prioritized over others, as 
completing a stream
+    // can help free up resources.
+    rpc_controller_.set_timeout(
+        MonoDelta::FromMilliseconds(FLAGS_data_stream_sender_eos_timeout_ms));
+  }
   *eos_req.mutable_dest_fragment_instance_id() = fragment_instance_id_;
   eos_req.set_sender_id(parent_->sender_id_);
   eos_req.set_dest_node_id(dest_node_id_);
diff --git a/be/src/service/data-stream-service.cc 
b/be/src/service/data-stream-service.cc
index f26c67cbf..b05052d76 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -69,6 +69,7 @@ DataStreamService::DataStreamService(MetricGroup* 
metric_group)
   mem_tracker_.reset(new MemTracker(
       bytes_limit, "Data Stream Service Queue", process_mem_tracker));
   MemTrackerMetric::CreateMetrics(metric_group, mem_tracker_.get(), 
"DataStreamService");
+  DCHECK_EQ(methods_by_name_.count(string(END_DATA_STREAM)), 1);
 }
 
 Status DataStreamService::Init() {
diff --git a/be/src/service/data-stream-service.h 
b/be/src/service/data-stream-service.h
index a934bc686..11649b905 100644
--- a/be/src/service/data-stream-service.h
+++ b/be/src/service/data-stream-service.h
@@ -42,6 +42,8 @@ class MetricGroup;
 /// appropriate receivers. Metrics exposed by the service will be added to 
'metric_group'.
 class DataStreamService : public DataStreamServiceIf {
  public:
+  static constexpr std::string_view END_DATA_STREAM = "EndDataStream";
+
   DataStreamService(MetricGroup* metric_group);
 
   /// Initializes the service by registering it with the singleton RPC manager.

Reply via email to