This is an automated email from the ASF dual-hosted git repository.

eldenmoon pushed a commit to branch variant-sparse
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f538bc755347d242d52c57d5fc30cf9aa3c7679e
Author: lihangyu <lihan...@selectdb.com>
AuthorDate: Mon Mar 10 10:42:30 2025 +0800

    add path stats check and fix sparse cache (#48834)
---
 be/src/olap/compaction.cpp                         |  6 ++
 be/src/olap/iterators.h                            |  3 +-
 be/src/olap/rowset/segment_v2/column_reader.cpp    | 14 ++--
 .../rowset/segment_v2/hierarchical_data_reader.h   | 35 +++++-----
 be/src/olap/rowset/segment_v2/segment_iterator.cpp |  2 +-
 be/src/olap/rowset/segment_v2/segment_writer.cpp   | 12 ++--
 be/src/olap/rowset/segment_v2/segment_writer.h     |  3 +-
 .../segment_v2/variant_column_writer_impl.cpp      | 12 ++--
 be/src/vec/common/schema_util.cpp                  | 78 ++++++++++++++++------
 be/src/vec/common/schema_util.h                    |  8 ++-
 10 files changed, 116 insertions(+), 57 deletions(-)

diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 6de361e4786..4423a3dad85 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -74,6 +74,7 @@
 #include "runtime/thread_context.h"
 #include "util/time.h"
 #include "util/trace.h"
+#include "vec/common/schema_util.h"
 
 using std::vector;
 
@@ -1076,6 +1077,11 @@ Status Compaction::check_correctness() {
                 _tablet->tablet_id(), _input_row_num, _stats.merged_rows, 
_stats.filtered_rows,
                 _output_rowset->num_rows());
     }
+    if (_tablet->keys_type() == KeysType::DUP_KEYS) {
+        // only check path stats for dup_keys since the rows may be merged in 
other models
+        
RETURN_IF_ERROR(vectorized::schema_util::check_path_stats(_input_rowsets, 
_output_rowset,
+                                                                  
_tablet->tablet_id()));
+    }
     return Status::OK();
 }
 
diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index 963f4d23598..f0cc7784f5c 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -121,7 +121,8 @@ public:
     RowRanges row_ranges;
     size_t topn_limit = 0;
     // Cache for sparse column data to avoid redundant reads
-    vectorized::ColumnPtr sparse_column_cache;
+    // col_unique_id -> cached column_ptr
+    std::unordered_map<int32_t, vectorized::ColumnPtr> sparse_column_cache;
 };
 
 struct CompactionSampleInfo {
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp 
b/be/src/olap/rowset/segment_v2/column_reader.cpp
index f29e603c379..dfa40080a56 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -324,9 +324,10 @@ Status 
VariantColumnReader::_create_sparse_merge_reader(ColumnIterator** iterato
     VLOG_DEBUG << "subcolumns to merge " << src_subcolumns_for_sparse.size();
 
     // Create sparse column merge reader
-    *iterator = new SparseColumnMergeReader(
-            path_set_info.sub_path_set, 
std::unique_ptr<ColumnIterator>(inner_iter),
-            std::move(src_subcolumns_for_sparse), 
const_cast<StorageReadOptions*>(opts));
+    *iterator = new SparseColumnMergeReader(path_set_info.sub_path_set,
+                                            
std::unique_ptr<ColumnIterator>(inner_iter),
+                                            
std::move(src_subcolumns_for_sparse),
+                                            
const_cast<StorageReadOptions*>(opts), target_col);
     return Status::OK();
 }
 
@@ -385,7 +386,7 @@ Status 
VariantColumnReader::_new_iterator_with_flat_leaves(ColumnIterator** iter
             *iterator = new SparseColumnExtractReader(
                     relative_path.get_path(), 
std::unique_ptr<ColumnIterator>(inner_iter),
                     // need to modify sparse_column_cache, so use const_cast 
here
-                    const_cast<StorageReadOptions*>(opts));
+                    const_cast<StorageReadOptions*>(opts), target_col);
             return Status::OK();
         }
         if (relative_path.get_path() == SPARSE_COLUMN_PATH) {
@@ -465,8 +466,9 @@ Status VariantColumnReader::new_iterator(ColumnIterator** 
iterator, const Tablet
         ColumnIterator* inner_iter;
         RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter));
         DCHECK(opt);
