Copilot commented on code in PR #64348:
URL: https://github.com/apache/doris/pull/64348#discussion_r3464006808


##########
be/src/storage/segment/segment.cpp:
##########
@@ -321,6 +321,7 @@ Status Segment::new_iterator(SchemaSPtr schema, const 
StorageReadOptions& read_o
                 // any condition not satisfied, return.
                 *iter = std::make_unique<EmptySegmentIterator>(*schema);
                 read_options.stats->filtered_segment_number++;
+                read_options.stats->rows_stats_filtered += num_rows();
                 return Status::OK();

Review Comment:
   `rows_stats_filtered` is now incremented when `match_condition` prunes an 
entire segment (good), but the earlier TSO-placeholder segment-prune path still 
returns after incrementing `filtered_segment_number` without accounting for the 
pruned rows. This makes `rows_stats_filtered` undercount segment-level pruning 
for binlog readers and is inconsistent with the other full-segment prune branch.



##########
be/test/storage/variant/index_storage_variant_field_pattern_index_test.cpp:
##########
@@ -0,0 +1,796 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <string_view>
+#include <vector>
+
+#include "common/config.h"
+#include "core/data_type/data_type_date_or_datetime_v2.h"
+#include "core/data_type/data_type_nullable.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
+#include "core/value/vdatetime_value.h"
+#include "storage/predicate/predicate_creator.h"
+#include "testutil/index_storage_test_util.h"
+
+namespace doris::index_storage_test {
+namespace {
+
+constexpr int32_t kVariantUid = 2;
+constexpr int64_t kIntPatternIndexId = 210201;
+constexpr int64_t kStringPatternIndexId = 210202;
+constexpr int64_t kBigIntPatternIndexId = 210203;
+constexpr int64_t kDoublePatternIndexId = 210204;
+constexpr int64_t kBoolPatternIndexId = 210205;
+constexpr int64_t kDatePatternIndexId = 210206;
+constexpr int64_t kDateTimePatternIndexId = 210207;
+constexpr int32_t kPagePruneLowValueBound = 900000;
+
+int32_t page_prune_offset(size_t row) {
+    uint64_t mixed = static_cast<uint64_t>(row + 1) * 11400714819323198485ull;
+    mixed ^= mixed >> 33;
+    return static_cast<int32_t>(mixed % 
static_cast<uint64_t>(kPagePruneLowValueBound));
+}
+
+std::shared_ptr<ColumnPredicate> typed_equals(int32_t column_id, std::string 
column_name,
+                                              DataTypePtr data_type, Field 
value) {
+    return create_comparison_predicate<PredicateType::EQ>(column_id, 
std::move(column_name),
+                                                          
std::move(data_type), value, false);
+}
+
+std::shared_ptr<ColumnPredicate> int_equals(int32_t column_id, std::string 
column_name,
+                                            int32_t value) {
+    return create_comparison_predicate<PredicateType::EQ>(
+            column_id, std::move(column_name), 
std::make_shared<DataTypeInt32>(),
+            Field::create_field<TYPE_INT>(value), false);
+}
+
+std::shared_ptr<ColumnPredicate> int_greater(int32_t column_id, std::string 
column_name,
+                                             int32_t value) {
+    return create_comparison_predicate<PredicateType::GT>(
+            column_id, std::move(column_name), 
std::make_shared<DataTypeInt32>(),
+            Field::create_field<TYPE_INT>(value), false);
+}
+
+void expect_optional_page_zone_map_filter_stats(const IndexReadResult& result,
+                                                int64_t selected_rows, int64_t 
total_rows,
+                                                int64_t max_filtered_rows) {
+    EXPECT_GE(result.stats.raw_rows_read, selected_rows);
+    EXPECT_LE(result.stats.raw_rows_read, total_rows);
+    EXPECT_GE(result.stats.rows_stats_filtered, 0);
+    EXPECT_LE(result.stats.rows_stats_filtered, max_filtered_rows);
+    if (result.stats.raw_rows_read < total_rows) {
+        EXPECT_GT(result.stats.rows_stats_filtered, 0);
+    }
+}
+
+DataTypePtr nullable_target_type(const DataTypePtr& type) {
+    return make_nullable(type);
+}
+
+DataTypePtr nullable_int32_target_type() {
+    return make_nullable(std::make_shared<DataTypeInt32>());
+}
+
+DataTypePtr nullable_int64_target_type() {
+    return make_nullable(std::make_shared<DataTypeInt64>());
+}
+
+void disable_bkd_skip_for_filter_stats_assertions(IndexReadOptions* 
read_options) {
+    // Tiny three-row segments can hit the default 50% BKD skip threshold 
before the field-pattern
+    // index contributes rows_inverted_index_filtered.
+    read_options->inverted_index_skip_threshold = 100;
+}
+
+VariantColumnSpec typed_pattern_variant_column() {
+    VariantColumnSpec variant;
+    variant.unique_id = kVariantUid;
+    variant.name = "v";
+    variant.max_subcolumns_count = 2;
+    variant.predefined_paths = {
+            VariantPathSpec {.path = "int_*",
+                             .type = FieldType::OLAP_FIELD_TYPE_INT,
+                             .nullable = true,
+                             .pattern_type = PatternTypePB::MATCH_NAME_GLOB,
+                             .array_item_type = {},
+                             .array_item_nullable = true},
+            VariantPathSpec {.path = "string_*",
+                             .type = FieldType::OLAP_FIELD_TYPE_STRING,
+                             .nullable = true,
+                             .pattern_type = PatternTypePB::MATCH_NAME_GLOB,
+                             .array_item_type = {},
+                             .array_item_nullable = true},
+    };
+    return variant;
+}
+
+VariantColumnSpec multi_typed_pattern_variant_column() {
+    VariantColumnSpec variant;
+    variant.unique_id = kVariantUid;
+    variant.name = "v";
+    variant.max_subcolumns_count = 6;
+    variant.predefined_paths = {
+            VariantPathSpec {.path = "big_*",
+                             .type = FieldType::OLAP_FIELD_TYPE_BIGINT,
+                             .nullable = true,
+                             .pattern_type = PatternTypePB::MATCH_NAME_GLOB,
+                             .array_item_type = {},
+                             .array_item_nullable = true},
+            VariantPathSpec {.path = "double_*",
+                             .type = FieldType::OLAP_FIELD_TYPE_DOUBLE,
+                             .nullable = true,
+                             .pattern_type = PatternTypePB::MATCH_NAME_GLOB,
+                             .array_item_type = {},
+                             .array_item_nullable = true},
+            VariantPathSpec {.path = "bool_*",
+                             .type = FieldType::OLAP_FIELD_TYPE_BOOL,
+                             .nullable = true,
+                             .pattern_type = PatternTypePB::MATCH_NAME_GLOB,
+                             .array_item_type = {},
+                             .array_item_nullable = true},
+    };
+    return variant;
+}
+
+VariantColumnSpec date_time_pattern_variant_column() {
+    VariantColumnSpec variant;
+    variant.unique_id = kVariantUid;
+    variant.name = "v";
+    variant.max_subcolumns_count = 4;
+    variant.predefined_paths = {
+            VariantPathSpec {.path = "date_*",
+                             .type = FieldType::OLAP_FIELD_TYPE_DATEV2,
+                             .nullable = true,
+                             .pattern_type = PatternTypePB::MATCH_NAME_GLOB,
+                             .array_item_type = {},
+                             .array_item_nullable = true},
+            VariantPathSpec {.path = "datetime_*",
+                             .type = FieldType::OLAP_FIELD_TYPE_DATETIMEV2,
+                             .nullable = true,
+                             .pattern_type = PatternTypePB::MATCH_NAME_GLOB,
+                             .array_item_type = {},
+                             .array_item_nullable = true},
+    };
+    return variant;
+}
+
+Field date_v2_field(int64_t yyyymmdd) {
+    DateV2Value<DateV2ValueType> value;
+    value.from_date_int64(yyyymmdd);
+    return Field::create_field<TYPE_DATEV2>(value);
+}
+
+Field datetime_v2_field(int64_t yyyymmddhhmmss) {
+    DateV2Value<DateTimeV2ValueType> value;
+    value.from_date_int64(yyyymmddhhmmss);
+    return Field::create_field<TYPE_DATETIMEV2>(value);
+}
+
+std::vector<std::string> split_int_variant_rows(size_t low_rows, int32_t 
low_value,
+                                                size_t high_rows, int32_t 
high_value) {
+    // Keep both ranges in one data source. The fixture flushes the rowset 
writer after
+    // each data source, so splitting low/high ranges into separate sources 
creates two
+    // segments and lets segment ZoneMap prune before page ZoneMap or 
BloomFilter can run.
+    std::vector<std::string> rows;
+    rows.reserve(low_rows + high_rows);
+    for (size_t i = 0; i < low_rows; ++i) {
+        rows.push_back(R"({"int_1": )" + std::to_string(low_value + 
page_prune_offset(i)) + "}");
+    }
+    for (size_t i = 0; i < high_rows; ++i) {
+        rows.push_back(R"({"int_1": )" +
+                       std::to_string(high_value + page_prune_offset(low_rows 
+ i)) + "}");
+    }
+    return rows;
+}
+
+std::vector<std::string> interleaved_int_variant_rows(size_t pairs, int32_t 
low_value,
+                                                      int32_t high_value) {
+    // BF assertions must keep both segment and page ZoneMaps matched. 
Alternating values keeps each
+    // page range wide enough that ZoneMap cannot prune before the BloomFilter 
reader is exercised.
+    std::vector<std::string> rows;
+    rows.reserve(pairs * 2);
+    for (size_t i = 0; i < pairs; ++i) {
+        rows.push_back(R"({"int_1": )" + std::to_string(low_value) + "}");
+        rows.push_back(R"({"int_1": )" + std::to_string(high_value) + "}");
+    }
+    return rows;
+}
+
+} // namespace
+
+class IndexStorageVariantFieldPatternIndexTest : public 
IndexStorageTestFixture {
+protected:
+    void SetUp() override {
+        IndexStorageTestFixture::SetUp();
+        _old_zone_map_row_num_threshold = config::zone_map_row_num_threshold;
+        // Keep zone maps available on small test pages when the compacted 
layout exposes them.
+        config::zone_map_row_num_threshold = 20;
+    }
+
+    void TearDown() override {
+        config::zone_map_row_num_threshold = _old_zone_map_row_num_threshold;
+        IndexStorageTestFixture::TearDown();
+    }
+
+    void run_typed_int_field_pattern_index_lifecycle(IndexCompactionKind 
compaction_kind,
+                                                     int64_t tablet_id);
+
+private:
+    int32_t _old_zone_map_row_num_threshold = 20;
+};
+
+void 
IndexStorageVariantFieldPatternIndexTest::run_typed_int_field_pattern_index_lifecycle(
+        IndexCompactionKind compaction_kind, int64_t tablet_id) {
+    const auto int_index = IndexSpec::field_pattern_index(kIntPatternIndexId, 
"idx_v_int_glob",
+                                                          kVariantUid, 
"int_*");
+    const auto string_index = IndexSpec::field_pattern_index(
+            kStringPatternIndexId, "idx_v_string_glob", kVariantUid, 
"string_*");
+    const auto index_case = 
IndexStorageCaseBuilder("typed_int_field_pattern_index_lifecycle")
+                                    .tablet_id(tablet_id)
+                                    
.variant_column(typed_pattern_variant_column())
+                                    .inverted_index(int_index)
+                                    .inverted_index(string_index)
+                                    .rowset(0, 
IndexDataSourceSpec::inline_variant(
+                                                       {R"({"int_1": 42, 
"string_1": "sample"})",
+                                                        R"({"int_1": 7, 
"string_1": "other"})"},
+                                                       0))
+                                    .rowset(1, 
IndexDataSourceSpec::inline_variant(
+                                                       {R"({"int_1": 42, 
"string_1": "sample"})",
+                                                        R"({"int_1": 8, 
"string_1": "other"})"},
+                                                       100))
+                                    .build();
+    ASSERT_TRUE(create_tablet(index_case.tablet_options).ok());
+    auto rowsets = write_rowsets(index_case.rowsets);
+    ASSERT_TRUE(rowsets.has_value()) << rowsets.error();
+
+    auto readable_rowsets = 
rowsets_with_variant_extended_schema(rowsets.value());
+    ASSERT_TRUE(readable_rowsets.has_value()) << readable_rowsets.error();
+    const int32_t path_column_id = column_id_by_path("v.int_1");
+    ASSERT_GE(path_column_id, 0) << dump_schema_paths(*tablet_schema());
+    const auto& path_column = tablet_schema()->column(path_column_id);
+
+    IndexReadOptions read_options;
+    read_options.return_columns = {0, static_cast<uint32_t>(path_column_id)};
+    read_options.target_cast_type_for_variants[path_column.name()] = 
nullable_int32_target_type();
+    read_options.predicates.push_back(int_equals(path_column_id, 
path_column.name(), 42));
+
+    auto before_compaction = read_rowsets(readable_rowsets.value(), 
read_options);
+    ASSERT_TRUE(before_compaction.has_value()) << before_compaction.error();
+    EXPECT_EQ(before_compaction->rows_read, 2);
+    expect_index_filter_stats(before_compaction.value(), 2);
+
+    IndexReadOptions range_read_options;
+    range_read_options.return_columns = {0, 
static_cast<uint32_t>(path_column_id)};
+    range_read_options.target_cast_type_for_variants[path_column.name()] =
+            nullable_int32_target_type();
+    range_read_options.predicates.push_back(int_greater(path_column_id, 
path_column.name(), 10));
+
+    auto range_before_compaction = read_rowsets(readable_rowsets.value(), 
range_read_options);
+    ASSERT_TRUE(range_before_compaction.has_value()) << 
range_before_compaction.error();
+    EXPECT_EQ(range_before_compaction->rows_read, 2);
+    expect_index_filter_stats(range_before_compaction.value(), 2);
+
+    auto compacted = compact_rowsets(compaction_kind, rowsets.value());
+    ASSERT_TRUE(compacted.has_value()) << compacted.error();
+    ASSERT_NE(compacted.value(), nullptr);
+    EXPECT_EQ(compacted.value()->num_rows(), 4);
+
+    auto reloaded = reload_rowsets({compacted.value()});
+    ASSERT_TRUE(reloaded.has_value()) << reloaded.error();
+    auto readable_compacted = 
rowsets_with_variant_extended_schema(reloaded.value());
+    ASSERT_TRUE(readable_compacted.has_value()) << readable_compacted.error();
+    const int32_t compacted_path_column_id = column_id_by_path("v.int_1");
+    ASSERT_EQ(compacted_path_column_id, path_column_id);
+
+    auto after_compaction = read_rowsets(readable_compacted.value(), 
read_options);
+    ASSERT_TRUE(after_compaction.has_value()) << after_compaction.error();
+    EXPECT_EQ(after_compaction->rows_read, 2);
+    expect_index_filter_stats(after_compaction.value(), 2);
+
+    auto range_after_compaction = read_rowsets(readable_compacted.value(), 
range_read_options);
+    ASSERT_TRUE(range_after_compaction.has_value()) << 
range_after_compaction.error();
+    EXPECT_EQ(range_after_compaction->rows_read, 2);
+    expect_index_filter_stats(range_after_compaction.value(), 2);
+}
+
+TEST_F(IndexStorageVariantFieldPatternIndexTest, 
TypedIntIndexAfterCumulativeCompaction) {
+    
run_typed_int_field_pattern_index_lifecycle(IndexCompactionKind::CUMULATIVE, 
110032);
+}
+
+TEST_F(IndexStorageVariantFieldPatternIndexTest, 
TypedIntIndexAfterFullCompaction) {
+    run_typed_int_field_pattern_index_lifecycle(IndexCompactionKind::FULL, 
110033);
+}
+
+// Non-INT typed Variant field-pattern indexes should filter rows for each 
physical storage type.
+TEST_F(IndexStorageVariantFieldPatternIndexTest, 
BigIntDoubleAndBoolFieldPatternIndexesFilterRows) {
+    const auto index_case =
+            
IndexStorageCaseBuilder("variant_multi_typed_field_pattern_index_matrix")
+                    .tablet_id(110040)
+                    .variant_column(multi_typed_pattern_variant_column())
+                    .inverted_index(IndexSpec::field_pattern_index(
+                            kBigIntPatternIndexId, "idx_v_big_glob", 
kVariantUid, "big_*"))
+                    .inverted_index(IndexSpec::field_pattern_index(
+                            kDoublePatternIndexId, "idx_v_double_glob", 
kVariantUid, "double_*"))
+                    .inverted_index(IndexSpec::field_pattern_index(
+                            kBoolPatternIndexId, "idx_v_bool_glob", 
kVariantUid, "bool_*"))
+                    // Use distinct hit counts per path so filter stats 
validate the selected
+                    // field-pattern index rather than only proving that some 
index was applied.
+                    .rowset(0,
+                            IndexDataSourceSpec::inline_variant(
+                                    {R"({"big_1": 9000000000, "double_1": 3.5, 
"bool_1": true})",
+                                     R"({"big_1": 7, "double_1": 1.25, 
"bool_1": false})",
+                                     R"({"big_1": 9000000000, "double_1": 7.5, 
"bool_1": false})",
+                                     R"({"big_1": 8, "double_1": 8.5, 
"bool_1": true})",
+                                     R"({"big_1": 9, "double_1": 9.5, 
"bool_1": true})"},
+                                    0))
+                    .build();
+    ASSERT_TRUE(create_tablet(index_case.tablet_options).ok());
+    auto rowsets = write_rowsets(index_case.rowsets);
+    ASSERT_TRUE(rowsets.has_value()) << rowsets.error();
+
+    auto readable_rowsets = 
rowsets_with_variant_extended_schema(rowsets.value());
+    ASSERT_TRUE(readable_rowsets.has_value()) << readable_rowsets.error();
+
+    auto read_and_verify = [&](std::string_view path, FieldType 
expected_storage_type,
+                               DataTypePtr data_type, Field value, int64_t 
expected_rows,
+                               int64_t expected_filtered_rows) {
+        const int32_t path_column_id = column_id_by_path("v." + 
std::string(path));
+        ASSERT_GE(path_column_id, 0) << dump_schema_paths(*tablet_schema());
+        const auto& path_column = tablet_schema()->column(path_column_id);
+        EXPECT_EQ(path_column.type(), expected_storage_type)
+                << "unexpected storage type for " << path_column.name();
+        EXPECT_EQ(data_type->get_storage_field_type(), expected_storage_type)
+                << "unexpected predicate type for " << path_column.name();
+
+        IndexReadOptions read_options;
+        read_options.return_columns = {0, 
static_cast<uint32_t>(path_column_id)};
+        read_options.target_cast_type_for_variants[path_column.name()] =
+                nullable_target_type(data_type);
+        disable_bkd_skip_for_filter_stats_assertions(&read_options);
+        read_options.predicates.push_back(
+                typed_equals(path_column_id, path_column.name(), data_type, 
value));
+
+        auto read_result = read_rowsets(readable_rowsets.value(), 
read_options);
+        ASSERT_TRUE(read_result.has_value()) << read_result.error();
+        EXPECT_EQ(read_result->rows_read, expected_rows);
+        expect_index_filter_stats(read_result.value(), expected_filtered_rows);
+    };
+
+    read_and_verify("big_1", FieldType::OLAP_FIELD_TYPE_BIGINT, 
std::make_shared<DataTypeInt64>(),
+                    Field::create_field<TYPE_BIGINT>(Int64(9000000000LL)), 2, 
3);
+    read_and_verify("double_1", FieldType::OLAP_FIELD_TYPE_DOUBLE,
+                    std::make_shared<DataTypeFloat64>(),
+                    Field::create_field<TYPE_DOUBLE>(Float64(3.5)), 1, 4);
+    read_and_verify("bool_1", FieldType::OLAP_FIELD_TYPE_BOOL, 
std::make_shared<DataTypeBool>(),
+                    Field::create_field<TYPE_BOOLEAN>(UInt8(1)), 3, 2);
+}
+
+// DATEV2/DATETIMEV2 Variant field-pattern indexes should filter rows for 
matching paths.
+TEST_F(IndexStorageVariantFieldPatternIndexTest, 
DateAndDateTimeFieldPatternIndexesFilterRows) {
+    const auto index_case =
+            
IndexStorageCaseBuilder("variant_date_time_field_pattern_index_matrix")
+                    .tablet_id(110044)
+                    .variant_column(date_time_pattern_variant_column())
+                    .inverted_index(IndexSpec::field_pattern_index(
+                            kDatePatternIndexId, "idx_v_date_glob", 
kVariantUid, "date_*"))
+                    
.inverted_index(IndexSpec::field_pattern_index(kDateTimePatternIndexId,
+                                                                   
"idx_v_datetime_glob",
+                                                                   
kVariantUid, "datetime_*"))
+                    .rowset(0,
+                            IndexDataSourceSpec::inline_variant(
+                                    {R"({"date_1": "2024-01-02", "datetime_1": 
"2024-01-02 03:04:05"})",
+                                     R"({"date_1": "2024-01-03", "datetime_1": 
"2024-01-03 03:04:05"})",
+                                     R"({"date_1": "2024-01-02", "datetime_1": 
"2024-01-04 03:04:05"})",
+                                     R"({"date_1": "2024-01-04", "datetime_1": 
"2024-01-05 03:04:05"})"},
+                                    0))
+                    .build();
+    ASSERT_TRUE(create_tablet(index_case.tablet_options).ok());
+    auto rowsets = write_rowsets(index_case.rowsets);
+    ASSERT_TRUE(rowsets.has_value()) << rowsets.error();
+
+    auto readable_rowsets = 
rowsets_with_variant_extended_schema(rowsets.value());
+    ASSERT_TRUE(readable_rowsets.has_value()) << readable_rowsets.error();
+
+    auto read_and_verify = [&](std::string_view path, DataTypePtr data_type, 
Field value,
+                               int64_t expected_rows, int64_t 
expected_filtered_rows) {
+        const int32_t path_column_id = column_id_by_path("v." + 
std::string(path));
+        ASSERT_GE(path_column_id, 0) << dump_schema_paths(*tablet_schema());
+        const auto& path_column = tablet_schema()->column(path_column_id);
+
+        IndexReadOptions read_options;
+        read_options.return_columns = {0, 
static_cast<uint32_t>(path_column_id)};
+        read_options.target_cast_type_for_variants[path_column.name()] =
+                nullable_target_type(data_type);
+        disable_bkd_skip_for_filter_stats_assertions(&read_options);
+        read_options.predicates.push_back(
+                typed_equals(path_column_id, path_column.name(), data_type, 
value));
+
+        auto read_result = read_rowsets(readable_rowsets.value(), 
read_options);
+        ASSERT_TRUE(read_result.has_value()) << read_result.error();
+        EXPECT_EQ(read_result->rows_read, expected_rows);
+        expect_index_filter_stats(read_result.value(), expected_filtered_rows);
+    };
+
+    read_and_verify("date_1", std::make_shared<DataTypeDateV2>(), 
date_v2_field(20240102), 2, 2);
+    read_and_verify("datetime_1", std::make_shared<DataTypeDateTimeV2>(),
+                    datetime_v2_field(20240102030405), 1, 3);
+}
+
+TEST_F(IndexStorageVariantFieldPatternIndexTest, 
TypedVariantPathSegmentZoneMapPrunesWholeSegment) {
+    const auto index_case = 
IndexStorageCaseBuilder("variant_typed_path_segment_zone_map_prune")
+                                    .tablet_id(110041)
+                                    
.variant_column(typed_pattern_variant_column())
+                                    .rowset(0,
+                                            
IndexDataSourceSpec::inline_variant(
+                                                    {R"({"int_1": 1})", 
R"({"int_1": 2})",
+                                                     R"({"int_1": 100})", 
R"({"int_1": 101})"},
+                                                    0),
+                                            2)
+                                    .build();
+    ASSERT_TRUE(create_tablet(index_case.tablet_options).ok());
+    auto rowsets = write_rowsets(index_case.rowsets);
+    ASSERT_TRUE(rowsets.has_value()) << rowsets.error();
+
+    auto readable_rowsets = 
rowsets_with_variant_extended_schema(rowsets.value());
+    ASSERT_TRUE(readable_rowsets.has_value()) << readable_rowsets.error();
+    const int32_t path_column_id = column_id_by_path("v.int_1");
+    ASSERT_GE(path_column_id, 0) << dump_schema_paths(*tablet_schema());
+    const auto& path_column = tablet_schema()->column(path_column_id);
+
+    IndexReadOptions read_options;
+    read_options.enable_inverted_index_query = false;
+    read_options.return_columns = {0, static_cast<uint32_t>(path_column_id)};
+    read_options.target_cast_type_for_variants[path_column.name()] = 
nullable_int32_target_type();
+    read_options.predicates.push_back(int_greater(path_column_id, 
path_column.name(), 50));
+
+    auto read_result = read_rowsets(readable_rowsets.value(), read_options);
+    ASSERT_TRUE(read_result.has_value()) << read_result.error();
+    EXPECT_EQ(read_result->rows_read, 2);
+    expect_raw_rows_read(read_result.value(), 2);
+    expect_segment_pruned(read_result.value(), 1);
+    expect_index_filter_stats(read_result.value(), 0);
+}
+
+TEST_F(IndexStorageVariantFieldPatternIndexTest, 
TypedVariantPathPageZoneMapPrunesWithinSegment) {
+    constexpr size_t kLowRows = 2048;
+    constexpr size_t kHighRows = 2048;
+    constexpr int32_t kLowValueBase = 1;
+    constexpr int32_t kHighValueBase = 10000000;
+    constexpr int32_t kPagePruneThreshold = 1000000;
+    auto variant = typed_pattern_variant_column();
+
+    IndexTabletOptions options;
+    options.tablet_id = 110045;
+    options.storage_page_size = 4096;
+    options.variant_columns.push_back(std::move(variant));
+
+    IndexRowsetSpec rowset;
+    rowset.version = 0;
+    rowset.max_rows_per_segment = static_cast<int64_t>(kLowRows + kHighRows);
+    rowset.data_sources.push_back(IndexDataSourceSpec::inline_variant(
+            split_int_variant_rows(kLowRows, kLowValueBase, kHighRows, 
kHighValueBase), 0));
+
+    ASSERT_TRUE(create_tablet(options).ok());
+    auto rowsets = write_rowsets({rowset});
+    ASSERT_TRUE(rowsets.has_value()) << rowsets.error();
+
+    // Use dispersed low/high ranges so page-level pruning does not depend on 
one exact page
+    // boundary or compression layout.
+    auto compacted = compact_rowsets(IndexCompactionKind::CUMULATIVE, 
rowsets.value());
+    ASSERT_TRUE(compacted.has_value()) << compacted.error();
+    ASSERT_NE(compacted.value(), nullptr);
+    auto reloaded = reload_rowsets({compacted.value()});
+    ASSERT_TRUE(reloaded.has_value()) << reloaded.error();
+
+    auto readable_rowsets = 
rowsets_with_variant_extended_schema(reloaded.value());
+    ASSERT_TRUE(readable_rowsets.has_value()) << readable_rowsets.error();
+    const int32_t path_column_id = column_id_by_path("v.int_1");
+    ASSERT_GE(path_column_id, 0) << dump_schema_paths(*tablet_schema());
+    const auto& path_column = tablet_schema()->column(path_column_id);
+
+    IndexReadOptions read_options;
+    read_options.enable_inverted_index_query = false;
+    read_options.return_columns = {0, static_cast<uint32_t>(path_column_id)};
+    read_options.target_cast_type_for_variants[path_column.name()] = 
nullable_int32_target_type();
+    read_options.predicates.push_back(
+            int_greater(path_column_id, path_column.name(), 
kPagePruneThreshold));
+
+    auto read_result = read_rowsets(readable_rowsets.value(), read_options);
+    ASSERT_TRUE(read_result.has_value()) << read_result.error();
+    EXPECT_EQ(read_result->rows_read, kHighRows);
+    expect_segment_pruned(read_result.value(), 0);
+    const auto total_rows = static_cast<int64_t>(kLowRows + kHighRows);
+    const auto high_rows = static_cast<int64_t>(kHighRows);
+    // Compaction can produce a layout where page ZoneMap pruning is not 
triggered. Keep the result
+    // contract strict and validate stats only when raw_rows_read proves page 
pruning actually
+    // happened.
+    expect_optional_page_zone_map_filter_stats(read_result.value(), high_rows, 
total_rows,
+                                               static_cast<int64_t>(kLowRows));
+    expect_index_filter_stats(read_result.value(), 0);
+}
+
+TEST_F(IndexStorageVariantFieldPatternIndexTest,
+       DisabledInvertedIndexQueryDoesNotFilterVariantFieldPatternIndex) {
+    const auto int_index = IndexSpec::field_pattern_index(kIntPatternIndexId, 
"idx_v_int_glob",
+                                                          kVariantUid, 
"int_*");
+    const auto index_case =
+            
IndexStorageCaseBuilder("variant_disabled_inverted_index_query_reverse_case")
+                    .tablet_id(110046)
+                    .variant_column(typed_pattern_variant_column())
+                    .inverted_index(int_index)
+                    .rowset(0, IndexDataSourceSpec::inline_variant(
+                                       {R"({"int_1": 42})", R"({"int_1": 7})", 
R"({"int_1": 42})",
+                                        R"({"int_1": 8})"},
+                                       0))
+                    .build();
+    ASSERT_TRUE(create_tablet(index_case.tablet_options).ok());
+    auto rowsets = write_rowsets(index_case.rowsets);
+    ASSERT_TRUE(rowsets.has_value()) << rowsets.error();
+
+    auto readable_rowsets = 
rowsets_with_variant_extended_schema(rowsets.value());
+    ASSERT_TRUE(readable_rowsets.has_value()) << readable_rowsets.error();
+    const int32_t path_column_id = column_id_by_path("v.int_1");
+    ASSERT_GE(path_column_id, 0) << dump_schema_paths(*tablet_schema());
+    const auto& path_column = tablet_schema()->column(path_column_id);
+
+    IndexReadOptions read_options;
+    read_options.enable_inverted_index_query = false;
+    read_options.return_columns = {0, static_cast<uint32_t>(path_column_id)};
+    read_options.target_cast_type_for_variants[path_column.name()] = 
nullable_int32_target_type();
+    read_options.predicates.push_back(int_equals(path_column_id, 
path_column.name(), 42));
+
+    auto read_result = read_rowsets(readable_rowsets.value(), read_options);
+    ASSERT_TRUE(read_result.has_value()) << read_result.error();
+    EXPECT_EQ(read_result->rows_read, 2);
+    expect_index_filter_stats(read_result.value(), 0);
+    expect_index_filter_stats(read_result.value(), 0);

Review Comment:
   Duplicate assertion: `expect_index_filter_stats(read_result.value(), 0);` is 
called twice back-to-back, which adds noise and makes failures harder to 
interpret.



##########
be/test/storage/variant/index_storage_variant_field_pattern_index_test.cpp:
##########
@@ -0,0 +1,796 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <string_view>
+#include <vector>
+
+#include "common/config.h"
+#include "core/data_type/data_type_date_or_datetime_v2.h"
+#include "core/data_type/data_type_nullable.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
+#include "core/value/vdatetime_value.h"
+#include "storage/predicate/predicate_creator.h"
+#include "testutil/index_storage_test_util.h"
+
+namespace doris::index_storage_test {
+namespace {
+
+constexpr int32_t kVariantUid = 2;
+constexpr int64_t kIntPatternIndexId = 210201;
+constexpr int64_t kStringPatternIndexId = 210202;
+constexpr int64_t kBigIntPatternIndexId = 210203;
+constexpr int64_t kDoublePatternIndexId = 210204;
+constexpr int64_t kBoolPatternIndexId = 210205;
+constexpr int64_t kDatePatternIndexId = 210206;
+constexpr int64_t kDateTimePatternIndexId = 210207;
+constexpr int32_t kPagePruneLowValueBound = 900000;
+
+int32_t page_prune_offset(size_t row) {
+    uint64_t mixed = static_cast<uint64_t>(row + 1) * 11400714819323198485ull;
+    mixed ^= mixed >> 33;
+    return static_cast<int32_t>(mixed % 
static_cast<uint64_t>(kPagePruneLowValueBound));
+}
+
+std::shared_ptr<ColumnPredicate> typed_equals(int32_t column_id, std::string 
column_name,
+                                              DataTypePtr data_type, Field 
value) {
+    return create_comparison_predicate<PredicateType::EQ>(column_id, 
std::move(column_name),
+                                                          
std::move(data_type), value, false);
+}
+
+std::shared_ptr<ColumnPredicate> int_equals(int32_t column_id, std::string 
column_name,
+                                            int32_t value) {
+    return create_comparison_predicate<PredicateType::EQ>(
+            column_id, std::move(column_name), 
std::make_shared<DataTypeInt32>(),
+            Field::create_field<TYPE_INT>(value), false);
+}
+
+std::shared_ptr<ColumnPredicate> int_greater(int32_t column_id, std::string 
column_name,
+                                             int32_t value) {
+    return create_comparison_predicate<PredicateType::GT>(
+            column_id, std::move(column_name), 
std::make_shared<DataTypeInt32>(),
+            Field::create_field<TYPE_INT>(value), false);
+}
+
+void expect_optional_page_zone_map_filter_stats(const IndexReadResult& result,
+                                                int64_t selected_rows, int64_t 
total_rows,
+                                                int64_t max_filtered_rows) {
+    EXPECT_GE(result.stats.raw_rows_read, selected_rows);
+    EXPECT_LE(result.stats.raw_rows_read, total_rows);
+    EXPECT_GE(result.stats.rows_stats_filtered, 0);
+    EXPECT_LE(result.stats.rows_stats_filtered, max_filtered_rows);
+    if (result.stats.raw_rows_read < total_rows) {
+        EXPECT_GT(result.stats.rows_stats_filtered, 0);
+    }
+}
+
+DataTypePtr nullable_target_type(const DataTypePtr& type) {
+    return make_nullable(type);
+}
+
+DataTypePtr nullable_int32_target_type() {
+    return make_nullable(std::make_shared<DataTypeInt32>());
+}
+
+DataTypePtr nullable_int64_target_type() {
+    return make_nullable(std::make_shared<DataTypeInt64>());
+}
+
+void disable_bkd_skip_for_filter_stats_assertions(IndexReadOptions* 
read_options) {
+    // Tiny three-row segments can hit the default 50% BKD skip threshold 
before the field-pattern
+    // index contributes rows_inverted_index_filtered.
+    read_options->inverted_index_skip_threshold = 100;
+}
+
+VariantColumnSpec typed_pattern_variant_column() {
+    VariantColumnSpec variant;
+    variant.unique_id = kVariantUid;
+    variant.name = "v";
+    variant.max_subcolumns_count = 2;
+    variant.predefined_paths = {
+            VariantPathSpec {.path = "int_*",
+                             .type = FieldType::OLAP_FIELD_TYPE_INT,
+                             .nullable = true,
+                             .pattern_type = PatternTypePB::MATCH_NAME_GLOB,
+                             .array_item_type = {},
+                             .array_item_nullable = true},
+            VariantPathSpec {.path = "string_*",
+                             .type = FieldType::OLAP_FIELD_TYPE_STRING,
+                             .nullable = true,
+                             .pattern_type = PatternTypePB::MATCH_NAME_GLOB,
+                             .array_item_type = {},
+                             .array_item_nullable = true},
+    };
+    return variant;
+}
+
+VariantColumnSpec multi_typed_pattern_variant_column() {
+    VariantColumnSpec variant;
+    variant.unique_id = kVariantUid;
+    variant.name = "v";
+    variant.max_subcolumns_count = 6;
+    variant.predefined_paths = {
+            VariantPathSpec {.path = "big_*",
+                             .type = FieldType::OLAP_FIELD_TYPE_BIGINT,
+                             .nullable = true,
+                             .pattern_type = PatternTypePB::MATCH_NAME_GLOB,
+                             .array_item_type = {},
+                             .array_item_nullable = true},
+            VariantPathSpec {.path = "double_*",
+                             .type = FieldType::OLAP_FIELD_TYPE_DOUBLE,
+                             .nullable = true,
+                             .pattern_type = PatternTypePB::MATCH_NAME_GLOB,
+                             .array_item_type = {},
+                             .array_item_nullable = true},
+            VariantPathSpec {.path = "bool_*",
+                             .type = FieldType::OLAP_FIELD_TYPE_BOOL,
+                             .nullable = true,
+                             .pattern_type = PatternTypePB::MATCH_NAME_GLOB,
+                             .array_item_type = {},
+                             .array_item_nullable = true},
+    };
+    return variant;
+}
+
+VariantColumnSpec date_time_pattern_variant_column() {
+    VariantColumnSpec variant;
+    variant.unique_id = kVariantUid;
+    variant.name = "v";
+    variant.max_subcolumns_count = 4;
+    variant.predefined_paths = {
+            VariantPathSpec {.path = "date_*",
+                             .type = FieldType::OLAP_FIELD_TYPE_DATEV2,
+                             .nullable = true,
+                             .pattern_type = PatternTypePB::MATCH_NAME_GLOB,
+                             .array_item_type = {},
+                             .array_item_nullable = true},
+            VariantPathSpec {.path = "datetime_*",
+                             .type = FieldType::OLAP_FIELD_TYPE_DATETIMEV2,
+                             .nullable = true,
+                             .pattern_type = PatternTypePB::MATCH_NAME_GLOB,
+                             .array_item_type = {},
+                             .array_item_nullable = true},
+    };
+    return variant;
+}
+
+Field date_v2_field(int64_t yyyymmdd) {
+    DateV2Value<DateV2ValueType> value;
+    value.from_date_int64(yyyymmdd);
+    return Field::create_field<TYPE_DATEV2>(value);
+}
+
+Field datetime_v2_field(int64_t yyyymmddhhmmss) {
+    DateV2Value<DateTimeV2ValueType> value;
+    value.from_date_int64(yyyymmddhhmmss);
+    return Field::create_field<TYPE_DATETIMEV2>(value);
+}
+
+std::vector<std::string> split_int_variant_rows(size_t low_rows, int32_t 
low_value,
+                                                size_t high_rows, int32_t 
high_value) {
+    // Keep both ranges in one data source. The fixture flushes the rowset 
writer after
+    // each data source, so splitting low/high ranges into separate sources 
creates two
+    // segments and lets segment ZoneMap prune before page ZoneMap or 
BloomFilter can run.
+    std::vector<std::string> rows;
+    rows.reserve(low_rows + high_rows);
+    for (size_t i = 0; i < low_rows; ++i) {
+        rows.push_back(R"({"int_1": )" + std::to_string(low_value + 
page_prune_offset(i)) + "}");
+    }
+    for (size_t i = 0; i < high_rows; ++i) {
+        rows.push_back(R"({"int_1": )" +
+                       std::to_string(high_value + page_prune_offset(low_rows 
+ i)) + "}");
+    }
+    return rows;
+}
+
+std::vector<std::string> interleaved_int_variant_rows(size_t pairs, int32_t 
low_value,
+                                                      int32_t high_value) {
+    // BF assertions must keep both segment and page ZoneMaps matched. 
Alternating values keeps each
+    // page range wide enough that ZoneMap cannot prune before the BloomFilter 
reader is exercised.
+    std::vector<std::string> rows;
+    rows.reserve(pairs * 2);
+    for (size_t i = 0; i < pairs; ++i) {
+        rows.push_back(R"({"int_1": )" + std::to_string(low_value) + "}");
+        rows.push_back(R"({"int_1": )" + std::to_string(high_value) + "}");
+    }
+    return rows;
+}
+
+} // namespace
+
+class IndexStorageVariantFieldPatternIndexTest : public 
IndexStorageTestFixture {
+protected:
+    void SetUp() override {
+        IndexStorageTestFixture::SetUp();
+        _old_zone_map_row_num_threshold = config::zone_map_row_num_threshold;
+        // Keep zone maps available on small test pages when the compacted 
layout exposes them.
+        config::zone_map_row_num_threshold = 20;
+    }
+
+    void TearDown() override {
+        config::zone_map_row_num_threshold = _old_zone_map_row_num_threshold;
+        IndexStorageTestFixture::TearDown();
+    }
+
+    void run_typed_int_field_pattern_index_lifecycle(IndexCompactionKind 
compaction_kind,
+                                                     int64_t tablet_id);
+
+private:
+    int32_t _old_zone_map_row_num_threshold = 20;
+};
+
+void 
IndexStorageVariantFieldPatternIndexTest::run_typed_int_field_pattern_index_lifecycle(
+        IndexCompactionKind compaction_kind, int64_t tablet_id) {
+    const auto int_index = IndexSpec::field_pattern_index(kIntPatternIndexId, 
"idx_v_int_glob",
+                                                          kVariantUid, 
"int_*");
+    const auto string_index = IndexSpec::field_pattern_index(
+            kStringPatternIndexId, "idx_v_string_glob", kVariantUid, 
"string_*");
+    const auto index_case = 
IndexStorageCaseBuilder("typed_int_field_pattern_index_lifecycle")
+                                    .tablet_id(tablet_id)
+                                    
.variant_column(typed_pattern_variant_column())
+                                    .inverted_index(int_index)
+                                    .inverted_index(string_index)
+                                    .rowset(0, 
IndexDataSourceSpec::inline_variant(
+                                                       {R"({"int_1": 42, 
"string_1": "sample"})",
+                                                        R"({"int_1": 7, 
"string_1": "other"})"},
+                                                       0))
+                                    .rowset(1, 
IndexDataSourceSpec::inline_variant(
+                                                       {R"({"int_1": 42, 
"string_1": "sample"})",
+                                                        R"({"int_1": 8, 
"string_1": "other"})"},
+                                                       100))
+                                    .build();
+    ASSERT_TRUE(create_tablet(index_case.tablet_options).ok());
+    auto rowsets = write_rowsets(index_case.rowsets);
+    ASSERT_TRUE(rowsets.has_value()) << rowsets.error();
+
+    auto readable_rowsets = 
rowsets_with_variant_extended_schema(rowsets.value());
+    ASSERT_TRUE(readable_rowsets.has_value()) << readable_rowsets.error();
+    const int32_t path_column_id = column_id_by_path("v.int_1");
+    ASSERT_GE(path_column_id, 0) << dump_schema_paths(*tablet_schema());
+    const auto& path_column = tablet_schema()->column(path_column_id);
+
+    IndexReadOptions read_options;
+    read_options.return_columns = {0, static_cast<uint32_t>(path_column_id)};
+    read_options.target_cast_type_for_variants[path_column.name()] = 
nullable_int32_target_type();
+    read_options.predicates.push_back(int_equals(path_column_id, 
path_column.name(), 42));
+
+    auto before_compaction = read_rowsets(readable_rowsets.value(), 
read_options);
+    ASSERT_TRUE(before_compaction.has_value()) << before_compaction.error();
+    EXPECT_EQ(before_compaction->rows_read, 2);
+    expect_index_filter_stats(before_compaction.value(), 2);
+
+    IndexReadOptions range_read_options;
+    range_read_options.return_columns = {0, 
static_cast<uint32_t>(path_column_id)};
+    range_read_options.target_cast_type_for_variants[path_column.name()] =
+            nullable_int32_target_type();
+    range_read_options.predicates.push_back(int_greater(path_column_id, 
path_column.name(), 10));
+
+    auto range_before_compaction = read_rowsets(readable_rowsets.value(), 
range_read_options);
+    ASSERT_TRUE(range_before_compaction.has_value()) << 
range_before_compaction.error();
+    EXPECT_EQ(range_before_compaction->rows_read, 2);
+    expect_index_filter_stats(range_before_compaction.value(), 2);
+
+    auto compacted = compact_rowsets(compaction_kind, rowsets.value());
+    ASSERT_TRUE(compacted.has_value()) << compacted.error();
+    ASSERT_NE(compacted.value(), nullptr);
+    EXPECT_EQ(compacted.value()->num_rows(), 4);
+
+    auto reloaded = reload_rowsets({compacted.value()});
+    ASSERT_TRUE(reloaded.has_value()) << reloaded.error();
+    auto readable_compacted = 
rowsets_with_variant_extended_schema(reloaded.value());
+    ASSERT_TRUE(readable_compacted.has_value()) << readable_compacted.error();
+    const int32_t compacted_path_column_id = column_id_by_path("v.int_1");
+    ASSERT_EQ(compacted_path_column_id, path_column_id);
+
+    auto after_compaction = read_rowsets(readable_compacted.value(), 
read_options);
+    ASSERT_TRUE(after_compaction.has_value()) << after_compaction.error();
+    EXPECT_EQ(after_compaction->rows_read, 2);
+    expect_index_filter_stats(after_compaction.value(), 2);
+
+    auto range_after_compaction = read_rowsets(readable_compacted.value(), 
range_read_options);
+    ASSERT_TRUE(range_after_compaction.has_value()) << 
range_after_compaction.error();
+    EXPECT_EQ(range_after_compaction->rows_read, 2);
+    expect_index_filter_stats(range_after_compaction.value(), 2);
+}
+
+TEST_F(IndexStorageVariantFieldPatternIndexTest, 
TypedIntIndexAfterCumulativeCompaction) {
+    
run_typed_int_field_pattern_index_lifecycle(IndexCompactionKind::CUMULATIVE, 
110032);
+}
+
+TEST_F(IndexStorageVariantFieldPatternIndexTest, 
TypedIntIndexAfterFullCompaction) {
+    run_typed_int_field_pattern_index_lifecycle(IndexCompactionKind::FULL, 
110033);
+}
+
+// Non-INT typed Variant field-pattern indexes should filter rows for each 
physical storage type.
+TEST_F(IndexStorageVariantFieldPatternIndexTest, 
BigIntDoubleAndBoolFieldPatternIndexesFilterRows) {
+    const auto index_case =
+            
IndexStorageCaseBuilder("variant_multi_typed_field_pattern_index_matrix")
+                    .tablet_id(110040)
+                    .variant_column(multi_typed_pattern_variant_column())
+                    .inverted_index(IndexSpec::field_pattern_index(
+                            kBigIntPatternIndexId, "idx_v_big_glob", 
kVariantUid, "big_*"))
+                    .inverted_index(IndexSpec::field_pattern_index(
+                            kDoublePatternIndexId, "idx_v_double_glob", 
kVariantUid, "double_*"))
+                    .inverted_index(IndexSpec::field_pattern_index(
+                            kBoolPatternIndexId, "idx_v_bool_glob", 
kVariantUid, "bool_*"))
+                    // Use distinct hit counts per path so filter stats 
validate the selected
+                    // field-pattern index rather than only proving that some 
index was applied.
+                    .rowset(0,
+                            IndexDataSourceSpec::inline_variant(
+                                    {R"({"big_1": 9000000000, "double_1": 3.5, 
"bool_1": true})",
+                                     R"({"big_1": 7, "double_1": 1.25, 
"bool_1": false})",
+                                     R"({"big_1": 9000000000, "double_1": 7.5, 
"bool_1": false})",
+                                     R"({"big_1": 8, "double_1": 8.5, 
"bool_1": true})",
+                                     R"({"big_1": 9, "double_1": 9.5, 
"bool_1": true})"},
+                                    0))
+                    .build();
+    ASSERT_TRUE(create_tablet(index_case.tablet_options).ok());
+    auto rowsets = write_rowsets(index_case.rowsets);
+    ASSERT_TRUE(rowsets.has_value()) << rowsets.error();
+
+    auto readable_rowsets = 
rowsets_with_variant_extended_schema(rowsets.value());
+    ASSERT_TRUE(readable_rowsets.has_value()) << readable_rowsets.error();
+
+    auto read_and_verify = [&](std::string_view path, FieldType 
expected_storage_type,
+                               DataTypePtr data_type, Field value, int64_t 
expected_rows,
+                               int64_t expected_filtered_rows) {
+        const int32_t path_column_id = column_id_by_path("v." + 
std::string(path));
+        ASSERT_GE(path_column_id, 0) << dump_schema_paths(*tablet_schema());
+        const auto& path_column = tablet_schema()->column(path_column_id);
+        EXPECT_EQ(path_column.type(), expected_storage_type)
+                << "unexpected storage type for " << path_column.name();
+        EXPECT_EQ(data_type->get_storage_field_type(), expected_storage_type)
+                << "unexpected predicate type for " << path_column.name();
+
+        IndexReadOptions read_options;
+        read_options.return_columns = {0, 
static_cast<uint32_t>(path_column_id)};
+        read_options.target_cast_type_for_variants[path_column.name()] =
+                nullable_target_type(data_type);
+        disable_bkd_skip_for_filter_stats_assertions(&read_options);
+        read_options.predicates.push_back(
+                typed_equals(path_column_id, path_column.name(), data_type, 
value));
+
+        auto read_result = read_rowsets(readable_rowsets.value(), 
read_options);
+        ASSERT_TRUE(read_result.has_value()) << read_result.error();
+        EXPECT_EQ(read_result->rows_read, expected_rows);
+        expect_index_filter_stats(read_result.value(), expected_filtered_rows);
+    };
+
+    read_and_verify("big_1", FieldType::OLAP_FIELD_TYPE_BIGINT, 
std::make_shared<DataTypeInt64>(),
+                    Field::create_field<TYPE_BIGINT>(Int64(9000000000LL)), 2, 
3);
+    read_and_verify("double_1", FieldType::OLAP_FIELD_TYPE_DOUBLE,
+                    std::make_shared<DataTypeFloat64>(),
+                    Field::create_field<TYPE_DOUBLE>(Float64(3.5)), 1, 4);
+    read_and_verify("bool_1", FieldType::OLAP_FIELD_TYPE_BOOL, 
std::make_shared<DataTypeBool>(),
+                    Field::create_field<TYPE_BOOLEAN>(UInt8(1)), 3, 2);
+}
+
+// DATEV2/DATETIMEV2 Variant field-pattern indexes should filter rows for 
matching paths.
+TEST_F(IndexStorageVariantFieldPatternIndexTest, 
DateAndDateTimeFieldPatternIndexesFilterRows) {
+    const auto index_case =
+            
IndexStorageCaseBuilder("variant_date_time_field_pattern_index_matrix")
+                    .tablet_id(110044)
+                    .variant_column(date_time_pattern_variant_column())
+                    .inverted_index(IndexSpec::field_pattern_index(
+                            kDatePatternIndexId, "idx_v_date_glob", 
kVariantUid, "date_*"))
+                    
.inverted_index(IndexSpec::field_pattern_index(kDateTimePatternIndexId,
+                                                                   
"idx_v_datetime_glob",
+                                                                   
kVariantUid, "datetime_*"))
+                    .rowset(0,
+                            IndexDataSourceSpec::inline_variant(
+                                    {R"({"date_1": "2024-01-02", "datetime_1": 
"2024-01-02 03:04:05"})",
+                                     R"({"date_1": "2024-01-03", "datetime_1": 
"2024-01-03 03:04:05"})",
+                                     R"({"date_1": "2024-01-02", "datetime_1": 
"2024-01-04 03:04:05"})",
+                                     R"({"date_1": "2024-01-04", "datetime_1": 
"2024-01-05 03:04:05"})"},
+                                    0))
+                    .build();
+    ASSERT_TRUE(create_tablet(index_case.tablet_options).ok());
+    auto rowsets = write_rowsets(index_case.rowsets);
+    ASSERT_TRUE(rowsets.has_value()) << rowsets.error();
+
+    auto readable_rowsets = 
rowsets_with_variant_extended_schema(rowsets.value());
+    ASSERT_TRUE(readable_rowsets.has_value()) << readable_rowsets.error();
+
+    auto read_and_verify = [&](std::string_view path, DataTypePtr data_type, 
Field value,
+                               int64_t expected_rows, int64_t 
expected_filtered_rows) {
+        const int32_t path_column_id = column_id_by_path("v." + 
std::string(path));
+        ASSERT_GE(path_column_id, 0) << dump_schema_paths(*tablet_schema());
+        const auto& path_column = tablet_schema()->column(path_column_id);
+
+        IndexReadOptions read_options;
+        read_options.return_columns = {0, 
static_cast<uint32_t>(path_column_id)};
+        read_options.target_cast_type_for_variants[path_column.name()] =
+                nullable_target_type(data_type);
+        disable_bkd_skip_for_filter_stats_assertions(&read_options);
+        read_options.predicates.push_back(
+                typed_equals(path_column_id, path_column.name(), data_type, 
value));
+
+        auto read_result = read_rowsets(readable_rowsets.value(), 
read_options);
+        ASSERT_TRUE(read_result.has_value()) << read_result.error();
+        EXPECT_EQ(read_result->rows_read, expected_rows);
+        expect_index_filter_stats(read_result.value(), expected_filtered_rows);
+    };
+
+    read_and_verify("date_1", std::make_shared<DataTypeDateV2>(), 
date_v2_field(20240102), 2, 2);
+    read_and_verify("datetime_1", std::make_shared<DataTypeDateTimeV2>(),
+                    datetime_v2_field(20240102030405), 1, 3);
+}
+
+TEST_F(IndexStorageVariantFieldPatternIndexTest, 
TypedVariantPathSegmentZoneMapPrunesWholeSegment) {
+    const auto index_case = 
IndexStorageCaseBuilder("variant_typed_path_segment_zone_map_prune")
+                                    .tablet_id(110041)
+                                    
.variant_column(typed_pattern_variant_column())
+                                    .rowset(0,
+                                            
IndexDataSourceSpec::inline_variant(
+                                                    {R"({"int_1": 1})", 
R"({"int_1": 2})",
+                                                     R"({"int_1": 100})", 
R"({"int_1": 101})"},
+                                                    0),
+                                            2)
+                                    .build();
+    ASSERT_TRUE(create_tablet(index_case.tablet_options).ok());
+    auto rowsets = write_rowsets(index_case.rowsets);
+    ASSERT_TRUE(rowsets.has_value()) << rowsets.error();
+
+    auto readable_rowsets = 
rowsets_with_variant_extended_schema(rowsets.value());
+    ASSERT_TRUE(readable_rowsets.has_value()) << readable_rowsets.error();
+    const int32_t path_column_id = column_id_by_path("v.int_1");
+    ASSERT_GE(path_column_id, 0) << dump_schema_paths(*tablet_schema());
+    const auto& path_column = tablet_schema()->column(path_column_id);
+
+    IndexReadOptions read_options;
+    read_options.enable_inverted_index_query = false;
+    read_options.return_columns = {0, static_cast<uint32_t>(path_column_id)};
+    read_options.target_cast_type_for_variants[path_column.name()] = 
nullable_int32_target_type();
+    read_options.predicates.push_back(int_greater(path_column_id, 
path_column.name(), 50));
+
+    auto read_result = read_rowsets(readable_rowsets.value(), read_options);
+    ASSERT_TRUE(read_result.has_value()) << read_result.error();
+    EXPECT_EQ(read_result->rows_read, 2);
+    expect_raw_rows_read(read_result.value(), 2);
+    expect_segment_pruned(read_result.value(), 1);
+    expect_index_filter_stats(read_result.value(), 0);
+}
+
+TEST_F(IndexStorageVariantFieldPatternIndexTest, 
TypedVariantPathPageZoneMapPrunesWithinSegment) {
+    constexpr size_t kLowRows = 2048;
+    constexpr size_t kHighRows = 2048;
+    constexpr int32_t kLowValueBase = 1;
+    constexpr int32_t kHighValueBase = 10000000;
+    constexpr int32_t kPagePruneThreshold = 1000000;
+    auto variant = typed_pattern_variant_column();
+
+    IndexTabletOptions options;
+    options.tablet_id = 110045;
+    options.storage_page_size = 4096;
+    options.variant_columns.push_back(std::move(variant));
+
+    IndexRowsetSpec rowset;
+    rowset.version = 0;
+    rowset.max_rows_per_segment = static_cast<int64_t>(kLowRows + kHighRows);
+    rowset.data_sources.push_back(IndexDataSourceSpec::inline_variant(
+            split_int_variant_rows(kLowRows, kLowValueBase, kHighRows, 
kHighValueBase), 0));
+
+    ASSERT_TRUE(create_tablet(options).ok());
+    auto rowsets = write_rowsets({rowset});
+    ASSERT_TRUE(rowsets.has_value()) << rowsets.error();
+
+    // Use dispersed low/high ranges so page-level pruning does not depend on 
one exact page
+    // boundary or compression layout.
+    auto compacted = compact_rowsets(IndexCompactionKind::CUMULATIVE, 
rowsets.value());
+    ASSERT_TRUE(compacted.has_value()) << compacted.error();
+    ASSERT_NE(compacted.value(), nullptr);
+    auto reloaded = reload_rowsets({compacted.value()});
+    ASSERT_TRUE(reloaded.has_value()) << reloaded.error();
+
+    auto readable_rowsets = 
rowsets_with_variant_extended_schema(reloaded.value());
+    ASSERT_TRUE(readable_rowsets.has_value()) << readable_rowsets.error();
+    const int32_t path_column_id = column_id_by_path("v.int_1");
+    ASSERT_GE(path_column_id, 0) << dump_schema_paths(*tablet_schema());
+    const auto& path_column = tablet_schema()->column(path_column_id);
+
+    IndexReadOptions read_options;
+    read_options.enable_inverted_index_query = false;
+    read_options.return_columns = {0, static_cast<uint32_t>(path_column_id)};
+    read_options.target_cast_type_for_variants[path_column.name()] = 
nullable_int32_target_type();
+    read_options.predicates.push_back(
+            int_greater(path_column_id, path_column.name(), 
kPagePruneThreshold));
+
+    auto read_result = read_rowsets(readable_rowsets.value(), read_options);
+    ASSERT_TRUE(read_result.has_value()) << read_result.error();
+    EXPECT_EQ(read_result->rows_read, kHighRows);
+    expect_segment_pruned(read_result.value(), 0);
+    const auto total_rows = static_cast<int64_t>(kLowRows + kHighRows);
+    const auto high_rows = static_cast<int64_t>(kHighRows);
+    // Compaction can produce a layout where page ZoneMap pruning is not 
triggered. Keep the result
+    // contract strict and validate stats only when raw_rows_read proves page 
pruning actually
+    // happened.
+    expect_optional_page_zone_map_filter_stats(read_result.value(), high_rows, 
total_rows,
+                                               static_cast<int64_t>(kLowRows));
+    expect_index_filter_stats(read_result.value(), 0);
+}
+
+TEST_F(IndexStorageVariantFieldPatternIndexTest,
+       DisabledInvertedIndexQueryDoesNotFilterVariantFieldPatternIndex) {
+    const auto int_index = IndexSpec::field_pattern_index(kIntPatternIndexId, 
"idx_v_int_glob",
+                                                          kVariantUid, 
"int_*");
+    const auto index_case =
+            
IndexStorageCaseBuilder("variant_disabled_inverted_index_query_reverse_case")
+                    .tablet_id(110046)
+                    .variant_column(typed_pattern_variant_column())
+                    .inverted_index(int_index)
+                    .rowset(0, IndexDataSourceSpec::inline_variant(
+                                       {R"({"int_1": 42})", R"({"int_1": 7})", 
R"({"int_1": 42})",
+                                        R"({"int_1": 8})"},
+                                       0))
+                    .build();
+    ASSERT_TRUE(create_tablet(index_case.tablet_options).ok());
+    auto rowsets = write_rowsets(index_case.rowsets);
+    ASSERT_TRUE(rowsets.has_value()) << rowsets.error();
+
+    auto readable_rowsets = 
rowsets_with_variant_extended_schema(rowsets.value());
+    ASSERT_TRUE(readable_rowsets.has_value()) << readable_rowsets.error();
+    const int32_t path_column_id = column_id_by_path("v.int_1");
+    ASSERT_GE(path_column_id, 0) << dump_schema_paths(*tablet_schema());
+    const auto& path_column = tablet_schema()->column(path_column_id);
+
+    IndexReadOptions read_options;
+    read_options.enable_inverted_index_query = false;
+    read_options.return_columns = {0, static_cast<uint32_t>(path_column_id)};
+    read_options.target_cast_type_for_variants[path_column.name()] = 
nullable_int32_target_type();
+    read_options.predicates.push_back(int_equals(path_column_id, 
path_column.name(), 42));
+
+    auto read_result = read_rowsets(readable_rowsets.value(), read_options);
+    ASSERT_TRUE(read_result.has_value()) << read_result.error();
+    EXPECT_EQ(read_result->rows_read, 2);
+    expect_index_filter_stats(read_result.value(), 0);
+    expect_index_filter_stats(read_result.value(), 0);
+}
+
+TEST_F(IndexStorageVariantFieldPatternIndexTest,
+       TargetCastTypeMismatchDoesNotDisableExtractedPathZoneMapPruning) {
+    const auto index_case =
+            
IndexStorageCaseBuilder("variant_unsafe_target_cast_zone_map_reverse_case")
+                    .tablet_id(110047)
+                    .variant_column(typed_pattern_variant_column())
+                    .rowset(0,
+                            IndexDataSourceSpec::inline_variant(
+                                    {R"({"int_1": 1})", R"({"int_1": 2})", 
R"({"int_1": 100})",
+                                     R"({"int_1": 101})"},
+                                    0),
+                            2)
+                    .build();
+    ASSERT_TRUE(create_tablet(index_case.tablet_options).ok());
+    auto rowsets = write_rowsets(index_case.rowsets);
+    ASSERT_TRUE(rowsets.has_value()) << rowsets.error();
+
+    auto readable_rowsets = 
rowsets_with_variant_extended_schema(rowsets.value());
+    ASSERT_TRUE(readable_rowsets.has_value()) << readable_rowsets.error();
+    const int32_t path_column_id = column_id_by_path("v.int_1");
+    ASSERT_GE(path_column_id, 0) << dump_schema_paths(*tablet_schema());
+    const auto& path_column = tablet_schema()->column(path_column_id);
+
+    IndexReadOptions read_options;
+    read_options.enable_inverted_index_query = false;
+    read_options.return_columns = {0, static_cast<uint32_t>(path_column_id)};
+    // Keep nullability aligned with Variant typed paths so this case checks 
the stored
+    // physical type mismatch, not a nullable/non-nullable wrapper mismatch.
+    read_options.target_cast_type_for_variants[path_column.name()] = 
nullable_int64_target_type();
+    read_options.predicates.push_back(int_greater(path_column_id, 
path_column.name(), 50));
+
+    auto read_result = read_rowsets(readable_rowsets.value(), read_options);
+    ASSERT_TRUE(read_result.has_value()) << read_result.error();
+    // The production guard only uses target-cast mismatches to reject storage 
predicates for
+    // Variant-typed columns. This test reads an already extracted typed path, 
so the synthetic
+    // target-cast mismatch does not disable segment ZoneMap pruning.
+    EXPECT_EQ(read_result->rows_read, 2);
+    expect_raw_rows_read(read_result.value(), 2);
+    expect_segment_pruned(read_result.value(), 1);
+    expect_zone_map_filtered(read_result.value(), 2);
+    expect_index_filter_stats(read_result.value(), 0);
+}
+
+// Expected-red: this is separate from the DORIS-26471 BF case fix. 
Segment-level typed-path ZoneMap
+// pruning works, but rows_stats_filtered is still not updated on this path.

Review Comment:
   The comment says this test is "Expected-red" because `rows_stats_filtered` 
isn't updated, but this PR updates `Segment::new_iterator()` to increment 
`rows_stats_filtered` on full-segment ZoneMap pruning. Keeping the stale 
comment will be misleading if/when the test now passes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to