This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 2f14fd29c0b47fc2c170a7f0eb1cecaf6b9704f4
Author: Csaba Ringhofer <[email protected]>
AuthorDate: Mon Sep 11 19:34:01 2023 +0200

    IMPALA-12433: Share buffers among channels in KrpcDataStreamSender
    
    Before this patch each KrpcDataStreamSender::Channel had 2
    OutboundRowBatch with its own serialization and compression buffers.
    
    This patch switches to use a single buffer per channel. This is
    enough to store the in-flight data in KRPC, while other buffers
    are only used during serialization and compression which is done for
    just a single channel at a time, so can be shared among channels.
    
    Memory estimates in the planner are not changed because the existing
    calculation has several issues (see IMPALA-12594).
    
    Change-Id: I64854a350a9dae8bf3af11c871882ea4750e60b3
    Reviewed-on: http://gerrit.cloudera.org:8080/20719
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Kurt Deschler <[email protected]>
    Reviewed-by: Zihao Ye <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
---
 be/src/benchmarks/row-batch-serialize-benchmark.cc |  31 +++---
 be/src/runtime/krpc-data-stream-sender.cc          | 109 ++++++++++++---------
 be/src/runtime/krpc-data-stream-sender.h           |  25 +++--
 be/src/runtime/outbound-row-batch.h                |  14 +--
 be/src/runtime/row-batch-serialize-test.cc         |  11 ++-
 be/src/runtime/row-batch.cc                        |  32 +++---
 be/src/runtime/row-batch.h                         |   7 +-
 .../org/apache/impala/planner/DataStreamSink.java  |   2 +
 8 files changed, 135 insertions(+), 96 deletions(-)

