This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c8636fade1 [core] Fix FallbackReadScan#plan mixing partition and data
filters (#7388)
c8636fade1 is described below
commit c8636fade1655300e17f639d473498327be54af7
Author: Junrui Lee <[email protected]>
AuthorDate: Fri Mar 20 09:57:01 2026 +0800
[core] Fix FallbackReadScan#plan mixing partition and data filters (#7388)
This is a follow-up to https://github.com/apache/paimon/pull/7268, which
fixed the same problem in ChainTableBatchScan. As @Aitozi pointed out,
FallbackReadScan#plan has the same issue: it mixes partition filters and
data filters when determining which partitions exist in the main vs
fallback branch.
---
.../apache/paimon/table/ChainGroupReadTable.java | 58 ++++--------
.../paimon/table/FallbackReadFileStoreTable.java | 100 +++++++++++++++++++--
.../paimon/table/system/ReadOptimizedTable.java | 7 +-
.../table/FallbackReadFileStoreTableTest.java | 63 +++++++++++++
4 files changed, 177 insertions(+), 51 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
index 36f9778ee3..2379882070 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
@@ -130,11 +130,9 @@ public class ChainGroupReadTable extends
FallbackReadFileStoreTable {
private final RowDataToObjectArrayConverter partitionConverter;
private final InternalRowPartitionComputer partitionComputer;
- private final TableSchema tableSchema;
private final CoreOptions options;
private final RecordComparator partitionComparator;
private final ChainGroupReadTable chainGroupReadTable;
- private PartitionPredicate partitionPredicate;
private Predicate dataPredicate;
private Filter<Integer> bucketFilter;
@@ -143,8 +141,12 @@ public class ChainGroupReadTable extends
FallbackReadFileStoreTable {
DataTableScan fallbackScan,
TableSchema tableSchema,
ChainGroupReadTable chainGroupReadTable) {
- super(mainScan, fallbackScan);
- this.tableSchema = tableSchema;
+ super(
+ mainScan,
+ fallbackScan,
+ chainGroupReadTable.wrapped,
+ chainGroupReadTable.fallback(),
+ tableSchema);
this.options = CoreOptions.fromMap(tableSchema.options());
this.chainGroupReadTable = chainGroupReadTable;
this.partitionConverter =
@@ -169,7 +171,6 @@ public class ChainGroupReadTable extends
FallbackReadFileStoreTable {
predicate,
tableSchema.logicalRowType(),
tableSchema.partitionKeys());
- setPartitionPredicate(pair.getLeft().orElse(null));
dataPredicate =
pair.getRight().isEmpty() ? null :
PredicateBuilder.and(pair.getRight());
}
@@ -179,57 +180,30 @@ public class ChainGroupReadTable extends
FallbackReadFileStoreTable {
@Override
public ChainTableBatchScan withPartitionFilter(Map<String, String>
partitionSpec) {
super.withPartitionFilter(partitionSpec);
- if (partitionSpec != null) {
- setPartitionPredicate(
- PartitionPredicate.fromMap(
- tableSchema.logicalPartitionType(),
- partitionSpec,
- options.partitionDefaultName()));
- }
return this;
}
@Override
public ChainTableBatchScan withPartitionFilter(List<BinaryRow>
partitions) {
super.withPartitionFilter(partitions);
- if (partitions != null) {
- setPartitionPredicate(
- PartitionPredicate.fromMultiple(
- tableSchema.logicalPartitionType(),
partitions));
- }
return this;
}
@Override
public ChainTableBatchScan withPartitionsFilter(List<Map<String,
String>> partitions) {
super.withPartitionsFilter(partitions);
- if (partitions != null) {
- setPartitionPredicate(
- PartitionPredicate.fromMaps(
- tableSchema.logicalPartitionType(),
- partitions,
- options.partitionDefaultName()));
- }
return this;
}
@Override
public ChainTableBatchScan withPartitionFilter(PartitionPredicate
partitionPredicate) {
super.withPartitionFilter(partitionPredicate);
- if (partitionPredicate != null) {
- setPartitionPredicate(partitionPredicate);
- }
return this;
}
@Override
public ChainTableBatchScan withPartitionFilter(Predicate
partitionPredicate) {
super.withPartitionFilter(partitionPredicate);
- if (partitionPredicate != null) {
- setPartitionPredicate(
- PartitionPredicate.fromPredicate(
- tableSchema.logicalPartitionType(),
partitionPredicate));
- }
return this;
}
@@ -252,6 +226,7 @@ public class ChainGroupReadTable extends
FallbackReadFileStoreTable {
@Override
public Plan plan() {
List<Split> splits = new ArrayList<>();
+ PartitionPredicate partitionPredicate = getPartitionPredicate();
PredicateBuilder builder = new
PredicateBuilder(tableSchema.logicalPartitionType());
for (Split split : mainScan.plan().splits()) {
DataSplit dataSplit = (DataSplit) split;
@@ -271,9 +246,11 @@ public class ChainGroupReadTable extends
FallbackReadFileStoreTable {
Set<BinaryRow> snapshotPartitions =
new HashSet<>(
- newPartitionListingScan(true,
partitionPredicate).listPartitions());
+ newChainPartitionListingScan(true,
partitionPredicate)
+ .listPartitions());
- DataTableScan deltaPartitionScan = newPartitionListingScan(false,
partitionPredicate);
+ DataTableScan deltaPartitionScan =
+ newChainPartitionListingScan(false, partitionPredicate);
List<BinaryRow> deltaPartitions =
deltaPartitionScan.listPartitions().stream()
.filter(p -> !snapshotPartitions.contains(p))
@@ -292,7 +269,7 @@ public class ChainGroupReadTable extends
FallbackReadFileStoreTable {
PartitionPredicate.fromPredicate(
tableSchema.logicalPartitionType(),
snapshotPredicate);
DataTableScan snapshotPartitionsScan =
- newPartitionListingScan(true,
snapshotPartitionPredicate);
+ newChainPartitionListingScan(true,
snapshotPartitionPredicate);
List<BinaryRow> candidateSnapshotPartitions =
snapshotPartitionsScan.listPartitions();
candidateSnapshotPartitions =
@@ -393,8 +370,9 @@ public class ChainGroupReadTable extends
FallbackReadFileStoreTable {
@Override
public List<PartitionEntry> listPartitionEntries() {
- DataTableScan snapshotScan = newPartitionListingScan(true,
partitionPredicate);
- DataTableScan deltaScan = newPartitionListingScan(false,
partitionPredicate);
+ PartitionPredicate partitionPredicate = getPartitionPredicate();
+ DataTableScan snapshotScan = newChainPartitionListingScan(true,
partitionPredicate);
+ DataTableScan deltaScan = newChainPartitionListingScan(false,
partitionPredicate);
List<PartitionEntry> partitionEntries =
new ArrayList<>(snapshotScan.listPartitionEntries());
Set<BinaryRow> partitions =
@@ -408,11 +386,7 @@ public class ChainGroupReadTable extends
FallbackReadFileStoreTable {
return partitionEntries;
}
- private void setPartitionPredicate(PartitionPredicate predicate) {
- this.partitionPredicate = predicate;
- }
-
- private DataTableScan newPartitionListingScan(
+ private DataTableScan newChainPartitionListingScan(
boolean snapshot, PartitionPredicate scanPartitionPredicate) {
DataTableScan scan =
snapshot
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index 65e875c6a5..aff2aa9a23 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -47,6 +47,7 @@ import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SegmentsCache;
@@ -189,7 +190,8 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
@Override
public DataTableScan newScan() {
validateSchema();
- return new FallbackReadScan(wrapped.newScan(), fallback.newScan());
+ return new FallbackReadScan(
+ wrapped.newScan(), fallback.newScan(), wrapped, fallback,
wrapped.schema());
}
protected void validateSchema() {
@@ -356,10 +358,22 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
protected final DataTableScan mainScan;
protected final DataTableScan fallbackScan;
-
- public FallbackReadScan(DataTableScan mainScan, DataTableScan
fallbackScan) {
+ protected final FileStoreTable wrappedTable;
+ protected final FileStoreTable fallbackTable;
+ protected final TableSchema tableSchema;
+ private PartitionPredicate partitionPredicate;
+
+ public FallbackReadScan(
+ DataTableScan mainScan,
+ DataTableScan fallbackScan,
+ FileStoreTable wrappedTable,
+ FileStoreTable fallbackTable,
+ TableSchema tableSchema) {
this.mainScan = mainScan;
this.fallbackScan = fallbackScan;
+ this.wrappedTable = wrappedTable;
+ this.fallbackTable = fallbackTable;
+ this.tableSchema = tableSchema;
}
@Override
@@ -373,6 +387,14 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
public FallbackReadScan withFilter(Predicate predicate) {
mainScan.withFilter(predicate);
fallbackScan.withFilter(predicate);
+ if (predicate != null) {
+ Pair<Optional<PartitionPredicate>, List<Predicate>> pair =
+
PartitionPredicate.splitPartitionPredicatesAndDataPredicates(
+ predicate,
+ tableSchema.logicalRowType(),
+ tableSchema.partitionKeys());
+ setPartitionPredicate(pair.getLeft().orElse(null));
+ }
return this;
}
@@ -387,6 +409,13 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
public FallbackReadScan withPartitionFilter(Map<String, String>
partitionSpec) {
mainScan.withPartitionFilter(partitionSpec);
fallbackScan.withPartitionFilter(partitionSpec);
+ if (partitionSpec != null) {
+ setPartitionPredicate(
+ PartitionPredicate.fromMap(
+ tableSchema.logicalPartitionType(),
+ partitionSpec,
+
CoreOptions.fromMap(tableSchema.options()).partitionDefaultName()));
+ }
return this;
}
@@ -394,6 +423,11 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
public FallbackReadScan withPartitionFilter(List<BinaryRow>
partitions) {
mainScan.withPartitionFilter(partitions);
fallbackScan.withPartitionFilter(partitions);
+ if (partitions != null) {
+ setPartitionPredicate(
+ PartitionPredicate.fromMultiple(
+ tableSchema.logicalPartitionType(),
partitions));
+ }
return this;
}
@@ -401,6 +435,13 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
public InnerTableScan withPartitionsFilter(List<Map<String, String>>
partitions) {
mainScan.withPartitionsFilter(partitions);
fallbackScan.withPartitionsFilter(partitions);
+ if (partitions != null) {
+ setPartitionPredicate(
+ PartitionPredicate.fromMaps(
+ tableSchema.logicalPartitionType(),
+ partitions,
+
CoreOptions.fromMap(tableSchema.options()).partitionDefaultName()));
+ }
return this;
}
@@ -408,6 +449,21 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
public InnerTableScan withPartitionFilter(PartitionPredicate
partitionPredicate) {
mainScan.withPartitionFilter(partitionPredicate);
fallbackScan.withPartitionFilter(partitionPredicate);
+ if (partitionPredicate != null) {
+ setPartitionPredicate(partitionPredicate);
+ }
+ return this;
+ }
+
+ @Override
+ public FallbackReadScan withPartitionFilter(Predicate
partitionPredicate) {
+ mainScan.withPartitionFilter(partitionPredicate);
+ fallbackScan.withPartitionFilter(partitionPredicate);
+ if (partitionPredicate != null) {
+ setPartitionPredicate(
+ PartitionPredicate.fromPredicate(
+ tableSchema.logicalPartitionType(),
partitionPredicate));
+ }
return this;
}
@@ -446,18 +502,26 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
return this;
}
+ /**
+ * Builds a plan for fallback read.
+ *
+ * <p>Partitions that exist in the main branch (based on partition
predicates only) are
+ * treated as complete and are read from the main branch with the full
predicate. Partitions
+ * that exist only in the fallback branch are read from the fallback
branch.
+ */
@Override
public TableScan.Plan plan() {
List<Split> splits = new ArrayList<>();
- Set<BinaryRow> completePartitions = new HashSet<>();
+ Set<BinaryRow> completePartitions =
+ new HashSet<>(
+ newPartitionListingScan(true,
partitionPredicate).listPartitions());
for (Split split : mainScan.plan().splits()) {
DataSplit dataSplit = (DataSplit) split;
splits.add(toFallbackSplit(dataSplit, false));
- completePartitions.add(dataSplit.partition());
}
List<BinaryRow> remainingPartitions =
- fallbackScan.listPartitions().stream()
+ newPartitionListingScan(false,
partitionPredicate).listPartitions().stream()
.filter(p -> !completePartitions.contains(p))
.collect(Collectors.toList());
if (!remainingPartitions.isEmpty()) {
@@ -471,18 +535,38 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
@Override
public List<PartitionEntry> listPartitionEntries() {
+ DataTableScan mainListingScan = newPartitionListingScan(true,
partitionPredicate);
+ DataTableScan fallbackListingScan = newPartitionListingScan(false,
partitionPredicate);
List<PartitionEntry> partitionEntries =
- new ArrayList<>(mainScan.listPartitionEntries());
+ new ArrayList<>(mainListingScan.listPartitionEntries());
Set<BinaryRow> partitions =
partitionEntries.stream()
.map(PartitionEntry::partition)
.collect(Collectors.toSet());
- List<PartitionEntry> fallBackPartitionEntries =
fallbackScan.listPartitionEntries();
+ List<PartitionEntry> fallBackPartitionEntries =
+ fallbackListingScan.listPartitionEntries();
fallBackPartitionEntries.stream()
.filter(e -> !partitions.contains(e.partition()))
.forEach(partitionEntries::add);
return partitionEntries;
}
+
+ protected void setPartitionPredicate(PartitionPredicate predicate) {
+ this.partitionPredicate = predicate;
+ }
+
+ protected PartitionPredicate getPartitionPredicate() {
+ return partitionPredicate;
+ }
+
+ private DataTableScan newPartitionListingScan(
+ boolean isMain, PartitionPredicate scanPartitionPredicate) {
+ DataTableScan scan = isMain ? wrappedTable.newScan() :
fallbackTable.newScan();
+ if (scanPartitionPredicate != null) {
+ scan.withPartitionFilter(scanPartitionPredicate);
+ }
+ return scan;
+ }
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index 87d79bc537..64ef8aa34e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -140,7 +140,12 @@ public class ReadOptimizedTable implements DataTable,
ReadonlyTable {
public DataTableScan newScan() {
if (wrapped instanceof FallbackReadFileStoreTable) {
FallbackReadFileStoreTable table = (FallbackReadFileStoreTable)
wrapped;
- return new FallbackReadScan(newScan(table.wrapped()),
newScan(table.fallback()));
+ return new FallbackReadScan(
+ newScan(table.wrapped()),
+ newScan(table.fallback()),
+ table.wrapped(),
+ table.fallback(),
+ table.wrapped().schema());
}
return newScan(wrapped);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
index 1b7eeddd95..7163202a8e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
@@ -26,12 +26,15 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -139,6 +142,66 @@ public class FallbackReadFileStoreTableTest {
Pair.of(1, 2L), Pair.of(2, 1L), Pair.of(3, 1L),
Pair.of(4, 1L));
}
+ /**
+ * Test that FallbackReadScan.plan() determines partition ownership based
on partition
+ * predicates only, not mixed with data filters. If a partition exists in
the main branch, it
+ * should never be read from fallback, regardless of the data filter.
+ *
+ * <p>Without the fix, the old code built completePartitions from
mainScan.plan() results which
+ * already had data filters applied. When the data filter excluded all
files of a main partition
+ * via filterByStats, that partition was incorrectly treated as "not in
main" and read from
+ * fallback.
+ */
+ @Test
+ public void testPlanWithDataFilter() throws Exception {
+ String branchName = "bc";
+
+ FileStoreTable mainTable = createTable();
+
+ // Main branch: partition 1 (a=10), partition 2 (a=20)
+ writeDataIntoTable(mainTable, 0, rowData(1, 10), rowData(2, 20));
+
+ mainTable.createBranch(branchName);
+
+ FileStoreTable branchTable = createTableFromBranch(mainTable,
branchName);
+
+ // Fallback branch: partition 1 already has a=10 (inherited), add
a=100.
+ // Also add partition 3 (a=30) which is fallback-only.
+ writeDataIntoTable(branchTable, 1, rowData(1, 100), rowData(3, 30));
+
+ FallbackReadFileStoreTable fallbackTable =
+ new FallbackReadFileStoreTable(mainTable, branchTable);
+ PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
+
+ // Case 1: WHERE pt = 1 AND a = 100
+ // Partition 1 exists in main branch. Even though main has no a=100
data,
+ // we should never fall back for it. The result should contain no
fallback splits.
+ DataTableScan scan1 = fallbackTable.newScan();
+ scan1.withFilter(PredicateBuilder.and(builder.equal(0, 1),
builder.equal(1, 100)));
+ List<Split> splits1 = scan1.plan().splits();
+
+ for (Split split : splits1) {
+ FallbackReadFileStoreTable.FallbackSplit fs =
+ (FallbackReadFileStoreTable.FallbackSplit) split;
+ assertThat(fs.isFallback())
+ .as("Partition that exists in main branch should never be
read from fallback")
+ .isFalse();
+ }
+
+ // Case 2: WHERE pt = 3 AND a = 30
+ // Partition 3 only exists in fallback branch, so it should be read
from fallback.
+ DataTableScan scan2 = fallbackTable.newScan();
+ scan2.withFilter(PredicateBuilder.and(builder.equal(0, 3),
builder.equal(1, 30)));
+ List<Split> splits2 = scan2.plan().splits();
+
+ assertThat(splits2).hasSize(1);
+ FallbackReadFileStoreTable.FallbackSplit fs2 =
+ (FallbackReadFileStoreTable.FallbackSplit) splits2.get(0);
+ assertThat(fs2.isFallback())
+ .as("Partition that only exists in fallback branch should be
read from fallback")
+ .isTrue();
+ }
+
private void writeDataIntoTable(
FileStoreTable table, long commitIdentifier, InternalRow...
allData) throws Exception {
StreamTableWrite write = table.newWrite(commitUser);