This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c8636fade1 [core] Fix FallbackReadScan#plan mixing partition and data 
filters (#7388)
c8636fade1 is described below

commit c8636fade1655300e17f639d473498327be54af7
Author: Junrui Lee <[email protected]>
AuthorDate: Fri Mar 20 09:57:01 2026 +0800

    [core] Fix FallbackReadScan#plan mixing partition and data filters (#7388)
    
    This is a follow-up to https://github.com/apache/paimon/pull/7268, which
    fixed the same problem in ChainTableBatchScan. As @Aitozi pointed out,
    FallbackReadScan#plan has the same issue: it mixes partition filters and
    data filters when determining which partitions exist in the main vs
    fallback branch.
---
 .../apache/paimon/table/ChainGroupReadTable.java   |  58 ++++--------
 .../paimon/table/FallbackReadFileStoreTable.java   | 100 +++++++++++++++++++--
 .../paimon/table/system/ReadOptimizedTable.java    |   7 +-
 .../table/FallbackReadFileStoreTableTest.java      |  63 +++++++++++++
 4 files changed, 177 insertions(+), 51 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
index 36f9778ee3..2379882070 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
@@ -130,11 +130,9 @@ public class ChainGroupReadTable extends 
FallbackReadFileStoreTable {
 
         private final RowDataToObjectArrayConverter partitionConverter;
         private final InternalRowPartitionComputer partitionComputer;
-        private final TableSchema tableSchema;
         private final CoreOptions options;
         private final RecordComparator partitionComparator;
         private final ChainGroupReadTable chainGroupReadTable;
-        private PartitionPredicate partitionPredicate;
         private Predicate dataPredicate;
         private Filter<Integer> bucketFilter;
 
@@ -143,8 +141,12 @@ public class ChainGroupReadTable extends 
FallbackReadFileStoreTable {
                 DataTableScan fallbackScan,
                 TableSchema tableSchema,
                 ChainGroupReadTable chainGroupReadTable) {
-            super(mainScan, fallbackScan);
-            this.tableSchema = tableSchema;
+            super(
+                    mainScan,
+                    fallbackScan,
+                    chainGroupReadTable.wrapped,
+                    chainGroupReadTable.fallback(),
+                    tableSchema);
             this.options = CoreOptions.fromMap(tableSchema.options());
             this.chainGroupReadTable = chainGroupReadTable;
             this.partitionConverter =
@@ -169,7 +171,6 @@ public class ChainGroupReadTable extends 
FallbackReadFileStoreTable {
                                 predicate,
                                 tableSchema.logicalRowType(),
                                 tableSchema.partitionKeys());
-                setPartitionPredicate(pair.getLeft().orElse(null));
                 dataPredicate =
                         pair.getRight().isEmpty() ? null : 
PredicateBuilder.and(pair.getRight());
             }
@@ -179,57 +180,30 @@ public class ChainGroupReadTable extends 
FallbackReadFileStoreTable {
         @Override
         public ChainTableBatchScan withPartitionFilter(Map<String, String> 
partitionSpec) {
             super.withPartitionFilter(partitionSpec);
-            if (partitionSpec != null) {
-                setPartitionPredicate(
-                        PartitionPredicate.fromMap(
-                                tableSchema.logicalPartitionType(),
-                                partitionSpec,
-                                options.partitionDefaultName()));
-            }
             return this;
         }
 
         @Override
         public ChainTableBatchScan withPartitionFilter(List<BinaryRow> 
partitions) {
             super.withPartitionFilter(partitions);
-            if (partitions != null) {
-                setPartitionPredicate(
-                        PartitionPredicate.fromMultiple(
-                                tableSchema.logicalPartitionType(), 
partitions));
-            }
             return this;
         }
 
         @Override
         public ChainTableBatchScan withPartitionsFilter(List<Map<String, 
String>> partitions) {
             super.withPartitionsFilter(partitions);
-            if (partitions != null) {
-                setPartitionPredicate(
-                        PartitionPredicate.fromMaps(
-                                tableSchema.logicalPartitionType(),
-                                partitions,
-                                options.partitionDefaultName()));
-            }
             return this;
         }
 
         @Override
         public ChainTableBatchScan withPartitionFilter(PartitionPredicate 
partitionPredicate) {
             super.withPartitionFilter(partitionPredicate);
-            if (partitionPredicate != null) {
-                setPartitionPredicate(partitionPredicate);
-            }
             return this;
         }
 
         @Override
         public ChainTableBatchScan withPartitionFilter(Predicate 
partitionPredicate) {
             super.withPartitionFilter(partitionPredicate);
-            if (partitionPredicate != null) {
-                setPartitionPredicate(
-                        PartitionPredicate.fromPredicate(
-                                tableSchema.logicalPartitionType(), 
partitionPredicate));
-            }
             return this;
         }
 
@@ -252,6 +226,7 @@ public class ChainGroupReadTable extends 
FallbackReadFileStoreTable {
         @Override
         public Plan plan() {
             List<Split> splits = new ArrayList<>();
+            PartitionPredicate partitionPredicate = getPartitionPredicate();
             PredicateBuilder builder = new 
PredicateBuilder(tableSchema.logicalPartitionType());
             for (Split split : mainScan.plan().splits()) {
                 DataSplit dataSplit = (DataSplit) split;
@@ -271,9 +246,11 @@ public class ChainGroupReadTable extends 
FallbackReadFileStoreTable {
 
             Set<BinaryRow> snapshotPartitions =
                     new HashSet<>(
-                            newPartitionListingScan(true, 
partitionPredicate).listPartitions());
+                            newChainPartitionListingScan(true, 
partitionPredicate)
+                                    .listPartitions());
 
-            DataTableScan deltaPartitionScan = newPartitionListingScan(false, 
partitionPredicate);
+            DataTableScan deltaPartitionScan =
+                    newChainPartitionListingScan(false, partitionPredicate);
             List<BinaryRow> deltaPartitions =
                     deltaPartitionScan.listPartitions().stream()
                             .filter(p -> !snapshotPartitions.contains(p))
@@ -292,7 +269,7 @@ public class ChainGroupReadTable extends 
FallbackReadFileStoreTable {
                         PartitionPredicate.fromPredicate(
                                 tableSchema.logicalPartitionType(), 
snapshotPredicate);
                 DataTableScan snapshotPartitionsScan =
-                        newPartitionListingScan(true, 
snapshotPartitionPredicate);
+                        newChainPartitionListingScan(true, 
snapshotPartitionPredicate);
                 List<BinaryRow> candidateSnapshotPartitions =
                         snapshotPartitionsScan.listPartitions();
                 candidateSnapshotPartitions =
@@ -393,8 +370,9 @@ public class ChainGroupReadTable extends 
FallbackReadFileStoreTable {
 
         @Override
         public List<PartitionEntry> listPartitionEntries() {
-            DataTableScan snapshotScan = newPartitionListingScan(true, 
partitionPredicate);
-            DataTableScan deltaScan = newPartitionListingScan(false, 
partitionPredicate);
+            PartitionPredicate partitionPredicate = getPartitionPredicate();
+            DataTableScan snapshotScan = newChainPartitionListingScan(true, 
partitionPredicate);
+            DataTableScan deltaScan = newChainPartitionListingScan(false, 
partitionPredicate);
             List<PartitionEntry> partitionEntries =
                     new ArrayList<>(snapshotScan.listPartitionEntries());
             Set<BinaryRow> partitions =
@@ -408,11 +386,7 @@ public class ChainGroupReadTable extends 
FallbackReadFileStoreTable {
             return partitionEntries;
         }
 
-        private void setPartitionPredicate(PartitionPredicate predicate) {
-            this.partitionPredicate = predicate;
-        }
-
-        private DataTableScan newPartitionListingScan(
+        private DataTableScan newChainPartitionListingScan(
                 boolean snapshot, PartitionPredicate scanPartitionPredicate) {
             DataTableScan scan =
                     snapshot
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index 65e875c6a5..aff2aa9a23 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -47,6 +47,7 @@ import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SegmentsCache;
 
@@ -189,7 +190,8 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
     @Override
     public DataTableScan newScan() {
         validateSchema();
-        return new FallbackReadScan(wrapped.newScan(), fallback.newScan());
+        return new FallbackReadScan(
+                wrapped.newScan(), fallback.newScan(), wrapped, fallback, 
wrapped.schema());
     }
 
     protected void validateSchema() {
@@ -356,10 +358,22 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
 
         protected final DataTableScan mainScan;
         protected final DataTableScan fallbackScan;
-
-        public FallbackReadScan(DataTableScan mainScan, DataTableScan 
fallbackScan) {
+        protected final FileStoreTable wrappedTable;
+        protected final FileStoreTable fallbackTable;
+        protected final TableSchema tableSchema;
+        private PartitionPredicate partitionPredicate;
+
+        public FallbackReadScan(
+                DataTableScan mainScan,
+                DataTableScan fallbackScan,
+                FileStoreTable wrappedTable,
+                FileStoreTable fallbackTable,
+                TableSchema tableSchema) {
             this.mainScan = mainScan;
             this.fallbackScan = fallbackScan;
+            this.wrappedTable = wrappedTable;
+            this.fallbackTable = fallbackTable;
+            this.tableSchema = tableSchema;
         }
 
         @Override
@@ -373,6 +387,14 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
         public FallbackReadScan withFilter(Predicate predicate) {
             mainScan.withFilter(predicate);
             fallbackScan.withFilter(predicate);
+            if (predicate != null) {
+                Pair<Optional<PartitionPredicate>, List<Predicate>> pair =
+                        
PartitionPredicate.splitPartitionPredicatesAndDataPredicates(
+                                predicate,
+                                tableSchema.logicalRowType(),
+                                tableSchema.partitionKeys());
+                setPartitionPredicate(pair.getLeft().orElse(null));
+            }
             return this;
         }
 
@@ -387,6 +409,13 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
         public FallbackReadScan withPartitionFilter(Map<String, String> 
partitionSpec) {
             mainScan.withPartitionFilter(partitionSpec);
             fallbackScan.withPartitionFilter(partitionSpec);
+            if (partitionSpec != null) {
+                setPartitionPredicate(
+                        PartitionPredicate.fromMap(
+                                tableSchema.logicalPartitionType(),
+                                partitionSpec,
+                                
CoreOptions.fromMap(tableSchema.options()).partitionDefaultName()));
+            }
             return this;
         }
 
@@ -394,6 +423,11 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
         public FallbackReadScan withPartitionFilter(List<BinaryRow> 
partitions) {
             mainScan.withPartitionFilter(partitions);
             fallbackScan.withPartitionFilter(partitions);
+            if (partitions != null) {
+                setPartitionPredicate(
+                        PartitionPredicate.fromMultiple(
+                                tableSchema.logicalPartitionType(), 
partitions));
+            }
             return this;
         }
 
@@ -401,6 +435,13 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
         public InnerTableScan withPartitionsFilter(List<Map<String, String>> 
partitions) {
             mainScan.withPartitionsFilter(partitions);
             fallbackScan.withPartitionsFilter(partitions);
+            if (partitions != null) {
+                setPartitionPredicate(
+                        PartitionPredicate.fromMaps(
+                                tableSchema.logicalPartitionType(),
+                                partitions,
+                                
CoreOptions.fromMap(tableSchema.options()).partitionDefaultName()));
+            }
             return this;
         }
 
@@ -408,6 +449,21 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
         public InnerTableScan withPartitionFilter(PartitionPredicate 
partitionPredicate) {
             mainScan.withPartitionFilter(partitionPredicate);
             fallbackScan.withPartitionFilter(partitionPredicate);
+            if (partitionPredicate != null) {
+                setPartitionPredicate(partitionPredicate);
+            }
+            return this;
+        }
+
+        @Override
+        public FallbackReadScan withPartitionFilter(Predicate 
partitionPredicate) {
+            mainScan.withPartitionFilter(partitionPredicate);
+            fallbackScan.withPartitionFilter(partitionPredicate);
+            if (partitionPredicate != null) {
+                setPartitionPredicate(
+                        PartitionPredicate.fromPredicate(
+                                tableSchema.logicalPartitionType(), 
partitionPredicate));
+            }
             return this;
         }
 
@@ -446,18 +502,26 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
             return this;
         }
 
+        /**
+         * Builds a plan for fallback read.
+         *
+         * <p>Partitions that exist in the main branch (based on partition 
predicates only) are
+         * treated as complete and are read from the main branch with the full 
predicate. Partitions
+         * that exist only in the fallback branch are read from the fallback 
branch.
+         */
         @Override
         public TableScan.Plan plan() {
             List<Split> splits = new ArrayList<>();
-            Set<BinaryRow> completePartitions = new HashSet<>();
+            Set<BinaryRow> completePartitions =
+                    new HashSet<>(
+                            newPartitionListingScan(true, 
partitionPredicate).listPartitions());
             for (Split split : mainScan.plan().splits()) {
                 DataSplit dataSplit = (DataSplit) split;
                 splits.add(toFallbackSplit(dataSplit, false));
-                completePartitions.add(dataSplit.partition());
             }
 
             List<BinaryRow> remainingPartitions =
-                    fallbackScan.listPartitions().stream()
+                    newPartitionListingScan(false, 
partitionPredicate).listPartitions().stream()
                             .filter(p -> !completePartitions.contains(p))
                             .collect(Collectors.toList());
             if (!remainingPartitions.isEmpty()) {
@@ -471,18 +535,38 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
 
         @Override
         public List<PartitionEntry> listPartitionEntries() {
+            DataTableScan mainListingScan = newPartitionListingScan(true, 
partitionPredicate);
+            DataTableScan fallbackListingScan = newPartitionListingScan(false, 
partitionPredicate);
             List<PartitionEntry> partitionEntries =
-                    new ArrayList<>(mainScan.listPartitionEntries());
+                    new ArrayList<>(mainListingScan.listPartitionEntries());
             Set<BinaryRow> partitions =
                     partitionEntries.stream()
                             .map(PartitionEntry::partition)
                             .collect(Collectors.toSet());
-            List<PartitionEntry> fallBackPartitionEntries = 
fallbackScan.listPartitionEntries();
+            List<PartitionEntry> fallBackPartitionEntries =
+                    fallbackListingScan.listPartitionEntries();
             fallBackPartitionEntries.stream()
                     .filter(e -> !partitions.contains(e.partition()))
                     .forEach(partitionEntries::add);
             return partitionEntries;
         }
+
+        protected void setPartitionPredicate(PartitionPredicate predicate) {
+            this.partitionPredicate = predicate;
+        }
+
+        protected PartitionPredicate getPartitionPredicate() {
+            return partitionPredicate;
+        }
+
+        private DataTableScan newPartitionListingScan(
+                boolean isMain, PartitionPredicate scanPartitionPredicate) {
+            DataTableScan scan = isMain ? wrappedTable.newScan() : 
fallbackTable.newScan();
+            if (scanPartitionPredicate != null) {
+                scan.withPartitionFilter(scanPartitionPredicate);
+            }
+            return scan;
+        }
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index 87d79bc537..64ef8aa34e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -140,7 +140,12 @@ public class ReadOptimizedTable implements DataTable, 
ReadonlyTable {
     public DataTableScan newScan() {
         if (wrapped instanceof FallbackReadFileStoreTable) {
             FallbackReadFileStoreTable table = (FallbackReadFileStoreTable) 
wrapped;
-            return new FallbackReadScan(newScan(table.wrapped()), 
newScan(table.fallback()));
+            return new FallbackReadScan(
+                    newScan(table.wrapped()),
+                    newScan(table.fallback()),
+                    table.wrapped(),
+                    table.fallback(),
+                    table.wrapped().schema());
         }
         return newScan(wrapped);
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
index 1b7eeddd95..7163202a8e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
@@ -26,12 +26,15 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaUtils;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.Split;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -139,6 +142,66 @@ public class FallbackReadFileStoreTableTest {
                         Pair.of(1, 2L), Pair.of(2, 1L), Pair.of(3, 1L), 
Pair.of(4, 1L));
     }
 
+    /**
+     * Test that FallbackReadScan.plan() determines partition ownership based 
on partition
+     * predicates only, not mixed with data filters. If a partition exists in 
the main branch, it
+     * should never be read from fallback, regardless of the data filter.
+     *
+     * <p>Without the fix, the old code built completePartitions from 
mainScan.plan() results which
+     * already had data filters applied. When the data filter excluded all 
files of a main partition
+     * via filterByStats, that partition was incorrectly treated as "not in 
main" and read from
+     * fallback.
+     */
+    @Test
+    public void testPlanWithDataFilter() throws Exception {
+        String branchName = "bc";
+
+        FileStoreTable mainTable = createTable();
+
+        // Main branch: partition 1 (a=10), partition 2 (a=20)
+        writeDataIntoTable(mainTable, 0, rowData(1, 10), rowData(2, 20));
+
+        mainTable.createBranch(branchName);
+
+        FileStoreTable branchTable = createTableFromBranch(mainTable, 
branchName);
+
+        // Fallback branch: partition 1 already has a=10 (inherited), add 
a=100.
+        // Also add partition 3 (a=30) which is fallback-only.
+        writeDataIntoTable(branchTable, 1, rowData(1, 100), rowData(3, 30));
+
+        FallbackReadFileStoreTable fallbackTable =
+                new FallbackReadFileStoreTable(mainTable, branchTable);
+        PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
+
+        // Case 1: WHERE pt = 1 AND a = 100
+        // Partition 1 exists in main branch. Even though main has no a=100 
data,
+        // we should never fall back for it. The result should contain no 
fallback splits.
+        DataTableScan scan1 = fallbackTable.newScan();
+        scan1.withFilter(PredicateBuilder.and(builder.equal(0, 1), 
builder.equal(1, 100)));
+        List<Split> splits1 = scan1.plan().splits();
+
+        for (Split split : splits1) {
+            FallbackReadFileStoreTable.FallbackSplit fs =
+                    (FallbackReadFileStoreTable.FallbackSplit) split;
+            assertThat(fs.isFallback())
+                    .as("Partition that exists in main branch should never be 
read from fallback")
+                    .isFalse();
+        }
+
+        // Case 2: WHERE pt = 3 AND a = 30
+        // Partition 3 only exists in fallback branch, so it should be read 
from fallback.
+        DataTableScan scan2 = fallbackTable.newScan();
+        scan2.withFilter(PredicateBuilder.and(builder.equal(0, 3), 
builder.equal(1, 30)));
+        List<Split> splits2 = scan2.plan().splits();
+
+        assertThat(splits2).hasSize(1);
+        FallbackReadFileStoreTable.FallbackSplit fs2 =
+                (FallbackReadFileStoreTable.FallbackSplit) splits2.get(0);
+        assertThat(fs2.isFallback())
+                .as("Partition that only exists in fallback branch should be 
read from fallback")
+                .isTrue();
+    }
+
     private void writeDataIntoTable(
             FileStoreTable table, long commitIdentifier, InternalRow... 
allData) throws Exception {
         StreamTableWrite write = table.newWrite(commitUser);

Reply via email to