diff --git a/be/src/benchmarks/row-batch-serialize-benchmark.cc 
b/be/src/benchmarks/row-batch-serialize-benchmark.cc
index d772855ad..26085f035 100644
--- a/be/src/benchmarks/row-batch-serialize-benchmark.cc
+++ b/be/src/benchmarks/row-batch-serialize-benchmark.cc
@@ -89,7 +89,8 @@ static std::shared_ptr<CharMemTrackerAllocator> 
char_mem_tracker_allocator;
 class RowBatchSerializeBaseline {
  public:
   // Copy of baseline version without dedup logic
-  static int Serialize(RowBatch* batch, OutboundRowBatch* output_batch) {
+  static int Serialize(RowBatch* batch, OutboundRowBatch* output_batch,
+      TrackedString* compression_scratch) {
     RowBatchHeaderPB* header = &output_batch->header_;
     output_batch->tuple_offsets_.clear();
     header->set_num_rows(batch->num_rows_);
@@ -105,7 +106,7 @@ class RowBatchSerializeBaseline {
     SerializeInternal(batch, size, output_batch);
 
     if (size > 0) {
-      // Try compressing tuple_data to compression_scratch_, swap if 
compressed data is
+      // Try compressing tuple_data to compression_scratch, swap if compressed 
data is
       // smaller
       Lz4Compressor compressor(nullptr, false);
       Status status = compressor.Init();
@@ -115,20 +116,20 @@ class RowBatchSerializeBaseline {
 
       int64_t compressed_size = compressor.MaxOutputLen(size);
       DCHECK_GT(compressed_size, 0);
-      if (output_batch->compression_scratch_.size() < compressed_size) {
-        output_batch->compression_scratch_.resize(compressed_size);
+      if (compression_scratch->size() < compressed_size) {
+        compression_scratch->resize(compressed_size);
       }
 
       uint8_t* input = const_cast<uint8_t*>(
           reinterpret_cast<const uint8_t*>(output_batch->tuple_data_.data()));
       uint8_t* compressed_output = const_cast<uint8_t*>(
-          reinterpret_cast<const 
uint8_t*>(output_batch->compression_scratch_.data()));
+          reinterpret_cast<const uint8_t*>(compression_scratch->data()));
       status = compressor.ProcessBlock(
           true, size, input, &compressed_size, &compressed_output);
       DCHECK(status.ok()) << status.GetDetail();
       if (LIKELY(compressed_size < size)) {
-        output_batch->compression_scratch_.resize(compressed_size);
-        output_batch->tuple_data_.swap(output_batch->compression_scratch_);
+        compression_scratch->resize(compressed_size);
+        output_batch->tuple_data_.swap(*compression_scratch);
       }
       VLOG_ROW << "uncompressed size: " << size
                << ", compressed size: " << compressed_size;
@@ -284,7 +285,9 @@ class RowBatchSerializeBenchmark {
     SerializeArgs* args = reinterpret_cast<SerializeArgs*>(data);
     for (int iter = 0; iter < batch_size; ++iter) {
       OutboundRowBatch row_batch(char_mem_tracker_allocator);
-      ABORT_IF_ERROR(args->batch->Serialize(&row_batch, args->full_dedup));
+      TrackedString compression_scratch(*char_mem_tracker_allocator.get());
+      ABORT_IF_ERROR(args->batch->Serialize(&row_batch, args->full_dedup,
+          &compression_scratch));
     }
   }
 
@@ -292,7 +295,8 @@ class RowBatchSerializeBenchmark {
     RowBatch* batch = reinterpret_cast<RowBatch*>(data);
     for (int iter = 0; iter < batch_size; ++iter) {
       OutboundRowBatch row_batch(char_mem_tracker_allocator);
-      RowBatchSerializeBaseline::Serialize(batch, &row_batch);
+      TrackedString compression_scratch(*char_mem_tracker_allocator.get());
+      RowBatchSerializeBaseline::Serialize(batch, &row_batch, 
&compression_scratch);
     }
   }
 
@@ -337,20 +341,23 @@ class RowBatchSerializeBenchmark {
     RowBatch* no_dup_batch =
         obj_pool.Add(new RowBatch(&row_desc, NUM_ROWS, tracker.get()));
     FillBatch(no_dup_batch, 12345, 1, -1);
+    TrackedString compression_scratch(*char_mem_tracker_allocator.get());
+
     OutboundRowBatch no_dup_row_batch(char_mem_tracker_allocator);
-    ABORT_IF_ERROR(no_dup_batch->Serialize(&no_dup_row_batch));
+    ABORT_IF_ERROR(no_dup_batch->Serialize(&no_dup_row_batch, 
&compression_scratch));
 
     RowBatch* adjacent_dup_batch =
         obj_pool.Add(new RowBatch(&row_desc, NUM_ROWS, tracker.get()));
     FillBatch(adjacent_dup_batch, 12345, 5, -1);
     OutboundRowBatch adjacent_dup_row_batch(char_mem_tracker_allocator);
-    ABORT_IF_ERROR(adjacent_dup_batch->Serialize(&adjacent_dup_row_batch, 
false));
+    ABORT_IF_ERROR(adjacent_dup_batch->Serialize(&adjacent_dup_row_batch,
+        false, &compression_scratch));
 
     RowBatch* dup_batch = obj_pool.Add(new RowBatch(&row_desc, NUM_ROWS, 
tracker.get()));
     // Non-adjacent duplicates.
     FillBatch(dup_batch, 12345, 1, NUM_ROWS / 5);
     OutboundRowBatch dup_row_batch(char_mem_tracker_allocator);
-    ABORT_IF_ERROR(dup_batch->Serialize(&dup_row_batch, true));
+    ABORT_IF_ERROR(dup_batch->Serialize(&dup_row_batch, true, 
&compression_scratch));
 
     int baseline;
     Benchmark ser_suite("serialize");
diff --git a/be/src/runtime/krpc-data-stream-sender.cc 
b/be/src/runtime/krpc-data-stream-sender.cc
index ef4aec260..45d3714fc 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -46,6 +46,7 @@
 #include "runtime/tuple-row.h"
 #include "service/data-stream-service.h"
 #include "util/aligned-new.h"
+#include "util/compress.h"
 #include "util/debug-util.h"
 #include "util/network-util.h"
 #include "util/pretty-printer.h"
@@ -118,12 +119,18 @@ void KrpcDataStreamSenderConfig::Close() {
 // calls above will return before the RPC has completed but they may block if 
there
 // is already an in-flight RPC.
 //
-// Each channel internally has two OutboundRowBatch to serialize to. They are 
reused
-// across multiple RPC calls. Having two OutboundRowBatch allows client to 
serialize
-// the next row batch while the current row batch is being sent. Upon 
completion of
-// a RPC, the callback TransmitDataCompleteCb() is invoked. If the RPC fails 
due to
-// remote service's queue being full, TransmitDataCompleteCb() will schedule 
the retry
-// callback RetryCb() after some delay dervied from 
'FLAGS_rpc_retry_internal_ms'.
+// Each channel owns a single OutboundRowBatch outbound_batch_. This holds the 
buffers
+// backing the in-flight RPC or kept as reserve if there is no ongoing RPC. 
The actual
+// serialization happens to a shared OutboundRowBatch (see IMPALA-12433 for 
details) which
+// can be swapped with outbound_batch_ once the previous in-flight RPC is 
finished.
+// This means that the client can serialize the next row batch while the 
current row batch
+// is being sent. outbound_batch_ is only used in the partitioned case - in the
+// unpartitioned case the shared outbound_batch_ holds the data for the the 
in-flight RPC.
+//
+// Upon completion of a RPC, the callback TransmitDataCompleteCb() is invoked. 
If the RPC
+// fails due to remote service's queue being full, TransmitDataCompleteCb() 
will schedule
+// the retry callback RetryCb() after some delay dervied from
+// 'FLAGS_rpc_retry_internal_ms'.
 //
 // When a data stream sender is shut down, it will call Teardown() on all 
channels to
 // release resources. Teardown() will cancel any in-flight RPC and wait for the
@@ -175,7 +182,10 @@ class KrpcDataStreamSender::Channel : public 
CacheLineAligned {
   // preceding RPC is still in-flight. This is expected to be called from the 
fragment
   // instance execution thread. Return error status if initialization of the 
RPC request
   // parameters failed or if the preceding RPC failed. Returns OK otherwise.
-  Status TransmitData(const OutboundRowBatch* outbound_batch);
+  // If 'swap_batch' is true, 'outbound_batch' is swapped with the Channel's
+  // 'outbound_batch_'. This happens after the previous RPC is finished so its 
buffers
+  // can be reused.
+  Status TransmitData(std::unique_ptr<OutboundRowBatch>* outbound_batch, bool 
swap_batch);
 
   // Copies a single row into this channel's row batch and flushes the row 
batch once
   // it reaches capacity. This call may block if the row batch's capacity is 
reached
@@ -231,20 +241,14 @@ class KrpcDataStreamSender::Channel : public 
CacheLineAligned {
   // Only used if the partitioning scheme is "KUDU" or "HASH_PARTITIONED".
   scoped_ptr<RowBatch> batch_;
 
-  // The outbound row batches are double-buffered so that we can serialize the 
next
-  // batch while the other is still referenced by the in-flight RPC. Each 
entry contains
+  // Owns an outbound row batch that can be referenced by the in-flight RPC. 
Contains
   // a RowBatchHeaderPB and the buffers for the serialized tuple offsets and 
data.
   //
-  // TODO: replace this with an actual queue. Schedule another RPC callback in 
the
+  // TODO: replace this with a queue. Schedule another RPC callback in the
   // completion callback if the queue is not empty.
-  // TODO: rethink whether to keep per-channel buffers vs having all buffers 
in the
-  // datastream sender and sharing them across all channels. These buffers are 
not used in
+  // datastream sender and sharing them across all channels. This buffer is 
not used in
   // "UNPARTITIONED" scheme.
-  std::vector<OutboundRowBatch> outbound_batches_;
-
-  // Index into 'outbound_batches_' for the next available OutboundRowBatch to 
serialize
-  // into. This is read and written by the main execution thread.
-  int next_batch_idx_ = 0;
+  std::unique_ptr<OutboundRowBatch> outbound_batch_;
 
   // Synchronize accesses to the following fields between the main execution 
thread and
   // the KRPC reactor thread. Note that there should be only one reactor 
thread invoking
@@ -383,10 +387,9 @@ Status KrpcDataStreamSender::Channel::Init(
   // Create a DataStreamService proxy to the destination.
   RETURN_IF_ERROR(DataStreamService::GetProxy(address_, hostname_, &proxy_));
 
-  // Init outbound_batches_.
-  for (int i = 0; i < NUM_OUTBOUND_BATCHES; ++i) {
-    outbound_batches_.emplace_back(allocator, is_local_);
-  }
+  // Init outbound_batch_.
+  outbound_batch_.reset(new OutboundRowBatch(allocator));
+
   return Status::OK();
 }
 
@@ -584,10 +587,10 @@ Status KrpcDataStreamSender::Channel::DoTransmitDataRpc() 
{
 }
 
 Status KrpcDataStreamSender::Channel::TransmitData(
-    const OutboundRowBatch* outbound_batch) {
+    unique_ptr<OutboundRowBatch>* outbound_batch, bool swap_batch) {
   VLOG_ROW << "Channel::TransmitData() fragment_instance_id="
            << PrintId(fragment_instance_id_) << " dest_node=" << dest_node_id_
-           << " #rows=" << outbound_batch->header()->num_rows();
+           << " #rows=" << outbound_batch->get()->header()->num_rows();
   std::unique_lock<SpinLock> l(lock_);
   RETURN_IF_ERROR(WaitForRpcLocked(&l));
   DCHECK(!rpc_in_flight_);
@@ -596,20 +599,25 @@ Status KrpcDataStreamSender::Channel::TransmitData(
   // TODO: Needs better solution for IMPALA-3990 in the long run.
   if (UNLIKELY(remote_recvr_closed_)) return Status::OK();
   rpc_in_flight_ = true;
-  rpc_in_flight_batch_ = outbound_batch;
+  rpc_in_flight_batch_ = outbound_batch->get();
   RETURN_IF_ERROR(DoTransmitDataRpc());
+  // At this point the previous RPC must be already finished and the previous 
buffer
+  // in outbound_batch_ can be reused.
+  DCHECK_NE(rpc_in_flight_batch_, outbound_batch_.get());
+  if (swap_batch) outbound_batch_.swap(*outbound_batch);
   return Status::OK();
 }
 
 Status KrpcDataStreamSender::Channel::SerializeAndSendBatch(RowBatch* batch) {
-  OutboundRowBatch* outbound_batch = &outbound_batches_[next_batch_idx_];
+  unique_ptr<OutboundRowBatch>* serialization_batch = 
&parent_->serialization_batch_;
+  DCHECK(serialization_batch->get() != nullptr);
   // Reads 'rpc_in_flight_batch_' without acquiring 'lock_', so reads can be 
racey.
   ANNOTATE_IGNORE_READS_BEGIN();
-  DCHECK(outbound_batch != rpc_in_flight_batch_);
+  DCHECK(serialization_batch->get() != rpc_in_flight_batch_);
   ANNOTATE_IGNORE_READS_END();
-  RETURN_IF_ERROR(parent_->SerializeBatch(batch, outbound_batch));
-  RETURN_IF_ERROR(TransmitData(outbound_batch));
-  next_batch_idx_ = (next_batch_idx_ + 1) % NUM_OUTBOUND_BATCHES;
+  RETURN_IF_ERROR(parent_->SerializeBatch(batch, serialization_batch->get(), 
!is_local_));
+  // Swap serialization_batch with outbound_batch_ once the old RPC is 
finished.
+  RETURN_IF_ERROR(TransmitData(serialization_batch, true /*swap_batch*/));
   return Status::OK();
 }
 
@@ -722,7 +730,7 @@ void KrpcDataStreamSender::Channel::Teardown(RuntimeState* 
state) {
     while (rpc_in_flight_) rpc_done_cv_.wait(l);
   }
   batch_.reset();
-  outbound_batches_.clear();
+  outbound_batch_.reset(nullptr);
 }
 
 KrpcDataStreamSender::KrpcDataStreamSender(TDataSinkId sink_id, int sender_id,
@@ -806,16 +814,17 @@ Status KrpcDataStreamSender::Prepare(
       new MemTracker(-1, "RowBatchSerialization", mem_tracker_.get()));
   char_mem_tracker_allocator_.reset(
       new CharMemTrackerAllocator(outbound_rb_mem_tracker_));
+
   string process_address =
        NetworkAddressPBToString(ExecEnv::GetInstance()->krpc_address());
-  for (int i = 0; i < NUM_OUTBOUND_BATCHES; ++i) {
-    // Only skip compression if there is a single channel and destination is 
in the same
-    // process. TODO: could be optimized to send the uncompressed buffer to 
the local
-    // targets to avoid decompression cost at the receiver.
-    bool is_local = channels_.size() == 1 && channels_[0]->IsLocal();
-    outbound_batches_.emplace_back(char_mem_tracker_allocator_, is_local);
+
+  serialization_batch_.reset(new 
OutboundRowBatch(char_mem_tracker_allocator_));
+  if (partition_type_ == TPartitionType::UNPARTITIONED) {
+    in_flight_batch_.reset(new OutboundRowBatch(char_mem_tracker_allocator_));
   }
 
+  compression_scratch_.reset(new 
TrackedString(*char_mem_tracker_allocator_.get()));
+
   for (int i = 0; i < channels_.size(); ++i) {
     RETURN_IF_ERROR(channels_[i]->Init(state, char_mem_tracker_allocator_));
   }
@@ -1034,14 +1043,22 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, 
RowBatch* batch) {
 
   if (batch->num_rows() == 0) return Status::OK();
   if (partition_type_ == TPartitionType::UNPARTITIONED) {
-    OutboundRowBatch* outbound_batch = &outbound_batches_[next_batch_idx_];
-    RETURN_IF_ERROR(SerializeBatch(batch, outbound_batch, channels_.size()));
+    // Only skip compression if there is a single channel and destination is 
in the same
+    // process. TODO: could be optimized to send the uncompressed buffer to 
the local
+    // targets to avoid decompression cost at the receiver.
+    bool is_local = channels_.size() == 1 && channels_[0]->IsLocal();
+    RETURN_IF_ERROR(SerializeBatch(
+        batch, serialization_batch_.get(), !is_local,  channels_.size()));
     // TransmitData() will block if there are still in-flight rpcs (and those 
will
-    // reference the previously written serialized batch).
+    // reference the previously written in_flight_batch_).
     for (int i = 0; i < channels_.size(); ++i) {
-      RETURN_IF_ERROR(channels_[i]->TransmitData(outbound_batch));
+      // Do not swap serialization_batch_ with the channel's outbound_batch_ 
to allow
+      // multiple channels to use the data backing serialization_batch_ in 
parallel.
+      RETURN_IF_ERROR(
+          channels_[i]->TransmitData(&serialization_batch_, false 
/*swap_batch*/));
     }
-    next_batch_idx_ = (next_batch_idx_ + 1) % NUM_OUTBOUND_BATCHES;
+    // At this point no RPCs can still refer to the old in_flight_batch_.
+    in_flight_batch_.swap(serialization_batch_);
   } else if (partition_type_ == TPartitionType::RANDOM || channels_.size() == 
1) {
     // Round-robin batches among channels. Wait for the current channel to 
finish its
     // rpc before overwriting its batch.
@@ -1197,7 +1214,10 @@ void KrpcDataStreamSender::Close(RuntimeState* state) {
     channels_[i]->Teardown(state);
   }
 
-  outbound_batches_.clear();
+  serialization_batch_.reset(nullptr);
+  in_flight_batch_.reset(nullptr);
+  compression_scratch_.reset(nullptr);
+
   if (outbound_rb_mem_tracker_.get() != nullptr) {
     outbound_rb_mem_tracker_->Close();
   }
@@ -1208,11 +1228,12 @@ void KrpcDataStreamSender::Close(RuntimeState* state) {
 }
 
 Status KrpcDataStreamSender::SerializeBatch(
-    RowBatch* src, OutboundRowBatch* dest, int num_receivers) {
+    RowBatch* src, OutboundRowBatch* dest, bool compress, int num_receivers) {
   VLOG_ROW << "serializing " << src->num_rows() << " rows";
   {
     SCOPED_TIMER(serialize_batch_timer_);
-    RETURN_IF_ERROR(src->Serialize(dest));
+    RETURN_IF_ERROR(
+        src->Serialize(dest, compress ? compression_scratch_.get() : nullptr));
     int64_t uncompressed_bytes = RowBatch::GetDeserializedSize(*dest);
     COUNTER_ADD(uncompressed_bytes_counter_, uncompressed_bytes * 
num_receivers);
   }
diff --git a/be/src/runtime/krpc-data-stream-sender.h 
b/be/src/runtime/krpc-data-stream-sender.h
index 1b1b78813..2d26e3adf 100644
--- a/be/src/runtime/krpc-data-stream-sender.h
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -166,9 +166,11 @@ class KrpcDataStreamSender : public DataSink {
 
   /// Serializes the src batch into the serialized row batch 'dest' and updates
   /// various stat counters.
+  /// 'compress' decides whether compression is attempted after serialization.
   /// 'num_receivers' is the number of receivers this batch will be sent to. 
Used for
   /// updating the stat counters.
-  Status SerializeBatch(RowBatch* src, OutboundRowBatch* dest, int 
num_receivers = 1);
+  Status SerializeBatch(
+      RowBatch* src, OutboundRowBatch* dest, bool compress, int num_receivers 
= 1);
 
   /// Returns 'partition_expr_evals_[i]'. Used by the codegen'd HashRow() IR 
function.
   ScalarExprEvaluator* GetPartitionExprEvaluator(int i);
@@ -211,15 +213,20 @@ class KrpcDataStreamSender : public DataSink {
   /// Index of the current channel to send to if random_ == true.
   int current_channel_idx_ = 0;
 
-  /// Index of the next OutboundRowBatch to use for serialization.
-  int next_batch_idx_ = 0;
+  /// Pointer to OutboundRowBatch that will be used for serialization.
+  /// Swapped with in_flight_batch_ (UNPARTITIONED case) or the channel's 
outbound_batch_
+  /// (PARTITIONED case) after serialization.
+  std::unique_ptr<OutboundRowBatch> serialization_batch_;
 
-  /// The outbound row batches are double-buffered so that we can serialize 
the next
-  /// batch while the other is still referenced by the in-flight RPC. Each 
entry contains
-  /// a RowBatchHeaderPB and buffers for the serialized tuple offsets and 
data. Used only
-  /// when the partitioning strategy is UNPARTITIONED.
-  static const int NUM_OUTBOUND_BATCHES = 2;
-  std::vector<OutboundRowBatch> outbound_batches_;
+  /// Buffer used for compression after serialization. Swapped with the 
OutboundRowBatch's
+  /// tuple_data_ if the compressed data is smaller.
+  std::unique_ptr<TrackedString> compression_scratch_;
+
+  /// Pointer to OutboundRowBatch referenced by the in-flight RPC(s). Used only
+  /// when the partitioning strategy is UNPARTITIONED. In the broadcasting case
+  /// multiple channels use it at the same time to hold the buffers backing 
the RPC
+  /// sidecars.
+  std::unique_ptr<OutboundRowBatch> in_flight_batch_;
 
   /// If true, this sender has called FlushFinal() successfully.
   /// Not valid to call Send() anymore.
diff --git a/be/src/runtime/outbound-row-batch.h 
b/be/src/runtime/outbound-row-batch.h
index 4411c6da4..8adb6e847 100644
--- a/be/src/runtime/outbound-row-batch.h
+++ b/be/src/runtime/outbound-row-batch.h
@@ -35,10 +35,9 @@ class RuntimeState;
 /// for holding the tuple offsets and tuple data.
 class OutboundRowBatch {
  public:
-  OutboundRowBatch(std::shared_ptr<CharMemTrackerAllocator> allocator,
-      bool skip_compression=false)
-    : tuple_data_(*allocator.get()), compression_scratch_(*allocator.get()),
-      skip_compression_(skip_compression) {}
+
+  OutboundRowBatch(std::shared_ptr<CharMemTrackerAllocator> allocator)
+    : tuple_data_(*allocator.get()) {}
 
   const RowBatchHeaderPB* header() const { return &header_; }
 
@@ -79,13 +78,6 @@ class OutboundRowBatch {
 
   /// Contains the actual data of all the tuples. The data could be compressed.
   TrackedString tuple_data_;
-
-  /// Contains the compression scratch for the compressed data in 
serialization.
-  /// The compression_scratch_ will be swapped with tuple_data_ if the 
compressed data
-  /// is shorter.
-  TrackedString compression_scratch_;
-
-  bool skip_compression_;
 };
 
 }
diff --git a/be/src/runtime/row-batch-serialize-test.cc 
b/be/src/runtime/row-batch-serialize-test.cc
index c5a5d9c4b..9fa587e7d 100644
--- a/be/src/runtime/row-batch-serialize-test.cc
+++ b/be/src/runtime/row-batch-serialize-test.cc
@@ -81,8 +81,9 @@ class RowBatchSerializeTest : public testing::Test {
       bool print_batches, bool full_dedup = false) {
     if (print_batches) cout << PrintBatch(batch) << endl;
 
+    TrackedString compression_scratch(*char_mem_tracker_allocator_.get());
     OutboundRowBatch row_batch(char_mem_tracker_allocator_);
-    RETURN_IF_ERROR(batch->Serialize(&row_batch, full_dedup));
+    RETURN_IF_ERROR(batch->Serialize(&row_batch, full_dedup, 
&compression_scratch));
 
     RowBatch deserialized_batch(&row_desc, row_batch, tracker_.get());
     if (print_batches) cout << PrintBatch(&deserialized_batch) << endl;
@@ -613,8 +614,10 @@ void RowBatchSerializeTest::TestDupRemoval(bool 
full_dedup) {
   vector<Tuple*> tuples;
   CreateTuples(tuple_desc, batch->tuple_data_pool(), num_distinct_tuples, 0, 
10, &tuples);
   AddTuplesToRowBatch(num_rows, tuples, repeats, batch);
+
+  TrackedString compression_scratch(*char_mem_tracker_allocator_.get());
   OutboundRowBatch row_batch(char_mem_tracker_allocator_);
-  EXPECT_OK(batch->Serialize(&row_batch, full_dedup));
+  EXPECT_OK(batch->Serialize(&row_batch, full_dedup, &compression_scratch));
   // Serialized data should only have one copy of each tuple.
   int64_t total_byte_size = 0; // Total size without duplication
   for (int i = 0; i < tuples.size(); ++i) {
@@ -751,8 +754,10 @@ TEST_F(RowBatchSerializeTest, DedupPathologicalFull) {
   // Full dedup should be automatically enabled because of row batch structure.
   EXPECT_TRUE(UseFullDedup(batch));
   LOG(INFO) << "Serializing row batch";
+
+  TrackedString compression_scratch(*char_mem_tracker_allocator_.get());
   OutboundRowBatch row_batch(char_mem_tracker_allocator_);
-  EXPECT_OK(batch->Serialize(&row_batch));
+  EXPECT_OK(batch->Serialize(&row_batch, &compression_scratch));
   LOG(INFO) << "Serialized batch size: " << 
row_batch.TupleDataAsSlice().size();
   LOG(INFO) << "Serialized batch uncompressed size: "
             << row_batch.header()->uncompressed_size();
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index 144734dfe..a8b4a7730 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -220,11 +220,13 @@ RowBatch::~RowBatch() {
   tuple_ptrs_ = nullptr;
 }
 
-Status RowBatch::Serialize(OutboundRowBatch* output_batch) {
-  return Serialize(output_batch, UseFullDedup());
+Status RowBatch::Serialize(
+     OutboundRowBatch* output_batch, TrackedString* compression_scratch) {
+  return Serialize(output_batch, UseFullDedup(), compression_scratch);
 }
 
-Status RowBatch::Serialize(OutboundRowBatch* output_batch, bool full_dedup) {
+Status RowBatch::Serialize(
+    OutboundRowBatch* output_batch, bool full_dedup, TrackedString* 
compression_scratch) {
   bool is_compressed;
   output_batch->tuple_offsets_.clear();
 
@@ -251,8 +253,8 @@ Status RowBatch::Serialize(OutboundRowBatch* output_batch, 
bool full_dedup) {
   }
   output_batch->tuple_data_.resize(size);
 
-  RETURN_IF_ERROR(Serialize(
-      full_dedup ? &distinct_tuples : nullptr, output_batch, &is_compressed, 
size));
+  RETURN_IF_ERROR(Serialize(full_dedup ? &distinct_tuples : nullptr,
+      output_batch, &is_compressed, size, compression_scratch));
 
   // Initialize the RowBatchHeaderPB
   RowBatchHeaderPB* header = &output_batch->header_;
@@ -266,7 +268,7 @@ Status RowBatch::Serialize(OutboundRowBatch* output_batch, 
bool full_dedup) {
 }
 
 Status RowBatch::Serialize(DedupMap* distinct_tuples, OutboundRowBatch* 
output_batch,
-    bool* is_compressed, int64_t size) {
+    bool* is_compressed, int64_t size, TrackedString* compression_scratch) {
   char* tuple_data = const_cast<char*>(output_batch->tuple_data_.data());
   std::vector<int32_t>* tuple_offsets = &output_batch->tuple_offsets_;
 
@@ -274,9 +276,9 @@ Status RowBatch::Serialize(DedupMap* distinct_tuples, 
OutboundRowBatch* output_b
 
   *is_compressed = false;
 
-  if (size > 0 && !output_batch->skip_compression_) {
-    // Try compressing tuple_data to compression_scratch_, swap if compressed 
data is
-    // smaller
+  if (size > 0 && compression_scratch != nullptr) {
+    // Try compressing tuple_data to compression_scratch, swap if compressed 
data is
+    // smaller.
     Lz4Compressor compressor(nullptr, false);
     RETURN_IF_ERROR(compressor.Init());
     auto compressor_cleanup =
@@ -288,19 +290,21 @@ Status RowBatch::Serialize(DedupMap* distinct_tuples, 
OutboundRowBatch* output_b
       return Status(TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE, size);
     }
     DCHECK_GT(compressed_size, 0);
-    if (output_batch->compression_scratch_.size() < compressed_size) {
-      output_batch->compression_scratch_.resize(compressed_size);
+    if (compression_scratch->size() < compressed_size) {
+      compression_scratch->resize(compressed_size);
     }
 
     uint8_t* input = reinterpret_cast<uint8_t*>(tuple_data);
     uint8_t* compressed_output = const_cast<uint8_t*>(
-        reinterpret_cast<const 
uint8_t*>(output_batch->compression_scratch_.data()));
+        reinterpret_cast<const uint8_t*>(compression_scratch->data()));
     RETURN_IF_ERROR(
         compressor.ProcessBlock(true, size, input, &compressed_size, 
&compressed_output));
     if (LIKELY(compressed_size < size)) {
-      output_batch->compression_scratch_.resize(compressed_size);
-      output_batch->tuple_data_.swap(output_batch->compression_scratch_);
+      compression_scratch->resize(compressed_size);
+      output_batch->tuple_data_.swap(*compression_scratch);
       *is_compressed = true;
+      // TODO: could copy to a smaller buffer if compressed data is much 
smaller to
+      //       save memory
     }
     VLOG_ROW << "uncompressed size: " << size << ", compressed size: " << 
compressed_size;
   }
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 80e9ac054..3626d7dce 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -331,7 +331,7 @@ class RowBatch {
   /// larger than the uncompressed data. Use output_batch.compression_type to 
determine
   /// whether tuple_data is compressed. If an in-flight row is present in this 
row batch,
   /// it is ignored. This function does not Reset().
-  Status Serialize(OutboundRowBatch* output_batch);
+  Status Serialize(OutboundRowBatch* output_batch, TrackedString* 
compression_scratch);
 
   /// Utility function: returns total byte size of a batch in either 
serialized or
   /// deserialized form. If a row batch is compressed, its serialized size can 
be much
@@ -411,7 +411,8 @@ class RowBatch {
   bool UseFullDedup();
 
   /// Overload for testing that allows the test to force the deduplication 
level.
-  Status Serialize(OutboundRowBatch* output_batch, bool full_dedup);
+  Status Serialize(OutboundRowBatch* output_batch, bool full_dedup,
+      TrackedString* compression_scratch);
 
   typedef FixedSizeHashTable<Tuple*, int> DedupMap;
 
@@ -435,7 +436,7 @@ class RowBatch {
   ///
   /// Returns error status if serialization failed. Returns OK otherwise.
   Status Serialize(DedupMap* distinct_tuples, OutboundRowBatch* output_batch,
-      bool* is_compressed, int64_t size);
+      bool* is_compressed, int64_t size, TrackedString* compression_scratch);
 
   /// Implementation for protobuf to deserialize a row batch.
   ///
diff --git a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java 
b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
index 6052226b6..b379da757 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
@@ -67,6 +67,8 @@ public class DataStreamSink extends DataSink {
    * equal to avgOutboundRowBatchSize. If outputPartiton_ is partitioned, all 
of the
    * channel's OutboundRowBatches are used. Otherwise, only a pair of 
OutboundRowBatches
    * in KrpcDataStreamSender class are used.
+   * TODO: this function can both over and under estimate the memory need
+   *       see IMPALA-12594 + IMPALA-12433
    */
   private long estimateOutboundRowBatchBuffers(TQueryOptions queryOptions) {
     int numChannels =

Reply via email to