This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d58eca93 IMPALA-13509: Copy rows directly to OutboundRowBatch during 
hash partitioning
3d58eca93 is described below

commit 3d58eca9378116836fc47df10021a7747f74dc25
Author: Csaba Ringhofer <[email protected]>
AuthorDate: Thu Jul 18 18:37:30 2024 +0200

    IMPALA-13509: Copy rows directly to OutboundRowBatch during hash 
partitioning
    
    Before this patch all rows were copied twice during hash/Kudu
    partitioning in KrpcDataStreamSender:
    1. to the collector RowBatch of the given Channel
       (KrpcDataStreamSender::Channel::AddRow())
    2. to an OutboundRowBatch when the collector RowBatch is at capacity
       (KrpcDataStreamSender::Channel::SendCurrentBatch())
    
    The change removes the RowBatch from KrpcDataStreamSender::Channel
    and simplifies its interface to only allow sending already serialized
    OutboundRowBatches. The responsibility of collecting per-partition rows
    is moved to a new struct KrpcDataStreamSender::PartitionRowCollector.
    During partitioning each row is deepcopied to the partition's
    PartitionRowCollector's OutboundRowBatch (collector_batch_), which
    has a dynamically growing internal buffer. Once collector_batch_
    reaches its row count/size limit it is finalized/compressed
    and sent to its Channel.
    
    The function responsible for deep copy (OutboundRowBatch::AppendRow())
    is designed similarly to BufferedTupleStream::AddRow():
    1. try deep copy assuming that there is enough space in the buffer
    2. in the rarer case of hitting the end of buffer reallocate the
       buffer and try again
    The main reason for the similar design is to potentially merge the two
    functions (and their codegen) in the future.
    
    The new logic would make it easier to use strict mem limit for
    OutboundRowBatch. Currently the main limit is a row count
    calculated from fixed len row size and
    FLAGS_data_stream_sender_buffer_size (16KB), but ignores var len size.
    An enforced mem limit (RowBatch::AT_CAPACITY_MEM_USAGE) is 8MB,
    but it should be hit only with very large rows.
    Strict 16KB limit is not enforced in this patch as it would likely
    cause perf regression by making OutboundRowBatches smaller. Another
    problem is that a single row can be >16KB (default max_row_size=512KB)
    and it has to be ensured that at least a single row at a time can always
    be transmitted.
    A future commit could enforce the limit and increase
    data_stream_sender_buffer_size at the same time. This would allow
    KrpcDataStreamSender to efficiently work within memory limits.
    See IMPALA-12594 for more details about memory usage and estimates.
    
    Another potential future improvement is adding tuple deduplication
    for the partitioned case (IMPALA-13225). The dedup logic used
    in non-partitioned change (RowBatch::Serialize()) was not working
    as the first deepcopy to rowbatch led to all rows having unique
    addresses so the pointer based deduplication always failed.
    
    Codegen of KrpcDataStreamSender::HashAndAddRows is slightly improved
    by replacing some functions with constants and moving some code
    to inline.h/ir.cc, but no "proper" codegen is done for deepcopy - it
    is still done in an interpreted way using Row/TupleDescriptor.
    
    Other changes:
    - New profile counters added to help analyzing KrpcDataStreamSender
      performance: RpcSuccess, TransmitDataTime
    
    Performance:
    
    TPCH benchmarks improved significantly:
    
+----------+-----------------------+---------+------------+------------+----------------+
    | Workload | File Format           | Avg (s) | Delta(Avg) | GeoMean(s) | 
Delta(GeoMean) |
    
+----------+-----------------------+---------+------------+------------+----------------+
    | TPCH(42) | parquet / none / none | 2.60    | -5.19%     | 1.98       | 
-3.13%         |
    
+----------+-----------------------+---------+------------+------------+----------------+
    In the summaries of improved queries some >1s EXCHANGE SENDERs became
    15-40% faster.
    
    Based on local benchmarks KrpcDataStreamSender's CPU usage seems to be
    dominated by LZ4 compression after the change.
    
    Change-Id: I81a16c2f0fcfc1f3adef7077b3932a29a0f15a8f
    Reviewed-on: http://gerrit.cloudera.org:8080/21932
    Reviewed-by: Csaba Ringhofer <[email protected]>
    Tested-by: Csaba Ringhofer <[email protected]>
---
 be/src/runtime/descriptors.h                 |   4 +
 be/src/runtime/krpc-data-stream-sender-ir.cc |  22 ++++-
 be/src/runtime/krpc-data-stream-sender.cc    | 138 +++++++++++++--------------
 be/src/runtime/krpc-data-stream-sender.h     |  44 ++++++++-
 be/src/runtime/outbound-row-batch.cc         |  16 +++-
 be/src/runtime/outbound-row-batch.h          |  28 +++++-
 be/src/runtime/outbound-row-batch.inline.h   |  85 +++++++++++++++++
 be/src/runtime/tuple-ir.cc                   |  85 +++++++++++++++++
 be/src/runtime/tuple-row.h                   |   2 +-
 be/src/runtime/tuple.h                       |  12 +++
 10 files changed, 355 insertions(+), 81 deletions(-)

diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index dd728525e..10c42c664 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -842,6 +842,10 @@ class RowDescriptor {
     return tuple_desc_map_;
   }
 
+  /// Number of tuples per row. Has IR_NO_INLINE to make it replacable with 
constant
+  /// during codegen.
+  int IR_NO_INLINE num_tuples_no_inline() const { return 
tuple_desc_map_.size(); }
+
   /// Populate row_tuple_ids with our ids.
   void ToThrift(std::vector<TTupleId>* row_tuple_ids) const;
 
