This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit cf28745d3e4a8d50ac4c713c48d629a08455437c
Author: Csaba Ringhofer <[email protected]>
AuthorDate: Thu Jul 18 16:48:38 2024 +0200

    IMPALA-13246: Smallify strings during broadcast exchange
    
    Other changes:
    - adds smallifying of strings in collections
    - removes Tuple::SmallifyStrings() and smallifies during deep copy
      instead - this seems somewhat slower in TPC-H, but is safer as
      it doesn't rely on the assumption that smallify is always done
      for the whole tuple (it turned out that this is not true in some
      cases)
    
    Measured 0.5-1% improvement in TPCH(42).
    
    note that there is a very similar WIP change:
    https://gerrit.cloudera.org/#/c/21508/
    
    Change-Id: I281d77c7a241ebfe8716eb5c975f0660601aec1b
    Reviewed-on: http://gerrit.cloudera.org:8080/21597
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/benchmarks/row-batch-serialize-benchmark.cc |  3 +-
 be/src/runtime/buffered-tuple-stream-test.cc       |  3 +-
 be/src/runtime/buffered-tuple-stream.cc            |  2 +-
 be/src/runtime/row-batch-serialize-test.cc         |  6 ++-
 be/src/runtime/row-batch.cc                        |  3 +-
 be/src/runtime/smallable-string.h                  | 10 +++++
 be/src/runtime/string-value.h                      |  9 ++++
 be/src/runtime/tuple.cc                            | 50 +++++++++-------------
 be/src/runtime/tuple.h                             | 28 +++++++-----
 9 files changed, 67 insertions(+), 47 deletions(-)

diff --git a/be/src/benchmarks/row-batch-serialize-benchmark.cc 
b/be/src/benchmarks/row-batch-serialize-benchmark.cc
index 26085f035..31af72e5b 100644
--- a/be/src/benchmarks/row-batch-serialize-benchmark.cc
+++ b/be/src/benchmarks/row-batch-serialize-benchmark.cc
@@ -175,7 +175,8 @@ class RowBatchSerializeBaseline {
       for (int j = 0; j < batch->num_tuples_per_row_; ++j) {
         Tuple* tuple = batch->GetRow(i)->GetTuple(j);
         if (tuple == NULL) continue;
-        result += 
tuple->TotalByteSize(*batch->row_desc_->tuple_descriptors()[j]);
+        result += tuple->TotalByteSize(
+            *batch->row_desc_->tuple_descriptors()[j], false 
/*assume_smallify*/);
       }
     }
     return result;
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc 
b/be/src/runtime/buffered-tuple-stream-test.cc
index 098fb0b81..55dce370d 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -1844,7 +1844,8 @@ TEST_F(MultiTupleStreamTest, MultiTupleAddRowCustom) {
       for (int k = 0; k < string_desc_->tuple_descriptors().size(); k++) {
         TupleDescriptor* tuple_desc = string_desc_->tuple_descriptors()[k];
         fixed_size += tuple_desc->byte_size();
-        varlen_size += row->GetTuple(k)->VarlenByteSize(*tuple_desc);
+        varlen_size += row->GetTuple(k)->VarlenByteSize(
+            *tuple_desc, false /*assume_smallify*/);
       }
       uint8_t* data = stream.AddRowCustomBegin(fixed_size + varlen_size, 
&status);
       ASSERT_TRUE(data != nullptr);
diff --git a/be/src/runtime/buffered-tuple-stream.cc 
b/be/src/runtime/buffered-tuple-stream.cc
index cff554c21..19dcb9ff6 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -983,7 +983,7 @@ int64_t BufferedTupleStream::ComputeRowSize(TupleRow* row)
       if (!item_desc.HasVarlenSlots()) continue;
       for (int j = 0; j < cv->num_tuples; ++j) {
         Tuple* item = reinterpret_cast<Tuple*>(&cv->ptr[j * 
item_desc.byte_size()]);
-        size += item->VarlenByteSize(item_desc);
+        size += item->VarlenByteSize(item_desc, false /*assume_smallify*/);
       }
     }
   }
