This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fe9b975c86 [core] When scan.primary-branch is set, allow primary 
branch to be an append only table (#7614)
fe9b975c86 is described below

commit fe9b975c868cd1f81a81c2d68041348a31670d56
Author: tsreaper <[email protected]>
AuthorDate: Fri Apr 10 13:06:47 2026 +0800

    [core] When scan.primary-branch is set, allow primary branch to be an 
append only table (#7614)
    
    In `scan.fallback-branch`, we allow main branch to be append only, and
    fallback branch to have primary key. So in `scan.primary-branch`, we
    should allow main branch to have primary key, and primary branch to be
    append only.
---
 .../paimon/table/FallbackReadFileStoreTable.java   | 24 -------------
 .../org/apache/paimon/flink/BranchSqlITCase.java   | 42 ++++++++++++++++++++++
 2 files changed, 42 insertions(+), 24 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index 5c839a5581..1d1c6214ce 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -224,30 +224,6 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
                 mainRowType,
                 otherBranch,
                 otherRowType);
-
-        List<String> mainPrimaryKeys = wrapped.schema().primaryKeys();
-        List<String> otherPrimaryKeys = other.schema().primaryKeys();
-        if (!mainPrimaryKeys.isEmpty()) {
-            if (otherPrimaryKeys.isEmpty()) {
-                throw new IllegalArgumentException(
-                        "Branch "
-                                + mainBranch
-                                + " has primary keys while branch "
-                                + otherBranch
-                                + " does not. This is not allowed.");
-            }
-            Preconditions.checkArgument(
-                    mainPrimaryKeys.equals(otherPrimaryKeys),
-                    "Branch %s and %s both have primary keys but are not the 
same.\n"
-                            + "Primary keys of %s are %s.\n"
-                            + "Primary keys of %s are %s.",
-                    mainBranch,
-                    otherBranch,
-                    mainBranch,
-                    mainPrimaryKeys,
-                    otherBranch,
-                    otherPrimaryKeys);
-        }
     }
 
     private boolean sameRowTypeIgnoreNullable(RowType mainRowType, RowType 
otherRowType) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index 641efb7335..7ecfbf77b2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -923,6 +923,48 @@ public class BranchSqlITCase extends CatalogITCaseBase {
                 .hasMessageContaining("Branch");
     }
 
+    @Test
+    public void testPrimaryBranchBatchRead() throws Exception {
+        // Create non-PK table, then create branch, then ALTER main to add PKs.
+        // This results in main = PK table, branch = non-PK table.
+        sql(
+                "CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) "
+                        + "PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )");
+
+        sql("CALL sys.create_branch('default.t', 'nb')");
+        sql("ALTER TABLE t SET ( 'primary-key' = 'pt, k', 'bucket' = '2' )");
+        sql("ALTER TABLE t SET ( 'scan.primary-branch' = 'nb' )");
+
+        // Insert into non-PK branch (primary, has priority)
+        sql("INSERT INTO `t$branch_nb` VALUES (1, 20, 'cat'), (1, 30, 'dog')");
+        // Insert overlapping partition into PK main (fallback)
+        sql("INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 'banana')");
+
+        // pt=1 exists in primary branch → read from branch
+        assertThat(collectResult("SELECT v, k FROM t"))
+                .containsExactlyInAnyOrder("+I[cat, 20]", "+I[dog, 30]");
+        assertThat(collectResult("SELECT v, k FROM `t$branch_nb`"))
+                .containsExactlyInAnyOrder("+I[cat, 20]", "+I[dog, 30]");
+
+        // Insert pt=2 into primary branch, pt=3 only into main
+        sql("INSERT INTO `t$branch_nb` VALUES (2, 10, 'tiger'), (2, 20, 
'wolf')");
+        sql("INSERT INTO t VALUES (3, 10, 'horse')");
+
+        // pt=1,2 from primary branch; pt=3 from main (fallback)
+        assertThat(collectResult("SELECT v, k FROM t"))
+                .containsExactlyInAnyOrder(
+                        "+I[cat, 20]",
+                        "+I[dog, 30]",
+                        "+I[tiger, 10]",
+                        "+I[wolf, 20]",
+                        "+I[horse, 10]");
+
+        // Unset scan.primary-branch, main table should show its own data
+        sql("ALTER TABLE t RESET ( 'scan.primary-branch' )");
+        assertThat(collectResult("SELECT v, k FROM t"))
+                .containsExactlyInAnyOrder("+I[apple, 10]", "+I[banana, 20]", 
"+I[horse, 10]");
+    }
+
     private List<String> collectResult(String sql) throws Exception {
         List<String> result = new ArrayList<>();
         try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {

Reply via email to