This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit abd2c199629d6a0d2ec547a90d40c0ad341a4483 Author: Michael Smith <[email protected]> AuthorDate: Thu Oct 31 11:36:07 2024 -0700 IMPALA-13502: Clean up around constructors Makes 'name' a required parameter around CreateBinaryPhiNode, CodegenNullPhiNode, and CodegenIsNullPhiNode to skip rare case where we generate a name. Uses raw pointers or const& rather than const shared_ptr& in some cases to simplify API. Removes unused TSaslServerTransport constructors. Reuses CharMemTrackerAllocator and OutboundRowBatch in TupleFileWriter. Change-Id: I4321888808a6b26f42664163e1d1fa137dbc7d14 Reviewed-on: http://gerrit.cloudera.org:8080/22010 Reviewed-by: Riza Suminto <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/benchmarks/row-batch-serialize-benchmark.cc | 16 ++++++++-------- be/src/codegen/codegen-anyval-read-write-info.h | 4 ++-- be/src/codegen/codegen-anyval.cc | 18 +++++++++--------- be/src/codegen/llvm-codegen.cc | 5 +---- be/src/codegen/llvm-codegen.h | 2 +- be/src/exec/hash-table.cc | 5 +++-- be/src/exec/tuple-file-writer.cc | 22 +++++++++++++--------- be/src/exec/tuple-file-writer.h | 7 ++++++- be/src/rpc/TAcceptQueueServer.cpp | 4 ++-- be/src/rpc/TAcceptQueueServer.h | 2 +- be/src/runtime/krpc-data-stream-sender.cc | 11 ++++++----- be/src/runtime/outbound-row-batch.h | 3 +-- be/src/runtime/row-batch-serialize-test.cc | 12 ++++++------ be/src/service/impala-server.cc | 17 +++++++++-------- be/src/service/impala-server.h | 3 +-- be/src/transport/TSaslServerTransport.cpp | 20 ++------------------ be/src/transport/TSaslServerTransport.h | 19 ------------------- 17 files changed, 71 insertions(+), 99 deletions(-) diff --git a/be/src/benchmarks/row-batch-serialize-benchmark.cc b/be/src/benchmarks/row-batch-serialize-benchmark.cc index 31af72e5b..64f1307d4 100644 --- a/be/src/benchmarks/row-batch-serialize-benchmark.cc +++ b/be/src/benchmarks/row-batch-serialize-benchmark.cc @@ -285,8 +285,8 @@ class RowBatchSerializeBenchmark { static void TestSerialize(int batch_size, void* data) { SerializeArgs* args = reinterpret_cast<SerializeArgs*>(data); for (int iter = 0; iter < batch_size; ++iter) { - OutboundRowBatch row_batch(char_mem_tracker_allocator); - TrackedString compression_scratch(*char_mem_tracker_allocator.get()); + OutboundRowBatch row_batch(*char_mem_tracker_allocator); + TrackedString compression_scratch(*char_mem_tracker_allocator); ABORT_IF_ERROR(args->batch->Serialize(&row_batch, args->full_dedup, &compression_scratch)); } @@ -295,8 +295,8 @@ class RowBatchSerializeBenchmark { static void TestSerializeBaseline(int batch_size, void* data) { RowBatch* batch = reinterpret_cast<RowBatch*>(data); for (int iter = 0; iter < batch_size; ++iter) { - OutboundRowBatch row_batch(char_mem_tracker_allocator); - TrackedString compression_scratch(*char_mem_tracker_allocator.get()); + OutboundRowBatch row_batch(*char_mem_tracker_allocator); + TrackedString compression_scratch(*char_mem_tracker_allocator); RowBatchSerializeBaseline::Serialize(batch, &row_batch, &compression_scratch); } } @@ -342,22 +342,22 @@ class RowBatchSerializeBenchmark { RowBatch* no_dup_batch = obj_pool.Add(new RowBatch(&row_desc, NUM_ROWS, tracker.get())); FillBatch(no_dup_batch, 12345, 1, -1); - TrackedString compression_scratch(*char_mem_tracker_allocator.get()); + TrackedString compression_scratch(*char_mem_tracker_allocator); - OutboundRowBatch no_dup_row_batch(char_mem_tracker_allocator); + OutboundRowBatch no_dup_row_batch(*char_mem_tracker_allocator); ABORT_IF_ERROR(no_dup_batch->Serialize(&no_dup_row_batch, &compression_scratch)); RowBatch* adjacent_dup_batch = obj_pool.Add(new RowBatch(&row_desc, NUM_ROWS, tracker.get())); FillBatch(adjacent_dup_batch, 12345, 5, -1); - OutboundRowBatch adjacent_dup_row_batch(char_mem_tracker_allocator); + OutboundRowBatch adjacent_dup_row_batch(*char_mem_tracker_allocator); ABORT_IF_ERROR(adjacent_dup_batch->Serialize(&adjacent_dup_row_batch, false, &compression_scratch)); RowBatch* dup_batch = obj_pool.Add(new RowBatch(&row_desc, NUM_ROWS, tracker.get())); // Non-adjacent duplicates. FillBatch(dup_batch, 12345, 1, NUM_ROWS / 5); - OutboundRowBatch dup_row_batch(char_mem_tracker_allocator); + OutboundRowBatch dup_row_batch(*char_mem_tracker_allocator); ABORT_IF_ERROR(dup_batch->Serialize(&dup_row_batch, true, &compression_scratch)); int baseline; diff --git a/be/src/codegen/codegen-anyval-read-write-info.h b/be/src/codegen/codegen-anyval-read-write-info.h index 3635187cd..58e27747c 100644 --- a/be/src/codegen/codegen-anyval-read-write-info.h +++ b/be/src/codegen/codegen-anyval-read-write-info.h @@ -173,11 +173,11 @@ class CodegenAnyValReadWriteInfo { // Creates a PHI node that will have the value 'non_null_value' if the incoming block is // 'non_null_block_' and the value 'null_value' if it is 'null_block_'. llvm::PHINode* CodegenNullPhiNode( - llvm::Value* non_null_value, llvm::Value* null_value, const std::string& name = ""); + llvm::Value* non_null_value, llvm::Value* null_value, const std::string& name); // Creates a PHI node the value of which tells whether it was reached from the non-null // or the null path, i.e. whether this CodegenAnyValReadWriteInfo is null. - llvm::PHINode* CodegenIsNullPhiNode(const std::string& name = ""); + llvm::PHINode* CodegenIsNullPhiNode(const std::string& name); private: LlvmCodeGen* const codegen_; diff --git a/be/src/codegen/codegen-anyval.cc b/be/src/codegen/codegen-anyval.cc index de46375f0..9e772ba11 100644 --- a/be/src/codegen/codegen-anyval.cc +++ b/be/src/codegen/codegen-anyval.cc @@ -971,11 +971,11 @@ CodegenAnyVal CodegenAnyVal::CreateFromReadWriteInfo( llvm::Value* ptr_null = llvm::Constant::getNullValue(ptr->getType()); llvm::PHINode* ptr_phi = LlvmCodeGen::CreateBinaryPhiNode(builder, ptr, ptr_null, - non_null_incoming_block, read_write_info.null_block()); + non_null_incoming_block, read_write_info.null_block(), "ptr_phi"); llvm::Value* len_null = llvm::ConstantInt::get(len->getType(), 0); llvm::PHINode* len_phi = LlvmCodeGen::CreateBinaryPhiNode(builder, len, len_null, - non_null_incoming_block, read_write_info.null_block()); + non_null_incoming_block, read_write_info.null_block(), "len_phi"); result.SetPtr(ptr_phi); result.SetLen(len_phi); @@ -984,22 +984,22 @@ CodegenAnyVal CodegenAnyVal::CreateFromReadWriteInfo( read_write_info.GetTimeAndDate().time_of_day->getType(), 0); llvm::PHINode* time_of_day_phi = LlvmCodeGen::CreateBinaryPhiNode(builder, read_write_info.GetTimeAndDate().time_of_day, time_of_day_null, - non_null_incoming_block, read_write_info.null_block()); + non_null_incoming_block, read_write_info.null_block(), "time_of_day_phi"); llvm::Value* date_null = llvm::ConstantInt::get( read_write_info.GetTimeAndDate().date->getType(), 0); - llvm::PHINode* date_phi = LlvmCodeGen::CreateBinaryPhiNode(builder, - read_write_info.GetTimeAndDate().date, date_null, non_null_incoming_block, - read_write_info.null_block()); + llvm::PHINode* date_phi = + LlvmCodeGen::CreateBinaryPhiNode(builder, read_write_info.GetTimeAndDate().date, + date_null, non_null_incoming_block, read_write_info.null_block(), "date_phi"); result.SetTimeOfDay(time_of_day_phi); result.SetDate(date_phi); } else { llvm::Value* null = llvm::Constant::getNullValue( read_write_info.GetSimpleVal()->getType()); - llvm::PHINode* val_phi = LlvmCodeGen::CreateBinaryPhiNode(builder, - read_write_info.GetSimpleVal(), null, non_null_incoming_block, - read_write_info.null_block()); + llvm::PHINode* val_phi = + LlvmCodeGen::CreateBinaryPhiNode(builder, read_write_info.GetSimpleVal(), null, + non_null_incoming_block, read_write_info.null_block(), "val_phi"); result.SetVal(val_phi); } diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc index c843e8779..0e3cd0ccd 100644 --- a/be/src/codegen/llvm-codegen.cc +++ b/be/src/codegen/llvm-codegen.cc @@ -697,12 +697,9 @@ void LlvmCodeGen::CreateIfElseBlocks(llvm::Function* fn, const string& if_name, llvm::PHINode* LlvmCodeGen::CreateBinaryPhiNode(LlvmBuilder* builder, llvm::Value* value1, llvm::Value* value2, llvm::BasicBlock* incoming_block1, llvm::BasicBlock* incoming_block2, const string& name) { - llvm::PHINode* res = builder->CreatePHI( - value1->getType(), 2, name.empty() ? (value1->getName().str() + "_phi") : name); - + llvm::PHINode* res = builder->CreatePHI(value1->getType(), 2, name); res->addIncoming(value1, incoming_block1); res->addIncoming(value2, incoming_block2); - return res; } diff --git a/be/src/codegen/llvm-codegen.h b/be/src/codegen/llvm-codegen.h index c07eaac03..0450d06ba 100644 --- a/be/src/codegen/llvm-codegen.h +++ b/be/src/codegen/llvm-codegen.h @@ -539,7 +539,7 @@ class LlvmCodeGen { // 'incoming_block2'. static llvm::PHINode* CreateBinaryPhiNode(LlvmBuilder* builder, llvm::Value* value1, llvm::Value* value2, llvm::BasicBlock* incoming_block1, - llvm::BasicBlock* incoming_block2, const std::string& name = ""); + llvm::BasicBlock* incoming_block2, const std::string& name); /// Returns a constant int of 'byte_size' bytes based on 'low_bits' and 'high_bits' /// which stand for the lower and upper 64-bits of the constant respectively. For diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc index 1de667a52..cc1eb5e35 100644 --- a/be/src/exec/hash-table.cc +++ b/be/src/exec/hash-table.cc @@ -1149,8 +1149,9 @@ Status HashTableCtx::CodegenHashRow(LlvmCodeGen* codegen, bool use_murmur, builder.SetInsertPoint(continue_block); // Use phi node to reconcile that we could have come from the string-null // path and string not null paths. - llvm::PHINode* phi_node = LlvmCodeGen::CreateBinaryPhiNode(&builder, - string_hash_result, str_null_result, not_null_block, null_block); + llvm::PHINode* phi_node = + LlvmCodeGen::CreateBinaryPhiNode(&builder, string_hash_result, + str_null_result, not_null_block, null_block, "str_null_phi"); hash_result = phi_node; } else { diff --git a/be/src/exec/tuple-file-writer.cc b/be/src/exec/tuple-file-writer.cc index 9965258ed..1eef9fbb6 100644 --- a/be/src/exec/tuple-file-writer.cc +++ b/be/src/exec/tuple-file-writer.cc @@ -43,6 +43,8 @@ TupleFileWriter::TupleFileWriter( : path_(move(path)), temp_suffix_(filesystem::unique_path(UNIQUE_PATH_SUFFIX).string()), tracker_(new MemTracker(-1, "TupleFileWriter", parent)), + allocator_(new CharMemTrackerAllocator(tracker_)), + out_batch_(new OutboundRowBatch(*allocator_)), write_timer_(profile ? ADD_TIMER(profile, "TupleCacheWriteTime") : nullptr), serialize_timer_(profile ? ADD_TIMER(profile, "TupleCacheSerializeTime") : nullptr), bytes_written_(profile ? @@ -61,7 +63,9 @@ TupleFileWriter::~TupleFileWriter() { DCHECK(state_ == State::Committed || state_ == State::Aborted); DCHECK(!kudu::Env::Default()->FileExists(TempPath())); } - // MemTracker expects an explicit close. + // MemTracker expects an explicit close. Consumers need to be released first. + out_batch_.reset(); + allocator_.reset(); if (tracker_) tracker_->Close(); } @@ -79,29 +83,29 @@ Status TupleFileWriter::Write(RuntimeState* state, RowBatch* row_batch) { DCHECK_EQ(state_, State::InProgress); SCOPED_TIMER(write_timer_); // serialize and write row batch - OutboundRowBatch out(make_shared<CharMemTrackerAllocator>(tracker_)); { SCOPED_TIMER(serialize_timer_); - // Passing in nullptr for 'compression_scrtach' disables compression. - RETURN_IF_ERROR(row_batch->Serialize(&out, /* compression_scratch */ nullptr)); + // Passing in nullptr for 'compression_scratch' disables compression. + RETURN_IF_ERROR( + row_batch->Serialize(out_batch_.get(), /* compression_scratch */ nullptr)); } - if (out.header()->num_rows() == 0) { - DCHECK_EQ(out.header()->uncompressed_size(), 0); + if (out_batch_->header()->num_rows() == 0) { + DCHECK_EQ(out_batch_->header()->uncompressed_size(), 0); return Status::OK(); } // Collect all of the pieces that we would want to write, then determine if writing // them would exceed the max file size. std::string header_buf; - if (!out.header()->SerializeToString(&header_buf)) { + if (!out_batch_->header()->SerializeToString(&header_buf)) { return Status(TErrorCode::INTERNAL_ERROR, "Could not serialize RowBatchHeaderPB to string"); } size_t header_len = header_buf.size(); DCHECK_GT(header_len, 0); - kudu::Slice tuple_data = out.TupleDataAsSlice(); - kudu::Slice tuple_offsets = out.TupleOffsetsAsSlice(); + kudu::Slice tuple_data = out_batch_->TupleDataAsSlice(); + kudu::Slice tuple_offsets = out_batch_->TupleOffsetsAsSlice(); // tuple_data_len is possible to be 0, see IMPALA-13411. size_t tuple_data_len = tuple_data.size(); size_t tuple_offsets_len = tuple_offsets.size(); diff --git a/be/src/exec/tuple-file-writer.h b/be/src/exec/tuple-file-writer.h index bdaf04706..209229472 100644 --- a/be/src/exec/tuple-file-writer.h +++ b/be/src/exec/tuple-file-writer.h @@ -21,6 +21,8 @@ #include <limits> #include <memory> +#include "runtime/mem-tracker.h" +#include "runtime/outbound-row-batch.h" #include "util/runtime-profile.h" #include "common/status.h" @@ -31,7 +33,6 @@ namespace kudu { namespace impala { -class MemTracker; class RowBatch; class RuntimeState; class TupleReadWriteTest; @@ -91,6 +92,10 @@ private: std::string temp_suffix_; // MemTracker for OutboundRowBatches. std::shared_ptr<MemTracker> tracker_; + // Allocator (using tracker_) for OutboundRowBatches. + std::unique_ptr<CharMemTrackerAllocator> allocator_; + // Buffer used for serializing row batches. + std::unique_ptr<OutboundRowBatch> out_batch_; // Total write time by the writer. RuntimeProfile::Counter* write_timer_; // Total time spent on serialization. diff --git a/be/src/rpc/TAcceptQueueServer.cpp b/be/src/rpc/TAcceptQueueServer.cpp index 0ce562e11..84c841da7 100644 --- a/be/src/rpc/TAcceptQueueServer.cpp +++ b/be/src/rpc/TAcceptQueueServer.cpp @@ -227,7 +227,7 @@ void TAcceptQueueServer::CleanupAndClose(const string& error, } // New. -void TAcceptQueueServer::SetupConnection(const shared_ptr<TAcceptQueueEntry>& entry) { +void TAcceptQueueServer::SetupConnection(TAcceptQueueEntry* entry) { DCHECK(entry != nullptr); if (metrics_enabled_) queue_size_metric_->Increment(-1); shared_ptr<TTransport> io_transport; @@ -348,7 +348,7 @@ void TAcceptQueueServer::serve() { "setup-worker", FLAGS_accepted_cnxn_setup_thread_pool_size, FLAGS_accepted_cnxn_queue_depth, [this](int tid, const shared_ptr<TAcceptQueueEntry>& item) { - this->SetupConnection(item); + this->SetupConnection(item.get()); }); // Initialize the thread pool Status status = connection_setup_pool.Init(); diff --git a/be/src/rpc/TAcceptQueueServer.h b/be/src/rpc/TAcceptQueueServer.h index 663c56723..703383d42 100644 --- a/be/src/rpc/TAcceptQueueServer.h +++ b/be/src/rpc/TAcceptQueueServer.h @@ -85,7 +85,7 @@ class TAcceptQueueServer : public TServer { // This is the work function for the thread pool, which does the work of setting up the // connection and starting a thread to handle it. Will block if there are currently // maxTasks_ connections and maxTasks_ is non-zero. - void SetupConnection(const std::shared_ptr<TAcceptQueueEntry>& entry); + void SetupConnection(TAcceptQueueEntry* entry); // Helper function to close a client connection in case of server side errors. void CleanupAndClose(const std::string& error, diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc index a3e06b668..468ee0c9c 100644 --- a/be/src/runtime/krpc-data-stream-sender.cc +++ b/be/src/runtime/krpc-data-stream-sender.cc @@ -397,7 +397,7 @@ Status KrpcDataStreamSender::Channel::Init( RETURN_IF_ERROR(DataStreamService::GetProxy(address_, hostname_, &proxy_)); // Init outbound_batch_. - outbound_batch_.reset(new OutboundRowBatch(allocator)); + outbound_batch_.reset(new OutboundRowBatch(*allocator)); return Status::OK(); } @@ -910,12 +910,12 @@ Status KrpcDataStreamSender::Prepare( string process_address = NetworkAddressPBToString(ExecEnv::GetInstance()->krpc_address()); - serialization_batch_.reset(new OutboundRowBatch(char_mem_tracker_allocator_)); + serialization_batch_.reset(new OutboundRowBatch(*char_mem_tracker_allocator_)); if (partition_type_ == TPartitionType::UNPARTITIONED) { - in_flight_batch_.reset(new OutboundRowBatch(char_mem_tracker_allocator_)); + in_flight_batch_.reset(new OutboundRowBatch(*char_mem_tracker_allocator_)); } - compression_scratch_.reset(new TrackedString(*char_mem_tracker_allocator_.get())); + compression_scratch_.reset(new TrackedString(*char_mem_tracker_allocator_)); for (int i = 0; i < channels_.size(); ++i) { RETURN_IF_ERROR(channels_[i]->Init(state, char_mem_tracker_allocator_)); @@ -1031,7 +1031,8 @@ Status KrpcDataStreamSenderConfig::CodegenHashRow( // Picks the input value to hash function builder.SetInsertPoint(hash_val_block); - llvm::PHINode* val_ptr_phi = rwi.CodegenNullPhiNode(native_ptr, null_ptr); + llvm::PHINode* val_ptr_phi = + rwi.CodegenNullPhiNode(native_ptr, null_ptr, "val_ptr_phi"); // Creates a global constant of the partition expression's ColumnType. It has to be a // constant for constant propagation and dead code elimination in 'get_hash_value_fn' diff --git a/be/src/runtime/outbound-row-batch.h b/be/src/runtime/outbound-row-batch.h index ce71c80dd..f9cb72eb2 100644 --- a/be/src/runtime/outbound-row-batch.h +++ b/be/src/runtime/outbound-row-batch.h @@ -35,8 +35,7 @@ class RuntimeState; /// for holding the tuple offsets and tuple data. class OutboundRowBatch { public: - OutboundRowBatch(const std::shared_ptr<CharMemTrackerAllocator>& allocator) - : tuple_data_(*allocator.get()) {} + OutboundRowBatch(const CharMemTrackerAllocator& allocator) : tuple_data_(allocator) {} const RowBatchHeaderPB* header() const { return &header_; } diff --git a/be/src/runtime/row-batch-serialize-test.cc b/be/src/runtime/row-batch-serialize-test.cc index 596f9655c..cdda5497b 100644 --- a/be/src/runtime/row-batch-serialize-test.cc +++ b/be/src/runtime/row-batch-serialize-test.cc @@ -81,8 +81,8 @@ class RowBatchSerializeTest : public testing::Test { bool print_batches, bool full_dedup = false) { if (print_batches) cout << PrintBatch(batch) << endl; - TrackedString compression_scratch(*char_mem_tracker_allocator_.get()); - OutboundRowBatch row_batch(char_mem_tracker_allocator_); + TrackedString compression_scratch(*char_mem_tracker_allocator_); + OutboundRowBatch row_batch(*char_mem_tracker_allocator_); RETURN_IF_ERROR(batch->Serialize(&row_batch, full_dedup, &compression_scratch)); RowBatch deserialized_batch(&row_desc, row_batch, tracker_.get()); @@ -617,8 +617,8 @@ void RowBatchSerializeTest::TestDupRemoval(bool full_dedup) { CreateTuples(tuple_desc, batch->tuple_data_pool(), num_distinct_tuples, 0, 10, &tuples); AddTuplesToRowBatch(num_rows, tuples, repeats, batch); - TrackedString compression_scratch(*char_mem_tracker_allocator_.get()); - OutboundRowBatch row_batch(char_mem_tracker_allocator_); + TrackedString compression_scratch(*char_mem_tracker_allocator_); + OutboundRowBatch row_batch(*char_mem_tracker_allocator_); EXPECT_OK(batch->Serialize(&row_batch, full_dedup, &compression_scratch)); // Serialized data should only have one copy of each tuple. int64_t total_byte_size = 0; // Total size without duplication @@ -757,8 +757,8 @@ TEST_F(RowBatchSerializeTest, DedupPathologicalFull) { EXPECT_TRUE(UseFullDedup(batch)); LOG(INFO) << "Serializing row batch"; - TrackedString compression_scratch(*char_mem_tracker_allocator_.get()); - OutboundRowBatch row_batch(char_mem_tracker_allocator_); + TrackedString compression_scratch(*char_mem_tracker_allocator_); + OutboundRowBatch row_batch(*char_mem_tracker_allocator_); EXPECT_OK(batch->Serialize(&row_batch, &compression_scratch)); LOG(INFO) << "Serialized batch size: " << row_batch.TupleDataAsSlice().size(); LOG(INFO) << "Serialized batch uncompressed size: " diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 0b102857a..754d66563 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -842,7 +842,7 @@ Status ImpalaServer::GetRuntimeProfileOutput(const TUniqueId& query_id, RETURN_IF_ERROR(GetQueryRecord(query_id, &query_record, &retried_query_record)); RETURN_IF_ERROR(CheckProfileAccess(user, query_record->effective_user, query_record->user_has_profile_access)); - RETURN_IF_ERROR(DecompressToProfile(format, query_record, original_profile)); + RETURN_IF_ERROR(DecompressToProfile(format, *query_record, original_profile)); // Set the profile for the retried query. if (query_record->was_retried) { @@ -856,7 +856,8 @@ Status ImpalaServer::GetRuntimeProfileOutput(const TUniqueId& query_id, DCHECK(status.ok()); RETURN_IF_ERROR(status); - RETURN_IF_ERROR(DecompressToProfile(format, retried_query_record, retried_profile)); + RETURN_IF_ERROR( + DecompressToProfile(format, *retried_query_record, retried_profile)); } } return Status::OK(); @@ -885,30 +886,30 @@ Status ImpalaServer::GetRuntimeProfileOutput(const TUniqueId& query_id, RETURN_IF_ERROR(GetQueryRecord(query_id, &query_record)); RETURN_IF_ERROR(CheckProfileAccess(user, query_record->effective_user, query_record->user_has_profile_access)); - RETURN_IF_ERROR(DecompressToProfile(format, query_record, profile)); + RETURN_IF_ERROR(DecompressToProfile(format, *query_record, profile)); } return Status::OK(); } Status ImpalaServer::DecompressToProfile(TRuntimeProfileFormat::type format, - const shared_ptr<QueryStateRecord>& query_record, RuntimeProfileOutput* profile) { + const QueryStateRecord& query_record, RuntimeProfileOutput* profile) { if (format == TRuntimeProfileFormat::BASE64) { - Base64Encode(query_record->compressed_profile, profile->string_output); + Base64Encode(query_record.compressed_profile, profile->string_output); } else if (format == TRuntimeProfileFormat::THRIFT) { RETURN_IF_ERROR(RuntimeProfile::DecompressToThrift( - query_record->compressed_profile, profile->thrift_output)); + query_record.compressed_profile, profile->thrift_output)); } else if (format == TRuntimeProfileFormat::JSON) { ObjectPool tmp_pool; RuntimeProfile* tmp_profile; RETURN_IF_ERROR(RuntimeProfile::DecompressToProfile( - query_record->compressed_profile, &tmp_pool, &tmp_profile)); + query_record.compressed_profile, &tmp_pool, &tmp_profile)); tmp_profile->ToJson(profile->json_output); } else { DCHECK_EQ(format, TRuntimeProfileFormat::STRING); ObjectPool tmp_pool; RuntimeProfile* tmp_profile; RETURN_IF_ERROR(RuntimeProfile::DecompressToProfile( - query_record->compressed_profile, &tmp_pool, &tmp_profile)); + query_record.compressed_profile, &tmp_pool, &tmp_profile)); tmp_profile->PrettyPrint(profile->string_output); } return Status::OK(); diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index 7fa19cf6b..4a67dd3a9 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -1232,8 +1232,7 @@ class ImpalaServer : public ImpalaServiceIf, /// Decompresses the profile in the given QueryStateRecord into the specified format. /// The decompressed profile is added to the given RuntimeProfileOutput. Status DecompressToProfile(TRuntimeProfileFormat::type format, - const std::shared_ptr<QueryStateRecord>& query_record, - RuntimeProfileOutput* profile); + const QueryStateRecord& query_record, RuntimeProfileOutput* profile); void WaitForNewCatalogServiceId( const TUniqueId& cur_service_id, std::unique_lock<std::mutex>* ver_lock); diff --git a/be/src/transport/TSaslServerTransport.cpp b/be/src/transport/TSaslServerTransport.cpp index fbfd179fc..3018ab85b 100644 --- a/be/src/transport/TSaslServerTransport.cpp +++ b/be/src/transport/TSaslServerTransport.cpp @@ -42,27 +42,11 @@ DEFINE_int32(sasl_connect_tcp_timeout_ms, 300000, "(Advanced) The underlying TSo using namespace sasl; namespace apache { namespace thrift { namespace transport { -TSaslServerTransport::TSaslServerTransport(std::shared_ptr<TTransport> transport) - : TSaslTransport(move(transport)) { -} - -TSaslServerTransport::TSaslServerTransport(string mechanism, - string protocol, - string serverName, - string realm, - unsigned flags, - map<string, string> props, - vector<struct sasl_callback> callbacks, - std::shared_ptr<TTransport> transport) - : TSaslTransport(move(transport)) { - addServerDefinition(move(mechanism), move(protocol), move(serverName), move(realm), - flags, move(props), move(callbacks)); -} -TSaslServerTransport:: TSaslServerTransport( +TSaslServerTransport::TSaslServerTransport( std::map<std::string, TSaslServerDefinition*> serverMap, std::shared_ptr<TTransport> transport) - : TSaslTransport(move(transport)), serverDefinitionMap_(move(serverMap)) { } + : TSaslTransport(move(transport)), serverDefinitionMap_(move(serverMap)) {} /** * Set the server for this transport diff --git a/be/src/transport/TSaslServerTransport.h b/be/src/transport/TSaslServerTransport.h index d59ad6aeb..47966d9af 100644 --- a/be/src/transport/TSaslServerTransport.h +++ b/be/src/transport/TSaslServerTransport.h @@ -103,25 +103,6 @@ class TSaslServerTransport : public TSaslTransport { public: - /** - * Constructs a new TSaslTransport to act as a server. - * transport: the underlying transport used to read and write data. - * - */ - TSaslServerTransport(std::shared_ptr<TTransport> transport); - - /** - * Construct a new TSaslTrasnport, passing in the components of the definition. - */ - TSaslServerTransport(std::string mechanism, - std::string protocol, - std::string serverName, - std::string realm, - unsigned flags, - std::map<std::string, std::string> props, - std::vector<struct sasl_callback> callbacks, - std::shared_ptr<TTransport> transport); - /* Add a definition to a server transport */ void addServerDefinition(std::string mechanism, std::string protocol,