-        *iterator = new SparseColumnExtractReader(
-                relative_path.get_path(), 
std::unique_ptr<ColumnIterator>(inner_iter), nullptr);
+        *iterator = new SparseColumnExtractReader(relative_path.get_path(),
+                                                  
std::unique_ptr<ColumnIterator>(inner_iter),
+                                                  nullptr, target_col);
         return Status::OK();
     }
 
diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h 
b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
index 591b706e0e7..5ea7ac59ad7 100644
--- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
+++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
@@ -172,7 +172,7 @@ protected:
     vectorized::MutableColumnPtr _sparse_column;
     StorageReadOptions* _read_opts; // Shared cache pointer
     std::unique_ptr<ColumnIterator> _sparse_column_reader;
-
+    const TabletColumn& _col;
     // Pure virtual method for data processing when encounter existing sparse 
columns(to be implemented by subclasses)
     virtual void 
_process_data_with_existing_sparse_column(vectorized::MutableColumnPtr& dst,
                                                            size_t num_rows) = 
0;
@@ -182,8 +182,9 @@ protected:
                                                      size_t num_rows) = 0;
 
 public:
-    BaseSparseColumnProcessor(std::unique_ptr<ColumnIterator>&& reader, 
StorageReadOptions* opts)
-            : _read_opts(opts), _sparse_column_reader(std::move(reader)) {
+    BaseSparseColumnProcessor(std::unique_ptr<ColumnIterator>&& reader, 
StorageReadOptions* opts,
+                              const TabletColumn& col)
+            : _read_opts(opts), _sparse_column_reader(std::move(reader)), 
_col(col) {
         _sparse_column = vectorized::ColumnObject::create_sparse_column_fn();
     }
 
@@ -208,15 +209,17 @@ public:
     Status _process_batch(ReadMethod&& read_method, size_t nrows,
                           vectorized::MutableColumnPtr& dst) {
         // Cache check and population logic
-        if (_read_opts && _read_opts->sparse_column_cache &&
+        if (_read_opts && 
_read_opts->sparse_column_cache[_col.parent_unique_id()] &&
             
ColumnReader::is_compaction_reader_type(_read_opts->io_ctx.reader_type)) {
-            _sparse_column = _read_opts->sparse_column_cache->assume_mutable();
+            _sparse_column =
+                    
_read_opts->sparse_column_cache[_col.parent_unique_id()]->assume_mutable();
         } else {
             _sparse_column->clear();
             RETURN_IF_ERROR(read_method());
 
             if (_read_opts) {
-                _read_opts->sparse_column_cache = 
_sparse_column->assume_mutable();
+                _read_opts->sparse_column_cache[_col.parent_unique_id()] =
+                        _sparse_column->get_ptr();
             }
         }
 
@@ -231,6 +234,14 @@ public:
         }
         return Status::OK();
     }
+};
+
+// Implementation for path extraction processor
+class SparseColumnExtractReader : public BaseSparseColumnProcessor {
+public:
+    SparseColumnExtractReader(std::string_view path, 
std::unique_ptr<ColumnIterator> reader,
+                              StorageReadOptions* opts, const TabletColumn& 
col)
+            : BaseSparseColumnProcessor(std::move(reader), opts, col), 
_path(path) {}
 
     // Batch processing using template method
     Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* 
has_null) override {
@@ -248,14 +259,6 @@ public:
                 },
                 count, dst);
     }