diff --git a/be/src/runtime/row-batch-serialize-test.cc 
b/be/src/runtime/row-batch-serialize-test.cc
index 9fa587e7d..596f9655c 100644
--- a/be/src/runtime/row-batch-serialize-test.cc
+++ b/be/src/runtime/row-batch-serialize-test.cc
@@ -214,7 +214,9 @@ class RowBatchSerializeTest : public testing::Test {
         StringValue* string_value = reinterpret_cast<StringValue*>(slot);
         StringValue* deserialized_string_value =
             reinterpret_cast<StringValue*>(deserialized_slot);
-        EXPECT_NE(string_value->Ptr(), deserialized_string_value->Ptr());
+        if (!deserialized_string_value->IsSmall()) {
+          EXPECT_NE(string_value->Ptr(), deserialized_string_value->Ptr());
+        }
       }
 
       if (type.IsCollectionType()) {
@@ -621,7 +623,7 @@ void RowBatchSerializeTest::TestDupRemoval(bool full_dedup) 
{
   // Serialized data should only have one copy of each tuple.
   int64_t total_byte_size = 0; // Total size without duplication
   for (int i = 0; i < tuples.size(); ++i) {
-    total_byte_size += tuples[i]->TotalByteSize(tuple_desc);
+    total_byte_size += tuples[i]->TotalByteSize(tuple_desc, true 
/*assume_smallify*/);
   }
   EXPECT_EQ(total_byte_size, row_batch.header()->uncompressed_size());
   TestRowBatch(row_desc, batch, false, full_dedup);
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index e66800ca4..811a2af9a 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -462,7 +462,8 @@ int64_t RowBatch::TotalByteSize(DedupMap* distinct_tuples) {
         bool inserted = distinct_tuples->InsertIfNotPresent(tuple, -1);
         if (!inserted) continue;
       }
-      result += tuple->VarlenByteSize(*row_desc_->tuple_descriptors()[j]);
+      result += tuple->VarlenByteSize(
+          *row_desc_->tuple_descriptors()[j], true /*assume_smallify*/);
       ++tuple_count[j];
     }
   }
diff --git a/be/src/runtime/smallable-string.h 
b/be/src/runtime/smallable-string.h
index 24534e7c5..aa2c4a124 100644
--- a/be/src/runtime/smallable-string.h
+++ b/be/src/runtime/smallable-string.h
@@ -141,6 +141,16 @@ class __attribute__((__packed__)) SmallableString {
     }
   }
 