diff --git a/be/src/runtime/krpc-data-stream-sender-ir.cc 
b/be/src/runtime/krpc-data-stream-sender-ir.cc
index d0da802d9..df6f5f5e5 100644
--- a/be/src/runtime/krpc-data-stream-sender-ir.cc
+++ b/be/src/runtime/krpc-data-stream-sender-ir.cc
@@ -17,6 +17,7 @@
 
 #include "exprs/scalar-expr-evaluator.h"
 #include "runtime/krpc-data-stream-sender.h"
+#include "runtime/outbound-row-batch.inline.h"
 #include "runtime/raw-value.h"
 #include "runtime/row-batch.h"
 
@@ -29,6 +30,9 @@ ScalarExprEvaluator* 
KrpcDataStreamSender::GetPartitionExprEvaluator(int i) {
 Status KrpcDataStreamSender::HashAndAddRows(RowBatch* batch) {
   const int num_rows = batch->num_rows();
   const int num_channels = GetNumChannels();
+  // Hash / deepcopy in batches of HASH_BATCH_SIZE.
+  // This optimization (IMPALA-6461) can be probably removed if deepcopy gets
+  // codegend.
   int channel_ids[RowBatch::HASH_BATCH_SIZE];
   int row_idx = 0;
   while (row_idx < num_rows) {
@@ -39,11 +43,27 @@ Status KrpcDataStreamSender::HashAndAddRows(RowBatch* 
batch) {
     }
     row_count = 0;
     FOREACH_ROW_LIMIT(batch, row_idx, RowBatch::HASH_BATCH_SIZE, 
row_batch_iter) {
-      RETURN_IF_ERROR(AddRowToChannel(channel_ids[row_count++], 
row_batch_iter.Get()));
+      int channel_id = channel_ids[row_count++];
+      PartitionRowCollector& collector = partition_row_collectors_[channel_id];
+      RETURN_IF_ERROR(collector.AppendRow(row_batch_iter.Get(), row_desc_));
     }
     row_idx += row_count;
   }
   return Status::OK();
 }
 
+Status KrpcDataStreamSender::PartitionRowCollector::AppendRow(
+    const TupleRow* row, const RowDescriptor* row_desc) {
+  DCHECK_LT(num_rows_, row_batch_capacity_);
+  num_rows_++;
+  RETURN_IF_ERROR(collector_batch_->AppendRow(row, row_desc));
+  DCHECK_GT(row_batch_capacity_, 0);
+  if (UNLIKELY(
+      num_rows_ == row_batch_capacity_ || 
collector_batch_->ReachedSizeLimit())) {
+    // This swaps collector_batch_ with an empty batch.
+    RETURN_IF_ERROR(SendCurrentBatch());
+  }
+  return Status::OK();
+}
+
 }
