This is an automated email from the ASF dual-hosted git repository.

hope pushed a commit to branch release-1.4
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 749aa28f832df88c8925d14a5725529f00258404
Author: tsreaper <[email protected]>
AuthorDate: Tue Mar 31 15:27:00 2026 +0800

    Introduce scan.primary-branch which is symmetrical to scan.fallback-branch 
(#7553)
    
    ### Purpose
    
    Some users may have accumulated large amounts of data in their main
    primary-key branch, and they want to organize the data into another
    non-primary-key branch to speed up querying. This PR introduces
    `scan.primary-branch` which is symmetrical to `scan.fallback-branch`.
    
    ### Tests
    
    * `FallbackReadFileStoreTableTest`
---
 .../shortcodes/generated/core_configuration.html   |  18 ++-
 .../generated/flink_connector_configuration.html   |   2 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |   9 ++
 .../org/apache/paimon/schema/SchemaValidation.java |  22 +++-
 .../paimon/table/AbstractFileStoreTable.java       |  17 ++-
 .../apache/paimon/table/ChainGroupReadTable.java   |  38 ++----
 .../paimon/table/FallbackReadFileStoreTable.java   | 137 ++++++++++++---------
 .../apache/paimon/table/FileStoreTableFactory.java |  66 ++++++----
 .../paimon/table/system/ReadOptimizedTable.java    |   9 +-
 .../org/apache/paimon/utils/ChainTableUtils.java   |   3 +-
 .../table/FallbackReadFileStoreTableTest.java      | 118 ++++++++++++------
 .../apache/paimon/table/SimpleTableTestBase.java   |   5 +-
 .../org/apache/paimon/flink/BranchSqlITCase.java   |   2 +-
 .../flink/PrimaryKeyFileStoreTableITCase.java      |   3 +-
 14 files changed, 274 insertions(+), 175 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 6dd621fcac..0071c26437 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -128,6 +128,12 @@ under the License.
             <td>MemorySize</td>
             <td>Memory page size for caching.</td>
         </tr>
+        <tr>
+            <td><h5>chain-table.chain-partition-keys</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Partition keys that participate in chain logic. Must be a 
contiguous suffix of the table's partition keys. Comma-separated. If not set, 
all partition keys participate in chain.</td>
+        </tr>
         <tr>
             <td><h5>chain-table.enabled</h5></td>
             <td style="word-wrap: break-word;">false</td>
@@ -1187,12 +1193,6 @@ This config option does not affect the default 
filesystem metastore.</td>
             <td>String</td>
             <td>When a batch job queries from a chain table, if a partition 
does not exist in the main branch, the reader will try to get this partition 
from chain snapshot branch.</td>
         </tr>
-        <tr>
-            <td><h5>chain-table.chain-partition-keys</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>String</td>
-            <td>Partition keys that participate in chain logic. Must be a 
contiguous suffix of the table's partition keys. Comma-separated. If not set, 
all partition keys participate in chain.</td>
-        </tr>
         <tr>
             <td><h5>scan.file-creation-time-millis</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -1235,6 +1235,12 @@ This config option does not affect the default 
filesystem metastore.</td>
             <td>Boolean</td>
             <td>Whether to sort plan files by partition fields, this allows 
you to read according to the partition order, even if your partition writes are 
out of order.<br />It is recommended that you use this for streaming read of 
the 'append-only' table. By default, streaming read will read the full snapshot 
first. In order to avoid the disorder reading for partitions, you can open this 
option.</td>
         </tr>
+        <tr>
+            <td><h5>scan.primary-branch</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>When a batch job queries from a table, if a partition exists 
in the primary branch, the reader will read this partition from the primary 
branch. Otherwise, the reader will read this partition from the current 
branch.</td>
+        </tr>
         <tr>
             <td><h5>scan.snapshot-id</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index d4c8b5a167..253652cb78 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -363,4 +363,4 @@ under the License.
             <td>Defines a custom parallelism for the unaware-bucket table 
compaction job. By default, if this option is not defined, the planner will 
derive the parallelism for each statement individually by also considering the 
global configuration.</td>
         </tr>
     </tbody>
-</table>
\ No newline at end of file
+</table>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 7c054b8c8a..897bebcf68 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1961,6 +1961,15 @@ public class CoreOptions implements Serializable {
                             "When a batch job queries from a table, if a 
partition does not exist in the current branch, "
                                     + "the reader will try to get this 
partition from this fallback branch.");
 
+    public static final ConfigOption<String> SCAN_PRIMARY_BRANCH =
+            key("scan.primary-branch")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "When a batch job queries from a table, if a 
partition exists in the primary branch, "
+                                    + "the reader will read this partition 
from the primary branch. "
+                                    + "Otherwise, the reader will read this 
partition from the current branch.");
+
     public static final ConfigOption<Boolean> ASYNC_FILE_WRITE =
             key("async-file-write")
                     .booleanType()
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index b9b68fc647..2ff1080c4a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -288,14 +288,34 @@ public class SchemaValidation {
 
     public static void validateFallbackBranch(SchemaManager schemaManager, 
TableSchema schema) {
         String fallbackBranch = 
schema.options().get(CoreOptions.SCAN_FALLBACK_BRANCH.key());
+        String primaryBranch = 
schema.options().get(CoreOptions.SCAN_PRIMARY_BRANCH.key());
+
+        if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)
+                && !StringUtils.isNullOrWhitespaceOnly(primaryBranch)) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Cannot set both '%s' and '%s' at the same time.",
+                            CoreOptions.SCAN_FALLBACK_BRANCH.key(),
+                            CoreOptions.SCAN_PRIMARY_BRANCH.key()));
+        }
+
         if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) {
             checkArgument(
                     
schemaManager.copyWithBranch(fallbackBranch).latest().isPresent(),
-                    "Cannot set '%s' = '%s' because the branch '%s' isn't 
existed.",
+                    "Cannot set '%s' = '%s' because the branch '%s' does not 
exist.",
                     CoreOptions.SCAN_FALLBACK_BRANCH.key(),
                     fallbackBranch,
                     fallbackBranch);
         }
