This is an automated email from the ASF dual-hosted git repository. csringhofer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 2f14fd29c0b47fc2c170a7f0eb1cecaf6b9704f4 Author: Csaba Ringhofer <[email protected]> AuthorDate: Mon Sep 11 19:34:01 2023 +0200 IMPALA-12433: Share buffers among channels in KrpcDataStreamSender Before this patch each KrpcDataStreamSender::Channel had 2 OutboundRowBatch with its own serialization and compression buffers. This patch switches to use a single buffer per channel. This is enough to store the in-flight data in KRPC, while other buffers are only used during serialization and compression which is done for just a single channel at a time, so can be shared among channels. Memory estimates in the planner are not changed because the existing calculation has several issues (see IMPALA-12594). Change-Id: I64854a350a9dae8bf3af11c871882ea4750e60b3 Reviewed-on: http://gerrit.cloudera.org:8080/20719 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Kurt Deschler <[email protected]> Reviewed-by: Zihao Ye <[email protected]> Reviewed-by: Michael Smith <[email protected]> --- be/src/benchmarks/row-batch-serialize-benchmark.cc | 31 +++--- be/src/runtime/krpc-data-stream-sender.cc | 109 ++++++++++++--------- be/src/runtime/krpc-data-stream-sender.h | 25 +++-- be/src/runtime/outbound-row-batch.h | 14 +-- be/src/runtime/row-batch-serialize-test.cc | 11 ++- be/src/runtime/row-batch.cc | 32 +++--- be/src/runtime/row-batch.h | 7 +- .../org/apache/impala/planner/DataStreamSink.java | 2 + 8 files changed, 135 insertions(+), 96 deletions(-) diff --git a/be/src/benchmarks/row-batch-serialize-benchmark.cc b/be/src/benchmarks/row-batch-serialize-benchmark.cc index d772855ad..26085f035 100644 --- a/be/src/benchmarks/row-batch-serialize-benchmark.cc +++ b/be/src/benchmarks/row-batch-serialize-benchmark.cc @@ -89,7 +89,8 @@ static std::shared_ptr<CharMemTrackerAllocator> char_mem_tracker_allocator; class RowBatchSerializeBaseline { public: // Copy of baseline version without dedup logic - static int Serialize(RowBatch* batch, OutboundRowBatch* output_batch) { + static int Serialize(RowBatch* batch, OutboundRowBatch* output_batch, + TrackedString* compression_scratch) { RowBatchHeaderPB* header = &output_batch->header_; output_batch->tuple_offsets_.clear(); header->set_num_rows(batch->num_rows_); @@ -105,7 +106,7 @@ class RowBatchSerializeBaseline { SerializeInternal(batch, size, output_batch); if (size > 0) { - // Try compressing tuple_data to compression_scratch_, swap if compressed data is + // Try compressing tuple_data to compression_scratch, swap if compressed data is // smaller Lz4Compressor compressor(nullptr, false); Status status = compressor.Init(); @@ -115,20 +116,20 @@ class RowBatchSerializeBaseline { int64_t compressed_size = compressor.MaxOutputLen(size); DCHECK_GT(compressed_size, 0); - if (output_batch->compression_scratch_.size() < compressed_size) { - output_batch->compression_scratch_.resize(compressed_size); + if (compression_scratch->size() < compressed_size) { + compression_scratch->resize(compressed_size); } uint8_t* input = const_cast<uint8_t*>( reinterpret_cast<const uint8_t*>(output_batch->tuple_data_.data())); uint8_t* compressed_output = const_cast<uint8_t*>( - reinterpret_cast<const uint8_t*>(output_batch->compression_scratch_.data())); + reinterpret_cast<const uint8_t*>(compression_scratch->data())); status = compressor.ProcessBlock( true, size, input, &compressed_size, &compressed_output); DCHECK(status.ok()) << status.GetDetail(); if (LIKELY(compressed_size < size)) { - output_batch->compression_scratch_.resize(compressed_size); - output_batch->tuple_data_.swap(output_batch->compression_scratch_); + compression_scratch->resize(compressed_size); + output_batch->tuple_data_.swap(*compression_scratch); } VLOG_ROW << "uncompressed size: " << size << ", compressed size: " << compressed_size; @@ -284,7 +285,9 @@ class RowBatchSerializeBenchmark { SerializeArgs* args = reinterpret_cast<SerializeArgs*>(data); for (int iter = 0; iter < batch_size; ++iter) { OutboundRowBatch row_batch(char_mem_tracker_allocator); - ABORT_IF_ERROR(args->batch->Serialize(&row_batch, args->full_dedup)); + TrackedString compression_scratch(*char_mem_tracker_allocator.get()); + ABORT_IF_ERROR(args->batch->Serialize(&row_batch, args->full_dedup, + &compression_scratch)); } } @@ -292,7 +295,8 @@ class RowBatchSerializeBenchmark { RowBatch* batch = reinterpret_cast<RowBatch*>(data); for (int iter = 0; iter < batch_size; ++iter) { OutboundRowBatch row_batch(char_mem_tracker_allocator); - RowBatchSerializeBaseline::Serialize(batch, &row_batch); + TrackedString compression_scratch(*char_mem_tracker_allocator.get()); + RowBatchSerializeBaseline::Serialize(batch, &row_batch, &compression_scratch); } } @@ -337,20 +341,23 @@ class RowBatchSerializeBenchmark { RowBatch* no_dup_batch = obj_pool.Add(new RowBatch(&row_desc, NUM_ROWS, tracker.get())); FillBatch(no_dup_batch, 12345, 1, -1); + TrackedString compression_scratch(*char_mem_tracker_allocator.get()); + OutboundRowBatch no_dup_row_batch(char_mem_tracker_allocator); - ABORT_IF_ERROR(no_dup_batch->Serialize(&no_dup_row_batch)); + ABORT_IF_ERROR(no_dup_batch->Serialize(&no_dup_row_batch, &compression_scratch)); RowBatch* adjacent_dup_batch = obj_pool.Add(new RowBatch(&row_desc, NUM_ROWS, tracker.get())); FillBatch(adjacent_dup_batch, 12345, 5, -1); OutboundRowBatch adjacent_dup_row_batch(char_mem_tracker_allocator); - ABORT_IF_ERROR(adjacent_dup_batch->Serialize(&adjacent_dup_row_batch, false)); + ABORT_IF_ERROR(adjacent_dup_batch->Serialize(&adjacent_dup_row_batch, + false, &compression_scratch)); RowBatch* dup_batch = obj_pool.Add(new RowBatch(&row_desc, NUM_ROWS, tracker.get())); // Non-adjacent duplicates. FillBatch(dup_batch, 12345, 1, NUM_ROWS / 5); OutboundRowBatch dup_row_batch(char_mem_tracker_allocator); - ABORT_IF_ERROR(dup_batch->Serialize(&dup_row_batch, true)); + ABORT_IF_ERROR(dup_batch->Serialize(&dup_row_batch, true, &compression_scratch)); int baseline; Benchmark ser_suite("serialize"); diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc index ef4aec260..45d3714fc 100644 --- a/be/src/runtime/krpc-data-stream-sender.cc +++ b/be/src/runtime/krpc-data-stream-sender.cc @@ -46,6 +46,7 @@ #include "runtime/tuple-row.h" #include "service/data-stream-service.h" #include "util/aligned-new.h" +#include "util/compress.h" #include "util/debug-util.h" #include "util/network-util.h" #include "util/pretty-printer.h" @@ -118,12 +119,18 @@ void KrpcDataStreamSenderConfig::Close() { // calls above will return before the RPC has completed but they may block if there // is already an in-flight RPC. // -// Each channel internally has two OutboundRowBatch to serialize to. They are reused -// across multiple RPC calls. Having two OutboundRowBatch allows client to serialize -// the next row batch while the current row batch is being sent. Upon completion of -// a RPC, the callback TransmitDataCompleteCb() is invoked. If the RPC fails due to -// remote service's queue being full, TransmitDataCompleteCb() will schedule the retry -// callback RetryCb() after some delay dervied from 'FLAGS_rpc_retry_internal_ms'. +// Each channel owns a single OutboundRowBatch outbound_batch_. This holds the buffers +// backing the in-flight RPC or kept as reserve if there is no ongoing RPC. The actual +// serialization happens to a shared OutboundRowBatch (see IMPALA-12433 for details) which +// can be swapped with outbound_batch_ once the previous in-flight RPC is finished. +// This means that the client can serialize the next row batch while the current row batch +// is being sent. outbound_batch_ is only used in the partitioned case - in the +// unpartitioned case the shared outbound_batch_ holds the data for the the in-flight RPC. +// +// Upon completion of a RPC, the callback TransmitDataCompleteCb() is invoked. If the RPC +// fails due to remote service's queue being full, TransmitDataCompleteCb() will schedule +// the retry callback RetryCb() after some delay dervied from +// 'FLAGS_rpc_retry_internal_ms'. // // When a data stream sender is shut down, it will call Teardown() on all channels to // release resources. Teardown() will cancel any in-flight RPC and wait for the @@ -175,7 +182,10 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned { // preceding RPC is still in-flight. This is expected to be called from the fragment // instance execution thread. Return error status if initialization of the RPC request // parameters failed or if the preceding RPC failed. Returns OK otherwise. - Status TransmitData(const OutboundRowBatch* outbound_batch); + // If 'swap_batch' is true, 'outbound_batch' is swapped with the Channel's + // 'outbound_batch_'. This happens after the previous RPC is finished so its buffers + // can be reused. + Status TransmitData(std::unique_ptr<OutboundRowBatch>* outbound_batch, bool swap_batch); // Copies a single row into this channel's row batch and flushes the row batch once // it reaches capacity. This call may block if the row batch's capacity is reached @@ -231,20 +241,14 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned { // Only used if the partitioning scheme is "KUDU" or "HASH_PARTITIONED". scoped_ptr<RowBatch> batch_; - // The outbound row batches are double-buffered so that we can serialize the next - // batch while the other is still referenced by the in-flight RPC. Each entry contains + // Owns an outbound row batch that can be referenced by the in-flight RPC. Contains // a RowBatchHeaderPB and the buffers for the serialized tuple offsets and data. // - // TODO: replace this with an actual queue. Schedule another RPC callback in the + // TODO: replace this with a queue. Schedule another RPC callback in the // completion callback if the queue is not empty. - // TODO: rethink whether to keep per-channel buffers vs having all buffers in the - // datastream sender and sharing them across all channels. These buffers are not used in + // datastream sender and sharing them across all channels. This buffer is not used in // "UNPARTITIONED" scheme. - std::vector<OutboundRowBatch> outbound_batches_; - - // Index into 'outbound_batches_' for the next available OutboundRowBatch to serialize - // into. This is read and written by the main execution thread. - int next_batch_idx_ = 0; + std::unique_ptr<OutboundRowBatch> outbound_batch_; // Synchronize accesses to the following fields between the main execution thread and // the KRPC reactor thread. Note that there should be only one reactor thread invoking @@ -383,10 +387,9 @@ Status KrpcDataStreamSender::Channel::Init( // Create a DataStreamService proxy to the destination. RETURN_IF_ERROR(DataStreamService::GetProxy(address_, hostname_, &proxy_)); - // Init outbound_batches_. - for (int i = 0; i < NUM_OUTBOUND_BATCHES; ++i) { - outbound_batches_.emplace_back(allocator, is_local_); - } + // Init outbound_batch_. + outbound_batch_.reset(new OutboundRowBatch(allocator)); + return Status::OK(); } @@ -584,10 +587,10 @@ Status KrpcDataStreamSender::Channel::DoTransmitDataRpc() { } Status KrpcDataStreamSender::Channel::TransmitData( - const OutboundRowBatch* outbound_batch) { + unique_ptr<OutboundRowBatch>* outbound_batch, bool swap_batch) { VLOG_ROW << "Channel::TransmitData() fragment_instance_id=" << PrintId(fragment_instance_id_) << " dest_node=" << dest_node_id_ - << " #rows=" << outbound_batch->header()->num_rows(); + << " #rows=" << outbound_batch->get()->header()->num_rows(); std::unique_lock<SpinLock> l(lock_); RETURN_IF_ERROR(WaitForRpcLocked(&l)); DCHECK(!rpc_in_flight_); @@ -596,20 +599,25 @@ Status KrpcDataStreamSender::Channel::TransmitData( // TODO: Needs better solution for IMPALA-3990 in the long run. if (UNLIKELY(remote_recvr_closed_)) return Status::OK(); rpc_in_flight_ = true; - rpc_in_flight_batch_ = outbound_batch; + rpc_in_flight_batch_ = outbound_batch->get(); RETURN_IF_ERROR(DoTransmitDataRpc()); + // At this point the previous RPC must be already finished and the previous buffer + // in outbound_batch_ can be reused. + DCHECK_NE(rpc_in_flight_batch_, outbound_batch_.get()); + if (swap_batch) outbound_batch_.swap(*outbound_batch); return Status::OK(); } Status KrpcDataStreamSender::Channel::SerializeAndSendBatch(RowBatch* batch) { - OutboundRowBatch* outbound_batch = &outbound_batches_[next_batch_idx_]; + unique_ptr<OutboundRowBatch>* serialization_batch = &parent_->serialization_batch_; + DCHECK(serialization_batch->get() != nullptr); // Reads 'rpc_in_flight_batch_' without acquiring 'lock_', so reads can be racey. ANNOTATE_IGNORE_READS_BEGIN(); - DCHECK(outbound_batch != rpc_in_flight_batch_); + DCHECK(serialization_batch->get() != rpc_in_flight_batch_); ANNOTATE_IGNORE_READS_END(); - RETURN_IF_ERROR(parent_->SerializeBatch(batch, outbound_batch)); - RETURN_IF_ERROR(TransmitData(outbound_batch)); - next_batch_idx_ = (next_batch_idx_ + 1) % NUM_OUTBOUND_BATCHES; + RETURN_IF_ERROR(parent_->SerializeBatch(batch, serialization_batch->get(), !is_local_)); + // Swap serialization_batch with outbound_batch_ once the old RPC is finished. + RETURN_IF_ERROR(TransmitData(serialization_batch, true /*swap_batch*/)); return Status::OK(); } @@ -722,7 +730,7 @@ void KrpcDataStreamSender::Channel::Teardown(RuntimeState* state) { while (rpc_in_flight_) rpc_done_cv_.wait(l); } batch_.reset(); - outbound_batches_.clear(); + outbound_batch_.reset(nullptr); } KrpcDataStreamSender::KrpcDataStreamSender(TDataSinkId sink_id, int sender_id, @@ -806,16 +814,17 @@ Status KrpcDataStreamSender::Prepare( new MemTracker(-1, "RowBatchSerialization", mem_tracker_.get())); char_mem_tracker_allocator_.reset( new CharMemTrackerAllocator(outbound_rb_mem_tracker_)); + string process_address = NetworkAddressPBToString(ExecEnv::GetInstance()->krpc_address()); - for (int i = 0; i < NUM_OUTBOUND_BATCHES; ++i) { - // Only skip compression if there is a single channel and destination is in the same - // process. TODO: could be optimized to send the uncompressed buffer to the local - // targets to avoid decompression cost at the receiver. - bool is_local = channels_.size() == 1 && channels_[0]->IsLocal(); - outbound_batches_.emplace_back(char_mem_tracker_allocator_, is_local); + + serialization_batch_.reset(new OutboundRowBatch(char_mem_tracker_allocator_)); + if (partition_type_ == TPartitionType::UNPARTITIONED) { + in_flight_batch_.reset(new OutboundRowBatch(char_mem_tracker_allocator_)); } + compression_scratch_.reset(new TrackedString(*char_mem_tracker_allocator_.get())); + for (int i = 0; i < channels_.size(); ++i) { RETURN_IF_ERROR(channels_[i]->Init(state, char_mem_tracker_allocator_)); } @@ -1034,14 +1043,22 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) { if (batch->num_rows() == 0) return Status::OK(); if (partition_type_ == TPartitionType::UNPARTITIONED) { - OutboundRowBatch* outbound_batch = &outbound_batches_[next_batch_idx_]; - RETURN_IF_ERROR(SerializeBatch(batch, outbound_batch, channels_.size())); + // Only skip compression if there is a single channel and destination is in the same + // process. TODO: could be optimized to send the uncompressed buffer to the local + // targets to avoid decompression cost at the receiver. + bool is_local = channels_.size() == 1 && channels_[0]->IsLocal(); + RETURN_IF_ERROR(SerializeBatch( + batch, serialization_batch_.get(), !is_local, channels_.size())); // TransmitData() will block if there are still in-flight rpcs (and those will - // reference the previously written serialized batch). + // reference the previously written in_flight_batch_). for (int i = 0; i < channels_.size(); ++i) { - RETURN_IF_ERROR(channels_[i]->TransmitData(outbound_batch)); + // Do not swap serialization_batch_ with the channel's outbound_batch_ to allow + // multiple channels to use the data backing serialization_batch_ in parallel. + RETURN_IF_ERROR( + channels_[i]->TransmitData(&serialization_batch_, false /*swap_batch*/)); } - next_batch_idx_ = (next_batch_idx_ + 1) % NUM_OUTBOUND_BATCHES; + // At this point no RPCs can still refer to the old in_flight_batch_. + in_flight_batch_.swap(serialization_batch_); } else if (partition_type_ == TPartitionType::RANDOM || channels_.size() == 1) { // Round-robin batches among channels. Wait for the current channel to finish its // rpc before overwriting its batch. @@ -1197,7 +1214,10 @@ void KrpcDataStreamSender::Close(RuntimeState* state) { channels_[i]->Teardown(state); } - outbound_batches_.clear(); + serialization_batch_.reset(nullptr); + in_flight_batch_.reset(nullptr); + compression_scratch_.reset(nullptr); + if (outbound_rb_mem_tracker_.get() != nullptr) { outbound_rb_mem_tracker_->Close(); } @@ -1208,11 +1228,12 @@ void KrpcDataStreamSender::Close(RuntimeState* state) { } Status KrpcDataStreamSender::SerializeBatch( - RowBatch* src, OutboundRowBatch* dest, int num_receivers) { + RowBatch* src, OutboundRowBatch* dest, bool compress, int num_receivers) { VLOG_ROW << "serializing " << src->num_rows() << " rows"; { SCOPED_TIMER(serialize_batch_timer_); - RETURN_IF_ERROR(src->Serialize(dest)); + RETURN_IF_ERROR( + src->Serialize(dest, compress ? compression_scratch_.get() : nullptr)); int64_t uncompressed_bytes = RowBatch::GetDeserializedSize(*dest); COUNTER_ADD(uncompressed_bytes_counter_, uncompressed_bytes * num_receivers); } diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h index 1b1b78813..2d26e3adf 100644 --- a/be/src/runtime/krpc-data-stream-sender.h +++ b/be/src/runtime/krpc-data-stream-sender.h @@ -166,9 +166,11 @@ class KrpcDataStreamSender : public DataSink { /// Serializes the src batch into the serialized row batch 'dest' and updates /// various stat counters. + /// 'compress' decides whether compression is attempted after serialization. /// 'num_receivers' is the number of receivers this batch will be sent to. Used for /// updating the stat counters. - Status SerializeBatch(RowBatch* src, OutboundRowBatch* dest, int num_receivers = 1); + Status SerializeBatch( + RowBatch* src, OutboundRowBatch* dest, bool compress, int num_receivers = 1); /// Returns 'partition_expr_evals_[i]'. Used by the codegen'd HashRow() IR function. ScalarExprEvaluator* GetPartitionExprEvaluator(int i); @@ -211,15 +213,20 @@ class KrpcDataStreamSender : public DataSink { /// Index of the current channel to send to if random_ == true. int current_channel_idx_ = 0; - /// Index of the next OutboundRowBatch to use for serialization. - int next_batch_idx_ = 0; + /// Pointer to OutboundRowBatch that will be used for serialization. + /// Swapped with in_flight_batch_ (UNPARTITIONED case) or the channel's outbound_batch_ + /// (PARTITIONED case) after serialization. + std::unique_ptr<OutboundRowBatch> serialization_batch_; - /// The outbound row batches are double-buffered so that we can serialize the next - /// batch while the other is still referenced by the in-flight RPC. Each entry contains - /// a RowBatchHeaderPB and buffers for the serialized tuple offsets and data. Used only - /// when the partitioning strategy is UNPARTITIONED. - static const int NUM_OUTBOUND_BATCHES = 2; - std::vector<OutboundRowBatch> outbound_batches_; + /// Buffer used for compression after serialization. Swapped with the OutboundRowBatch's + /// tuple_data_ if the compressed data is smaller. + std::unique_ptr<TrackedString> compression_scratch_; + + /// Pointer to OutboundRowBatch referenced by the in-flight RPC(s). Used only + /// when the partitioning strategy is UNPARTITIONED. In the broadcasting case + /// multiple channels use it at the same time to hold the buffers backing the RPC + /// sidecars. + std::unique_ptr<OutboundRowBatch> in_flight_batch_; /// If true, this sender has called FlushFinal() successfully. /// Not valid to call Send() anymore. diff --git a/be/src/runtime/outbound-row-batch.h b/be/src/runtime/outbound-row-batch.h index 4411c6da4..8adb6e847 100644 --- a/be/src/runtime/outbound-row-batch.h +++ b/be/src/runtime/outbound-row-batch.h @@ -35,10 +35,9 @@ class RuntimeState; /// for holding the tuple offsets and tuple data. class OutboundRowBatch { public: - OutboundRowBatch(std::shared_ptr<CharMemTrackerAllocator> allocator, - bool skip_compression=false) - : tuple_data_(*allocator.get()), compression_scratch_(*allocator.get()), - skip_compression_(skip_compression) {} + + OutboundRowBatch(std::shared_ptr<CharMemTrackerAllocator> allocator) + : tuple_data_(*allocator.get()) {} const RowBatchHeaderPB* header() const { return &header_; } @@ -79,13 +78,6 @@ class OutboundRowBatch { /// Contains the actual data of all the tuples. The data could be compressed. TrackedString tuple_data_; - - /// Contains the compression scratch for the compressed data in serialization. - /// The compression_scratch_ will be swapped with tuple_data_ if the compressed data - /// is shorter. - TrackedString compression_scratch_; - - bool skip_compression_; }; } diff --git a/be/src/runtime/row-batch-serialize-test.cc b/be/src/runtime/row-batch-serialize-test.cc index c5a5d9c4b..9fa587e7d 100644 --- a/be/src/runtime/row-batch-serialize-test.cc +++ b/be/src/runtime/row-batch-serialize-test.cc @@ -81,8 +81,9 @@ class RowBatchSerializeTest : public testing::Test { bool print_batches, bool full_dedup = false) { if (print_batches) cout << PrintBatch(batch) << endl; + TrackedString compression_scratch(*char_mem_tracker_allocator_.get()); OutboundRowBatch row_batch(char_mem_tracker_allocator_); - RETURN_IF_ERROR(batch->Serialize(&row_batch, full_dedup)); + RETURN_IF_ERROR(batch->Serialize(&row_batch, full_dedup, &compression_scratch)); RowBatch deserialized_batch(&row_desc, row_batch, tracker_.get()); if (print_batches) cout << PrintBatch(&deserialized_batch) << endl; @@ -613,8 +614,10 @@ void RowBatchSerializeTest::TestDupRemoval(bool full_dedup) { vector<Tuple*> tuples; CreateTuples(tuple_desc, batch->tuple_data_pool(), num_distinct_tuples, 0, 10, &tuples); AddTuplesToRowBatch(num_rows, tuples, repeats, batch); + + TrackedString compression_scratch(*char_mem_tracker_allocator_.get()); OutboundRowBatch row_batch(char_mem_tracker_allocator_); - EXPECT_OK(batch->Serialize(&row_batch, full_dedup)); + EXPECT_OK(batch->Serialize(&row_batch, full_dedup, &compression_scratch)); // Serialized data should only have one copy of each tuple. int64_t total_byte_size = 0; // Total size without duplication for (int i = 0; i < tuples.size(); ++i) { @@ -751,8 +754,10 @@ TEST_F(RowBatchSerializeTest, DedupPathologicalFull) { // Full dedup should be automatically enabled because of row batch structure. EXPECT_TRUE(UseFullDedup(batch)); LOG(INFO) << "Serializing row batch"; + + TrackedString compression_scratch(*char_mem_tracker_allocator_.get()); OutboundRowBatch row_batch(char_mem_tracker_allocator_); - EXPECT_OK(batch->Serialize(&row_batch)); + EXPECT_OK(batch->Serialize(&row_batch, &compression_scratch)); LOG(INFO) << "Serialized batch size: " << row_batch.TupleDataAsSlice().size(); LOG(INFO) << "Serialized batch uncompressed size: " << row_batch.header()->uncompressed_size(); diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc index 144734dfe..a8b4a7730 100644 --- a/be/src/runtime/row-batch.cc +++ b/be/src/runtime/row-batch.cc @@ -220,11 +220,13 @@ RowBatch::~RowBatch() { tuple_ptrs_ = nullptr; } -Status RowBatch::Serialize(OutboundRowBatch* output_batch) { - return Serialize(output_batch, UseFullDedup()); +Status RowBatch::Serialize( + OutboundRowBatch* output_batch, TrackedString* compression_scratch) { + return Serialize(output_batch, UseFullDedup(), compression_scratch); } -Status RowBatch::Serialize(OutboundRowBatch* output_batch, bool full_dedup) { +Status RowBatch::Serialize( + OutboundRowBatch* output_batch, bool full_dedup, TrackedString* compression_scratch) { bool is_compressed; output_batch->tuple_offsets_.clear(); @@ -251,8 +253,8 @@ Status RowBatch::Serialize(OutboundRowBatch* output_batch, bool full_dedup) { } output_batch->tuple_data_.resize(size); - RETURN_IF_ERROR(Serialize( - full_dedup ? &distinct_tuples : nullptr, output_batch, &is_compressed, size)); + RETURN_IF_ERROR(Serialize(full_dedup ? &distinct_tuples : nullptr, + output_batch, &is_compressed, size, compression_scratch)); // Initialize the RowBatchHeaderPB RowBatchHeaderPB* header = &output_batch->header_; @@ -266,7 +268,7 @@ Status RowBatch::Serialize(OutboundRowBatch* output_batch, bool full_dedup) { } Status RowBatch::Serialize(DedupMap* distinct_tuples, OutboundRowBatch* output_batch, - bool* is_compressed, int64_t size) { + bool* is_compressed, int64_t size, TrackedString* compression_scratch) { char* tuple_data = const_cast<char*>(output_batch->tuple_data_.data()); std::vector<int32_t>* tuple_offsets = &output_batch->tuple_offsets_; @@ -274,9 +276,9 @@ Status RowBatch::Serialize(DedupMap* distinct_tuples, OutboundRowBatch* output_b *is_compressed = false; - if (size > 0 && !output_batch->skip_compression_) { - // Try compressing tuple_data to compression_scratch_, swap if compressed data is - // smaller + if (size > 0 && compression_scratch != nullptr) { + // Try compressing tuple_data to compression_scratch, swap if compressed data is + // smaller. Lz4Compressor compressor(nullptr, false); RETURN_IF_ERROR(compressor.Init()); auto compressor_cleanup = @@ -288,19 +290,21 @@ Status RowBatch::Serialize(DedupMap* distinct_tuples, OutboundRowBatch* output_b return Status(TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE, size); } DCHECK_GT(compressed_size, 0); - if (output_batch->compression_scratch_.size() < compressed_size) { - output_batch->compression_scratch_.resize(compressed_size); + if (compression_scratch->size() < compressed_size) { + compression_scratch->resize(compressed_size); } uint8_t* input = reinterpret_cast<uint8_t*>(tuple_data); uint8_t* compressed_output = const_cast<uint8_t*>( - reinterpret_cast<const uint8_t*>(output_batch->compression_scratch_.data())); + reinterpret_cast<const uint8_t*>(compression_scratch->data())); RETURN_IF_ERROR( compressor.ProcessBlock(true, size, input, &compressed_size, &compressed_output)); if (LIKELY(compressed_size < size)) { - output_batch->compression_scratch_.resize(compressed_size); - output_batch->tuple_data_.swap(output_batch->compression_scratch_); + compression_scratch->resize(compressed_size); + output_batch->tuple_data_.swap(*compression_scratch); *is_compressed = true; + // TODO: could copy to a smaller buffer if compressed data is much smaller to + // save memory } VLOG_ROW << "uncompressed size: " << size << ", compressed size: " << compressed_size; } diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h index 80e9ac054..3626d7dce 100644 --- a/be/src/runtime/row-batch.h +++ b/be/src/runtime/row-batch.h @@ -331,7 +331,7 @@ class RowBatch { /// larger than the uncompressed data. Use output_batch.compression_type to determine /// whether tuple_data is compressed. If an in-flight row is present in this row batch, /// it is ignored. This function does not Reset(). - Status Serialize(OutboundRowBatch* output_batch); + Status Serialize(OutboundRowBatch* output_batch, TrackedString* compression_scratch); /// Utility function: returns total byte size of a batch in either serialized or /// deserialized form. If a row batch is compressed, its serialized size can be much @@ -411,7 +411,8 @@ class RowBatch { bool UseFullDedup(); /// Overload for testing that allows the test to force the deduplication level. - Status Serialize(OutboundRowBatch* output_batch, bool full_dedup); + Status Serialize(OutboundRowBatch* output_batch, bool full_dedup, + TrackedString* compression_scratch); typedef FixedSizeHashTable<Tuple*, int> DedupMap; @@ -435,7 +436,7 @@ class RowBatch { /// /// Returns error status if serialization failed. Returns OK otherwise. Status Serialize(DedupMap* distinct_tuples, OutboundRowBatch* output_batch, - bool* is_compressed, int64_t size); + bool* is_compressed, int64_t size, TrackedString* compression_scratch); /// Implementation for protobuf to deserialize a row batch. /// diff --git a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java index 6052226b6..b379da757 100644 --- a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java +++ b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java @@ -67,6 +67,8 @@ public class DataStreamSink extends DataSink { * equal to avgOutboundRowBatchSize. If outputPartiton_ is partitioned, all of the * channel's OutboundRowBatches are used. Otherwise, only a pair of OutboundRowBatches * in KrpcDataStreamSender class are used. + * TODO: this function can both over and under estimate the memory need + * see IMPALA-12594 + IMPALA-12433 */ private long estimateOutboundRowBatchBuffers(TQueryOptions queryOptions) { int numChannels =