diff --git a/be/src/runtime/krpc-data-stream-sender.cc 
b/be/src/runtime/krpc-data-stream-sender.cc
index 468ee0c9c..c422de9c0 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -118,9 +118,8 @@ void KrpcDataStreamSenderConfig::Close() {
 // A datastream sender may send row batches to multiple destinations. There is 
one
 // channel for each destination.
 //
-// Clients can call TransmitData() to directly send a serialized row batch to 
the
-// destination or it can call AddRow() to accumulate rows in an internal row 
batch
-// to certain capacity before sending it. The underlying RPC layer is 
implemented
+// Clients can call TransmitData() to send a serialized (and optionally
+// compressed) row batch to the destination. The underlying RPC layer is 
implemented
 // with KRPC, which provides interfaces for asynchronous RPC calls. Normally, 
the
 // calls above will return before the RPC has completed but they may block if 
there
 // is already an in-flight RPC.
@@ -131,7 +130,7 @@ void KrpcDataStreamSenderConfig::Close() {
 // can be swapped with outbound_batch_ once the previous in-flight RPC is 
finished.
 // This means that the client can serialize the next row batch while the 
current row batch
 // is being sent. outbound_batch_ is only used in the partitioned case - in the
-// unpartitioned case the shared outbound_batch_ holds the data for the the 
in-flight RPC.
+// unpartitioned case the shared outbound_batch_ holds the data for the 
in-flight RPC.
 //
 // Upon completion of a RPC, the callback TransmitDataCompleteCb() is invoked. 
If the RPC
 // fails due to remote service's queue being full, TransmitDataCompleteCb() 
will schedule
@@ -142,8 +141,7 @@ void KrpcDataStreamSenderConfig::Close() {
 // release resources. Teardown() will cancel any in-flight RPC and wait for the
 // completion callback to be called before returning. It's expected that the 
execution
 // thread to flush all buffered row batches and send the end-of-stream message 
(by
-// calling FlushBatches(), SendEosAsync() and WaitForRpc()) before closing the 
data
-// stream sender.
+// SendEosAsync() and WaitForRpc()) before closing the data stream sender.
 //
 // Note that the RPC payloads are owned solely by the channel and the KRPC 
layer will
 // relinquish references of them before the completion callback is invoked so 
it's
@@ -193,24 +191,13 @@ class KrpcDataStreamSender::Channel : public 
CacheLineAligned {
   // can be reused.
   Status TransmitData(std::unique_ptr<OutboundRowBatch>* outbound_batch, bool 
swap_batch);
 
-  // Copies a single row into this channel's row batch and flushes the row 
batch once
-  // it reaches capacity. This call may block if the row batch's capacity is 
reached
-  // and the preceding RPC is still in progress. Returns error status if 
serialization
-  // failed or if the preceding RPC failed. Return OK otherwise.
-  Status ALWAYS_INLINE AddRow(TupleRow* row);
-
   // Shutdowns the channel and frees the row batch allocation. Any in-flight 
RPC will
-  // be cancelled. It's expected that clients normally call FlushBatches(), 
SendEosAsync()
+  // be cancelled. It's expected that clients normally call SendEosAsync()
   // and WaitForRpc() before calling Teardown() to flush all buffered row 
batches to
   // destinations. Teardown() may be called without flushing the channel in 
cases such
   // as cancellation or error.
   void Teardown(RuntimeState* state);
 
-  // Flushes any buffered row batches. Return error status if the 
TransmitData() RPC
-  // fails. The RPC is sent asynchrononously. WaitForRpc() must be called to 
wait
-  // for the RPC. This should be only called from a fragment executor thread.
-  Status FlushBatches();
-
   // Sends the EOS RPC to close the channel. Return error status if sending 
the EOS RPC
   // failed. The RPC is sent asynchrononously. WaitForRpc() must be called to 
wait for
   // the RPC. This should be only called from a fragment executor thread.
@@ -221,12 +208,14 @@ class KrpcDataStreamSender::Channel : public 
CacheLineAligned {
   // Returns OK otherwise. This should be only called from a fragment executor 
thread.
   Status WaitForRpc();
 
-  int RowBatchCapacity() const;
+  int RowBatchCapacity() const { return row_batch_capacity_; }
+  int CalculateRowBatchCapacity() const;
 
   // The type for a RPC worker function.
   typedef boost::function<Status()> DoRpcFn;
 
   bool IsLocal() const { return is_local_; }
+  KrpcDataStreamSender* GetParent() { return parent_; }
 
  private:
   friend KrpcDataStreamSender::IcebergPositionDeleteChannel;
@@ -246,9 +235,7 @@ class KrpcDataStreamSender::Channel : public 
CacheLineAligned {
   // True if the target fragment instance runs within the same process.
   const bool is_local_;
 
-  // The row batch for accumulating rows copied from AddRow().
-  // Only used if the partitioning scheme is "KUDU" or "HASH_PARTITIONED".
-  scoped_ptr<RowBatch> batch_;
+  int row_batch_capacity_ = -1;
 
   // Owns an outbound row batch that can be referenced by the in-flight RPC. 
Contains
   // a RowBatchHeaderPB and the buffers for the serialized tuple offsets and 
data.
@@ -391,7 +378,7 @@ class KrpcDataStreamSender::Channel : public 
CacheLineAligned {
 
 Status KrpcDataStreamSender::Channel::Init(
     RuntimeState* state, const shared_ptr<CharMemTrackerAllocator>& allocator) 
{
-  batch_.reset(new RowBatch(row_desc_, RowBatchCapacity(), 
parent_->mem_tracker()));
+  row_batch_capacity_ = CalculateRowBatchCapacity();
 
   // Create a DataStreamService proxy to the destination.
   RETURN_IF_ERROR(DataStreamService::GetProxy(address_, hostname_, &proxy_));
@@ -402,14 +389,18 @@ Status KrpcDataStreamSender::Channel::Init(
   return Status::OK();
 }
 
-int KrpcDataStreamSender::Channel::RowBatchCapacity() const {
+int KrpcDataStreamSender::Channel::CalculateRowBatchCapacity() const {
   // TODO: take into account of var-len data at runtime.
   return
       max(1, parent_->per_channel_buffer_size_ / max(row_desc_->GetRowSize(), 
1));
 }
 
 void KrpcDataStreamSender::Channel::MarkDone(const Status& status) {
-  if (UNLIKELY(!status.ok())) COUNTER_ADD(parent_->rpc_failure_counter_, 1);
+  if (UNLIKELY(!status.ok())) {
+    COUNTER_ADD(parent_->rpc_failure_counter_, 1);
+  } else {
+    COUNTER_ADD(parent_->rpc_success_counter_, 1);
+  }
   rpc_status_ = status;
   rpc_in_flight_ = false;
   rpc_in_flight_batch_ = nullptr;
@@ -608,6 +599,8 @@ Status KrpcDataStreamSender::Channel::TransmitData(
            << " #rows=" << outbound_batch->get()->header()->num_rows();
   std::unique_lock<SpinLock> l(lock_);
   RETURN_IF_ERROR(WaitForRpcLocked(&l));
+  // Measure the time needed after getting the lock.
+  SCOPED_TIMER(parent_->transmit_data_timer_);
   DCHECK(!rpc_in_flight_);
   DCHECK(rpc_in_flight_batch_ == nullptr);
   // If the remote receiver is closed already, there is no point in sending 
anything.
@@ -641,27 +634,16 @@ unique_ptr<OutboundRowBatch>* 
KrpcDataStreamSender::Channel::GetSerializationBat
   return serialization_batch;
 }
 
-Status KrpcDataStreamSender::Channel::SendCurrentBatch() {
-  RETURN_IF_ERROR(SerializeAndSendBatch(batch_.get()));
-  batch_->Reset();
-  return Status::OK();
-}
-
-inline Status KrpcDataStreamSender::Channel::AddRow(TupleRow* row) {
-  if (batch_->AtCapacity()) {
-    // batch_ is full, let's send it.
-    RETURN_IF_ERROR(SendCurrentBatch());
-  }
-  TupleRow* dest = batch_->GetRow(batch_->AddRow());
-  const vector<TupleDescriptor*>& descs = row_desc_->tuple_descriptors();
-  for (int i = 0; i < descs.size(); ++i) {
-    if (UNLIKELY(row->GetTuple(i) == nullptr)) {
-      dest->SetTuple(i, nullptr);
-    } else {
-      dest->SetTuple(i, row->GetTuple(i)->DeepCopy(*descs[i], 
batch_->tuple_data_pool()));
-    }
+Status KrpcDataStreamSender::PartitionRowCollector::SendCurrentBatch() {
+  if (collector_batch_->IsEmpty()) {
+    DCHECK_EQ(num_rows_, 0);
+    return Status::OK();
   }
-  batch_->CommitLastRow();
+  num_rows_ = 0;
+  RETURN_IF_ERROR(channel_->GetParent()->PrepareBatchForSend(
+      collector_batch_.get(), !channel_->IsLocal()));
+  RETURN_IF_ERROR(channel_->TransmitData(&collector_batch_, true 
/*swap_batch*/));
+  collector_batch_->Reset();
   return Status::OK();
 }
 
@@ -712,23 +694,7 @@ Status KrpcDataStreamSender::Channel::DoEndDataStreamRpc() 
{
   return Status::OK();
 }
 
-Status KrpcDataStreamSender::Channel::FlushBatches() {
-  VLOG_RPC << "Channel::FlushBatches() fragment_instance_id="
-           << PrintId(fragment_instance_id_) << " dest_node=" << dest_node_id_
-           << " #rows= " << batch_->num_rows();
-
-  // We can return an error here and not go on to send the EOS RPC because the 
error that
-  // we returned will be sent to the coordinator who will then cancel all the 
remote
-  // fragments including the one that this sender is sending to.
-  if (batch_->num_rows() > 0) RETURN_IF_ERROR(SendCurrentBatch());
-  return Status::OK();
-}
-
 Status KrpcDataStreamSender::Channel::SendEosAsync() {
-  VLOG_RPC << "Channel::SendEosAsync() fragment_instance_id="
-           << PrintId(fragment_instance_id_) << " dest_node=" << dest_node_id_
-           << " #rows= " << batch_->num_rows();
-  DCHECK_EQ(0, batch_->num_rows()) << "Batches must be flushed";
   {
     std::unique_lock<SpinLock> l(lock_);
     DCHECK(!rpc_in_flight_);
@@ -755,12 +721,11 @@ void 
KrpcDataStreamSender::Channel::Teardown(RuntimeState* state) {
     rpc_controller_.Cancel();
     while (rpc_in_flight_) rpc_done_cv_.wait(l);
   }
-  batch_.reset();
   outbound_batch_.reset(nullptr);
 }
 
-/// Channel's AddRow() and the generic serialization methods are inefficient 
for
-/// Iceberg position delete records. This class stores and effciently 
serializes
+/// KrpcDataStreamSender's generic partitioning and serialization methods are 
inefficient
+/// for Iceberg position delete records. This class stores and efficiently 
serializes
 /// such data, then uses an internal Channel object's TransmitData() to send 
out
 /// the already serialized outbound row batches.
 class KrpcDataStreamSender::IcebergPositionDeleteChannel {
@@ -848,6 +813,12 @@ KrpcDataStreamSender::KrpcDataStreamSender(TDataSinkId 
sink_id, int sender_id,
         destination.krpc_backend(), destination.fragment_instance_id(), 
sink.dest_node_id,
         per_channel_buffer_size, is_local));
 
+    if (partition_type_  == TPartitionType::HASH_PARTITIONED
+        || sink.output_partition.type == TPartitionType::KUDU) {
+      partition_row_collectors_.emplace_back();
+      partition_row_collectors_.back().channel_ = channels_.back().get();
+    }
+
     if (IsDirectedMode()) {
       DCHECK(host_to_channel_.find(destination.address()) == 
host_to_channel_.end());
       host_to_channel_[destination.address()] = channels_.back().get();
@@ -885,8 +856,10 @@ Status KrpcDataStreamSender::Prepare(
   RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_exprs_, state, 
state->obj_pool(),
       expr_perm_pool_.get(), expr_results_pool_.get(), 
&partition_expr_evals_));
   serialize_batch_timer_ = ADD_TIMER(profile(), "SerializeBatchTime");
+  transmit_data_timer_ = ADD_TIMER(profile(), "TransmitDataTime");
   rpc_retry_counter_ = ADD_COUNTER(profile(), "RpcRetry", TUnit::UNIT);
   rpc_failure_counter_ = ADD_COUNTER(profile(), "RpcFailure", TUnit::UNIT);
+  rpc_success_counter_ = ADD_COUNTER(profile(), "RpcSuccess", TUnit::UNIT);
   bytes_sent_counter_ = ADD_COUNTER(profile(), "TotalBytesSent", TUnit::BYTES);
   state->AddBytesSentCounter(bytes_sent_counter_);
   bytes_sent_time_series_counter_ =
@@ -920,6 +893,10 @@ Status KrpcDataStreamSender::Prepare(
   for (int i = 0; i < channels_.size(); ++i) {
     RETURN_IF_ERROR(channels_[i]->Init(state, char_mem_tracker_allocator_));
   }
+  for (PartitionRowCollector& collector: partition_row_collectors_) {
+    collector.collector_batch_.reset(new 
OutboundRowBatch(char_mem_tracker_allocator_));
+    collector.row_batch_capacity_ = collector.channel_->RowBatchCapacity();
+  }
   for (auto& [ch, ice_ch] : channel_to_ice_channel_) {
     ice_ch->Prepare(mem_tracker_.get());
   }
@@ -1099,6 +1076,11 @@ void KrpcDataStreamSenderConfig::Codegen(FragmentState* 
state) {
         codegen->GetI32Constant(num_channels_), "GetNumChannels");
     DCHECK_EQ(num_replaced, 1);
 
+    num_replaced = codegen->ReplaceCallSitesWithValue(hash_and_add_rows_fn,
+        codegen->GetI32Constant(input_row_desc_->num_tuples_no_inline()),
+        "num_tuples_no_inline");
+    DCHECK_EQ(num_replaced, 1);
+
     // Replace HashRow() with the handcrafted IR function.
     num_replaced = codegen->ReplaceCallSites(hash_and_add_rows_fn,
         hash_row_fn, KrpcDataStreamSender::HASH_ROW_SYMBOL);
@@ -1115,10 +1097,6 @@ void KrpcDataStreamSenderConfig::Codegen(FragmentState* 
state) {
   AddCodegenStatus(codegen_status, sender_name);
 }
 
-Status KrpcDataStreamSender::AddRowToChannel(const int channel_id, TupleRow* 
row) {
-  return channels_[channel_id]->AddRow(row);
-}
-
 uint64_t KrpcDataStreamSender::HashRow(TupleRow* row, uint64_t seed) {
   uint64_t hash_val = seed;
   for (ScalarExprEvaluator* eval : partition_expr_evals_) {
@@ -1167,7 +1145,6 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, 
RowBatch* batch) {
     const int num_rows = batch->num_rows();
     const int hash_batch_size = RowBatch::HASH_BATCH_SIZE;
     int channel_ids[hash_batch_size];
-
     for (int batch_start = 0; batch_start < num_rows; batch_start += 
hash_batch_size) {
       int batch_window_size = min(num_rows - batch_start, hash_batch_size);
       for (int i = 0; i < batch_window_size; ++i) {
@@ -1185,7 +1162,9 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, 
RowBatch* batch) {
 
       for (int i = 0; i < batch_window_size; ++i) {
         TupleRow* row = batch->GetRow(i + batch_start);
-        RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row));
+        int channel_id = channel_ids[i];
+        PartitionRowCollector& collector = 
partition_row_collectors_[channel_id];
+        RETURN_IF_ERROR(collector.AppendRow(row, row_desc_));
       }
     }
   } else if (partition_type_ == TPartitionType::DIRECTED) {
@@ -1302,8 +1281,8 @@ Status KrpcDataStreamSender::FlushFinal(RuntimeState* 
state) {
   for (auto& [ch, ice_ch] : channel_to_ice_channel_) {
     RETURN_IF_ERROR(ice_ch->Flush());
   }
-  for (unique_ptr<Channel>& channel : channels_) {
-    RETURN_IF_ERROR(channel->FlushBatches());
+  for (PartitionRowCollector& collector: partition_row_collectors_) {
+    RETURN_IF_ERROR(collector.SendCurrentBatch());
   }
   for (unique_ptr<Channel>& channel : channels_) {
     RETURN_IF_ERROR(channel->WaitForRpc());
@@ -1325,6 +1304,8 @@ void KrpcDataStreamSender::Close(RuntimeState* state) {
     ice_ch->Teardown();
   }
 
+  partition_row_collectors_.clear();
+
   for (int i = 0; i < channels_.size(); ++i) {
     channels_[i]->Teardown(state);
   }
@@ -1355,6 +1336,19 @@ Status KrpcDataStreamSender::SerializeBatch(
   return Status::OK();
 }
 
+Status KrpcDataStreamSender::PrepareBatchForSend(
+    OutboundRowBatch* batch, bool compress) {
+  DCHECK(partition_type_ == TPartitionType::HASH_PARTITIONED
+      || partition_type_ == TPartitionType::KUDU);
+  SCOPED_TIMER(serialize_batch_timer_);
+  RETURN_IF_ERROR(batch->PrepareForSend(row_desc_->tuple_descriptors().size(),
+      compress ? compression_scratch_.get() : nullptr, true));
+  int64_t uncompressed_bytes = RowBatch::GetDeserializedSize(*batch);
+  COUNTER_ADD(uncompressed_bytes_counter_, uncompressed_bytes);
+  return Status::OK();
+}
+
+
 int64_t KrpcDataStreamSender::GetNumDataBytesSent() const {
   return bytes_sent_counter_->value();
 }
diff --git a/be/src/runtime/krpc-data-stream-sender.h 
b/be/src/runtime/krpc-data-stream-sender.h
index d201f878c..71db3fd9a 100644
--- a/be/src/runtime/krpc-data-stream-sender.h
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -110,7 +110,7 @@ class KrpcDataStreamSender : public DataSink {
   /// 'per_channel_buffer_size' is the soft limit in bytes of the buffering 
into the
   /// per-channel's accumulating row batch before it will be sent.
   /// NOTE: supported partition types are UNPARTITIONED (broadcast), 
HASH_PARTITIONED,
-  /// RANDOM, and DIRECTED (used for sending rows from Iceberg delete files).
+  /// KUDU, RANDOM, and DIRECTED (used for sending rows from Iceberg delete 
files).
   KrpcDataStreamSender(TDataSinkId sink_id, int sender_id,
       const KrpcDataStreamSenderConfig& sink_config, const TDataStreamSink& 
sink,
       const google::protobuf::RepeatedPtrField<PlanFragmentDestinationPB>& 
destinations,
@@ -165,6 +165,30 @@ class KrpcDataStreamSender : public DataSink {
   class Channel;
   class IcebergPositionDeleteChannel;
 
+  // Per partition structure to collect rows before sending the 
OutboundRowBatch to
+  // Channel. Only used in HASH/KUDU partitioning.
+  struct PartitionRowCollector {
+    std::unique_ptr<OutboundRowBatch> collector_batch_;
+    Channel* channel_ = nullptr;
+    int num_rows_ = 0;
+    int row_batch_capacity_ = 0;
+
+    // Copies a single row into collector_batch_ and flushes it 
(SendCurrentBatch())
+    // once row count or memory capacity is reached. This call may block if 
capacity is
+    // reached and channel_'s preceding RPC is still in progress. Returns 
error status
+    // if serialization failed or if the preceding RPC failed. Returns OK 
otherwise.
+    Status IR_ALWAYS_INLINE AppendRow(
+        const TupleRow* row, const RowDescriptor* row_desc);
+
+    // Finalizes and compresses collector_batch_ and sends it with channel_'s
+    // TransmitData(). This call may block if channel_'s preceding RPC is still
+    // in progress. Swaps collector_batch_ with the channel's outbound_batch_.
+    // Returns error status if compression failed or if the preceding RPC 
failed.
+    // Returns OK otherwise.
+    Status SendCurrentBatch();
+  };
+  std::vector<PartitionRowCollector> partition_row_collectors_;
+
   /// Serializes the src batch into the serialized row batch 'dest' and updates
   /// various stat counters.
   /// 'compress' decides whether compression is attempted after serialization.
@@ -173,6 +197,10 @@ class KrpcDataStreamSender : public DataSink {
   Status SerializeBatch(
       RowBatch* src, OutboundRowBatch* dest, bool compress, int num_receivers 
= 1);
 
+  // Like SerializeBatch, but the batch is already serialized and only 
compression is
+  // needed.
+  Status PrepareBatchForSend(OutboundRowBatch* batch, bool compress);
+
   /// Returns 'partition_expr_evals_[i]'. Used by the codegen'd HashRow() IR 
function.
   ScalarExprEvaluator* GetPartitionExprEvaluator(int i);
 
@@ -190,9 +218,6 @@ class KrpcDataStreamSender : public DataSink {
   /// insertion into the channel fails. Returns OK status otherwise.
   Status HashAndAddRows(RowBatch* batch);
 
-  /// Adds the given row to 'channels_[channel_id]'.
-  Status AddRowToChannel(const int channel_id, TupleRow* row);
-
   /// Functions to dump the content of the "filename to hosts" related 
mappings into logs.
   void DumpFilenameToHostsMapping() const;
   void DumpDestinationHosts() const;
@@ -241,15 +266,24 @@ class KrpcDataStreamSender : public DataSink {
   const std::vector<ScalarExpr*>& partition_exprs_;
   std::vector<ScalarExprEvaluator*> partition_expr_evals_;
 
-  /// Time for serializing row batches.
+  /// Time for serializing row batches. In case of Kudu/Hash partitioning
+  /// this mainly included compression time, while in other cases also
+  /// contains deep copying tuples to OutboundRowBatch.
   RuntimeProfile::Counter* serialize_batch_timer_ = nullptr;
 
+  /// "Active" time spent in TransmitData(). Waiting for the previous RPC to
+  /// finish is not included.
+  RuntimeProfile::Counter* transmit_data_timer_ = nullptr;
+
   /// Number of TransmitData() RPC retries due to remote service being busy.
   RuntimeProfile::Counter* rpc_retry_counter_ = nullptr;
 
   /// Total number of times RPC fails or the remote responds with a 
non-retryable error.
   RuntimeProfile::Counter* rpc_failure_counter_ = nullptr;
 
+  /// Total number of successful TransmitData() and EndDataStream() RPCs.
+  RuntimeProfile::Counter* rpc_success_counter_ = nullptr;
+
   /// Total number of bytes sent. Updated on RPC completion.
   RuntimeProfile::Counter* bytes_sent_counter_ = nullptr;
 
diff --git a/be/src/runtime/outbound-row-batch.cc 
b/be/src/runtime/outbound-row-batch.cc
index 5926a7801..719e25665 100644
--- a/be/src/runtime/outbound-row-batch.cc
+++ b/be/src/runtime/outbound-row-batch.cc
@@ -16,13 +16,20 @@
 // under the License.
 
 #include "runtime/outbound-row-batch.h"
+#include "runtime/outbound-row-batch.inline.h"
 #include "util/compress.h"
 #include "util/scope-exit-trigger.h"
 
 namespace impala {
 
 Status OutboundRowBatch::PrepareForSend(int num_tuples_per_row,
-    TrackedString* compression_scratch) {
+    TrackedString* compression_scratch, bool used_append_row) {
+  if (used_append_row) {
+    DCHECK_GE(tuple_data_.size(), tuple_data_offset_);
+    tuple_data_.resize(tuple_data_offset_);
+  } else {
+    DCHECK_EQ(tuple_data_offset_, 0);
+  }
   bool is_compressed = false;
   int64_t uncompressed_size = tuple_data_.size();
   if (uncompressed_size > 0 && compression_scratch != nullptr) {
@@ -82,4 +89,11 @@ void OutboundRowBatch::SetHeader(int num_rows, int 
num_tuples_per_row,
       is_compressed ? CompressionTypePB::LZ4 : CompressionTypePB::NONE);
 }
 
+void OutboundRowBatch::Reset() {
+  header_.Clear();
+  tuple_offsets_.clear();
+  tuple_data_offset_ = 0;
+  // Do not clear tuple_data_ to avoid unnecessary delete + allocate.
+}
+
 }
diff --git a/be/src/runtime/outbound-row-batch.h 
b/be/src/runtime/outbound-row-batch.h
index f9cb72eb2..716047242 100644
--- a/be/src/runtime/outbound-row-batch.h
+++ b/be/src/runtime/outbound-row-batch.h
@@ -20,6 +20,7 @@
 #include <cstring>
 #include <vector>
 
+#include "codegen/impala-ir.h"
 #include "gen-cpp/row_batch.pb.h"
 #include "kudu/util/slice.h"
 #include "runtime/mem-tracker.h"
@@ -29,7 +30,11 @@ namespace impala {
 template <typename K, typename V> class FixedSizeHashTable;
 class MemTracker;
 class RowBatchSerializeTest;
+class RowDescriptor;
 class RuntimeState;
+class Tuple;
+class TupleDescriptor;
+class TupleRow;
 
 /// A KRPC outbound row batch which contains the serialized row batch header 
and buffers
 /// for holding the tuple offsets and tuple data.
@@ -65,14 +70,31 @@ class OutboundRowBatch {
   // Prepares the outbound row batch for sending over the network. If
   // 'compression_scratch' is not null, then it also tries to compress the 
tuple_data,
   // and swaps tuple_data and compression_scratch if the compressed data is 
smaller.
+  // If 'used_append_row' is true, assumes that AppendRow() was used to 
serialize
+  // the batch and the actual size comes from tuple_data_offset_.
   // Also sets the header.
-  Status PrepareForSend(int num_tuples_per_row, TrackedString* 
compression_scratch);
+  Status PrepareForSend(int num_tuples_per_row, TrackedString* 
compression_scratch,
+      bool used_append_row = false);
+
+  void Reset();
+
+  bool IsEmpty() { return tuple_offsets_.empty(); }
+
+  inline Status IR_ALWAYS_INLINE AppendRow(
+      const TupleRow* row, const RowDescriptor* row_desc);
+
+  // Returns true if the size limit (also used by RowBatch) is reached.
+  // Only used if the batch is serialized with AppendRow().
+  inline bool ReachedSizeLimit();
 
  private:
   friend class IcebergPositionDeleteCollector;
   friend class RowBatch;
   friend class RowBatchSerializeBaseline;
 
+  inline bool IR_ALWAYS_INLINE TryAppendTuple(
+      const Tuple* tuple, const TupleDescriptor* desc);
+
   // Try compressing tuple_data to compression_scratch, swap if compressed 
data is
   // smaller.
   Status TryCompress(TrackedString* compression_scratch, bool* is_compressed);
@@ -91,6 +113,10 @@ class OutboundRowBatch {
 
   /// Contains the actual data of all the tuples. The data could be compressed.
   TrackedString tuple_data_;
+
+  /// Used only if the row batch is filled with AppendRow(). Marks the offset 
of the
+  /// next tuple to write in tuple_data_.
+  int tuple_data_offset_ = 0;
 };
 
 }
diff --git a/be/src/runtime/outbound-row-batch.inline.h 
b/be/src/runtime/outbound-row-batch.inline.h
new file mode 100644
index 000000000..d252db0b0
--- /dev/null
+++ b/be/src/runtime/outbound-row-batch.inline.h
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "runtime/outbound-row-batch.h"
+
+#include "runtime/descriptors.h"
+#include "runtime/tuple.h"
+#include "runtime/tuple-row.h"
+
+namespace impala {
+
+Status OutboundRowBatch::AppendRow(const TupleRow* row, const RowDescriptor* 
row_desc) {
+  DCHECK(row != nullptr);
+  int num_tuples = row_desc->num_tuples_no_inline();
+  vector<TupleDescriptor*>::const_iterator desc =
+      row_desc->tuple_descriptors().begin();
+  for (int j = 0; j < num_tuples; ++desc, ++j) {
+    Tuple* tuple = row->GetTuple(j);
+    if (UNLIKELY(tuple == nullptr)) {
+      // NULLs are encoded as -1
+      tuple_offsets_.push_back(-1);
+      continue;
+    }
+    // Record offset before creating copy (which increments offset and 
tuple_data)
+    tuple_offsets_.push_back(tuple_data_offset_);
+    // Try appending tuple to current tuple_data_. If it doesn't fit to the 
buffer,
+    // get the exact size needed for the tuple and allocate enough memory for 
it.
+    // This allows iterating through the varlen slots of most tuples only once.
+    if (UNLIKELY(!TryAppendTuple(tuple, *desc))) {
+      int64_t tuple_size = tuple->TotalByteSize(**desc, true 
/*assume_smallify*/);
+      int64_t new_size = tuple_data_offset_ + tuple_size;
+      if (new_size > numeric_limits<int32_t>::max()) {
+        return Status(
+            TErrorCode::ROW_BATCH_TOO_LARGE, new_size, 
numeric_limits<int32_t>::max());
+      }
+      // TODO: Based on experience the below logic doubles the buffer size 
instead of
+      // resizing to the exact size, similarly to vector. It would be clearer 
to use a
+      // vector instead of string for tuple_data_, but in the long term it 
would be
+      // better to use a fixed sized buffer (data_stream_sender_buffer_size) 
once var
+      // len data is properly accounted for (see IMPALA-12594 for details).
+      tuple_data_.resize(new_size);
+      tuple_data_.resize(tuple_data_.capacity());
+      DCHECK_GT(tuple_data_.size(), 0);
+      bool retry_successful = TryAppendTuple(tuple, *desc);
+      // As the buffer was resized based on the exact size of the tuple the 
second
+      // attempt must succeed.
+      DCHECK(retry_successful);
+    }
+    DCHECK_LE(tuple_data_offset_, tuple_data_.size());
+  }
+  return Status::OK();
+}
+
+bool OutboundRowBatch::TryAppendTuple(const Tuple* tuple, const 
TupleDescriptor* desc) {
+  DCHECK(tuple != nullptr);
+  DCHECK(desc != nullptr);
+  if (tuple_data_.size() == 0) return false;
+  DCHECK_GT(tuple_data_.size(), 0);
+  uint8_t* dst = reinterpret_cast<uint8_t*>(&tuple_data_[0]) + 
tuple_data_offset_;
+  uint8_t* dst_end = reinterpret_cast<uint8_t*>(&tuple_data_.back()) + 1;
+  return tuple->TryDeepCopy(
+      &dst, dst_end, &tuple_data_offset_, *desc, /* convert_ptrs */ true);
+}
+
+bool OutboundRowBatch::ReachedSizeLimit() {
+    return RowBatch::AT_CAPACITY_MEM_USAGE <= tuple_data_offset_;
+}
+
+}
diff --git a/be/src/runtime/tuple-ir.cc b/be/src/runtime/tuple-ir.cc
index fca74031d..cee822d45 100644
--- a/be/src/runtime/tuple-ir.cc
+++ b/be/src/runtime/tuple-ir.cc
@@ -48,4 +48,89 @@ bool Tuple::CopyStrings(const char* err_ctx, RuntimeState* 
state,
   }
   return true;
 }
+
+template<class T>
+bool TryMemCopy(uint8_t* dst, const uint8_t* dst_end, int size, const T* src) {
+  if (UNLIKELY(dst + size > dst_end)) return false;
+  memcpy(dst, src, size);
+  return true;
+}
+
+bool Tuple::TryDeepCopy(uint8_t** dst_start, const uint8_t* dst_end, int* 
offset_start,
+    const TupleDescriptor& desc, bool convert_ptrs) const {
+  uint8_t* dst = *dst_start;
+  int offset = *offset_start;
+  if (!TryMemCopy(dst, dst_end, desc.byte_size(), this)) return false;
+  Tuple* dst_tuple = reinterpret_cast<Tuple*>(dst);
+  dst += desc.byte_size();
+  offset += desc.byte_size();
+  if (!dst_tuple->TryDeepCopyStrings(&dst, dst_end, &offset, desc, 
convert_ptrs)) {
+    return false;
+  }
+  if (!dst_tuple->TryDeepCopyCollections(&dst, dst_end, &offset, desc, 
convert_ptrs)) {
+    return false;
+  }
+  *dst_start = dst;
+  *offset_start = offset;
+  return true;
+}
+
+bool Tuple::TryDeepCopyStrings(uint8_t** data, const uint8_t* data_end, int* 
offset,
+   const TupleDescriptor& desc, bool convert_ptrs) {
+  vector<SlotDescriptor*>::const_iterator slot = desc.string_slots().begin();
+  for (; slot != desc.string_slots().end(); ++slot) {
+    DCHECK((*slot)->type().IsVarLenStringType());
+    if (IsNull((*slot)->null_indicator_offset())) continue;
+
+    StringValue* string_v = GetStringSlot((*slot)->tuple_offset());
+    // It is safe to smallify at this point as DeepCopyVarlenData is called on 
the new
+    // tuple which can be modified.
+    if (string_v->Smallify()) continue;
+    int len = string_v->Len();
+    DCHECK_GT(len, 0); // Size 0 should be handled by "smallify" case.
+    if (!TryMemCopy(*data, data_end, len, string_v->Ptr())) return false;
+    char* new_ptr = convert_ptrs ? reinterpret_cast<char*>(*offset) :
+                                   reinterpret_cast<char*>(*data);
+    string_v->SetPtr(new_ptr);
+    *data += len;
+    *offset += len;
+  }
+  return true;
+}
+
+bool Tuple::TryDeepCopyCollections(uint8_t** data, const uint8_t* data_end, 
int* offset,
+    const TupleDescriptor& desc, bool convert_ptrs) {
+  vector<SlotDescriptor*>::const_iterator slot = 
desc.collection_slots().begin();
+  for (; slot != desc.collection_slots().end(); ++slot) {
+    DCHECK((*slot)->type().IsCollectionType());
+    if (IsNull((*slot)->null_indicator_offset())) continue;
+
+    CollectionValue* coll_value = GetCollectionSlot((*slot)->tuple_offset());
+    const TupleDescriptor& item_desc = *(*slot)->children_tuple_descriptor();
+    int coll_byte_size = coll_value->num_tuples * item_desc.byte_size();
+    if (!TryMemCopy(*data, data_end, coll_byte_size, coll_value->ptr)) return 
false;
+    uint8_t* coll_data = reinterpret_cast<uint8_t*>(*data);
+
+    coll_value->ptr = convert_ptrs ? reinterpret_cast<uint8_t*>(*offset) : 
coll_data;
+
+    *data += coll_byte_size;
+    *offset += coll_byte_size;
+
+    // Copy per-tuple varlen data if necessary.
+    if (!item_desc.HasVarlenSlots()) continue;
+    for (int i = 0; i < coll_value->num_tuples; ++i) {
+      Tuple* dst_item = reinterpret_cast<Tuple*>(coll_data);
+      if(!dst_item->TryDeepCopyStrings(data, data_end, offset, item_desc, 
convert_ptrs)) {
+        return false;
+      }
+      if(!dst_item->TryDeepCopyCollections(
+          data, data_end, offset, item_desc, convert_ptrs)) {
+        return false;
+      }
+      coll_data += item_desc.byte_size();
+    }
+  }
+  return true;
+}
+
 }
diff --git a/be/src/runtime/tuple-row.h b/be/src/runtime/tuple-row.h
index 30288a397..514bf1c9f 100644
--- a/be/src/runtime/tuple-row.h
+++ b/be/src/runtime/tuple-row.h
@@ -70,7 +70,7 @@ class TupleRow {
     }
   }
 
-  /// TODO: make a macro for doing thisf
+  /// TODO: make a macro for doing this
   /// For C++/IR interop, we need to be able to look up types by name.
   static const char* LLVM_CLASS_NAME;
 
diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h
index 612833cb3..e615a9c13 100644
--- a/be/src/runtime/tuple.h
+++ b/be/src/runtime/tuple.h
@@ -153,6 +153,13 @@ class Tuple {
   void DeepCopy(const TupleDescriptor& desc, char** data, int* offset,
                 bool convert_ptrs = false) const;
 
+  /// Similar to DeepCopy() above, but returns false if the buffer is not 
large enough
+  /// ('*data' would pass 'data_end') instead of assuming that the tuple fits 
to buffer.
+  /// If it fails, 'data' will be the same as before the call, but the buffer
+  /// can be modified (between before '*data' and 'data_end').
+  bool TryDeepCopy(uint8_t** data, const uint8_t* data_end, int* offset,
+      const TupleDescriptor& desc, bool convert_ptrs) const;
+
   /// This function should only be called on tuples created by DeepCopy() with
   /// 'convert_ptrs' = true. It takes all pointers contained in this tuple 
(i.e. in
   /// StringValues and CollectionValues, including those contained within other
@@ -354,6 +361,11 @@ class Tuple {
   void DeepCopyVarlenData(const TupleDescriptor& desc, char** data, int* 
offset,
       bool convert_ptrs);
 
+  bool TryDeepCopyStrings(uint8_t** data, const uint8_t* data_end, int* offset,
+      const TupleDescriptor& desc, bool convert_ptrs);
+  bool TryDeepCopyCollections(uint8_t** data, const uint8_t* data_end, int* 
offset,
+      const TupleDescriptor& desc, bool convert_ptrs);
+
   /// During the construction of hand-crafted codegen'd functions, types 
cannot generally
   /// be looked up by name. In our own types we use the static 
'LLVM_CLASS_NAME' member to
   /// facilitate this, but it cannot be used with other types, for example 
standard


Reply via email to