+  /// Returns the number of bytes needed outside the slot itself:
+  /// - if the length is too long to smallify, return length
+  /// - if the length is small enough to smallify:
+  ///   - if assume_smallify is true or the string is already smallified 
return 0
+  ///   - otherwise (not already smallified and assume_smallify is false) 
return length
+  int ExternalLen(bool assume_smallify) const {
+    if (IsSmall() || (assume_smallify && rep.long_rep.len <= SMALL_LIMIT)) 
return 0;
+    return rep.long_rep.len;
+  }
+
   char* Ptr() const {
     if (IsSmall()) {
       return const_cast<char*>(rep.small_rep.buf);
diff --git a/be/src/runtime/string-value.h b/be/src/runtime/string-value.h
index 5bd1f6452..aa48a692d 100644
--- a/be/src/runtime/string-value.h
+++ b/be/src/runtime/string-value.h
@@ -93,6 +93,15 @@ public:
 
   int Len() const { return string_impl_.Len(); }
 
+  /// Returns the number of bytes needed outside the slot itself:
+  /// - if the length is too long to smallify, return length
+  /// - if the length is small enough to smallify:
+  ///   - if assume_smallify is true or the string is already smallified 
return 0
+  ///   - otherwise (not already smallified and assume_smallify is false) 
return length
+  int ExternalLen(bool assume_smallify) const {
+    return string_impl_.ExternalLen(assume_smallify);
+  }
+
   /// Sets the length of this String object. Length can only be decreased.
   void SetLen(int len) { return string_impl_.SetLen(len); }
 
diff --git a/be/src/runtime/tuple.cc b/be/src/runtime/tuple.cc
index 3ddb6f19a..108f86961 100644
--- a/be/src/runtime/tuple.cc
+++ b/be/src/runtime/tuple.cc
@@ -50,23 +50,21 @@ const char* Tuple::MATERIALIZE_EXPRS_NULL_POOL_SYMBOL = 
"MaterializeExprsILb0ELb
 
 Tuple* const Tuple::POISON = reinterpret_cast<Tuple*>(42L);
 
-int64_t Tuple::TotalByteSize(const TupleDescriptor& desc) const {
+int64_t Tuple::TotalByteSize(const TupleDescriptor& desc, bool 
assume_smallify) const {
   int64_t result = desc.byte_size();
   if (!desc.HasVarlenSlots()) return result;
-  result += VarlenByteSize(desc);
+  result += VarlenByteSize(desc, assume_smallify);
   return result;
 }
 
-int64_t Tuple::VarlenByteSize(const TupleDescriptor& desc) const {
+int64_t Tuple::VarlenByteSize(const TupleDescriptor& desc, bool 
assume_smallify) const {
   int64_t result = 0;
   vector<SlotDescriptor*>::const_iterator slot = desc.string_slots().begin();
   for (; slot != desc.string_slots().end(); ++slot) {
     DCHECK((*slot)->type().IsVarLenStringType());
     if (IsNull((*slot)->null_indicator_offset())) continue;
     const StringValue* string_val = GetStringSlot((*slot)->tuple_offset());
-    // Small strings don't require extra storage space in the varlen section.
-    if (string_val->IsSmall()) continue;
-    result += string_val->Len();
+    result += string_val->ExternalLen(assume_smallify);
   }
 
   slot = desc.collection_slots().begin();
@@ -77,27 +75,15 @@ int64_t Tuple::VarlenByteSize(const TupleDescriptor& desc) 
const {
     uint8_t* coll_data = coll_value->ptr;
     const TupleDescriptor& item_desc = *(*slot)->children_tuple_descriptor();
     for (int i = 0; i < coll_value->num_tuples; ++i) {
-      result += reinterpret_cast<Tuple*>(coll_data)->TotalByteSize(item_desc);
+      Tuple* tuple = reinterpret_cast<Tuple*>(coll_data);
+      result += tuple->TotalByteSize(item_desc, assume_smallify);
       coll_data += item_desc.byte_size();
     }
   }
   return result;
 }
 
-inline void Tuple::SmallifyStrings(const TupleDescriptor& desc) {
-  for (const SlotDescriptor* slot : desc.string_slots()) {
-    DCHECK(slot->type().IsVarLenStringType());
-    if (IsNull(slot->null_indicator_offset())) continue;
-    StringValue* string_v = GetStringSlot(slot->tuple_offset());
-    // StringValues are only smallified on a on-demand basis. And we only 
smallify
-    // them in batches for whole tuples. I.e. if we encounter the first small 
string
-    // in a tuple we can assume that the rest of the strings are already 
smallified.
-    if (string_v->IsSmall()) return;
-    string_v->Smallify();
-  }
-}
-
-Tuple* Tuple::DeepCopy(const TupleDescriptor& desc, MemPool* pool) {
+Tuple* Tuple::DeepCopy(const TupleDescriptor& desc, MemPool* pool) const {
   Tuple* result = reinterpret_cast<Tuple*>(pool->Allocate(desc.byte_size()));
   DeepCopy(result, desc, pool);
   return result;
@@ -106,11 +92,9 @@ Tuple* Tuple::DeepCopy(const TupleDescriptor& desc, 
MemPool* pool) {
 // TODO: the logic is very similar to the other DeepCopy implementation aside 
from how
 // memory is allocated - can we templatise it somehow to avoid redundancy 
without runtime
 // overhead.
-void Tuple::DeepCopy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool) {
+void Tuple::DeepCopy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool) 
const {
   if (desc.HasVarlenSlots()) {
     memcpy(dst, this, desc.byte_size());
-    // 'dst' is a new tuple, so it is safe to smallify its string values.
-    dst->SmallifyStrings(desc);
     dst->DeepCopyVarlenData(desc, pool);
   } else {
     memcpy(dst, this, desc.byte_size());
@@ -124,7 +108,9 @@ void Tuple::DeepCopyVarlenData(const TupleDescriptor& desc, 
MemPool* pool) {
     DCHECK((*slot)->type().IsVarLenStringType());
     if (IsNull((*slot)->null_indicator_offset())) continue;
     StringValue* string_v = GetStringSlot((*slot)->tuple_offset());
-    if (string_v->IsSmall()) continue;
+    // It is safe to smallify at this point as DeepCopyVarlenData is called on 
the new
+    // tuple which can be modified.
+    if (string_v->Smallify()) continue;
     char* string_copy = 
reinterpret_cast<char*>(pool->Allocate(string_v->Len()));
     Ubsan::MemCpy(string_copy, string_v->Ptr(), string_v->Len());
     string_v->SetPtr(string_copy);
@@ -151,12 +137,14 @@ void Tuple::DeepCopyVarlenData(const TupleDescriptor& 
desc, MemPool* pool) {
 }
 
 void Tuple::DeepCopy(const TupleDescriptor& desc, char** data, int* offset,
-                     bool convert_ptrs) {
+                     bool convert_ptrs) const {
   Tuple* dst = reinterpret_cast<Tuple*>(*data);
   memcpy(dst, this, desc.byte_size());
   *data += desc.byte_size();
   *offset += desc.byte_size();
-  if (desc.HasVarlenSlots()) dst->DeepCopyVarlenData(desc, data, offset, 
convert_ptrs);
+  if (desc.HasVarlenSlots()) {
+    dst->DeepCopyVarlenData(desc, data, offset, convert_ptrs);
+  }
 }
 
 void Tuple::DeepCopyVarlenData(const TupleDescriptor& desc, char** data, int* 
offset,
@@ -167,7 +155,9 @@ void Tuple::DeepCopyVarlenData(const TupleDescriptor& desc, 
char** data, int* of
     if (IsNull((*slot)->null_indicator_offset())) continue;
 
     StringValue* string_v = GetStringSlot((*slot)->tuple_offset());
-    if (string_v->IsSmall()) continue;
+    // It is safe to smallify at this point as DeepCopyVarlenData is called on 
the new
+    // tuple which can be modified.
+    if (string_v->Smallify()) continue;
     unsigned int len = string_v->Len();
     Ubsan::MemCpy(*data, string_v->Ptr(), len);
     string_v->SetPtr(convert_ptrs ? reinterpret_cast<char*>(*offset) : *data);
@@ -194,8 +184,8 @@ void Tuple::DeepCopyVarlenData(const TupleDescriptor& desc, 
char** data, int* of
     // Copy per-tuple varlen data if necessary.
     if (!item_desc.HasVarlenSlots()) continue;
     for (int i = 0; i < coll_value->num_tuples; ++i) {
-      reinterpret_cast<Tuple*>(coll_data)->DeepCopyVarlenData(
-          item_desc, data, offset, convert_ptrs);
+      Tuple* dst_item = reinterpret_cast<Tuple*>(coll_data);
+      dst_item->DeepCopyVarlenData(item_desc, data, offset, convert_ptrs);
       coll_data += item_desc.byte_size();
     }
   }
diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h
index 0c5334149..612833cb3 100644
--- a/be/src/runtime/tuple.h
+++ b/be/src/runtime/tuple.h
@@ -118,32 +118,40 @@ class Tuple {
 
   /// The total size of all data represented in this tuple (tuple data and 
referenced
   /// string and collection data).
-  int64_t TotalByteSize(const TupleDescriptor& desc) const;
+  /// If 'assume_smallify' is true, calculates with smallified string sizes 
even for not
+  /// (yet) smallified strings.
+  int64_t TotalByteSize(const TupleDescriptor& desc, bool assume_smallify) 
const;
 
   /// The size of all referenced string and collection data. Smallified 
strings aren't
   /// counted here as they don't need extra storage space.
-  int64_t VarlenByteSize(const TupleDescriptor& desc) const;
+  /// If 'assume_smallify' is true, calculates with smallified string sizes 
even for not
+  /// (yet) smallified strings.
+  int64_t VarlenByteSize(const TupleDescriptor& desc, bool assume_smallify) 
const;
 
   /// Create a copy of 'this', including all of its referenced variable-length 
data
   /// (i.e. strings and collections), using pool to allocate memory. Returns 
the copy.
-  Tuple* DeepCopy(const TupleDescriptor& desc, MemPool* pool);
+  /// Also smallifies the copied strings when possible.
+  Tuple* DeepCopy(const TupleDescriptor& desc, MemPool* pool) const;
 
   /// Create a copy of 'this', including all its referenced variable-length 
data
   /// (i.e. strings and collections), using pool to allocate memory. This 
version does
   /// not allocate a tuple, instead copying to 'dst'. 'dst' must already be 
allocated to
-  /// the correct size (i.e. TotalByteSize()).
-  void DeepCopy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool);
+  /// the correct size (i.e. TotalByteSize()). Also smallifies the copied 
strings when
+  /// possible - to get the exact size needed TotalByteSize() has to be called 
with
+  /// 'assume_smallify'=true.
+  void DeepCopy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool) const;
 
   /// Create a copy of 'this', including all referenced variable-length data 
(i.e. strings
   /// and collections), into 'data'. The tuple is written first, followed by 
any
   /// variable-length data. 'data' and 'offset' will be incremented by the 
total number of
   /// bytes written. 'data' must already be allocated to the correct size
-  /// (i.e. TotalByteSize()).
+  /// (i.e. TotalByteSize()). Also smallifies the copied strings when possible 
- to get
+  /// the exact size needed TotalByteSize() has to be called with 
'assume_smallify'=true.
   /// If 'convert_ptrs' is true, rewrites pointers that are part of the tuple 
as offsets
   /// into 'data'. Otherwise they will remain pointers directly into data. The 
offsets are
   /// determined by 'offset', where '*offset' corresponds to address '*data'.
   void DeepCopy(const TupleDescriptor& desc, char** data, int* offset,
-                bool convert_ptrs = false);
+                bool convert_ptrs = false) const;
 
   /// This function should only be called on tuples created by DeepCopy() with
   /// 'convert_ptrs' = true. It takes all pointers contained in this tuple 
(i.e. in
@@ -336,11 +344,13 @@ class Tuple {
 
   /// Copy all referenced string and collection data by allocating memory from 
pool,
   /// copying data, then updating pointers in tuple to reference copied data.
+  /// Also smallifies the copied strings when possible.
   void DeepCopyVarlenData(const TupleDescriptor& desc, MemPool* pool);
 
   /// Copies all referenced string and collection data into 'data'. Increments 
'data' and
   /// 'offset' by the number of bytes written. Recursively writes collection 
tuple data
   /// and referenced collection and string data.
+  /// Also smallifies the copied strings when possible.
   void DeepCopyVarlenData(const TupleDescriptor& desc, char** data, int* 
offset,
       bool convert_ptrs);
 
@@ -383,10 +393,6 @@ class Tuple {
   char* AllocateStrings(const char* err_ctx, RuntimeState* state, int64_t 
bytes,
       MemPool* pool, Status* status) noexcept;
 
-  /// Smallify string values of the tuple. It should only be called for newly 
created
-  /// tuples, e.g. in DeepCopy().
-  void SmallifyStrings(const TupleDescriptor& desc);
-
   // Defined in tuple-ir.cc to force the compilation of the CodegenTypes 
struct.
   void dummy(Tuple::CodegenTypes*);
 };

Reply via email to