+
+        if (!StringUtils.isNullOrWhitespaceOnly(primaryBranch)) {
+            checkArgument(
+                    
schemaManager.copyWithBranch(primaryBranch).latest().isPresent(),
+                    "Cannot set '%s' = '%s' because the branch '%s' does not 
exist.",
+                    CoreOptions.SCAN_PRIMARY_BRANCH.key(),
+                    primaryBranch,
+                    primaryBranch);
+        }
     }
 
     private static void validateOnlyContainPrimitiveType(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 3583d17c08..f8a6b1722f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -709,10 +709,19 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 && branchName.equals(fallbackBranch)) {
             throw new IllegalArgumentException(
                     String.format(
-                            "can not delete the fallback branch. "
-                                    + "branchName to be deleted is %s. you 
have set 'scan.fallback-branch' = '%s'. "
-                                    + "you should reset 'scan.fallback-branch' 
before deleting this branch.",
-                            branchName, fallbackBranch));
+                            "Cannot delete branch '%s' because it is 
configured as"
+                                    + " 'scan.fallback-branch'. Unset 
'scan.fallback-branch' first.",
+                            branchName));
+        }
+
+        String primaryBranch = 
coreOptions().toConfiguration().get(CoreOptions.SCAN_PRIMARY_BRANCH);
+        if (!StringUtils.isNullOrWhitespaceOnly(primaryBranch)
+                && branchName.equals(primaryBranch)) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Cannot delete branch '%s' because it is 
configured as"
+                                    + " 'scan.primary-branch'. Unset 
'scan.primary-branch' first.",
+                            branchName));
         }
 
         branchManager().dropBranch(branchName);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
index 78752acfc6..b178715a2c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
@@ -67,7 +67,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkNotNull;
 public class ChainGroupReadTable extends FallbackReadFileStoreTable {
 
     public ChainGroupReadTable(FileStoreTable snapshotStoreTable, 
FileStoreTable deltaStoreTable) {
-        super(snapshotStoreTable, deltaStoreTable);
+        super(snapshotStoreTable, deltaStoreTable, true);
         checkArgument(snapshotStoreTable instanceof PrimaryKeyFileStoreTable);
         checkArgument(deltaStoreTable instanceof PrimaryKeyFileStoreTable);
     }
@@ -75,11 +75,7 @@ public class ChainGroupReadTable extends 
FallbackReadFileStoreTable {
     @Override
     public DataTableScan newScan() {
         super.validateSchema();
-        return new ChainTableBatchScan(
-                wrapped.newScan(),
-                fallback().newScan(),
-                ((AbstractFileStoreTable) wrapped).tableSchema,
-                this);
+        return new ChainTableBatchScan(((AbstractFileStoreTable) 
wrapped).tableSchema, this);
     }
 
     private DataTableScan newSnapshotScan() {
@@ -87,42 +83,38 @@ public class ChainGroupReadTable extends 
FallbackReadFileStoreTable {
     }
 
     private DataTableScan newDeltaScan() {
-        return fallback().newScan();
+        return other().newScan();
     }
 
     @Override
     public FileStoreTable copy(Map<String, String> dynamicOptions) {
         return new ChainGroupReadTable(
-                wrapped.copy(dynamicOptions),
-                fallback().copy(rewriteFallbackOptions(dynamicOptions)));
+                wrapped.copy(dynamicOptions), 
other().copy(rewriteOtherOptions(dynamicOptions)));
     }
 
     @Override
     public FileStoreTable copy(TableSchema newTableSchema) {
         return new ChainGroupReadTable(
                 wrapped.copy(newTableSchema),
-                fallback()
-                        .copy(
-                                newTableSchema.copy(
-                                        
rewriteFallbackOptions(newTableSchema.options()))));
+                
other().copy(newTableSchema.copy(rewriteOtherOptions(newTableSchema.options()))));
     }
 
     @Override
     public FileStoreTable copyWithoutTimeTravel(Map<String, String> 
dynamicOptions) {
         return new ChainGroupReadTable(
                 wrapped.copyWithoutTimeTravel(dynamicOptions),
-                
fallback().copyWithoutTimeTravel(rewriteFallbackOptions(dynamicOptions)));
+                
other().copyWithoutTimeTravel(rewriteOtherOptions(dynamicOptions)));
     }
 
     @Override
     public FileStoreTable copyWithLatestSchema() {
         return new ChainGroupReadTable(
-                wrapped.copyWithLatestSchema(), 
fallback().copyWithLatestSchema());
+                wrapped.copyWithLatestSchema(), 
other().copyWithLatestSchema());
     }
 
     @Override
     public FileStoreTable switchToBranch(String branchName) {
-        return new ChainGroupReadTable(switchWrappedToBranch(branchName), 
fallback());
+        return new ChainGroupReadTable(switchWrappedToBranch(branchName), 
other());
     }
 
     /** Scan implementation for {@link ChainGroupReadTable}. */