-};
-
-// Implementation for path extraction processor
-class SparseColumnExtractReader : public BaseSparseColumnProcessor {
-public:
-    SparseColumnExtractReader(std::string_view path, 
std::unique_ptr<ColumnIterator> reader,
-                              StorageReadOptions* opts)
-            : BaseSparseColumnProcessor(std::move(reader), opts), _path(path) 
{}
 
 private:
     std::string _path;
@@ -280,8 +283,8 @@ public:
     SparseColumnMergeReader(const TabletSchema::PathSet& path_map,
                             std::unique_ptr<ColumnIterator>&& 
sparse_column_reader,
                             SubstreamReaderTree&& src_subcolumns_for_sparse,
-                            StorageReadOptions* opts)
-            : BaseSparseColumnProcessor(std::move(sparse_column_reader), opts),
+                            StorageReadOptions* opts, const TabletColumn& col)
+            : BaseSparseColumnProcessor(std::move(sparse_column_reader), opts, 
col),
               _src_subcolumn_map(path_map),
               _src_subcolumns_for_sparse(src_subcolumns_for_sparse) {}
     Status init(const ColumnIteratorOptions& opts) override;
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index e6606eda3d3..570808d4306 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -2015,7 +2015,7 @@ Status 
SegmentIterator::_next_batch_internal(vectorized::Block* block) {
     bool is_mem_reuse = block->mem_reuse();
     DCHECK(is_mem_reuse);
     // Clear the sparse column cache before processing a new batch
-    _opts.sparse_column_cache = nullptr;
+    _opts.sparse_column_cache.clear();
     SCOPED_RAW_TIMER(&_opts.stats->block_load_ns);
     if (UNLIKELY(!_lazy_inited)) {
         RETURN_IF_ERROR(_lazy_init());
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 912629ab47b..c84e42fb370 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -778,7 +778,7 @@ Status SegmentWriter::append_block(const vectorized::Block* 
block, size_t row_po
 
         // caculate stats for variant type
         // TODO it's tricky here, maybe come up with a better idea
-        _maybe_calculate_variant_stats(block, id, cid);
+        _maybe_calculate_variant_stats(block, id, cid, row_pos, num_rows);
     }
     if (_has_key) {
         if (_is_mow_with_cluster_key()) {
@@ -1297,8 +1297,11 @@ inline bool SegmentWriter::_is_mow_with_cluster_key() {
 // Compaction will extend sparse column and is visible during read and write, 
in order to
 // persit variant stats info, we should do extra caculation during flushing 
segment, otherwise
 // the info is lost
-void SegmentWriter::_maybe_calculate_variant_stats(const vectorized::Block* 
block, size_t id,
-                                                   size_t cid) {
+void SegmentWriter::_maybe_calculate_variant_stats(
+        const vectorized::Block* block,
+        size_t id,  // id is the offset of the column in the block
+        size_t cid, // cid is the column id in TabletSchema
+        size_t row_pos, size_t num_rows) {
     // Only process sparse columns during compaction
     if (!_tablet_schema->columns()[cid]->is_sparse_column() ||
         _opts.write_type != DataWriteType::TYPE_COMPACTION) {
@@ -1319,7 +1322,8 @@ void SegmentWriter::_maybe_calculate_variant_stats(const 
vectorized::Block* bloc
 
         // Found matching column, calculate statistics
         auto* stats = column.mutable_variant_statistics();
-        
vectorized::schema_util::calculate_variant_stats(*block->get_by_position(id).column,
 stats);
+        
vectorized::schema_util::calculate_variant_stats(*block->get_by_position(id).column,
 stats,
+                                                         row_pos, num_rows);
 
         VLOG_DEBUG << "sparse stats columns " << 
stats->sparse_column_non_null_size_size();
         break;
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h 
b/be/src/olap/rowset/segment_v2/segment_writer.h
index 7b9be150de9..e790f6b6e98 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/segment_writer.h
@@ -172,7 +172,8 @@ private:
     Status _write_footer();
     Status _write_raw_data(const std::vector<Slice>& slices);
     void _maybe_invalid_row_cache(const std::string& key);
-    void _maybe_calculate_variant_stats(const vectorized::Block* block, size_t 
id, size_t cid);
+    void _maybe_calculate_variant_stats(const vectorized::Block* block, size_t 
id, size_t cid,
+                                        size_t row_pos, size_t num_rows);
     std::string _encode_keys(const 
std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
                              size_t pos);
     // used for unique-key with merge on write and segment min_max key
diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp 
b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
index 5c57db390de..34fe6e085ec 100644
--- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
+++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
@@ -83,9 +83,10 @@ Status _create_column_writer(uint32_t cid, const 
TabletColumn& column,
     opt->need_bloom_filter = column.is_bf_column();
     opt->need_bitmap_index = column.has_bitmap_index();
     const auto& index = 
tablet_schema->inverted_index(column.parent_unique_id());
-    VLOG_DEBUG << "column: " << column.name() << " need_inverted_index: " << 
opt->need_inverted_index
-                << " need_bloom_filter: " << opt->need_bloom_filter
-                << " need_bitmap_index: " << opt->need_bitmap_index;
+    VLOG_DEBUG << "column: " << column.name()
+               << " need_inverted_index: " << opt->need_inverted_index
+               << " need_bloom_filter: " << opt->need_bloom_filter
+               << " need_bitmap_index: " << opt->need_bitmap_index;
 
     // init inverted index
     if (index != nullptr &&
@@ -660,8 +661,9 @@ Status VariantSubcolumnWriter::finalize() {
             
_opts.rowset_ctx->tablet_schema->column_by_uid(_tablet_column->parent_unique_id());
     // refresh opts and get writer with flush column
     vectorized::schema_util::inherit_column_attributes(parent_column, 
flush_column);
-    VLOG_DEBUG << "parent_column: " << parent_column.name() << " flush_column: 
"
-               << flush_column.name() << " is_bf_column: " << 
parent_column.is_bf_column() << " "
+    VLOG_DEBUG << "parent_column: " << parent_column.name()
+               << " flush_column: " << flush_column.name()
+               << " is_bf_column: " << parent_column.is_bf_column() << " "
                << flush_column.is_bf_column();
     RETURN_IF_ERROR(_create_column_writer(
             0, flush_column, _opts.rowset_ctx->tablet_schema, 
_opts.inverted_index_file_writer,
diff --git a/be/src/vec/common/schema_util.cpp 
b/be/src/vec/common/schema_util.cpp
index d489562edd4..ed2ff1b191a 100644
--- a/be/src/vec/common/schema_util.cpp
+++ b/be/src/vec/common/schema_util.cpp
@@ -725,6 +725,36 @@ void get_subpaths(const TabletColumn& variant,
     }
 }
 
+Status check_path_stats(const std::vector<RowsetSharedPtr>& intputs, 
RowsetSharedPtr output,
+                        int64_t tablet_id) {
+    std::unordered_map<int32_t, PathToNoneNullValues> 
original_uid_to_path_stats;
+    for (const auto& rs : intputs) {
+        RETURN_IF_ERROR(collect_path_stats(rs, original_uid_to_path_stats));
+    }
+    std::unordered_map<int32_t, PathToNoneNullValues> output_uid_to_path_stats;
+    RETURN_IF_ERROR(collect_path_stats(output, output_uid_to_path_stats));
+    for (const auto& [uid, stats] : original_uid_to_path_stats) {
+        if (output_uid_to_path_stats.find(uid) == 
output_uid_to_path_stats.end()) {
+            return Status::InternalError("Path stats not found for uid {}, 
tablet_id {}", uid,
+                                         tablet_id);
+        }
+        if (stats.size() != output_uid_to_path_stats.at(uid).size()) {
+            return Status::InternalError("Path stats size not match for uid 
{}, tablet_id {}", uid,
+                                         tablet_id);
+        }
+        for (const auto& [path, size] : stats) {
+            if (output_uid_to_path_stats.at(uid).at(path) != size) {
+                return Status::InternalError(
+                        "Path stats not match for uid {} with path `{}`, input 
size {}, output "
+                        "size {}, "
+                        "tablet_id {}",
+                        uid, path, size, 
output_uid_to_path_stats.at(uid).at(path), tablet_id);
+            }
+        }
+    }
+    return Status::OK();
+}
+
 // Build the temporary schema for compaction
 // 1. collect path stats from all rowsets
 // 2. get the subpaths and sparse paths for each unique id
@@ -764,7 +794,8 @@ Status get_compaction_schema(const 
std::vector<RowsetSharedPtr>& rowsets,
             subcolumn.set_name(column->name_lower_case() + "." + 
subpath.to_string());
             subcolumn.set_type(FieldType::OLAP_FIELD_TYPE_VARIANT);
             subcolumn.set_parent_unique_id(column->unique_id());
-            subcolumn.set_path_info(PathInData(column->name_lower_case() + "." 
+ subpath.to_string()));
+            subcolumn.set_path_info(
+                    PathInData(column->name_lower_case() + "." + 
subpath.to_string()));
             subcolumn.set_aggregation_method(column->aggregation());
             
subcolumn.set_variant_max_subcolumns_count(column->variant_max_subcolumns_count());
             subcolumn.set_is_nullable(true);
@@ -784,7 +815,8 @@ Status get_compaction_schema(const 
std::vector<RowsetSharedPtr>& rowsets,
 
 // Calculate statistics about variant data paths from the encoded sparse column
 void calculate_variant_stats(const IColumn& encoded_sparse_column,
-                             segment_v2::VariantStatisticsPB* stats) {
+                             segment_v2::VariantStatisticsPB* stats, size_t 
row_pos,
+                             size_t num_rows) {
     // Cast input column to ColumnMap type since sparse column is stored as a 
map
     const auto& map_column = assert_cast<const 
ColumnMap&>(encoded_sparse_column);
 
@@ -794,21 +826,25 @@ void calculate_variant_stats(const IColumn& 
encoded_sparse_column,
     // Get the keys column which contains the paths as strings
     const auto& sparse_data_paths =
             assert_cast<const ColumnString*>(map_column.get_keys_ptr().get());
-
+    const auto& serialized_sparse_column_offsets =
+            assert_cast<const 
ColumnArray::Offsets64&>(map_column.get_offsets());
     // Iterate through all paths in the sparse column
-    for (size_t i = 0; i != sparse_data_paths->size(); ++i) {
-        auto path = sparse_data_paths->get_data_at(i);
-
-        // If path already exists in statistics, increment its count
-        if (auto it = sparse_data_paths_statistics.find(path);
-            it != sparse_data_paths_statistics.end()) {
-            ++it->second;
-        }
-        // If path doesn't exist and we haven't hit the max statistics size 
limit,
-        // add it with count 1
-        else if (sparse_data_paths_statistics.size() <
-                 VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) {
-            sparse_data_paths_statistics.emplace(path, 1);
+    for (size_t i = row_pos; i != row_pos + num_rows; ++i) {
+        size_t offset = serialized_sparse_column_offsets[i - 1];
+        size_t end = serialized_sparse_column_offsets[i];
+        for (size_t j = offset; j != end; ++j) {
+            auto path = sparse_data_paths->get_data_at(j);
+            // If path already exists in statistics, increment its count
+            if (auto it = sparse_data_paths_statistics.find(path);
+                it != sparse_data_paths_statistics.end()) {
+                ++it->second;
+            }
+            // If path doesn't exist and we haven't hit the max statistics 
size limit,
+            // add it with count 1
+            else if (sparse_data_paths_statistics.size() <
+                     VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) {
+                sparse_data_paths_statistics.emplace(path, 1);
+            }
         }
     }
 
@@ -816,13 +852,11 @@ void calculate_variant_stats(const IColumn& 
encoded_sparse_column,
     // This maps each path string to its frequency count
     for (const auto& [path, size] : sparse_data_paths_statistics) {
         const auto& sparse_path = path.to_string();
-        auto it = stats->sparse_column_non_null_size().find(sparse_path);
-        if (it == stats->sparse_column_non_null_size().end()) {
-            stats->mutable_sparse_column_non_null_size()->emplace(sparse_path, 
size);
+        auto& count_map = *stats->mutable_sparse_column_non_null_size();
+        if (auto it = count_map.find(sparse_path); it != count_map.end()) {
+            it->second += size;
         } else {
-            size_t original_size = it->second;
-            stats->mutable_sparse_column_non_null_size()->emplace(sparse_path,
-                                                                  
original_size + size);
+            count_map.emplace(sparse_path, size);
         }
     }
 }
diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h
index 6e3d049f199..a4101883fc9 100644
--- a/be/src/vec/common/schema_util.h
+++ b/be/src/vec/common/schema_util.h
@@ -136,8 +136,14 @@ TabletColumn create_sparse_column(const TabletColumn& 
variant);
 // Build the temporary schema for compaction, this will reduce the memory 
usage of compacting variant columns
 Status get_compaction_schema(const std::vector<RowsetSharedPtr>& rowsets, 
TabletSchemaSPtr& target);
 
+// Check if the path stats are consistent between inputs rowsets and output 
rowset.
+// Used to check the correctness of compaction.
+Status check_path_stats(const std::vector<RowsetSharedPtr>& intputs, 
RowsetSharedPtr output,
+                        int64_t tablet_id);
+
 // Calculate statistics about variant data paths from the encoded sparse column
 void calculate_variant_stats(const IColumn& encoded_sparse_column,
-                             segment_v2::VariantStatisticsPB* stats);
+                             segment_v2::VariantStatisticsPB* stats, size_t 
row_pos,
+                             size_t num_rows);
 
 } // namespace  doris::vectorized::schema_util


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to