This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch branch-4.4.1 in repository https://gitbox.apache.org/repos/asf/impala.git
commit bb9df269a0ea1baed5eb49aa6784308d3172565f Author: Michael Smith <[email protected]> AuthorDate: Mon Jun 3 16:36:36 2024 -0700 IMPALA-13130: Prioritize EndDataStream messages Prioritize EndDataStream messages over other types handled by DataStreamService, and avoid rejecting them when memory limit is reached. They take very little memory (~75 bytes) and will usually help reduce memory use by closing out in-progress operations. Adds the 'data_stream_sender_eos_timeout_ms' flag to control EOS timeouts. Defaults to 1 hour, and can be disabled by setting to -1. Adds unit tests ensuring EOS are processed even if mem limit is reached and ahead of TransmitData messages in the queue. Change-Id: I2829e1ab5bcde36107e10bff5fe629c5ee60f3e8 Reviewed-on: http://gerrit.cloudera.org:8080/21476 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/rpc/impala-service-pool.cc | 27 +++-- be/src/runtime/data-stream-test.cc | 167 ++++++++++++++++++++++++++++-- be/src/runtime/krpc-data-stream-sender.cc | 10 ++ be/src/service/data-stream-service.cc | 1 + be/src/service/data-stream-service.h | 2 + 5 files changed, 191 insertions(+), 16 deletions(-) diff --git a/be/src/rpc/impala-service-pool.cc b/be/src/rpc/impala-service-pool.cc index 009df3a9d..c1a597a76 100644 --- a/be/src/rpc/impala-service-pool.cc +++ b/be/src/rpc/impala-service-pool.cc @@ -38,6 +38,7 @@ #include "kudu/util/trace.h" #include "runtime/exec-env.h" #include "runtime/mem-tracker.h" +#include "service/data-stream-service.h" #include "util/pretty-printer.h" #include "util/thread.h" @@ -189,13 +190,25 @@ kudu::Status ImpalaServicePool::QueueInboundCall( // usage. unique_lock<SpinLock> mem_tracker_lock(mem_tracker_lock_); if (UNLIKELY(service_mem_tracker_->AnyLimitExceeded(MemLimit::HARD))) { - // Discards the transfer early so the transfer size drops to 0. This is to ensure - // the MemTracker::Release() call in FailAndReleaseRpc() is correct as we haven't - // called MemTracker::Consume() at this point. - mem_tracker_lock.unlock(); - c->DiscardTransfer(); - RejectTooBusy(c); - return kudu::Status::OK(); + if (c->remote_method().method_name() == DataStreamService::END_DATA_STREAM) { + // EndDataStream operations use very little memory and help free up other + // resources, so ignore memory hard limit and always add them to the queue. This + // can help complete queries earlier when under heavy load that would otherwise + // drop the requests and require them to be retried. + LOG(WARNING) << "Admitting " << c->remote_method().method_name() + << " request on " << service_->service_name() + << "from " << c->remote_address().ToString() + << " to complete query despite exceeding memory limit; memory consumption is " + << PrettyPrinter::Print(service_mem_tracker_->consumption(), TUnit::BYTES); + } else { + // Discards the transfer early so the transfer size drops to 0. This is to ensure + // the MemTracker::Release() call in FailAndReleaseRpc() is correct as we haven't + // called MemTracker::Consume() at this point. + mem_tracker_lock.unlock(); + c->DiscardTransfer(); + RejectTooBusy(c); + return kudu::Status::OK(); + } } service_mem_tracker_->Consume(transfer_size); } diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc index e78c088b3..47ee9949f 100644 --- a/be/src/runtime/data-stream-test.cc +++ b/be/src/runtime/data-stream-test.cc @@ -57,6 +57,7 @@ #include "gen-cpp/Descriptors_types.h" #include "service/fe-support.h" +#include <atomic> #include <iostream> #include <string> #include <unistd.h> @@ -70,15 +71,20 @@ using namespace apache::thrift; using namespace apache::thrift::protocol; using kudu::MetricEntity; +using kudu::MonoDelta; +using kudu::MonoTime; using kudu::rpc::ResultTracker; using kudu::rpc::RpcContext; using kudu::rpc::ServiceIf; +using kudu::SleepFor; DEFINE_int32(port, 20001, "port on which to run Impala krpc based test backend."); DECLARE_int32(datastream_sender_timeout_ms); DECLARE_int32(datastream_service_num_deserialization_threads); DECLARE_int32(datastream_service_deserialization_queue_size); DECLARE_string(datastream_service_queue_mem_limit); +DECLARE_int32(datastream_service_num_svc_threads); +DECLARE_string(debug_actions); static const PlanNodeId DEST_NODE_ID = 1; static const int BATCH_CAPACITY = 100; // rows @@ -108,7 +114,10 @@ class ImpalaKRPCTestBackend : public DataStreamServiceIf { virtual ~ImpalaKRPCTestBackend() {} Status Init() { - return rpc_mgr_->RegisterService(CpuInfo::num_cores(), 1024, this, mem_tracker(), + int num_svc_threads = FLAGS_datastream_service_num_svc_threads > 0 ? + FLAGS_datastream_service_num_svc_threads : CpuInfo::num_cores(); + LOG(INFO) << "Num svc thread=" << num_svc_threads; + return rpc_mgr_->RegisterService(num_svc_threads, 1024, this, mem_tracker(), ExecEnv::GetInstance()->rpc_metrics()); } @@ -119,6 +128,8 @@ class ImpalaKRPCTestBackend : public DataStreamServiceIf { virtual void TransmitData(const TransmitDataRequestPB* request, TransmitDataResponsePB* response, RpcContext* rpc_context) { + transmit_counter_++; + DebugActionNoFail(FLAGS_debug_actions, "TRANSMIT_DATA_DELAY"); stream_mgr_->AddData(request, response, rpc_context); } @@ -138,10 +149,21 @@ class ImpalaKRPCTestBackend : public DataStreamServiceIf { MemTracker* mem_tracker() { return mem_tracker_.get(); } + int NumTransmitsReceived() const { + return transmit_counter_.load(); + } + + int QueueSize() { + rapidjson::Document doc(rapidjson::kObjectType); + rpc_mgr_->ToJson(&doc); + return doc["services"][0]["queue_size"].GetInt(); + } + private: RpcMgr* rpc_mgr_; KrpcDataStreamMgr* stream_mgr_; unique_ptr<MemTracker> mem_tracker_; + std::atomic<int> transmit_counter_; }; class DataStreamTest : public testing::Test { @@ -149,6 +171,8 @@ class DataStreamTest : public testing::Test { DataStreamTest() : next_val_(0) { // Stop tests that rely on mismatched sender / receiver pairs timing out from failing. FLAGS_datastream_sender_timeout_ms = 250; + // MemTracker is not given a mem limit, so the default would be 5% * 0 = 0. + FLAGS_datastream_service_queue_mem_limit = std::to_string(TOTAL_DATA_SIZE); } ~DataStreamTest() { runtime_state_->ReleaseResources(); } @@ -252,6 +276,9 @@ class DataStreamTest : public testing::Test { dest_.Clear(); } + // Ensure gflags are reset in each test class. + gflags::FlagSaver saver_; + ObjectPool obj_pool_; MemTracker tracker_; scoped_ptr<MemPool> mem_pool_; @@ -401,7 +428,8 @@ class DataStreamTest : public testing::Test { // Start receiver (expecting given number of senders) in separate thread. void StartReceiver(TPartitionType::type stream_type, int num_senders, int receiver_num, - int buffer_size, bool is_merging, TUniqueId* out_id = nullptr) { + int buffer_size, bool is_merging, TUniqueId* out_id = nullptr, + RuntimeProfile** out_profile = nullptr) { VLOG_QUERY << "start receiver"; RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver"); TUniqueId instance_id; @@ -423,6 +451,7 @@ class DataStreamTest : public testing::Test { profile, less_than_comparator)); } if (out_id != nullptr) *out_id = instance_id; + if (out_profile != nullptr) *out_profile = profile; } void JoinReceivers() { @@ -476,7 +505,8 @@ class DataStreamTest : public testing::Test { } // Verify correctness of receivers' data values. - void CheckReceivers(TPartitionType::type stream_type, int num_senders) { + void CheckReceivers(TPartitionType::type stream_type, int num_senders, + int num_batches = NUM_BATCHES) { int64_t total = 0; multiset<int64_t> all_data_values; for (int i = 0; i < receiver_info_.size(); ++i) { @@ -487,7 +517,7 @@ class DataStreamTest : public testing::Test { ASSERT_EQ(info->num_senders, num_senders); if (stream_type == TPartitionType::UNPARTITIONED) { EXPECT_EQ( - NUM_BATCHES * BATCH_CAPACITY * num_senders, info->data_values.size()); + num_batches * BATCH_CAPACITY * num_senders, info->data_values.size()); } all_data_values.insert(info->data_values.begin(), info->data_values.end()); @@ -510,7 +540,7 @@ class DataStreamTest : public testing::Test { } if (stream_type == TPartitionType::HASH_PARTITIONED) { - EXPECT_EQ(NUM_BATCHES * BATCH_CAPACITY * num_senders, total); + EXPECT_EQ(num_batches * BATCH_CAPACITY * num_senders, total); int k = 0; for (multiset<int64_t>::iterator j = all_data_values.begin(); @@ -584,13 +614,15 @@ class DataStreamTest : public testing::Test { } void StartSender(TPartitionType::type partition_type = TPartitionType::UNPARTITIONED, - int channel_buffer_size = 1024, bool reset_hash_seed = false) { + int channel_buffer_size = 1024, bool reset_hash_seed = false, + int num_batches = NUM_BATCHES) { VLOG_QUERY << "start sender"; int num_senders = sender_info_.size(); sender_info_.emplace_back(make_unique<SenderInfo>()); sender_info_.back()->thread_handle.reset( new thread(&DataStreamTest::Sender, this, num_senders, channel_buffer_size, - partition_type, sender_info_[num_senders].get(), reset_hash_seed)); + partition_type, sender_info_[num_senders].get(), reset_hash_seed, + num_batches)); } void JoinSenders() { @@ -601,7 +633,8 @@ class DataStreamTest : public testing::Test { } void Sender(int sender_num, int channel_buffer_size, - TPartitionType::type partition_type, SenderInfo* info, bool reset_hash_seed) { + TPartitionType::type partition_type, SenderInfo* info, bool reset_hash_seed, + int num_batches) { RuntimeState state(TQueryCtx(), exec_env_.get(), desc_tbl_); VLOG_QUERY << "create sender " << sender_num; const TDataSink sink = GetSink(partition_type); @@ -633,7 +666,7 @@ class DataStreamTest : public testing::Test { EXPECT_OK(sender->Open(&state)); scoped_ptr<RowBatch> batch(CreateRowBatch()); int next_val = 0; - for (int i = 0; i < NUM_BATCHES; ++i) { + for (int i = 0; i < num_batches; ++i) { GetNextBatch(batch.get(), &next_val); VLOG_QUERY << "sender " << sender_num << ": #rows=" << batch->num_rows(); info->status = sender->Send(&state, batch.get()); @@ -684,6 +717,19 @@ class DataStreamTestShortDeserQueue : public DataStreamTest { } }; +// A separate test class which simulates that the service queue is slow processing RPCs. +class DataStreamTestSlowServiceQueue : public DataStreamTest { + protected: + virtual void SetUp() { + FLAGS_datastream_service_num_svc_threads = 1; + DataStreamTest::SetUp(); + } + + virtual void TearDown() { + DataStreamTest::TearDown(); + } +}; + // A separate test class which simulates that the service queue fills up. class DataStreamTestShortServiceQueue : public DataStreamTest { protected: @@ -821,12 +867,115 @@ TEST_F(DataStreamTestShortDeserQueue, TestNoDeadlock) { CheckReceivers(TPartitionType::UNPARTITIONED, 4); } +// Test that EndDataStream RPCs are prioritized over other RPCs in the ImpalaServicePool +// queue. It starts a receiver with 1 service thread and a 3s delay on processing +// TransmitData RPCs, and starts 2 senders each trying to send data. The 1st sender's RPC +// starts processing, and the 2nd sender's RPC is queued in the ImpalaServicePool. Then +// starts a 3rd sender that sends no data, just EndDataStream, and asserts it completes +// before the 2nd block that's already queued i.e. before the 2nd 3s delay has passed. +TEST_F(DataStreamTestSlowServiceQueue, TestPrioritizeEos) { + FLAGS_debug_actions="TRANSMIT_DATA_DELAY:SLEEP@3000"; + TPartitionType::type stream_type = TPartitionType::UNPARTITIONED; + constexpr int buffer_size = 1024; + constexpr int num_batches = 1; + Reset(); + + TUniqueId instance_id; + RuntimeProfile* profile; + StartReceiver(stream_type, 3, 0, buffer_size, false, &instance_id, &profile); + + MonotonicStopWatch timer; + timer.Start(); + // Start sender; TRANSMIT_DATA_DELAY:SLEEP will block up the queue. + StartSender(stream_type, buffer_size, false, num_batches); + StartSender(stream_type, buffer_size, false, num_batches); + // Wait for 1st sender batch to be received and 2nd sender batch queued. + while (test_service_->NumTransmitsReceived() == 0) { + SleepFor(MonoDelta::FromMilliseconds(1)); + } + while (test_service_->QueueSize() == 0) { + SleepFor(MonoDelta::FromMilliseconds(1)); + } + + // Start last sender with no batches so it just sends EndDataStream. + StartSender(stream_type, buffer_size, false, 0); + + // Wait for last sender to complete. + sender_info_.back()->thread_handle->join(); + sender_info_.pop_back(); + // Verify receiver got EOS. + vector<RuntimeProfileBase::Counter*> counters; + profile->GetCounters("TotalEosReceived", &counters); + ASSERT_EQ(1, counters.size()); + EXPECT_EQ(1, counters.front()->value()); + // Assert 2nd TRANSMIT_DATA_DELAY sleep is not finished. + EXPECT_GT(timer.ElapsedTime(), 3 * MonoTime::kNanosecondsPerSecond); + EXPECT_LT(timer.ElapsedTime(), 6 * MonoTime::kNanosecondsPerSecond); + + // Clean up. + JoinSenders(); + JoinReceivers(); + EXPECT_GT(timer.ElapsedTime(), 6 * MonoTime::kNanosecondsPerSecond); + EXPECT_EQ(2 * num_batches * BATCH_CAPACITY, receiver_info_[0]->data_values.size()); + EXPECT_EQ(3, counters.front()->value()); +} + // Test that payloads larger than the service queue's soft mem limit can be transmitted. TEST_F(DataStreamTestShortServiceQueue, TestLargePayload) { TestStream( TPartitionType::UNPARTITIONED, 4, 1, SHORT_SERVICE_QUEUE_MEM_LIMIT * 2, false); } +// Test that EndDataStream messages are admitted to the ImpalaServicePool queue when the +// pool memory limit has been reached. Starts a receiver with a very low memory limit on +// the pool so only one message is admitted to the queue at a time; others will be +// rejected and must be retried by the sender. Starts a sender that sends a batch, with +// processing of TransmitData delayed for 3s; then starts another sender with no blocks +// that just sends EndDataStream and verifies the EndDataStream message is processed +// before the 3s delay finishes, so it must have been admitted while memory was still +// held for the TransmitData message. +TEST_F(DataStreamTestShortServiceQueue, TestFullQueue) { + FLAGS_debug_actions="TRANSMIT_DATA_DELAY:SLEEP@3000"; + TPartitionType::type stream_type = TPartitionType::UNPARTITIONED; + constexpr int buffer_size = SHORT_SERVICE_QUEUE_MEM_LIMIT * 2; + constexpr int num_batches = 1; + Reset(); + + TUniqueId instance_id; + RuntimeProfile* profile; + StartReceiver(stream_type, 2, 0, buffer_size, false, &instance_id, &profile); + + MonotonicStopWatch timer; + timer.Start(); + // Start sender; TRANSMIT_DATA_DELAY:SLEEP will block up the queue. + StartSender(stream_type, buffer_size, false, num_batches); + // Wait for 1st batch to be received. + while (test_service_->NumTransmitsReceived() == 0) { + SleepFor(MonoDelta::FromMilliseconds(1)); + } + + // Start last sender with no batches so it just sends EndDataStream. + StartSender(stream_type, buffer_size, false, 0); + + // Wait for last sender to complete. + sender_info_.back()->thread_handle->join(); + sender_info_.pop_back(); + // Verify receiver got EOS. + vector<RuntimeProfileBase::Counter*> counters; + profile->GetCounters("TotalEosReceived", &counters); + ASSERT_EQ(1, counters.size()); + EXPECT_EQ(1, counters.front()->value()); + // Assert TRANSMIT_DATA_DELAY sleep is not finished. + EXPECT_LT(timer.ElapsedTime(), 3 * MonoTime::kNanosecondsPerSecond); + + // Clean up. + JoinSenders(); + JoinReceivers(); + EXPECT_GT(timer.ElapsedTime(), 3 * MonoTime::kNanosecondsPerSecond); + EXPECT_EQ(num_batches * BATCH_CAPACITY, receiver_info_[0]->data_values.size()); + EXPECT_EQ(2, counters.front()->value()); +} + // TODO: more tests: // - test case for transmission error in last batch // - receivers getting created concurrently diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc index 6bacf7843..8dbf3dc3a 100644 --- a/be/src/runtime/krpc-data-stream-sender.cc +++ b/be/src/runtime/krpc-data-stream-sender.cc @@ -60,6 +60,10 @@ DEFINE_int64(data_stream_sender_buffer_size, 16 * 1024, "(Advanced) Max size in bytes which a row batch in a data stream sender's channel " "can accumulate before the row batch is sent over the wire."); +DEFINE_int64_hidden(data_stream_sender_eos_timeout_ms, 60*60*1000, + "Timeout for EndDataStream (EOS) RPCs. Setting a timeout prioritizes them over other " + "DataStreamService RPCs. Defaults to 1 hour. Set to 0 or negative value to disable " + "the timeout."); using std::condition_variable_any; using namespace apache::thrift; @@ -676,6 +680,12 @@ Status KrpcDataStreamSender::Channel::DoEndDataStreamRpc() { DCHECK(rpc_in_flight_); EndDataStreamRequestPB eos_req; rpc_controller_.Reset(); + if (FLAGS_data_stream_sender_eos_timeout_ms > 0) { + // Provide a timeout so EOS RPCs are prioritized over others, as completing a stream + // can help free up resources. + rpc_controller_.set_timeout( + MonoDelta::FromMilliseconds(FLAGS_data_stream_sender_eos_timeout_ms)); + } *eos_req.mutable_dest_fragment_instance_id() = fragment_instance_id_; eos_req.set_sender_id(parent_->sender_id_); eos_req.set_dest_node_id(dest_node_id_); diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc index f26c67cbf..b05052d76 100644 --- a/be/src/service/data-stream-service.cc +++ b/be/src/service/data-stream-service.cc @@ -69,6 +69,7 @@ DataStreamService::DataStreamService(MetricGroup* metric_group) mem_tracker_.reset(new MemTracker( bytes_limit, "Data Stream Service Queue", process_mem_tracker)); MemTrackerMetric::CreateMetrics(metric_group, mem_tracker_.get(), "DataStreamService"); + DCHECK_EQ(methods_by_name_.count(string(END_DATA_STREAM)), 1); } Status DataStreamService::Init() { diff --git a/be/src/service/data-stream-service.h b/be/src/service/data-stream-service.h index a934bc686..11649b905 100644 --- a/be/src/service/data-stream-service.h +++ b/be/src/service/data-stream-service.h @@ -42,6 +42,8 @@ class MetricGroup; /// appropriate receivers. Metrics exposed by the service will be added to 'metric_group'. class DataStreamService : public DataStreamServiceIf { public: + static constexpr std::string_view END_DATA_STREAM = "EndDataStream"; + DataStreamService(MetricGroup* metric_group); /// Initializes the service by registering it with the singleton RPC manager.
