This is an automated email from the ASF dual-hosted git repository.
dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 3f827bfc2 IMPALA-13130: Prioritize EndDataStream messages
3f827bfc2 is described below
commit 3f827bfc2447d8c11a4f09bcb96e86c53b92d753
Author: Michael Smith <[email protected]>
AuthorDate: Mon Jun 3 16:36:36 2024 -0700
IMPALA-13130: Prioritize EndDataStream messages
Prioritize EndDataStream messages over other types handled by
DataStreamService, and avoid rejecting them when memory limit is
reached. They take very little memory (~75 bytes) and will usually help
reduce memory use by closing out in-progress operations.
Adds the 'data_stream_sender_eos_timeout_ms' flag to control EOS
timeouts. Defaults to 1 hour, and can be disabled by setting to -1.
Adds unit tests ensuring EOS are processed even if mem limit is reached
and ahead of TransmitData messages in the queue.
Change-Id: I2829e1ab5bcde36107e10bff5fe629c5ee60f3e8
Reviewed-on: http://gerrit.cloudera.org:8080/21476
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/rpc/impala-service-pool.cc | 27 +++--
be/src/runtime/data-stream-test.cc | 167 ++++++++++++++++++++++++++++--
be/src/runtime/krpc-data-stream-sender.cc | 10 ++
be/src/service/data-stream-service.cc | 1 +
be/src/service/data-stream-service.h | 2 +
5 files changed, 191 insertions(+), 16 deletions(-)
diff --git a/be/src/rpc/impala-service-pool.cc
b/be/src/rpc/impala-service-pool.cc
index 009df3a9d..c1a597a76 100644
--- a/be/src/rpc/impala-service-pool.cc
+++ b/be/src/rpc/impala-service-pool.cc
@@ -38,6 +38,7 @@
#include "kudu/util/trace.h"
#include "runtime/exec-env.h"
#include "runtime/mem-tracker.h"
+#include "service/data-stream-service.h"
#include "util/pretty-printer.h"
#include "util/thread.h"
@@ -189,13 +190,25 @@ kudu::Status ImpalaServicePool::QueueInboundCall(
// usage.
unique_lock<SpinLock> mem_tracker_lock(mem_tracker_lock_);
if (UNLIKELY(service_mem_tracker_->AnyLimitExceeded(MemLimit::HARD))) {
- // Discards the transfer early so the transfer size drops to 0. This is
to ensure
- // the MemTracker::Release() call in FailAndReleaseRpc() is correct as
we haven't
- // called MemTracker::Consume() at this point.
- mem_tracker_lock.unlock();
- c->DiscardTransfer();
- RejectTooBusy(c);
- return kudu::Status::OK();
+ if (c->remote_method().method_name() ==
DataStreamService::END_DATA_STREAM) {
+ // EndDataStream operations use very little memory and help free up
other
+ // resources, so ignore memory hard limit and always add them to the
queue. This
+ // can help complete queries earlier when under heavy load that would
otherwise
+ // drop the requests and require them to be retried.
+ LOG(WARNING) << "Admitting " << c->remote_method().method_name()
+ << " request on " << service_->service_name()
+ << "from " << c->remote_address().ToString()
+ << " to complete query despite exceeding memory limit; memory
consumption is "
+ << PrettyPrinter::Print(service_mem_tracker_->consumption(),
TUnit::BYTES);
+ } else {
+ // Discards the transfer early so the transfer size drops to 0. This
is to ensure
+ // the MemTracker::Release() call in FailAndReleaseRpc() is correct as
we haven't
+ // called MemTracker::Consume() at this point.
+ mem_tracker_lock.unlock();
+ c->DiscardTransfer();
+ RejectTooBusy(c);
+ return kudu::Status::OK();
+ }
}
service_mem_tracker_->Consume(transfer_size);
}
diff --git a/be/src/runtime/data-stream-test.cc
b/be/src/runtime/data-stream-test.cc
index e78c088b3..47ee9949f 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -57,6 +57,7 @@
#include "gen-cpp/Descriptors_types.h"
#include "service/fe-support.h"
+#include <atomic>
#include <iostream>
#include <string>
#include <unistd.h>
@@ -70,15 +71,20 @@ using namespace apache::thrift;
using namespace apache::thrift::protocol;
using kudu::MetricEntity;
+using kudu::MonoDelta;
+using kudu::MonoTime;
using kudu::rpc::ResultTracker;
using kudu::rpc::RpcContext;
using kudu::rpc::ServiceIf;
+using kudu::SleepFor;
DEFINE_int32(port, 20001, "port on which to run Impala krpc based test
backend.");
DECLARE_int32(datastream_sender_timeout_ms);
DECLARE_int32(datastream_service_num_deserialization_threads);
DECLARE_int32(datastream_service_deserialization_queue_size);
DECLARE_string(datastream_service_queue_mem_limit);
+DECLARE_int32(datastream_service_num_svc_threads);
+DECLARE_string(debug_actions);
static const PlanNodeId DEST_NODE_ID = 1;
static const int BATCH_CAPACITY = 100; // rows
@@ -108,7 +114,10 @@ class ImpalaKRPCTestBackend : public DataStreamServiceIf {
virtual ~ImpalaKRPCTestBackend() {}
Status Init() {
- return rpc_mgr_->RegisterService(CpuInfo::num_cores(), 1024, this,
mem_tracker(),
+ int num_svc_threads = FLAGS_datastream_service_num_svc_threads > 0 ?
+ FLAGS_datastream_service_num_svc_threads : CpuInfo::num_cores();
+ LOG(INFO) << "Num svc thread=" << num_svc_threads;
+ return rpc_mgr_->RegisterService(num_svc_threads, 1024, this,
mem_tracker(),
ExecEnv::GetInstance()->rpc_metrics());
}
@@ -119,6 +128,8 @@ class ImpalaKRPCTestBackend : public DataStreamServiceIf {
virtual void TransmitData(const TransmitDataRequestPB* request,
TransmitDataResponsePB* response, RpcContext* rpc_context) {
+ transmit_counter_++;
+ DebugActionNoFail(FLAGS_debug_actions, "TRANSMIT_DATA_DELAY");
stream_mgr_->AddData(request, response, rpc_context);
}
@@ -138,10 +149,21 @@ class ImpalaKRPCTestBackend : public DataStreamServiceIf {
MemTracker* mem_tracker() { return mem_tracker_.get(); }
+ int NumTransmitsReceived() const {
+ return transmit_counter_.load();
+ }
+
+ int QueueSize() {
+ rapidjson::Document doc(rapidjson::kObjectType);
+ rpc_mgr_->ToJson(&doc);
+ return doc["services"][0]["queue_size"].GetInt();
+ }
+
private:
RpcMgr* rpc_mgr_;
KrpcDataStreamMgr* stream_mgr_;
unique_ptr<MemTracker> mem_tracker_;
+ std::atomic<int> transmit_counter_;
};
class DataStreamTest : public testing::Test {
@@ -149,6 +171,8 @@ class DataStreamTest : public testing::Test {
DataStreamTest() : next_val_(0) {
// Stop tests that rely on mismatched sender / receiver pairs timing out
from failing.
FLAGS_datastream_sender_timeout_ms = 250;
+ // MemTracker is not given a mem limit, so the default would be 5% * 0 = 0.
+ FLAGS_datastream_service_queue_mem_limit = std::to_string(TOTAL_DATA_SIZE);
}
~DataStreamTest() { runtime_state_->ReleaseResources(); }
@@ -252,6 +276,9 @@ class DataStreamTest : public testing::Test {
dest_.Clear();
}
+ // Ensure gflags are reset in each test class.
+ gflags::FlagSaver saver_;
+
ObjectPool obj_pool_;
MemTracker tracker_;
scoped_ptr<MemPool> mem_pool_;
@@ -401,7 +428,8 @@ class DataStreamTest : public testing::Test {
// Start receiver (expecting given number of senders) in separate thread.
void StartReceiver(TPartitionType::type stream_type, int num_senders, int
receiver_num,
- int buffer_size, bool is_merging, TUniqueId* out_id = nullptr) {
+ int buffer_size, bool is_merging, TUniqueId* out_id = nullptr,
+ RuntimeProfile** out_profile = nullptr) {
VLOG_QUERY << "start receiver";
RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_,
"TestReceiver");
TUniqueId instance_id;
@@ -423,6 +451,7 @@ class DataStreamTest : public testing::Test {
profile, less_than_comparator));
}
if (out_id != nullptr) *out_id = instance_id;
+ if (out_profile != nullptr) *out_profile = profile;
}
void JoinReceivers() {
@@ -476,7 +505,8 @@ class DataStreamTest : public testing::Test {
}
// Verify correctness of receivers' data values.
- void CheckReceivers(TPartitionType::type stream_type, int num_senders) {
+ void CheckReceivers(TPartitionType::type stream_type, int num_senders,
+ int num_batches = NUM_BATCHES) {
int64_t total = 0;
multiset<int64_t> all_data_values;
for (int i = 0; i < receiver_info_.size(); ++i) {
@@ -487,7 +517,7 @@ class DataStreamTest : public testing::Test {
ASSERT_EQ(info->num_senders, num_senders);
if (stream_type == TPartitionType::UNPARTITIONED) {
EXPECT_EQ(
- NUM_BATCHES * BATCH_CAPACITY * num_senders,
info->data_values.size());
+ num_batches * BATCH_CAPACITY * num_senders,
info->data_values.size());
}
all_data_values.insert(info->data_values.begin(),
info->data_values.end());
@@ -510,7 +540,7 @@ class DataStreamTest : public testing::Test {
}
if (stream_type == TPartitionType::HASH_PARTITIONED) {
- EXPECT_EQ(NUM_BATCHES * BATCH_CAPACITY * num_senders, total);
+ EXPECT_EQ(num_batches * BATCH_CAPACITY * num_senders, total);
int k = 0;
for (multiset<int64_t>::iterator j = all_data_values.begin();
@@ -584,13 +614,15 @@ class DataStreamTest : public testing::Test {
}
void StartSender(TPartitionType::type partition_type =
TPartitionType::UNPARTITIONED,
- int channel_buffer_size = 1024, bool reset_hash_seed = false) {
+ int channel_buffer_size = 1024, bool reset_hash_seed = false,
+ int num_batches = NUM_BATCHES) {
VLOG_QUERY << "start sender";
int num_senders = sender_info_.size();
sender_info_.emplace_back(make_unique<SenderInfo>());
sender_info_.back()->thread_handle.reset(
new thread(&DataStreamTest::Sender, this, num_senders,
channel_buffer_size,
- partition_type, sender_info_[num_senders].get(), reset_hash_seed));
+ partition_type, sender_info_[num_senders].get(), reset_hash_seed,
+ num_batches));
}
void JoinSenders() {
@@ -601,7 +633,8 @@ class DataStreamTest : public testing::Test {
}
void Sender(int sender_num, int channel_buffer_size,
- TPartitionType::type partition_type, SenderInfo* info, bool
reset_hash_seed) {
+ TPartitionType::type partition_type, SenderInfo* info, bool
reset_hash_seed,
+ int num_batches) {
RuntimeState state(TQueryCtx(), exec_env_.get(), desc_tbl_);
VLOG_QUERY << "create sender " << sender_num;
const TDataSink sink = GetSink(partition_type);
@@ -633,7 +666,7 @@ class DataStreamTest : public testing::Test {
EXPECT_OK(sender->Open(&state));
scoped_ptr<RowBatch> batch(CreateRowBatch());
int next_val = 0;
- for (int i = 0; i < NUM_BATCHES; ++i) {
+ for (int i = 0; i < num_batches; ++i) {
GetNextBatch(batch.get(), &next_val);
VLOG_QUERY << "sender " << sender_num << ": #rows=" << batch->num_rows();
info->status = sender->Send(&state, batch.get());
@@ -684,6 +717,19 @@ class DataStreamTestShortDeserQueue : public
DataStreamTest {
}
};
+// A separate test class which simulates that the service queue is slow
processing RPCs.
+class DataStreamTestSlowServiceQueue : public DataStreamTest {
+ protected:
+ virtual void SetUp() {
+ FLAGS_datastream_service_num_svc_threads = 1;
+ DataStreamTest::SetUp();
+ }
+
+ virtual void TearDown() {
+ DataStreamTest::TearDown();
+ }
+};
+
// A separate test class which simulates that the service queue fills up.
class DataStreamTestShortServiceQueue : public DataStreamTest {
protected:
@@ -821,12 +867,115 @@ TEST_F(DataStreamTestShortDeserQueue, TestNoDeadlock) {
CheckReceivers(TPartitionType::UNPARTITIONED, 4);
}
+// Test that EndDataStream RPCs are prioritized over other RPCs in the
ImpalaServicePool
+// queue. It starts a receiver with 1 service thread and a 3s delay on
processing
+// TransmitData RPCs, and starts 2 senders each trying to send data. The 1st
sender's RPC
+// starts processing, and the 2nd sender's RPC is queued in the
ImpalaServicePool. Then
+// starts a 3rd sender that sends no data, just EndDataStream, and asserts it
completes
+// before the 2nd block that's already queued i.e. before the 2nd 3s delay has
passed.
+TEST_F(DataStreamTestSlowServiceQueue, TestPrioritizeEos) {
+ FLAGS_debug_actions="TRANSMIT_DATA_DELAY:SLEEP@3000";
+ TPartitionType::type stream_type = TPartitionType::UNPARTITIONED;
+ constexpr int buffer_size = 1024;
+ constexpr int num_batches = 1;
+ Reset();
+
+ TUniqueId instance_id;
+ RuntimeProfile* profile;
+ StartReceiver(stream_type, 3, 0, buffer_size, false, &instance_id, &profile);
+
+ MonotonicStopWatch timer;
+ timer.Start();
+ // Start sender; TRANSMIT_DATA_DELAY:SLEEP will block up the queue.
+ StartSender(stream_type, buffer_size, false, num_batches);
+ StartSender(stream_type, buffer_size, false, num_batches);
+ // Wait for 1st sender batch to be received and 2nd sender batch queued.
+ while (test_service_->NumTransmitsReceived() == 0) {
+ SleepFor(MonoDelta::FromMilliseconds(1));
+ }
+ while (test_service_->QueueSize() == 0) {
+ SleepFor(MonoDelta::FromMilliseconds(1));
+ }
+
+ // Start last sender with no batches so it just sends EndDataStream.
+ StartSender(stream_type, buffer_size, false, 0);
+
+ // Wait for last sender to complete.
+ sender_info_.back()->thread_handle->join();
+ sender_info_.pop_back();
+ // Verify receiver got EOS.
+ vector<RuntimeProfileBase::Counter*> counters;
+ profile->GetCounters("TotalEosReceived", &counters);
+ ASSERT_EQ(1, counters.size());
+ EXPECT_EQ(1, counters.front()->value());
+ // Assert 2nd TRANSMIT_DATA_DELAY sleep is not finished.
+ EXPECT_GT(timer.ElapsedTime(), 3 * MonoTime::kNanosecondsPerSecond);
+ EXPECT_LT(timer.ElapsedTime(), 6 * MonoTime::kNanosecondsPerSecond);
+
+ // Clean up.
+ JoinSenders();
+ JoinReceivers();
+ EXPECT_GT(timer.ElapsedTime(), 6 * MonoTime::kNanosecondsPerSecond);
+ EXPECT_EQ(2 * num_batches * BATCH_CAPACITY,
receiver_info_[0]->data_values.size());
+ EXPECT_EQ(3, counters.front()->value());
+}
+
// Test that payloads larger than the service queue's soft mem limit can be
transmitted.
TEST_F(DataStreamTestShortServiceQueue, TestLargePayload) {
TestStream(
TPartitionType::UNPARTITIONED, 4, 1, SHORT_SERVICE_QUEUE_MEM_LIMIT * 2,
false);
}
+// Test that EndDataStream messages are admitted to the ImpalaServicePool
queue when the
+// pool memory limit has been reached. Starts a receiver with a very low
memory limit on
+// the pool so only one message is admitted to the queue at a time; others
will be
+// rejected and must be retried by the sender. Starts a sender that sends a
batch, with
+// processing of TransmitData delayed for 3s; then starts another sender with
no blocks
+// that just sends EndDataStream and verifies the EndDataStream message is
processed
+// before the 3s delay finishes, so it must have been admitted while memory
was still
+// held for the TransmitData message.
+TEST_F(DataStreamTestShortServiceQueue, TestFullQueue) {
+ FLAGS_debug_actions="TRANSMIT_DATA_DELAY:SLEEP@3000";
+ TPartitionType::type stream_type = TPartitionType::UNPARTITIONED;
+ constexpr int buffer_size = SHORT_SERVICE_QUEUE_MEM_LIMIT * 2;
+ constexpr int num_batches = 1;
+ Reset();
+
+ TUniqueId instance_id;
+ RuntimeProfile* profile;
+ StartReceiver(stream_type, 2, 0, buffer_size, false, &instance_id, &profile);
+
+ MonotonicStopWatch timer;
+ timer.Start();
+ // Start sender; TRANSMIT_DATA_DELAY:SLEEP will block up the queue.
+ StartSender(stream_type, buffer_size, false, num_batches);
+ // Wait for 1st batch to be received.
+ while (test_service_->NumTransmitsReceived() == 0) {
+ SleepFor(MonoDelta::FromMilliseconds(1));
+ }
+
+ // Start last sender with no batches so it just sends EndDataStream.
+ StartSender(stream_type, buffer_size, false, 0);
+
+ // Wait for last sender to complete.
+ sender_info_.back()->thread_handle->join();
+ sender_info_.pop_back();
+ // Verify receiver got EOS.
+ vector<RuntimeProfileBase::Counter*> counters;
+ profile->GetCounters("TotalEosReceived", &counters);
+ ASSERT_EQ(1, counters.size());
+ EXPECT_EQ(1, counters.front()->value());
+ // Assert TRANSMIT_DATA_DELAY sleep is not finished.
+ EXPECT_LT(timer.ElapsedTime(), 3 * MonoTime::kNanosecondsPerSecond);
+
+ // Clean up.
+ JoinSenders();
+ JoinReceivers();
+ EXPECT_GT(timer.ElapsedTime(), 3 * MonoTime::kNanosecondsPerSecond);
+ EXPECT_EQ(num_batches * BATCH_CAPACITY,
receiver_info_[0]->data_values.size());
+ EXPECT_EQ(2, counters.front()->value());
+}
+
// TODO: more tests:
// - test case for transmission error in last batch
// - receivers getting created concurrently
diff --git a/be/src/runtime/krpc-data-stream-sender.cc
b/be/src/runtime/krpc-data-stream-sender.cc
index 6bacf7843..8dbf3dc3a 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -60,6 +60,10 @@
DEFINE_int64(data_stream_sender_buffer_size, 16 * 1024,
"(Advanced) Max size in bytes which a row batch in a data stream sender's
channel "
"can accumulate before the row batch is sent over the wire.");
+DEFINE_int64_hidden(data_stream_sender_eos_timeout_ms, 60*60*1000,
+ "Timeout for EndDataStream (EOS) RPCs. Setting a timeout prioritizes them
over other "
+ "DataStreamService RPCs. Defaults to 1 hour. Set to 0 or negative value to
disable "
+ "the timeout.");
using std::condition_variable_any;
using namespace apache::thrift;
@@ -676,6 +680,12 @@ Status KrpcDataStreamSender::Channel::DoEndDataStreamRpc()
{
DCHECK(rpc_in_flight_);
EndDataStreamRequestPB eos_req;
rpc_controller_.Reset();
+ if (FLAGS_data_stream_sender_eos_timeout_ms > 0) {
+ // Provide a timeout so EOS RPCs are prioritized over others, as
completing a stream
+ // can help free up resources.
+ rpc_controller_.set_timeout(
+ MonoDelta::FromMilliseconds(FLAGS_data_stream_sender_eos_timeout_ms));
+ }
*eos_req.mutable_dest_fragment_instance_id() = fragment_instance_id_;
eos_req.set_sender_id(parent_->sender_id_);
eos_req.set_dest_node_id(dest_node_id_);
diff --git a/be/src/service/data-stream-service.cc
b/be/src/service/data-stream-service.cc
index 8a497f7b2..cc405259f 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -72,6 +72,7 @@ DataStreamService::DataStreamService(MetricGroup*
metric_group)
mem_tracker_.reset(new MemTracker(
bytes_limit, "Data Stream Service Queue", process_mem_tracker));
MemTrackerMetric::CreateMetrics(metric_group, mem_tracker_.get(),
"DataStreamService");
+ DCHECK_EQ(methods_by_name_.count(string(END_DATA_STREAM)), 1);
}
Status DataStreamService::Init() {
diff --git a/be/src/service/data-stream-service.h
b/be/src/service/data-stream-service.h
index a934bc686..11649b905 100644
--- a/be/src/service/data-stream-service.h
+++ b/be/src/service/data-stream-service.h
@@ -42,6 +42,8 @@ class MetricGroup;
/// appropriate receivers. Metrics exposed by the service will be added to
'metric_group'.
class DataStreamService : public DataStreamServiceIf {
public:
+ static constexpr std::string_view END_DATA_STREAM = "EndDataStream";
+
DataStreamService(MetricGroup* metric_group);
/// Initializes the service by registering it with the singleton RPC manager.