This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fe9b975c86 [core] When scan.primary-branch is set, allow primary
branch to be an append only table (#7614)
fe9b975c86 is described below
commit fe9b975c868cd1f81a81c2d68041348a31670d56
Author: tsreaper <[email protected]>
AuthorDate: Fri Apr 10 13:06:47 2026 +0800
[core] When scan.primary-branch is set, allow primary branch to be an
append only table (#7614)
In `scan.fallback-branch`, we allow main branch to be append only, and
fallback branch to have primary key. So in `scan.primary-branch`, we
should allow main branch to have primary key, and primary branch to be
append only.
---
.../paimon/table/FallbackReadFileStoreTable.java | 24 -------------
.../org/apache/paimon/flink/BranchSqlITCase.java | 42 ++++++++++++++++++++++
2 files changed, 42 insertions(+), 24 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index 5c839a5581..1d1c6214ce 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -224,30 +224,6 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
mainRowType,
otherBranch,
otherRowType);
-
- List<String> mainPrimaryKeys = wrapped.schema().primaryKeys();
- List<String> otherPrimaryKeys = other.schema().primaryKeys();
- if (!mainPrimaryKeys.isEmpty()) {
- if (otherPrimaryKeys.isEmpty()) {
- throw new IllegalArgumentException(
- "Branch "
- + mainBranch
- + " has primary keys while branch "
- + otherBranch
- + " does not. This is not allowed.");
- }
- Preconditions.checkArgument(
- mainPrimaryKeys.equals(otherPrimaryKeys),
- "Branch %s and %s both have primary keys but are not the
same.\n"
- + "Primary keys of %s are %s.\n"
- + "Primary keys of %s are %s.",
- mainBranch,
- otherBranch,
- mainBranch,
- mainPrimaryKeys,
- otherBranch,
- otherPrimaryKeys);
- }
}
private boolean sameRowTypeIgnoreNullable(RowType mainRowType, RowType
otherRowType) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index 641efb7335..7ecfbf77b2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -923,6 +923,48 @@ public class BranchSqlITCase extends CatalogITCaseBase {
.hasMessageContaining("Branch");
}
+ @Test
+ public void testPrimaryBranchBatchRead() throws Exception {
+ // Create non-PK table, then create branch, then ALTER main to add PKs.
+ // This results in main = PK table, branch = non-PK table.
+ sql(
+ "CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) "
+ + "PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )");
+
+ sql("CALL sys.create_branch('default.t', 'nb')");
+ sql("ALTER TABLE t SET ( 'primary-key' = 'pt, k', 'bucket' = '2' )");
+ sql("ALTER TABLE t SET ( 'scan.primary-branch' = 'nb' )");
+
+ // Insert into non-PK branch (primary, has priority)
+ sql("INSERT INTO `t$branch_nb` VALUES (1, 20, 'cat'), (1, 30, 'dog')");
+ // Insert overlapping partition into PK main (fallback)
+ sql("INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 'banana')");
+
+ // pt=1 exists in primary branch → read from branch
+ assertThat(collectResult("SELECT v, k FROM t"))
+ .containsExactlyInAnyOrder("+I[cat, 20]", "+I[dog, 30]");
+ assertThat(collectResult("SELECT v, k FROM `t$branch_nb`"))
+ .containsExactlyInAnyOrder("+I[cat, 20]", "+I[dog, 30]");
+
+ // Insert pt=2 into primary branch, pt=3 only into main
+ sql("INSERT INTO `t$branch_nb` VALUES (2, 10, 'tiger'), (2, 20,
'wolf')");
+ sql("INSERT INTO t VALUES (3, 10, 'horse')");
+
+ // pt=1,2 from primary branch; pt=3 from main (fallback)
+ assertThat(collectResult("SELECT v, k FROM t"))
+ .containsExactlyInAnyOrder(
+ "+I[cat, 20]",
+ "+I[dog, 30]",
+ "+I[tiger, 10]",
+ "+I[wolf, 20]",
+ "+I[horse, 10]");
+
+ // Unset scan.primary-branch, main table should show its own data
+ sql("ALTER TABLE t RESET ( 'scan.primary-branch' )");
+ assertThat(collectResult("SELECT v, k FROM t"))
+ .containsExactlyInAnyOrder("+I[apple, 10]", "+I[banana, 20]",
"+I[horse, 10]");
+ }
+
private List<String> collectResult(String sql) throws Exception {
List<String> result = new ArrayList<>();
try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {