This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit cf28745d3e4a8d50ac4c713c48d629a08455437c Author: Csaba Ringhofer <[email protected]> AuthorDate: Thu Jul 18 16:48:38 2024 +0200 IMPALA-13246: Smallify strings during broadcast exchange Other changes: - adds smallifying of strings in collections - removes Tuple::SmallifyStrings() and smallifies during deep copy instead - this seems somewhat slower in TPC-H, but is safer as it doesn't rely on the assumption that smallify is always done for the whole tuple (it turned out that this is not true in some cases) Measured 0.5-1% improvement in TPCH(42). note that there is a very similar WIP change: https://gerrit.cloudera.org/#/c/21508/ Change-Id: I281d77c7a241ebfe8716eb5c975f0660601aec1b Reviewed-on: http://gerrit.cloudera.org:8080/21597 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/benchmarks/row-batch-serialize-benchmark.cc | 3 +- be/src/runtime/buffered-tuple-stream-test.cc | 3 +- be/src/runtime/buffered-tuple-stream.cc | 2 +- be/src/runtime/row-batch-serialize-test.cc | 6 ++- be/src/runtime/row-batch.cc | 3 +- be/src/runtime/smallable-string.h | 10 +++++ be/src/runtime/string-value.h | 9 ++++ be/src/runtime/tuple.cc | 50 +++++++++------------- be/src/runtime/tuple.h | 28 +++++++----- 9 files changed, 67 insertions(+), 47 deletions(-) diff --git a/be/src/benchmarks/row-batch-serialize-benchmark.cc b/be/src/benchmarks/row-batch-serialize-benchmark.cc index 26085f035..31af72e5b 100644 --- a/be/src/benchmarks/row-batch-serialize-benchmark.cc +++ b/be/src/benchmarks/row-batch-serialize-benchmark.cc @@ -175,7 +175,8 @@ class RowBatchSerializeBaseline { for (int j = 0; j < batch->num_tuples_per_row_; ++j) { Tuple* tuple = batch->GetRow(i)->GetTuple(j); if (tuple == NULL) continue; - result += tuple->TotalByteSize(*batch->row_desc_->tuple_descriptors()[j]); + result += tuple->TotalByteSize( + *batch->row_desc_->tuple_descriptors()[j], false /*assume_smallify*/); } } return result; diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc index 098fb0b81..55dce370d 100644 --- a/be/src/runtime/buffered-tuple-stream-test.cc +++ b/be/src/runtime/buffered-tuple-stream-test.cc @@ -1844,7 +1844,8 @@ TEST_F(MultiTupleStreamTest, MultiTupleAddRowCustom) { for (int k = 0; k < string_desc_->tuple_descriptors().size(); k++) { TupleDescriptor* tuple_desc = string_desc_->tuple_descriptors()[k]; fixed_size += tuple_desc->byte_size(); - varlen_size += row->GetTuple(k)->VarlenByteSize(*tuple_desc); + varlen_size += row->GetTuple(k)->VarlenByteSize( + *tuple_desc, false /*assume_smallify*/); } uint8_t* data = stream.AddRowCustomBegin(fixed_size + varlen_size, &status); ASSERT_TRUE(data != nullptr); diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc index cff554c21..19dcb9ff6 100644 --- a/be/src/runtime/buffered-tuple-stream.cc +++ b/be/src/runtime/buffered-tuple-stream.cc @@ -983,7 +983,7 @@ int64_t BufferedTupleStream::ComputeRowSize(TupleRow* row) if (!item_desc.HasVarlenSlots()) continue; for (int j = 0; j < cv->num_tuples; ++j) { Tuple* item = reinterpret_cast<Tuple*>(&cv->ptr[j * item_desc.byte_size()]); - size += item->VarlenByteSize(item_desc); + size += item->VarlenByteSize(item_desc, false /*assume_smallify*/); } } } diff --git a/be/src/runtime/row-batch-serialize-test.cc b/be/src/runtime/row-batch-serialize-test.cc index 9fa587e7d..596f9655c 100644 --- a/be/src/runtime/row-batch-serialize-test.cc +++ b/be/src/runtime/row-batch-serialize-test.cc @@ -214,7 +214,9 @@ class RowBatchSerializeTest : public testing::Test { StringValue* string_value = reinterpret_cast<StringValue*>(slot); StringValue* deserialized_string_value = reinterpret_cast<StringValue*>(deserialized_slot); - EXPECT_NE(string_value->Ptr(), deserialized_string_value->Ptr()); + if (!deserialized_string_value->IsSmall()) { + EXPECT_NE(string_value->Ptr(), deserialized_string_value->Ptr()); + } } if (type.IsCollectionType()) { @@ -621,7 +623,7 @@ void RowBatchSerializeTest::TestDupRemoval(bool full_dedup) { // Serialized data should only have one copy of each tuple. int64_t total_byte_size = 0; // Total size without duplication for (int i = 0; i < tuples.size(); ++i) { - total_byte_size += tuples[i]->TotalByteSize(tuple_desc); + total_byte_size += tuples[i]->TotalByteSize(tuple_desc, true /*assume_smallify*/); } EXPECT_EQ(total_byte_size, row_batch.header()->uncompressed_size()); TestRowBatch(row_desc, batch, false, full_dedup); diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc index e66800ca4..811a2af9a 100644 --- a/be/src/runtime/row-batch.cc +++ b/be/src/runtime/row-batch.cc @@ -462,7 +462,8 @@ int64_t RowBatch::TotalByteSize(DedupMap* distinct_tuples) { bool inserted = distinct_tuples->InsertIfNotPresent(tuple, -1); if (!inserted) continue; } - result += tuple->VarlenByteSize(*row_desc_->tuple_descriptors()[j]); + result += tuple->VarlenByteSize( + *row_desc_->tuple_descriptors()[j], true /*assume_smallify*/); ++tuple_count[j]; } } diff --git a/be/src/runtime/smallable-string.h b/be/src/runtime/smallable-string.h index 24534e7c5..aa2c4a124 100644 --- a/be/src/runtime/smallable-string.h +++ b/be/src/runtime/smallable-string.h @@ -141,6 +141,16 @@ class __attribute__((__packed__)) SmallableString { } } + /// Returns the number of bytes needed outside the slot itself: + /// - if the length is too long to smallify, return length + /// - if the length is small enough to smallify: + /// - if assume_smallify is true or the string is already smallified return 0 + /// - otherwise (not already smallified and assume_smallify is false) return length + int ExternalLen(bool assume_smallify) const { + if (IsSmall() || (assume_smallify && rep.long_rep.len <= SMALL_LIMIT)) return 0; + return rep.long_rep.len; + } + char* Ptr() const { if (IsSmall()) { return const_cast<char*>(rep.small_rep.buf); diff --git a/be/src/runtime/string-value.h b/be/src/runtime/string-value.h index 5bd1f6452..aa48a692d 100644 --- a/be/src/runtime/string-value.h +++ b/be/src/runtime/string-value.h @@ -93,6 +93,15 @@ public: int Len() const { return string_impl_.Len(); } + /// Returns the number of bytes needed outside the slot itself: + /// - if the length is too long to smallify, return length + /// - if the length is small enough to smallify: + /// - if assume_smallify is true or the string is already smallified return 0 + /// - otherwise (not already smallified and assume_smallify is false) return length + int ExternalLen(bool assume_smallify) const { + return string_impl_.ExternalLen(assume_smallify); + } + /// Sets the length of this String object. Length can only be decreased. void SetLen(int len) { return string_impl_.SetLen(len); } diff --git a/be/src/runtime/tuple.cc b/be/src/runtime/tuple.cc index 3ddb6f19a..108f86961 100644 --- a/be/src/runtime/tuple.cc +++ b/be/src/runtime/tuple.cc @@ -50,23 +50,21 @@ const char* Tuple::MATERIALIZE_EXPRS_NULL_POOL_SYMBOL = "MaterializeExprsILb0ELb Tuple* const Tuple::POISON = reinterpret_cast<Tuple*>(42L); -int64_t Tuple::TotalByteSize(const TupleDescriptor& desc) const { +int64_t Tuple::TotalByteSize(const TupleDescriptor& desc, bool assume_smallify) const { int64_t result = desc.byte_size(); if (!desc.HasVarlenSlots()) return result; - result += VarlenByteSize(desc); + result += VarlenByteSize(desc, assume_smallify); return result; } -int64_t Tuple::VarlenByteSize(const TupleDescriptor& desc) const { +int64_t Tuple::VarlenByteSize(const TupleDescriptor& desc, bool assume_smallify) const { int64_t result = 0; vector<SlotDescriptor*>::const_iterator slot = desc.string_slots().begin(); for (; slot != desc.string_slots().end(); ++slot) { DCHECK((*slot)->type().IsVarLenStringType()); if (IsNull((*slot)->null_indicator_offset())) continue; const StringValue* string_val = GetStringSlot((*slot)->tuple_offset()); - // Small strings don't require extra storage space in the varlen section. - if (string_val->IsSmall()) continue; - result += string_val->Len(); + result += string_val->ExternalLen(assume_smallify); } slot = desc.collection_slots().begin(); @@ -77,27 +75,15 @@ int64_t Tuple::VarlenByteSize(const TupleDescriptor& desc) const { uint8_t* coll_data = coll_value->ptr; const TupleDescriptor& item_desc = *(*slot)->children_tuple_descriptor(); for (int i = 0; i < coll_value->num_tuples; ++i) { - result += reinterpret_cast<Tuple*>(coll_data)->TotalByteSize(item_desc); + Tuple* tuple = reinterpret_cast<Tuple*>(coll_data); + result += tuple->TotalByteSize(item_desc, assume_smallify); coll_data += item_desc.byte_size(); } } return result; } -inline void Tuple::SmallifyStrings(const TupleDescriptor& desc) { - for (const SlotDescriptor* slot : desc.string_slots()) { - DCHECK(slot->type().IsVarLenStringType()); - if (IsNull(slot->null_indicator_offset())) continue; - StringValue* string_v = GetStringSlot(slot->tuple_offset()); - // StringValues are only smallified on a on-demand basis. And we only smallify - // them in batches for whole tuples. I.e. if we encounter the first small string - // in a tuple we can assume that the rest of the strings are already smallified. - if (string_v->IsSmall()) return; - string_v->Smallify(); - } -} - -Tuple* Tuple::DeepCopy(const TupleDescriptor& desc, MemPool* pool) { +Tuple* Tuple::DeepCopy(const TupleDescriptor& desc, MemPool* pool) const { Tuple* result = reinterpret_cast<Tuple*>(pool->Allocate(desc.byte_size())); DeepCopy(result, desc, pool); return result; @@ -106,11 +92,9 @@ Tuple* Tuple::DeepCopy(const TupleDescriptor& desc, MemPool* pool) { // TODO: the logic is very similar to the other DeepCopy implementation aside from how // memory is allocated - can we templatise it somehow to avoid redundancy without runtime // overhead. -void Tuple::DeepCopy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool) { +void Tuple::DeepCopy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool) const { if (desc.HasVarlenSlots()) { memcpy(dst, this, desc.byte_size()); - // 'dst' is a new tuple, so it is safe to smallify its string values. - dst->SmallifyStrings(desc); dst->DeepCopyVarlenData(desc, pool); } else { memcpy(dst, this, desc.byte_size()); @@ -124,7 +108,9 @@ void Tuple::DeepCopyVarlenData(const TupleDescriptor& desc, MemPool* pool) { DCHECK((*slot)->type().IsVarLenStringType()); if (IsNull((*slot)->null_indicator_offset())) continue; StringValue* string_v = GetStringSlot((*slot)->tuple_offset()); - if (string_v->IsSmall()) continue; + // It is safe to smallify at this point as DeepCopyVarlenData is called on the new + // tuple which can be modified. + if (string_v->Smallify()) continue; char* string_copy = reinterpret_cast<char*>(pool->Allocate(string_v->Len())); Ubsan::MemCpy(string_copy, string_v->Ptr(), string_v->Len()); string_v->SetPtr(string_copy); @@ -151,12 +137,14 @@ void Tuple::DeepCopyVarlenData(const TupleDescriptor& desc, MemPool* pool) { } void Tuple::DeepCopy(const TupleDescriptor& desc, char** data, int* offset, - bool convert_ptrs) { + bool convert_ptrs) const { Tuple* dst = reinterpret_cast<Tuple*>(*data); memcpy(dst, this, desc.byte_size()); *data += desc.byte_size(); *offset += desc.byte_size(); - if (desc.HasVarlenSlots()) dst->DeepCopyVarlenData(desc, data, offset, convert_ptrs); + if (desc.HasVarlenSlots()) { + dst->DeepCopyVarlenData(desc, data, offset, convert_ptrs); + } } void Tuple::DeepCopyVarlenData(const TupleDescriptor& desc, char** data, int* offset, @@ -167,7 +155,9 @@ void Tuple::DeepCopyVarlenData(const TupleDescriptor& desc, char** data, int* of if (IsNull((*slot)->null_indicator_offset())) continue; StringValue* string_v = GetStringSlot((*slot)->tuple_offset()); - if (string_v->IsSmall()) continue; + // It is safe to smallify at this point as DeepCopyVarlenData is called on the new + // tuple which can be modified. + if (string_v->Smallify()) continue; unsigned int len = string_v->Len(); Ubsan::MemCpy(*data, string_v->Ptr(), len); string_v->SetPtr(convert_ptrs ? reinterpret_cast<char*>(*offset) : *data); @@ -194,8 +184,8 @@ void Tuple::DeepCopyVarlenData(const TupleDescriptor& desc, char** data, int* of // Copy per-tuple varlen data if necessary. if (!item_desc.HasVarlenSlots()) continue; for (int i = 0; i < coll_value->num_tuples; ++i) { - reinterpret_cast<Tuple*>(coll_data)->DeepCopyVarlenData( - item_desc, data, offset, convert_ptrs); + Tuple* dst_item = reinterpret_cast<Tuple*>(coll_data); + dst_item->DeepCopyVarlenData(item_desc, data, offset, convert_ptrs); coll_data += item_desc.byte_size(); } } diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h index 0c5334149..612833cb3 100644 --- a/be/src/runtime/tuple.h +++ b/be/src/runtime/tuple.h @@ -118,32 +118,40 @@ class Tuple { /// The total size of all data represented in this tuple (tuple data and referenced /// string and collection data). - int64_t TotalByteSize(const TupleDescriptor& desc) const; + /// If 'assume_smallify' is true, calculates with smallified string sizes even for not + /// (yet) smallified strings. + int64_t TotalByteSize(const TupleDescriptor& desc, bool assume_smallify) const; /// The size of all referenced string and collection data. Smallified strings aren't /// counted here as they don't need extra storage space. - int64_t VarlenByteSize(const TupleDescriptor& desc) const; + /// If 'assume_smallify' is true, calculates with smallified string sizes even for not + /// (yet) smallified strings. + int64_t VarlenByteSize(const TupleDescriptor& desc, bool assume_smallify) const; /// Create a copy of 'this', including all of its referenced variable-length data /// (i.e. strings and collections), using pool to allocate memory. Returns the copy. - Tuple* DeepCopy(const TupleDescriptor& desc, MemPool* pool); + /// Also smallifies the copied strings when possible. + Tuple* DeepCopy(const TupleDescriptor& desc, MemPool* pool) const; /// Create a copy of 'this', including all its referenced variable-length data /// (i.e. strings and collections), using pool to allocate memory. This version does /// not allocate a tuple, instead copying to 'dst'. 'dst' must already be allocated to - /// the correct size (i.e. TotalByteSize()). - void DeepCopy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool); + /// the correct size (i.e. TotalByteSize()). Also smallifies the copied strings when + /// possible - to get the exact size needed TotalByteSize() has to be called with + /// 'assume_smallify'=true. + void DeepCopy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool) const; /// Create a copy of 'this', including all referenced variable-length data (i.e. strings /// and collections), into 'data'. The tuple is written first, followed by any /// variable-length data. 'data' and 'offset' will be incremented by the total number of /// bytes written. 'data' must already be allocated to the correct size - /// (i.e. TotalByteSize()). + /// (i.e. TotalByteSize()). Also smallifies the copied strings when possible - to get + /// the exact size needed TotalByteSize() has to be called with 'assume_smallify'=true. /// If 'convert_ptrs' is true, rewrites pointers that are part of the tuple as offsets /// into 'data'. Otherwise they will remain pointers directly into data. The offsets are /// determined by 'offset', where '*offset' corresponds to address '*data'. void DeepCopy(const TupleDescriptor& desc, char** data, int* offset, - bool convert_ptrs = false); + bool convert_ptrs = false) const; /// This function should only be called on tuples created by DeepCopy() with /// 'convert_ptrs' = true. It takes all pointers contained in this tuple (i.e. in @@ -336,11 +344,13 @@ class Tuple { /// Copy all referenced string and collection data by allocating memory from pool, /// copying data, then updating pointers in tuple to reference copied data. + /// Also smallifies the copied strings when possible. void DeepCopyVarlenData(const TupleDescriptor& desc, MemPool* pool); /// Copies all referenced string and collection data into 'data'. Increments 'data' and /// 'offset' by the number of bytes written. Recursively writes collection tuple data /// and referenced collection and string data. + /// Also smallifies the copied strings when possible. void DeepCopyVarlenData(const TupleDescriptor& desc, char** data, int* offset, bool convert_ptrs); @@ -383,10 +393,6 @@ class Tuple { char* AllocateStrings(const char* err_ctx, RuntimeState* state, int64_t bytes, MemPool* pool, Status* status) noexcept; - /// Smallify string values of the tuple. It should only be called for newly created - /// tuples, e.g. in DeepCopy(). - void SmallifyStrings(const TupleDescriptor& desc); - // Defined in tuple-ir.cc to force the compilation of the CodegenTypes struct. void dummy(Tuple::CodegenTypes*); };