@@ -138,16 +130,12 @@ public class ChainGroupReadTable extends 
FallbackReadFileStoreTable {
         private Filter<Integer> bucketFilter;
 
         public ChainTableBatchScan(
-                DataTableScan mainScan,
-                DataTableScan fallbackScan,
-                TableSchema tableSchema,
-                ChainGroupReadTable chainGroupReadTable) {
+                TableSchema tableSchema, ChainGroupReadTable 
chainGroupReadTable) {
             super(
-                    mainScan,
-                    fallbackScan,
                     chainGroupReadTable.wrapped,
-                    chainGroupReadTable.fallback(),
-                    tableSchema);
+                    chainGroupReadTable.other(),
+                    tableSchema,
+                    FileStoreTable::newScan);
             this.options = CoreOptions.fromMap(tableSchema.options());
             this.chainGroupReadTable = chainGroupReadTable;
             this.partitionConverter =
@@ -500,7 +488,7 @@ public class ChainGroupReadTable extends 
FallbackReadFileStoreTable {
 
         private Read() {
             this.mainRead = wrapped.newRead();
-            this.fallbackRead = fallback().newRead();
+            this.fallbackRead = other().newRead();
         }
 
         @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index aff2aa9a23..5c839a5581 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -66,73 +66,82 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
- * A {@link FileStoreTable} which mainly read from the current branch. 
However, if the current
- * branch does not have a partition, it will read that partition from the 
fallback branch.
+ * A {@link FileStoreTable} which reads from two branches with partition-level 
merging. When {@code
+ * wrappedFirst} is true, the wrapped table has read priority (fallback mode). 
When {@code
+ * wrappedFirst} is false, the other table has read priority (primary mode). 
Write operations are
+ * always delegated to the wrapped table (current branch).
  */
 public class FallbackReadFileStoreTable extends DelegatedFileStoreTable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FallbackReadFileStoreTable.class);
 
-    private final FileStoreTable fallback;
+    private final FileStoreTable other;
+    private final boolean wrappedFirst;
 
-    public FallbackReadFileStoreTable(FileStoreTable wrapped, FileStoreTable 
fallback) {
+    public FallbackReadFileStoreTable(
+            FileStoreTable wrapped, FileStoreTable other, boolean 
wrappedFirst) {
         super(wrapped);
-        this.fallback = fallback;
+        this.other = other;
+        this.wrappedFirst = wrappedFirst;
 
         Preconditions.checkArgument(!(wrapped instanceof 
FallbackReadFileStoreTable));
-        if (fallback instanceof FallbackReadFileStoreTable) {
+        if (other instanceof FallbackReadFileStoreTable) {
             // ChainGroupReadTable need to be wrapped again
-            if (!(fallback instanceof ChainGroupReadTable)) {
+            if (!(other instanceof ChainGroupReadTable)) {
                 throw new IllegalArgumentException(
                         "This is a bug, perhaps there is a recursive call.");
             }
         }
     }
 
-    public FileStoreTable fallback() {
-        return fallback;
+    public FileStoreTable other() {
+        return other;
     }
 
     @Override
     public FileStoreTable copy(Map<String, String> dynamicOptions) {
         return new FallbackReadFileStoreTable(
                 wrapped.copy(dynamicOptions),
-                fallback.copy(rewriteFallbackOptions(dynamicOptions)));
+                other.copy(rewriteOtherOptions(dynamicOptions)),
+                wrappedFirst);
     }
 
     @Override
     public FileStoreTable copy(TableSchema newTableSchema) {
         return new FallbackReadFileStoreTable(
                 wrapped.copy(newTableSchema),
-                fallback.copy(
-                        
newTableSchema.copy(rewriteFallbackOptions(newTableSchema.options()))));
+                
other.copy(newTableSchema.copy(rewriteOtherOptions(newTableSchema.options()))),
+                wrappedFirst);
     }
 
     @Override
     public FileStoreTable copyWithoutTimeTravel(Map<String, String> 
dynamicOptions) {
         return new FallbackReadFileStoreTable(
                 wrapped.copyWithoutTimeTravel(dynamicOptions),
-                
fallback.copyWithoutTimeTravel(rewriteFallbackOptions(dynamicOptions)));
+                
other.copyWithoutTimeTravel(rewriteOtherOptions(dynamicOptions)),
+                wrappedFirst);
     }
 
     @Override
     public FileStoreTable copyWithLatestSchema() {
         return new FallbackReadFileStoreTable(
-                wrapped.copyWithLatestSchema(), 
fallback.copyWithLatestSchema());
+                wrapped.copyWithLatestSchema(), other.copyWithLatestSchema(), 
wrappedFirst);
     }
 
     @Override
     public FileStoreTable switchToBranch(String branchName) {
-        return new 
FallbackReadFileStoreTable(switchWrappedToBranch(branchName), fallback);
+        return new FallbackReadFileStoreTable(
+                switchWrappedToBranch(branchName), other, wrappedFirst);
     }
 
     @Override
     public void setManifestCache(SegmentsCache<Path> manifestCache) {
         super.setManifestCache(manifestCache);
-        fallback.setManifestCache(manifestCache);
+        other.setManifestCache(manifestCache);
     }
 
     protected FileStoreTable switchWrappedToBranch(String branchName) {
@@ -153,35 +162,35 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
                 wrapped.catalogEnvironment());
     }
 
