This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit daa4d6e916f80d8b929dcf4873668accceb33b0b Author: Zoltan Borok-Nagy <[email protected]> AuthorDate: Tue Jun 25 17:20:20 2024 +0200 IMPALA-13194: Fast-serialize position delete records Currently the serialization of position delete records are very wasteful. The records contain slots 'file_path' and 'pos'. And what we do during serialization is the following: 1. Write fixed-size tuples that have a StringValue and a BigInt slot 2. Copy the StringValue's contents after the tuple. 3. Convert the StringValue ptr to be an offset to the string data So we end up having something like this: +-------------+--------+----------------+-------------+--------+----------------+-----+ | StringValue | BigInt | File path | StringValue | BigInt | File path | ... | +-------------+--------+----------------+-------------+--------+----------------+-----+ | ptr, len | 42 | /.../a.parquet | ptr, len | 43 | /.../a.parquet | ... | +-------------+--------+----------------+-------------+--------+----------------+-----+ This is very redundant to store the file paths that way, and in the end we will have a huge buffer that we need to compress and send over the network. Moreover, we copy the file paths in memory twice: 1. From input row batch to the KrpcDataStreamSender::Channel's temporary row batch 2. From the temporary row batch to the outbound row batch (during serialization) The position delete files store the delete records in ascending order. This means adjacent records mostly have the same file path. So we could just buffer the position delete records up to the Channel's capacity, then serialize the data in a more efficient way. With this patch, serialized data will look like this: +----------------+-------------+--------+-------------+--------+-----+ | File path | StringValue | BigInt | StringValue | BigInt | ... | +----------------+-------------+--------+-------------+--------+-----+ | /.../a.parquet | ptr, len | 42 | ptr, len | 43 | ... | +----------------+-------------+--------+-------------+--------+-----+ File path, then tuples with the same file path, after that comes the next file path and tuples associated with that one, and so on. Measurements: 07:EXCHANGE : 1m ==> 52s F02:EXCHANGE SENDER: 1m2s ==> 16s Change-Id: I6095f318e3d06dedb4197681156b40dd2a326c6f Reviewed-on: http://gerrit.cloudera.org:8080/21563 Reviewed-by: Csaba Ringhofer <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/iceberg-delete-builder.h | 9 +- be/src/runtime/CMakeLists.txt | 1 + be/src/runtime/iceberg-position-delete-collector.h | 162 +++++++++++++++++++++ be/src/runtime/krpc-data-stream-sender.cc | 125 ++++++++++++++-- be/src/runtime/krpc-data-stream-sender.h | 6 + be/src/runtime/outbound-row-batch.cc | 85 +++++++++++ be/src/runtime/outbound-row-batch.h | 15 ++ be/src/runtime/row-batch.cc | 49 +------ be/src/runtime/string-value.h | 11 ++ 9 files changed, 395 insertions(+), 68 deletions(-) diff --git a/be/src/exec/iceberg-delete-builder.h b/be/src/exec/iceberg-delete-builder.h index 6ae8716fd..ae25d3040 100644 --- a/be/src/exec/iceberg-delete-builder.h +++ b/be/src/exec/iceberg-delete-builder.h @@ -109,15 +109,8 @@ class IcebergDeleteBuilder : public JoinBuilder { std::string DebugString() const; - struct StringValueHashWrapper { - size_t operator()(const impala::StringValue& str) const { - return impala::hash_value(str); - } - }; - using DeleteRowHashTable = - std::unordered_map<impala::StringValue, RoaringBitmap64, - StringValueHashWrapper>; + std::unordered_map<impala::StringValue, RoaringBitmap64>; DeleteRowHashTable& deleted_rows() { return deleted_rows_; } diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index fb9f8c7c6..a1e41b35d 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -68,6 +68,7 @@ add_library(Runtime lib-cache.cc mem-tracker.cc mem-pool.cc + outbound-row-batch.cc query-driver.cc query-exec-mgr.cc query-exec-params.cc diff --git a/be/src/runtime/iceberg-position-delete-collector.h b/be/src/runtime/iceberg-position-delete-collector.h new file mode 100644 index 000000000..0edc1bab5 --- /dev/null +++ b/be/src/runtime/iceberg-position-delete-collector.h @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> +#include <unordered_map> + +#include "runtime/descriptors.h" +#include "runtime/tuple.h" +#include "runtime/tuple-row.h" + +namespace impala { + +/// Helper class for collecting position delete records and efficiently +/// serializing them to outbound row batches. The serialization algorithm +/// is the following: +/// for each entry in 'file_to_positions_': +/// write out the file path +/// for each position for the current file path: +/// write out a tuple: (StringValue object pointing to the file path, position) +/// E.g.: +/// +----------------+-------------+--------+-------------+--------+-----+ +/// | File path | StringValue | BigInt | StringValue | BigInt | ... | +/// +----------------+-------------+--------+-------------+--------+-----+ +/// | /.../a.parquet | ptr, len | 42 | ptr, len | 43 | ... | +/// +----------------+-------------+--------+-------------+--------+-----+ +/// +/// IcebergPositionDeleteCollector tracks the memory of the file paths, but the +/// positions are stored in a std::vector<int64_t> untracked (since it only store records +/// up to a KrpcDataStreamSender::Channel's capacity, the memory consumption should +/// not be too significant). +class IcebergPositionDeleteCollector { +public: + IcebergPositionDeleteCollector(TupleDescriptor* desc) { + desc_ = desc; + DCHECK_EQ(desc_->slots().size(), 2); + SlotDescriptor* file_path_desc = desc_->slots()[0]; + SlotDescriptor* position_desc = desc_->slots()[1]; + DCHECK(file_path_desc->type().IsVarLenStringType()); + DCHECK(position_desc->type().type == TYPE_BIGINT); + file_path_offset_ = file_path_desc->tuple_offset(); + pos_offset_ = position_desc->tuple_offset(); + row_count_ = 0; + insert_it_ = file_to_positions_.end(); + } + + void Init(MemTracker* parent_mem_tracker) { + pool_ = std::make_unique<MemPool>(parent_mem_tracker); + } + + Status AddRow(TupleRow* row) { + Tuple* tuple = row->GetTuple(0); + StringValue* filename_value = tuple->GetStringSlot(file_path_offset_); + int64_t pos = *tuple->GetBigIntSlot(pos_offset_); + + RETURN_IF_ERROR(SetInsertIterator(filename_value)); + insert_it_->second.push_back(pos); + ++row_count_; + return Status::OK(); + } + + int RowCount() const { return row_count_; } + + void Close() { + Reset(); + pool_->FreeAll(); + } + + /// Serializes the collected position delete records into 'dest', + /// then resets the internal structures of 'this'. + Status Serialize(OutboundRowBatch* dest) { + dest->tuple_offsets_.clear(); + int64_t tuple_data_size = TupleDataSize(); + dest->tuple_data_.resize(tuple_data_size); + char* tuple_data = const_cast<char*>(dest->tuple_data_.data()); + Ubsan::MemSet(tuple_data, 0, tuple_data_size); + int offset = 0; + for (const auto& [path, positions] : file_to_positions_) { + int path_start = offset; + int path_len = path.Len(); + Ubsan::MemCpy(tuple_data + offset, path.Ptr(), path_len); + offset += path_len; + for (int64_t pos : positions) { + dest->tuple_offsets_.push_back(offset); + Tuple* t = reinterpret_cast<Tuple*>(tuple_data + offset); + StringValue* sv = t->GetStringSlot(file_path_offset_); + sv->Assign(reinterpret_cast<char*>(path_start), path_len); + int64_t* pos_slot = t->GetBigIntSlot(pos_offset_); + DCHECK_GE(pos, 0); + *pos_slot = pos; + offset += desc_->byte_size(); + } + } + DCHECK_EQ(offset, tuple_data_size); + Reset(); + return Status::OK(); + } + +private: + void Reset() { + file_to_positions_.clear(); + insert_it_ = file_to_positions_.end(); + pool_->Clear(); + row_count_ = 0; + } + + int64_t TupleDataSize() const { + int64_t total_size = 0; + for (const auto& [filename, positions] : file_to_positions_) { + total_size += filename.Len(); + total_size += positions.size() * desc_->byte_size(); + } + return total_size; + } + + Status SetInsertIterator(StringValue* filename_value) { + if (insert_it_ != file_to_positions_.end() && + insert_it_->first == *filename_value) { + return Status::OK(); + } + insert_it_ = file_to_positions_.find(*filename_value); + if (insert_it_ != file_to_positions_.end()) { + return Status::OK(); + } + StringValue::SimpleString ss = filename_value->ToSimpleString(); + char* ptr = reinterpret_cast<char*>(pool_->TryAllocate(ss.len)); + if (UNLIKELY(ptr == nullptr)) { + return Status(strings::Substitute( + "Could not allocate $0 bytes in IcebergPositionDeleteChannel.", ss.len)); + } + Ubsan::MemCpy(ptr, ss.ptr, ss.len); + StringValue sv(ptr, ss.len); + insert_it_ = file_to_positions_.insert(insert_it_, {sv, {}}); + return Status::OK(); + } + + std::unique_ptr<MemPool> pool_; + TupleDescriptor* desc_; + int file_path_offset_; + int pos_offset_; + int row_count_; + using FileToPositionsMap = std::unordered_map<StringValue, vector<int64_t>>; + FileToPositionsMap file_to_positions_; + FileToPositionsMap::iterator insert_it_; +}; + +} \ No newline at end of file diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc index 8dbf3dc3a..e777c63bb 100644 --- a/be/src/runtime/krpc-data-stream-sender.cc +++ b/be/src/runtime/krpc-data-stream-sender.cc @@ -39,6 +39,7 @@ #include "runtime/descriptors.h" #include "runtime/exec-env.h" #include "runtime/fragment-state.h" +#include "runtime/iceberg-position-delete-collector.h" #include "runtime/mem-tracker.h" #include "runtime/raw-value.inline.h" #include "runtime/row-batch.h" @@ -50,6 +51,7 @@ #include "util/debug-util.h" #include "util/network-util.h" #include "util/pretty-printer.h" +#include "util/ubsan.h" #include "gen-cpp/data_stream_service.pb.h" #include "gen-cpp/data_stream_service.proxy.h" @@ -219,12 +221,15 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned { // Returns OK otherwise. This should be only called from a fragment executor thread. Status WaitForRpc(); + int RowBatchCapacity() const; + // The type for a RPC worker function. typedef boost::function<Status()> DoRpcFn; bool IsLocal() const { return is_local_; } private: + friend KrpcDataStreamSender::IcebergPositionDeleteChannel; // The parent data stream sender owning this channel. Not owned. KrpcDataStreamSender* parent_; @@ -297,6 +302,9 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned { // TODO: Fix IMPALA-3990 bool remote_recvr_closed_ = false; + // Returns the serialization batch from 'parent'. + unique_ptr<OutboundRowBatch>* GetSerializationBatch(); + // Returns true if the channel should terminate because the parent sender // has been closed or cancelled. bool ShouldTerminate() const { return shutdown_ || parent_->state_->is_cancelled(); } @@ -383,10 +391,7 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned { Status KrpcDataStreamSender::Channel::Init( RuntimeState* state, std::shared_ptr<CharMemTrackerAllocator> allocator) { - // TODO: take into account of var-len data at runtime. - int capacity = - max(1, parent_->per_channel_buffer_size_ / max(row_desc_->GetRowSize(), 1)); - batch_.reset(new RowBatch(row_desc_, capacity, parent_->mem_tracker())); + batch_.reset(new RowBatch(row_desc_, RowBatchCapacity(), parent_->mem_tracker())); // Create a DataStreamService proxy to the destination. RETURN_IF_ERROR(DataStreamService::GetProxy(address_, hostname_, &proxy_)); @@ -397,6 +402,12 @@ Status KrpcDataStreamSender::Channel::Init( return Status::OK(); } +int KrpcDataStreamSender::Channel::RowBatchCapacity() const { + // TODO: take into account of var-len data at runtime. + return + max(1, parent_->per_channel_buffer_size_ / max(row_desc_->GetRowSize(), 1)); +} + void KrpcDataStreamSender::Channel::MarkDone(const Status& status) { if (UNLIKELY(!status.ok())) COUNTER_ADD(parent_->rpc_failure_counter_, 1); rpc_status_ = status; @@ -613,16 +624,21 @@ Status KrpcDataStreamSender::Channel::TransmitData( } Status KrpcDataStreamSender::Channel::SerializeAndSendBatch(RowBatch* batch) { + unique_ptr<OutboundRowBatch>* serialization_batch = GetSerializationBatch(); + RETURN_IF_ERROR(parent_->SerializeBatch(batch, serialization_batch->get(), !is_local_)); + // Swap serialization_batch with outbound_batch_ once the old RPC is finished. + RETURN_IF_ERROR(TransmitData(serialization_batch, true /*swap_batch*/)); + return Status::OK(); +} + +unique_ptr<OutboundRowBatch>* KrpcDataStreamSender::Channel::GetSerializationBatch() { unique_ptr<OutboundRowBatch>* serialization_batch = &parent_->serialization_batch_; DCHECK(serialization_batch->get() != nullptr); // Reads 'rpc_in_flight_batch_' without acquiring 'lock_', so reads can be racey. ANNOTATE_IGNORE_READS_BEGIN(); DCHECK(serialization_batch->get() != rpc_in_flight_batch_); ANNOTATE_IGNORE_READS_END(); - RETURN_IF_ERROR(parent_->SerializeBatch(batch, serialization_batch->get(), !is_local_)); - // Swap serialization_batch with outbound_batch_ once the old RPC is finished. - RETURN_IF_ERROR(TransmitData(serialization_batch, true /*swap_batch*/)); - return Status::OK(); + return serialization_batch; } Status KrpcDataStreamSender::Channel::SendCurrentBatch() { @@ -743,6 +759,64 @@ void KrpcDataStreamSender::Channel::Teardown(RuntimeState* state) { outbound_batch_.reset(nullptr); } +/// Channel's AddRow() and the generic serialization methods are inefficient for +/// Iceberg position delete records. This class stores and effciently serializes +/// such data, then uses an internal Channel object's TransmitData() to send out +/// the already serialized outbound row batches. +class KrpcDataStreamSender::IcebergPositionDeleteChannel { + public: + IcebergPositionDeleteChannel(KrpcDataStreamSender* parent, Channel* channel, + TupleDescriptor* desc) : delete_collector_(desc) { + parent_ = parent; + channel_ = channel; + capacity_ = channel_->RowBatchCapacity(); + } + + void Prepare(MemTracker* parent_mem_tracker) { + delete_collector_.Init(parent_mem_tracker); + } + + void Teardown() { + delete_collector_.Close(); + } + + Status AddRow(TupleRow* row) { + RETURN_IF_ERROR(delete_collector_.AddRow(row)); + if (delete_collector_.RowCount() == capacity_) { + RETURN_IF_ERROR(Flush()); + } + return Status::OK(); + } + + Status Flush() { + if (delete_collector_.RowCount() == 0) return Status::OK(); + unique_ptr<OutboundRowBatch>* serialization_batch = channel_->GetSerializationBatch(); + RETURN_IF_ERROR(ToOutboundRowBatch(serialization_batch->get())); + RETURN_IF_ERROR(channel_->TransmitData(serialization_batch, /*swap_batch=*/true)); + return Status::OK(); + } + + private: + Status ToOutboundRowBatch(OutboundRowBatch* dest) { + { + SCOPED_TIMER(parent_->serialize_batch_timer_); + RETURN_IF_ERROR(delete_collector_.Serialize(dest)); + constexpr int NUM_TUPLES_PER_ROW = 1; + bool compress = !channel_->IsLocal(); + RETURN_IF_ERROR(dest->PrepareForSend(NUM_TUPLES_PER_ROW, + compress ? parent_->compression_scratch_.get(): nullptr)); + int64_t uncompressed_bytes = RowBatch::GetDeserializedSize(*dest); + COUNTER_ADD(parent_->uncompressed_bytes_counter_, uncompressed_bytes); + } + return Status::OK(); + } + + KrpcDataStreamSender* parent_; + KrpcDataStreamSender::Channel* channel_; + int capacity_; + IcebergPositionDeleteCollector delete_collector_; +}; + KrpcDataStreamSender::KrpcDataStreamSender(TDataSinkId sink_id, int sender_id, const KrpcDataStreamSenderConfig& sink_config, const TDataStreamSink& sink, const google::protobuf::RepeatedPtrField<PlanFragmentDestinationPB>& destinations, @@ -779,6 +853,14 @@ KrpcDataStreamSender::KrpcDataStreamSender(TDataSinkId sink_id, int sender_id, host_to_channel_[destination.address()] = channels_.back().get(); } } + if (IsDirectedMode()) { + DCHECK_EQ(row_desc_->tuple_descriptors().size(), 1); + TupleDescriptor* tuple_desc = row_desc_->tuple_descriptors()[0]; + for (unique_ptr<Channel>& ch : channels_) { + channel_to_ice_channel_[ch.get()] = make_unique<IcebergPositionDeleteChannel>( + this, ch.get(), tuple_desc); + } + } if (partition_type_ == TPartitionType::UNPARTITIONED || partition_type_ == TPartitionType::RANDOM) { @@ -838,6 +920,9 @@ Status KrpcDataStreamSender::Prepare( for (int i = 0; i < channels_.size(); ++i) { RETURN_IF_ERROR(channels_[i]->Init(state, char_mem_tracker_allocator_)); } + for (auto& [ch, ice_ch] : channel_to_ice_channel_) { + ice_ch->Prepare(mem_tracker_.get()); + } return Status::OK(); } @@ -1105,13 +1190,13 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) { } else if (partition_type_ == TPartitionType::DIRECTED) { const int num_rows = batch->num_rows(); char* prev_filename_ptr = nullptr; - vector<Channel*> prev_channels; + vector<IcebergPositionDeleteChannel*> prev_channels; bool skipped_prev_row = false; for (int row_idx = 0; row_idx < num_rows; ++row_idx) { DCHECK_EQ(batch->num_tuples_per_row(), 1); TupleRow* tuple_row = batch->GetRow(row_idx); - Tuple* row = batch->GetRow(row_idx)->GetTuple(0); - StringValue* filename_value = row->GetStringSlot(0); + Tuple* tuple = batch->GetRow(row_idx)->GetTuple(0); + StringValue* filename_value = tuple->GetStringSlot(0); DCHECK(filename_value != nullptr); StringValue::SimpleString filename_value_ss = filename_value->ToSimpleString(); if (filename_value_ss.ptr == prev_filename_ptr) { @@ -1119,7 +1204,9 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) { // send the row to the same channels as the previous row. DCHECK(skipped_prev_row || !prev_channels.empty() || (filename_value_ss.len == 0 && prev_channels.empty())); - for (Channel* ch : prev_channels) RETURN_IF_ERROR(ch->AddRow(tuple_row)); + for (IcebergPositionDeleteChannel* ch : prev_channels) { + RETURN_IF_ERROR(ch->AddRow(tuple_row)); + } continue; } prev_filename_ptr = filename_value_ss.ptr; @@ -1159,8 +1246,10 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) { return Status(ss.str()); } - prev_channels.push_back(channel_map_it->second); - RETURN_IF_ERROR(channel_map_it->second->AddRow(tuple_row)); + IcebergPositionDeleteChannel* ice_channel = + channel_to_ice_channel_[channel_map_it->second].get(); + prev_channels.push_back(ice_channel); + RETURN_IF_ERROR(ice_channel->AddRow(tuple_row)); } } } else { @@ -1209,6 +1298,9 @@ Status KrpcDataStreamSender::FlushFinal(RuntimeState* state) { // If we hit an error here, we can return without closing the remaining channels as // the error is propagated back to the coordinator, which in turn cancels the query, // which will cause the remaining open channels to be closed. + for (auto& [ch, ice_ch] : channel_to_ice_channel_) { + RETURN_IF_ERROR(ice_ch->Flush()); + } for (unique_ptr<Channel>& channel : channels_) { RETURN_IF_ERROR(channel->FlushBatches()); } @@ -1227,6 +1319,11 @@ Status KrpcDataStreamSender::FlushFinal(RuntimeState* state) { void KrpcDataStreamSender::Close(RuntimeState* state) { SCOPED_TIMER(profile()->total_time_counter()); if (closed_) return; + + for (auto& [ch, ice_ch] : channel_to_ice_channel_) { + ice_ch->Teardown(); + } + for (int i = 0; i < channels_.size(); ++i) { channels_[i]->Teardown(state); } diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h index 2d26e3adf..d201f878c 100644 --- a/be/src/runtime/krpc-data-stream-sender.h +++ b/be/src/runtime/krpc-data-stream-sender.h @@ -163,6 +163,7 @@ class KrpcDataStreamSender : public DataSink { private: class Channel; + class IcebergPositionDeleteChannel; /// Serializes the src batch into the serialized row batch 'dest' and updates /// various stat counters. @@ -304,6 +305,11 @@ class KrpcDataStreamSender : public DataSink { /// A mapping between host addresses to channels. Used for DIRECTED distribution mode /// where only one channel is associated with each host address. std::unordered_map<NetworkAddressPB, Channel*> host_to_channel_; + /// A mapping from Channel to IcebergPositionDeleteChannel. Only used in DIRECTED mode + /// where IcebergPositionDeleteChannel applies a specific serialization algorithm on + /// position delete records. + std::unordered_map<Channel*, std::unique_ptr<IcebergPositionDeleteChannel>> + channel_to_ice_channel_; }; } // namespace impala diff --git a/be/src/runtime/outbound-row-batch.cc b/be/src/runtime/outbound-row-batch.cc new file mode 100644 index 000000000..5926a7801 --- /dev/null +++ b/be/src/runtime/outbound-row-batch.cc @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/outbound-row-batch.h" +#include "util/compress.h" +#include "util/scope-exit-trigger.h" + +namespace impala { + +Status OutboundRowBatch::PrepareForSend(int num_tuples_per_row, + TrackedString* compression_scratch) { + bool is_compressed = false; + int64_t uncompressed_size = tuple_data_.size(); + if (uncompressed_size > 0 && compression_scratch != nullptr) { + RETURN_IF_ERROR(TryCompress(compression_scratch, &is_compressed)); + } + int num_tuples = tuple_offsets_.size(); + DCHECK_EQ(num_tuples % num_tuples_per_row, 0); + int num_rows = num_tuples / num_tuples_per_row; + SetHeader(num_rows, num_tuples_per_row, uncompressed_size, is_compressed); + return Status::OK(); +} + +Status OutboundRowBatch::TryCompress(TrackedString* compression_scratch, + bool* is_compressed) { + DCHECK(compression_scratch != nullptr); + Lz4Compressor compressor(nullptr, false); + RETURN_IF_ERROR(compressor.Init()); + auto compressor_cleanup = + MakeScopeExitTrigger([&compressor]() { compressor.Close(); }); + + *is_compressed = false; + int64_t uncompressed_size = tuple_data_.size(); + // If the input size is too large for LZ4 to compress, MaxOutputLen() will return 0. + int64_t compressed_size = compressor.MaxOutputLen(uncompressed_size); + if (compressed_size == 0) { + return Status(TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE, uncompressed_size); + } + DCHECK_GT(compressed_size, 0); + if (compression_scratch->size() < compressed_size) { + compression_scratch->resize(compressed_size); + } + + uint8_t* input = reinterpret_cast<uint8_t*>(tuple_data_.data()); + uint8_t* compressed_output = const_cast<uint8_t*>( + reinterpret_cast<const uint8_t*>(compression_scratch->data())); + RETURN_IF_ERROR(compressor.ProcessBlock( + true, uncompressed_size, input, &compressed_size, &compressed_output)); + if (LIKELY(compressed_size < uncompressed_size)) { + compression_scratch->resize(compressed_size); + tuple_data_.swap(*compression_scratch); + *is_compressed = true; + // TODO: could copy to a smaller buffer if compressed data is much smaller to + // save memory + } + VLOG_ROW << "uncompressed size: " << uncompressed_size << ", compressed size: " + << compressed_size; + return Status::OK(); +} + +void OutboundRowBatch::SetHeader(int num_rows, int num_tuples_per_row, + int64_t uncompressed_size, bool is_compressed) { + header_.Clear(); + header_.set_num_rows(num_rows); + header_.set_num_tuples_per_row(num_tuples_per_row); + header_.set_uncompressed_size(uncompressed_size); + header_.set_compression_type( + is_compressed ? CompressionTypePB::LZ4 : CompressionTypePB::NONE); +} + +} diff --git a/be/src/runtime/outbound-row-batch.h b/be/src/runtime/outbound-row-batch.h index 8adb6e847..4014b2e5b 100644 --- a/be/src/runtime/outbound-row-batch.h +++ b/be/src/runtime/outbound-row-batch.h @@ -64,10 +64,25 @@ class OutboundRowBatch { header_.has_compression_type(); } + // Prepares the outbound row batch for sending over the network. If + // 'compression_scratch' is not null, then it also tries to compress the tuple_data, + // and swaps tuple_data and compression_scratch if the compressed data is smaller. + // Also sets the header. + Status PrepareForSend(int num_tuples_per_row, TrackedString* compression_scratch); + private: + friend class IcebergPositionDeleteCollector; friend class RowBatch; friend class RowBatchSerializeBaseline; + // Try compressing tuple_data to compression_scratch, swap if compressed data is + // smaller. + Status TryCompress(TrackedString* compression_scratch, bool* is_compressed); + + // Sets header of this outbound row batch. + void SetHeader(int num_rows, int num_tuples_per_row, int64_t uncompressed_size, + bool is_compressed); + /// The serialized header which contains the meta-data of the row batch such as the /// number of rows and compression scheme used etc. RowBatchHeaderPB header_; diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc index 848fcdca7..e66800ca4 100644 --- a/be/src/runtime/row-batch.cc +++ b/be/src/runtime/row-batch.cc @@ -227,7 +227,7 @@ Status RowBatch::Serialize( Status RowBatch::Serialize( OutboundRowBatch* output_batch, bool full_dedup, TrackedString* compression_scratch) { - bool is_compressed; + bool is_compressed = false; output_batch->tuple_offsets_.clear(); DedupMap distinct_tuples; @@ -252,18 +252,8 @@ Status RowBatch::Serialize( return Status(TErrorCode::ROW_BATCH_TOO_LARGE, size, numeric_limits<int32_t>::max()); } output_batch->tuple_data_.resize(size); - RETURN_IF_ERROR(Serialize(full_dedup ? &distinct_tuples : nullptr, output_batch, &is_compressed, size, compression_scratch)); - - // Initialize the RowBatchHeaderPB - RowBatchHeaderPB* header = &output_batch->header_; - header->Clear(); - header->set_num_rows(num_rows_); - header->set_num_tuples_per_row(row_desc_->tuple_descriptors().size()); - header->set_uncompressed_size(size); - header->set_compression_type( - is_compressed ? CompressionTypePB::LZ4 : CompressionTypePB::NONE); return Status::OK(); } @@ -273,41 +263,8 @@ Status RowBatch::Serialize(DedupMap* distinct_tuples, OutboundRowBatch* output_b std::vector<int32_t>* tuple_offsets = &output_batch->tuple_offsets_; RETURN_IF_ERROR(SerializeInternal(size, distinct_tuples, tuple_offsets, tuple_data)); - - *is_compressed = false; - - if (size > 0 && compression_scratch != nullptr) { - // Try compressing tuple_data to compression_scratch, swap if compressed data is - // smaller. - Lz4Compressor compressor(nullptr, false); - RETURN_IF_ERROR(compressor.Init()); - auto compressor_cleanup = - MakeScopeExitTrigger([&compressor]() { compressor.Close(); }); - - // If the input size is too large for LZ4 to compress, MaxOutputLen() will return 0. - int64_t compressed_size = compressor.MaxOutputLen(size); - if (compressed_size == 0) { - return Status(TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE, size); - } - DCHECK_GT(compressed_size, 0); - if (compression_scratch->size() < compressed_size) { - compression_scratch->resize(compressed_size); - } - - uint8_t* input = reinterpret_cast<uint8_t*>(tuple_data); - uint8_t* compressed_output = const_cast<uint8_t*>( - reinterpret_cast<const uint8_t*>(compression_scratch->data())); - RETURN_IF_ERROR( - compressor.ProcessBlock(true, size, input, &compressed_size, &compressed_output)); - if (LIKELY(compressed_size < size)) { - compression_scratch->resize(compressed_size); - output_batch->tuple_data_.swap(*compression_scratch); - *is_compressed = true; - // TODO: could copy to a smaller buffer if compressed data is much smaller to - // save memory - } - VLOG_ROW << "uncompressed size: " << size << ", compressed size: " << compressed_size; - } + RETURN_IF_ERROR(output_batch->PrepareForSend(row_desc_->tuple_descriptors().size(), + compression_scratch)); return Status::OK(); } diff --git a/be/src/runtime/string-value.h b/be/src/runtime/string-value.h index f3d1730c9..5bd1f6452 100644 --- a/be/src/runtime/string-value.h +++ b/be/src/runtime/string-value.h @@ -205,4 +205,15 @@ std::ostream& operator<<(std::ostream& os, const StringValue& string_value); } +/// With this specialization it is possbile to use StringValues in hash-based std +/// containers (unordered_set, unordered_map) without the need of explicitly +/// specifying the Hash template parameter. +namespace std { + template <> struct hash<impala::StringValue> { + size_t operator()(const impala::StringValue& str) const { + return hash_value(str); + } + }; +} + #endif