-    protected Map<String, String> rewriteFallbackOptions(Map<String, String> 
options) {
+    protected Map<String, String> rewriteOtherOptions(Map<String, String> 
options) {
         Map<String, String> result = new HashMap<>(options);
 
-        // branch of fallback table should never change
+        // branch of other table should never change
         String branchKey = CoreOptions.BRANCH.key();
         if (options.containsKey(branchKey)) {
-            result.put(branchKey, fallback.options().get(branchKey));
+            result.put(branchKey, other.options().get(branchKey));
         }
 
-        // snapshot ids may be different between the main branch and the 
fallback branch,
+        // snapshot ids may be different between the main branch and the other 
branch,
         // so we need to convert main branch snapshot id to millisecond,
-        // then convert millisecond to fallback branch snapshot id
+        // then convert millisecond to other branch snapshot id
         String scanSnapshotIdOptionKey = CoreOptions.SCAN_SNAPSHOT_ID.key();
         String scanSnapshotId = options.get(scanSnapshotIdOptionKey);
         if (scanSnapshotId != null) {
             long id = Long.parseLong(scanSnapshotId);
             long millis = wrapped.snapshotManager().snapshot(id).timeMillis();
-            Snapshot fallbackSnapshot = 
fallback.snapshotManager().earlierOrEqualTimeMills(millis);
-            long fallbackId;
-            if (fallbackSnapshot == null) {
-                fallbackId = Snapshot.FIRST_SNAPSHOT_ID;
+            Snapshot otherSnapshot = 
other.snapshotManager().earlierOrEqualTimeMills(millis);
+            long otherId;
+            if (otherSnapshot == null) {
+                otherId = Snapshot.FIRST_SNAPSHOT_ID;
             } else {
-                fallbackId = fallbackSnapshot.id();
+                otherId = otherSnapshot.id();
             }
-            result.put(scanSnapshotIdOptionKey, String.valueOf(fallbackId));
+            result.put(scanSnapshotIdOptionKey, String.valueOf(otherId));
         }
 
-        // bucket number of main branch and fallback branch are very likely 
different,
-        // so we remove bucket in options to use fallback branch's bucket 
number
+        // bucket number of main branch and other branch are very likely 
different,
+        // so we remove bucket in options to use other branch's bucket number
         result.remove(CoreOptions.BUCKET.key());
 
         return result;
@@ -189,61 +198,66 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
 
     @Override
     public DataTableScan newScan() {
+        return newScan(FileStoreTable::newScan);
+    }
+
+    public DataTableScan newScan(Function<FileStoreTable, DataTableScan> 
scanCreator) {
         validateSchema();
-        return new FallbackReadScan(
-                wrapped.newScan(), fallback.newScan(), wrapped, fallback, 
wrapped.schema());
+        FileStoreTable first = wrappedFirst ? wrapped : other;
+        FileStoreTable second = wrappedFirst ? other : wrapped;
+        return new FallbackReadScan(first, second, wrapped.schema(), 
scanCreator);
     }
 
     protected void validateSchema() {
         String mainBranch = wrapped.coreOptions().branch();
-        String fallbackBranch = fallback.coreOptions().branch();
+        String otherBranch = other.coreOptions().branch();
         RowType mainRowType = wrapped.schema().logicalRowType();
-        RowType fallbackRowType = fallback.schema().logicalRowType();
+        RowType otherRowType = other.schema().logicalRowType();
         Preconditions.checkArgument(
-                sameRowTypeIgnoreNullable(mainRowType, fallbackRowType),
+                sameRowTypeIgnoreNullable(mainRowType, otherRowType),
                 "Branch %s and %s does not have the same row type.\n"
                         + "Row type of branch %s is %s.\n"
                         + "Row type of branch %s is %s.",
                 mainBranch,
-                fallbackBranch,
+                otherBranch,
                 mainBranch,
                 mainRowType,
-                fallbackBranch,
-                fallbackRowType);
+                otherBranch,
+                otherRowType);
 
         List<String> mainPrimaryKeys = wrapped.schema().primaryKeys();
-        List<String> fallbackPrimaryKeys = fallback.schema().primaryKeys();
+        List<String> otherPrimaryKeys = other.schema().primaryKeys();
         if (!mainPrimaryKeys.isEmpty()) {
-            if (fallbackPrimaryKeys.isEmpty()) {
+            if (otherPrimaryKeys.isEmpty()) {
                 throw new IllegalArgumentException(
                         "Branch "
                                 + mainBranch
-                                + " has primary keys while fallback branch "
-                                + fallbackBranch
+                                + " has primary keys while branch "
+                                + otherBranch
                                 + " does not. This is not allowed.");
             }
             Preconditions.checkArgument(
-                    mainPrimaryKeys.equals(fallbackPrimaryKeys),
+                    mainPrimaryKeys.equals(otherPrimaryKeys),
                     "Branch %s and %s both have primary keys but are not the 
same.\n"
                             + "Primary keys of %s are %s.\n"
                             + "Primary keys of %s are %s.",
                     mainBranch,
-                    fallbackBranch,
+                    otherBranch,
                     mainBranch,
                     mainPrimaryKeys,
-                    fallbackBranch,
-                    fallbackPrimaryKeys);
+                    otherBranch,
+                    otherPrimaryKeys);
         }
     }
 
-    private boolean sameRowTypeIgnoreNullable(RowType mainRowType, RowType 
fallbackRowType) {
-        if (mainRowType.getFieldCount() != fallbackRowType.getFieldCount()) {
+    private boolean sameRowTypeIgnoreNullable(RowType mainRowType, RowType 
otherRowType) {
+        if (mainRowType.getFieldCount() != otherRowType.getFieldCount()) {
             return false;
         }
         for (int i = 0; i < mainRowType.getFieldCount(); i++) {
             DataType mainType = mainRowType.getFields().get(i).type();
-            DataType fallbackType = fallbackRowType.getFields().get(i).type();
-            if (!mainType.equalsIgnoreNullable(fallbackType)) {
+            DataType otherType = otherRowType.getFields().get(i).type();
+            if (!mainType.equalsIgnoreNullable(otherType)) {
                 return false;
             }
         }
@@ -356,24 +370,25 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
     /** Scan implementation for {@link FallbackReadFileStoreTable}. */
     public static class FallbackReadScan implements DataTableScan {
 
-        protected final DataTableScan mainScan;
-        protected final DataTableScan fallbackScan;
         protected final FileStoreTable wrappedTable;
         protected final FileStoreTable fallbackTable;
         protected final TableSchema tableSchema;
+        protected final Function<FileStoreTable, DataTableScan> scanCreator;
+        protected final DataTableScan mainScan;
+        protected final DataTableScan fallbackScan;
         private PartitionPredicate partitionPredicate;
 
         public FallbackReadScan(
-                DataTableScan mainScan,
-                DataTableScan fallbackScan,
                 FileStoreTable wrappedTable,
                 FileStoreTable fallbackTable,
-                TableSchema tableSchema) {
-            this.mainScan = mainScan;
-            this.fallbackScan = fallbackScan;
+                TableSchema tableSchema,
+                Function<FileStoreTable, DataTableScan> scanCreator) {
             this.wrappedTable = wrappedTable;
             this.fallbackTable = fallbackTable;
             this.tableSchema = tableSchema;
+            this.scanCreator = scanCreator;
+            this.mainScan = scanCreator.apply(wrappedTable);
+            this.fallbackScan = scanCreator.apply(fallbackTable);
         }
 
         @Override
@@ -561,7 +576,7 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
 
         private DataTableScan newPartitionListingScan(
                 boolean isMain, PartitionPredicate scanPartitionPredicate) {
-            DataTableScan scan = isMain ? wrappedTable.newScan() : 
fallbackTable.newScan();
+            DataTableScan scan = scanCreator.apply(isMain ? wrappedTable : 
fallbackTable);
             if (scanPartitionPredicate != null) {
                 scan.withPartitionFilter(scanPartitionPredicate);
             }
@@ -580,8 +595,10 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
         private final InnerTableRead fallbackRead;
 
         private Read() {
-            this.mainRead = wrapped.newRead();
-            this.fallbackRead = fallback.newRead();
+            FileStoreTable first = wrappedFirst ? wrapped : other;
+            FileStoreTable second = wrappedFirst ? other : wrapped;
+            this.mainRead = first.newRead();
+            this.fallbackRead = second.newRead();
         }
 
         @Override
@@ -628,7 +645,7 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
                         return 
fallbackRead.createReader(fallbackSplit.wrapped());
                     } catch (Exception ignored) {
                         LOG.error(
-                                "Reading from fallback branch has problems: 
{}",
+                                "Reading from supplemental branch has 
problems: {}",
                                 fallbackSplit.wrapped());
                     }
                 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
index 73ae6d2dd1..e4a8656799 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
@@ -99,35 +99,27 @@ public class FileStoreTableFactory {
 
         Options options = new Options(table.options());
         String fallbackBranch = options.get(CoreOptions.SCAN_FALLBACK_BRANCH);
+        String primaryBranch = options.get(CoreOptions.SCAN_PRIMARY_BRANCH);
         if (ChainTableUtils.isChainTable(options.toMap())) {
             table = createChainTable(table, fileIO, tablePath, dynamicOptions, 
catalogEnvironment);
         } else if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) {
-            Options branchOptions = new Options(dynamicOptions.toMap());
-            branchOptions.set(CoreOptions.BRANCH, fallbackBranch);
-            Optional<TableSchema> schema =
-                    new SchemaManager(fileIO, tablePath, 
fallbackBranch).latest();
-            if (schema.isPresent()) {
-                Identifier identifier = catalogEnvironment.identifier();
-                CatalogEnvironment fallbackCatalogEnvironment = 
catalogEnvironment;
-                if (identifier != null) {
-                    fallbackCatalogEnvironment =
-                            catalogEnvironment.copy(
-                                    new Identifier(
-                                            identifier.getDatabaseName(),
-                                            identifier.getObjectName(),
-                                            fallbackBranch));
-                }
-                FileStoreTable fallbackTable =
-                        createWithoutFallbackBranch(
-                                fileIO,
-                                tablePath,
-                                schema.get(),
-                                branchOptions,
-                                fallbackCatalogEnvironment);
-                table = new FallbackReadFileStoreTable(table, fallbackTable);
+            FileStoreTable otherTable =
+                    createOtherBranchTable(
+                            fileIO, tablePath, fallbackBranch, dynamicOptions, 
catalogEnvironment);
+            if (otherTable != null) {
+                table = new FallbackReadFileStoreTable(table, otherTable, 
true);
             } else {
                 LOG.error("Fallback branch {} not found for table {}", 
fallbackBranch, tablePath);
             }
+        } else if (!StringUtils.isNullOrWhitespaceOnly(primaryBranch)) {
+            FileStoreTable otherTable =
+                    createOtherBranchTable(
+                            fileIO, tablePath, primaryBranch, dynamicOptions, 
catalogEnvironment);
+            if (otherTable != null) {
+                table = new FallbackReadFileStoreTable(table, otherTable, 
false);
+            } else {
+                LOG.error("Primary branch {} not found for table {}", 
primaryBranch, tablePath);
+            }
         }
 
         return table;
@@ -189,7 +181,33 @@ public class FileStoreTableFactory {
                                 catalogEnvironment);
         FileStoreTable chainGroupFileStoreTable =
                 new ChainGroupReadTable(snapshotTable, deltaTable);
-        return new FallbackReadFileStoreTable(table, chainGroupFileStoreTable);
+        return new FallbackReadFileStoreTable(table, chainGroupFileStoreTable, 
true);
+    }
+
+    private static FileStoreTable createOtherBranchTable(
+            FileIO fileIO,
+            Path tablePath,
+            String branchName,
+            Options dynamicOptions,
+            CatalogEnvironment catalogEnvironment) {
+        Options branchOptions = new Options(dynamicOptions.toMap());
+        branchOptions.set(CoreOptions.BRANCH, branchName);
+        Optional<TableSchema> schema = new SchemaManager(fileIO, tablePath, 
branchName).latest();
+        if (schema.isPresent()) {
+            Identifier identifier = catalogEnvironment.identifier();
+            CatalogEnvironment branchCatalogEnvironment = catalogEnvironment;
+            if (identifier != null) {
+                branchCatalogEnvironment =
+                        catalogEnvironment.copy(
+                                new Identifier(
+                                        identifier.getDatabaseName(),
+                                        identifier.getObjectName(),
+                                        branchName));
+            }
+            return createWithoutFallbackBranch(
+                    fileIO, tablePath, schema.get(), branchOptions, 
branchCatalogEnvironment);
+        }
+        return null;
     }
 
     public static FileStoreTable createWithoutFallbackBranch(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index 64ef8aa34e..0a96533a99 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -29,7 +29,6 @@ import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.FallbackReadFileStoreTable;
-import org.apache.paimon.table.FallbackReadFileStoreTable.FallbackReadScan;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.ReadonlyTable;
 import org.apache.paimon.table.Table;
@@ -139,13 +138,7 @@ public class ReadOptimizedTable implements DataTable, 
ReadonlyTable {
     @Override
     public DataTableScan newScan() {
         if (wrapped instanceof FallbackReadFileStoreTable) {
-            FallbackReadFileStoreTable table = (FallbackReadFileStoreTable) 
wrapped;
-            return new FallbackReadScan(
-                    newScan(table.wrapped()),
-                    newScan(table.fallback()),
-                    table.wrapped(),
-                    table.fallback(),
-                    table.wrapped().schema());
+            return ((FallbackReadFileStoreTable) 
wrapped).newScan(this::newScan);
         }
         return newScan(wrapped);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
index 6ccbbbc8fc..99e25cc2ce 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
@@ -220,8 +220,7 @@ public class ChainTableUtils {
 
     public static FileStoreTable resolveChainPrimaryTable(FileStoreTable 
table) {
         if (table.coreOptions().isChainTable() && table instanceof 
FallbackReadFileStoreTable) {
-            return ((ChainGroupReadTable) ((FallbackReadFileStoreTable) 
table).fallback())
-                    .wrapped();
+            return ((ChainGroupReadTable) ((FallbackReadFileStoreTable) 
table).other()).wrapped();
         }
         return table;
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
index 7163202a8e..6a1f782fd2 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
@@ -42,8 +42,9 @@ import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.TraceableFileIO;
 
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Collections;
 import java.util.List;
@@ -77,8 +78,9 @@ public class FallbackReadFileStoreTableTest {
         fileIO = FileIOFinder.find(tablePath);
     }
 
-    @Test
-    public void testListPartitions() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testListPartitions(boolean wrappedFirst) throws Exception {
         String branchName = "bc";
 
         FileStoreTable mainTable = createTable();
@@ -93,25 +95,20 @@ public class FallbackReadFileStoreTableTest {
         // write data into partition for branch only
         writeDataIntoTable(branchTable, 0, rowData(3, 60));
 
-        FallbackReadFileStoreTable fallbackTable =
-                new FallbackReadFileStoreTable(mainTable, branchTable);
-
-        List<Integer> partitionsFromMainTable =
-                mainTable.newScan().listPartitions().stream()
-                        .map(row -> row.getInt(0))
-                        .collect(Collectors.toList());
-        assertThat(partitionsFromMainTable).containsExactlyInAnyOrder(1, 2);
+        FallbackReadFileStoreTable table =
+                new FallbackReadFileStoreTable(mainTable, branchTable, 
wrappedFirst);
 
-        List<Integer> partitionsFromFallbackTable =
-                fallbackTable.newScan().listPartitions().stream()
+        List<Integer> partitions =
+                table.newScan().listPartitions().stream()
                         .map(row -> row.getInt(0))
                         .collect(Collectors.toList());
         // this should contain all partitions
-        assertThat(partitionsFromFallbackTable).containsExactlyInAnyOrder(1, 
2, 3);
+        assertThat(partitions).containsExactlyInAnyOrder(1, 2, 3);
     }
 
-    @Test
-    public void testListPartitionEntries() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testListPartitionEntries(boolean wrappedFirst) throws 
Exception {
         String branchName = "bc";
 
         FileStoreTable mainTable = createTable();
@@ -126,40 +123,38 @@ public class FallbackReadFileStoreTableTest {
         // write data into partition for branch only
         writeDataIntoTable(branchTable, 0, rowData(1, 50), rowData(3, 60), 
rowData(4, 70));
 
-        FallbackReadFileStoreTable fallbackTable =
-                new FallbackReadFileStoreTable(mainTable, branchTable);
-
-        List<PartitionEntry> partitionEntries = 
mainTable.newScan().listPartitionEntries();
-        assertThat(partitionEntries)
-                .map(e -> e.partition().getInt(0))
-                .containsExactlyInAnyOrder(1, 2);
+        FallbackReadFileStoreTable table =
+                new FallbackReadFileStoreTable(mainTable, branchTable, 
wrappedFirst);
 
-        List<PartitionEntry> fallbackPartitionEntries =
-                fallbackTable.newScan().listPartitionEntries();
-        assertThat(fallbackPartitionEntries)
+        List<PartitionEntry> entries = table.newScan().listPartitionEntries();
+        // partition 1 exists in both: record count depends on which table has 
priority
+        // wrappedFirst=true → mainTable has priority (2 records), false → 
branchTable (1 record)
+        long expectedPt1Count = wrappedFirst ? 2L : 1L;
+        assertThat(entries)
                 .map(e -> Pair.of(e.partition().getInt(0), e.recordCount()))
                 .containsExactlyInAnyOrder(
-                        Pair.of(1, 2L), Pair.of(2, 1L), Pair.of(3, 1L), 
Pair.of(4, 1L));
+                        Pair.of(1, expectedPt1Count),
+                        Pair.of(2, 1L),
+                        Pair.of(3, 1L),
+                        Pair.of(4, 1L));
     }
 
     /**
      * Test that FallbackReadScan.plan() determines partition ownership based 
on partition
-     * predicates only, not mixed with data filters. If a partition exists in 
the main branch, it
-     * should never be read from fallback, regardless of the data filter.
-     *
-     * <p>Without the fix, the old code built completePartitions from 
mainScan.plan() results which
-     * already had data filters applied. When the data filter excluded all 
files of a main partition
-     * via filterByStats, that partition was incorrectly treated as "not in 
main" and read from
-     * fallback.
+     * predicates only, not mixed with data filters. If a partition exists in 
the priority table, it
+     * should never be read from the supplemental table, regardless of the 
data filter.
      */
-    @Test
-    public void testPlanWithDataFilter() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testPlanWithDataFilter(boolean wrappedFirst) throws Exception {
         String branchName = "bc";
+        InternalRow[] firstValues = new InternalRow[] {rowData(1, 10), 
rowData(2, 20)};
+        InternalRow[] secondValues = new InternalRow[] {rowData(1, 100), 
rowData(3, 30)};
 
         FileStoreTable mainTable = createTable();
 
         // Main branch: partition 1 (a=10), partition 2 (a=20)
-        writeDataIntoTable(mainTable, 0, rowData(1, 10), rowData(2, 20));
+        writeDataIntoTable(mainTable, 0, wrappedFirst ? firstValues : 
secondValues);
 
         mainTable.createBranch(branchName);
 
@@ -167,10 +162,10 @@ public class FallbackReadFileStoreTableTest {
 
         // Fallback branch: partition 1 already has a=10 (inherited), add 
a=100.
         // Also add partition 3 (a=30) which is fallback-only.
-        writeDataIntoTable(branchTable, 1, rowData(1, 100), rowData(3, 30));
+        writeDataIntoTable(branchTable, 1, wrappedFirst ? secondValues : 
firstValues);
 
         FallbackReadFileStoreTable fallbackTable =
-                new FallbackReadFileStoreTable(mainTable, branchTable);
+                new FallbackReadFileStoreTable(mainTable, branchTable, 
wrappedFirst);
         PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
 
         // Case 1: WHERE pt = 1 AND a = 100
@@ -202,6 +197,51 @@ public class FallbackReadFileStoreTableTest {
                 .isTrue();
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testWriteGoesToWrapped(boolean wrappedFirst) throws Exception {
+        String branchName = "bc";
+
+        FileStoreTable mainTable = createTable();
+
+        // write data into partition 1 for main
+        writeDataIntoTable(mainTable, 0, rowData(1, 10));
+
+        mainTable.createBranch(branchName);
+
+        FileStoreTable branchTable = createTableFromBranch(mainTable, 
branchName);
+
+        // write data into partition 2 for branch
+        writeDataIntoTable(branchTable, 0, rowData(2, 20));
+
+        FallbackReadFileStoreTable table =
+                new FallbackReadFileStoreTable(mainTable, branchTable, 
wrappedFirst);
+
+        // write through the merged table — should go to mainTable (wrapped)
+        writeDataIntoTable(table, 1, rowData(3, 30));
+
+        // verify: main branch should now have partition 1 and 3
+        List<Integer> mainPartitions =
+                mainTable.newScan().listPartitions().stream()
+                        .map(row -> row.getInt(0))
+                        .collect(Collectors.toList());
+        assertThat(mainPartitions).containsExactlyInAnyOrder(1, 3);
+
+        // verify: branch should still only have partition 2
+        List<Integer> branchPartitions =
+                branchTable.newScan().listPartitions().stream()
+                        .map(row -> row.getInt(0))
+                        .collect(Collectors.toList());
+        assertThat(branchPartitions).containsExactlyInAnyOrder(2);
+
+        // verify: merged read should see all three partitions
+        List<Integer> mergedPartitions =
+                table.newScan().listPartitions().stream()
+                        .map(row -> row.getInt(0))
+                        .collect(Collectors.toList());
+        assertThat(mergedPartitions).containsExactlyInAnyOrder(1, 2, 3);
+    }
+
     private void writeDataIntoTable(
             FileStoreTable table, long commitIdentifier, InternalRow... 
allData) throws Exception {
         StreamTableWrite write = table.newWrite(commitUser);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
index f50e9922f5..c4ff1ca51a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
@@ -1218,9 +1218,8 @@ public abstract class SimpleTableTestBase {
                 .satisfies(
                         anyCauseMatches(
                                 IllegalArgumentException.class,
-                                "can not delete the fallback branch. "
-                                        + "branchName to be deleted is 
fallback. you have set 'scan.fallback-branch' = 'fallback'. "
-                                        + "you should reset 
'scan.fallback-branch' before deleting this branch."));
+                                "Cannot delete branch 'fallback' because it is 
configured as"
+                                        + " 'scan.fallback-branch'. Unset 
'scan.fallback-branch' first."));
 
         table.deleteBranch("fallback");
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index 1b7d50c852..641efb7335 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -760,7 +760,7 @@ public class BranchSqlITCase extends CatalogITCaseBase {
     @Test
     public void testCannotSetEmptyFallbackBranch() {
         String errMsg =
-                "Cannot set 'scan.fallback-branch' = 'test' because the branch 
'test' isn't existed.";
+                "Cannot set 'scan.fallback-branch' = 'test' because the branch 
'test' does not exist.";
         assertThatThrownBy(
                         () ->
                                 sql(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index 06afcce3bb..94bd5a30e7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -966,7 +966,8 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
         // branch1 is fallback branch, can not be deleted
         assertThatCode(() -> bEnv.executeSql("CALL 
sys.delete_branch('default.t', 'branch1')"))
                 .rootCause()
-                .hasMessageContaining("can not delete the fallback branch.");
+                .hasMessageContaining(
+                        "Cannot delete branch 'branch1' because it is 
configured as 'scan.fallback-branch'.");
 
         // reset scan.fallback-branch
         bEnv.executeSql("ALTER TABLE t RESET ('scan.fallback-branch')");

Reply via email to