This is an automated email from the ASF dual-hosted git repository. hope pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 749aa28f832df88c8925d14a5725529f00258404 Author: tsreaper <[email protected]> AuthorDate: Tue Mar 31 15:27:00 2026 +0800 Introduce scan.primary-branch which is symmetrical to scan.fallback-branch (#7553) ### Purpose Some users may have accumulated large amounts of data in their main primary-key branch, and they want to organize the data into another non-primary-key branch to speed up querying. This PR introduces `scan.primary-branch` which is symmetrical to `scan.fallback-branch`. ### Tests * `FallbackReadFileStoreTableTest` --- .../shortcodes/generated/core_configuration.html | 18 ++- .../generated/flink_connector_configuration.html | 2 +- .../main/java/org/apache/paimon/CoreOptions.java | 9 ++ .../org/apache/paimon/schema/SchemaValidation.java | 22 +++- .../paimon/table/AbstractFileStoreTable.java | 17 ++- .../apache/paimon/table/ChainGroupReadTable.java | 38 ++---- .../paimon/table/FallbackReadFileStoreTable.java | 137 ++++++++++++--------- .../apache/paimon/table/FileStoreTableFactory.java | 66 ++++++---- .../paimon/table/system/ReadOptimizedTable.java | 9 +- .../org/apache/paimon/utils/ChainTableUtils.java | 3 +- .../table/FallbackReadFileStoreTableTest.java | 118 ++++++++++++------ .../apache/paimon/table/SimpleTableTestBase.java | 5 +- .../org/apache/paimon/flink/BranchSqlITCase.java | 2 +- .../flink/PrimaryKeyFileStoreTableITCase.java | 3 +- 14 files changed, 274 insertions(+), 175 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 6dd621fcac..0071c26437 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -128,6 +128,12 @@ under the License. <td>MemorySize</td> <td>Memory page size for caching.</td> </tr> + <tr> + <td><h5>chain-table.chain-partition-keys</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Partition keys that participate in chain logic. Must be a contiguous suffix of the table's partition keys. Comma-separated. If not set, all partition keys participate in chain.</td> + </tr> <tr> <td><h5>chain-table.enabled</h5></td> <td style="word-wrap: break-word;">false</td> @@ -1187,12 +1193,6 @@ This config option does not affect the default filesystem metastore.</td> <td>String</td> <td>When a batch job queries from a chain table, if a partition does not exist in the main branch, the reader will try to get this partition from chain snapshot branch.</td> </tr> - <tr> - <td><h5>chain-table.chain-partition-keys</h5></td> - <td style="word-wrap: break-word;">(none)</td> - <td>String</td> - <td>Partition keys that participate in chain logic. Must be a contiguous suffix of the table's partition keys. Comma-separated. If not set, all partition keys participate in chain.</td> - </tr> <tr> <td><h5>scan.file-creation-time-millis</h5></td> <td style="word-wrap: break-word;">(none)</td> @@ -1235,6 +1235,12 @@ This config option does not affect the default filesystem metastore.</td> <td>Boolean</td> <td>Whether to sort plan files by partition fields, this allows you to read according to the partition order, even if your partition writes are out of order.<br />It is recommended that you use this for streaming read of the 'append-only' table. By default, streaming read will read the full snapshot first. In order to avoid the disorder reading for partitions, you can open this option.</td> </tr> + <tr> + <td><h5>scan.primary-branch</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>When a batch job queries from a table, if a partition exists in the primary branch, the reader will read this partition from the primary branch. Otherwise, the reader will read this partition from the current branch.</td> + </tr> <tr> <td><h5>scan.snapshot-id</h5></td> <td style="word-wrap: break-word;">(none)</td> diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html index d4c8b5a167..253652cb78 100644 --- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html @@ -363,4 +363,4 @@ under the License. <td>Defines a custom parallelism for the unaware-bucket table compaction job. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration.</td> </tr> </tbody> -</table> \ No newline at end of file +</table> diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 7c054b8c8a..897bebcf68 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -1961,6 +1961,15 @@ public class CoreOptions implements Serializable { "When a batch job queries from a table, if a partition does not exist in the current branch, " + "the reader will try to get this partition from this fallback branch."); + public static final ConfigOption<String> SCAN_PRIMARY_BRANCH = + key("scan.primary-branch") + .stringType() + .noDefaultValue() + .withDescription( + "When a batch job queries from a table, if a partition exists in the primary branch, " + + "the reader will read this partition from the primary branch. " + + "Otherwise, the reader will read this partition from the current branch."); + public static final ConfigOption<Boolean> ASYNC_FILE_WRITE = key("async-file-write") .booleanType() diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index b9b68fc647..2ff1080c4a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -288,14 +288,34 @@ public class SchemaValidation { public static void validateFallbackBranch(SchemaManager schemaManager, TableSchema schema) { String fallbackBranch = schema.options().get(CoreOptions.SCAN_FALLBACK_BRANCH.key()); + String primaryBranch = schema.options().get(CoreOptions.SCAN_PRIMARY_BRANCH.key()); + + if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch) + && !StringUtils.isNullOrWhitespaceOnly(primaryBranch)) { + throw new IllegalArgumentException( + String.format( + "Cannot set both '%s' and '%s' at the same time.", + CoreOptions.SCAN_FALLBACK_BRANCH.key(), + CoreOptions.SCAN_PRIMARY_BRANCH.key())); + } + if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) { checkArgument( schemaManager.copyWithBranch(fallbackBranch).latest().isPresent(), - "Cannot set '%s' = '%s' because the branch '%s' isn't existed.", + "Cannot set '%s' = '%s' because the branch '%s' does not exist.", CoreOptions.SCAN_FALLBACK_BRANCH.key(), fallbackBranch, fallbackBranch); } + + if (!StringUtils.isNullOrWhitespaceOnly(primaryBranch)) { + checkArgument( + schemaManager.copyWithBranch(primaryBranch).latest().isPresent(), + "Cannot set '%s' = '%s' because the branch '%s' does not exist.", + CoreOptions.SCAN_PRIMARY_BRANCH.key(), + primaryBranch, + primaryBranch); + } } private static void validateOnlyContainPrimitiveType( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 3583d17c08..f8a6b1722f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -709,10 +709,19 @@ abstract class AbstractFileStoreTable implements FileStoreTable { && branchName.equals(fallbackBranch)) { throw new IllegalArgumentException( String.format( - "can not delete the fallback branch. " - + "branchName to be deleted is %s. you have set 'scan.fallback-branch' = '%s'. " - + "you should reset 'scan.fallback-branch' before deleting this branch.", - branchName, fallbackBranch)); + "Cannot delete branch '%s' because it is configured as" + + " 'scan.fallback-branch'. Unset 'scan.fallback-branch' first.", + branchName)); + } + + String primaryBranch = coreOptions().toConfiguration().get(CoreOptions.SCAN_PRIMARY_BRANCH); + if (!StringUtils.isNullOrWhitespaceOnly(primaryBranch) + && branchName.equals(primaryBranch)) { + throw new IllegalArgumentException( + String.format( + "Cannot delete branch '%s' because it is configured as" + + " 'scan.primary-branch'. Unset 'scan.primary-branch' first.", + branchName)); } branchManager().dropBranch(branchName); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java index 78752acfc6..b178715a2c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java @@ -67,7 +67,7 @@ import static org.apache.paimon.utils.Preconditions.checkNotNull; public class ChainGroupReadTable extends FallbackReadFileStoreTable { public ChainGroupReadTable(FileStoreTable snapshotStoreTable, FileStoreTable deltaStoreTable) { - super(snapshotStoreTable, deltaStoreTable); + super(snapshotStoreTable, deltaStoreTable, true); checkArgument(snapshotStoreTable instanceof PrimaryKeyFileStoreTable); checkArgument(deltaStoreTable instanceof PrimaryKeyFileStoreTable); } @@ -75,11 +75,7 @@ public class ChainGroupReadTable extends FallbackReadFileStoreTable { @Override public DataTableScan newScan() { super.validateSchema(); - return new ChainTableBatchScan( - wrapped.newScan(), - fallback().newScan(), - ((AbstractFileStoreTable) wrapped).tableSchema, - this); + return new ChainTableBatchScan(((AbstractFileStoreTable) wrapped).tableSchema, this); } private DataTableScan newSnapshotScan() { @@ -87,42 +83,38 @@ public class ChainGroupReadTable extends FallbackReadFileStoreTable { } private DataTableScan newDeltaScan() { - return fallback().newScan(); + return other().newScan(); } @Override public FileStoreTable copy(Map<String, String> dynamicOptions) { return new ChainGroupReadTable( - wrapped.copy(dynamicOptions), - fallback().copy(rewriteFallbackOptions(dynamicOptions))); + wrapped.copy(dynamicOptions), other().copy(rewriteOtherOptions(dynamicOptions))); } @Override public FileStoreTable copy(TableSchema newTableSchema) { return new ChainGroupReadTable( wrapped.copy(newTableSchema), - fallback() - .copy( - newTableSchema.copy( - rewriteFallbackOptions(newTableSchema.options())))); + other().copy(newTableSchema.copy(rewriteOtherOptions(newTableSchema.options())))); } @Override public FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions) { return new ChainGroupReadTable( wrapped.copyWithoutTimeTravel(dynamicOptions), - fallback().copyWithoutTimeTravel(rewriteFallbackOptions(dynamicOptions))); + other().copyWithoutTimeTravel(rewriteOtherOptions(dynamicOptions))); } @Override public FileStoreTable copyWithLatestSchema() { return new ChainGroupReadTable( - wrapped.copyWithLatestSchema(), fallback().copyWithLatestSchema()); + wrapped.copyWithLatestSchema(), other().copyWithLatestSchema()); } @Override public FileStoreTable switchToBranch(String branchName) { - return new ChainGroupReadTable(switchWrappedToBranch(branchName), fallback()); + return new ChainGroupReadTable(switchWrappedToBranch(branchName), other()); } /** Scan implementation for {@link ChainGroupReadTable}. */ @@ -138,16 +130,12 @@ public class ChainGroupReadTable extends FallbackReadFileStoreTable { private Filter<Integer> bucketFilter; public ChainTableBatchScan( - DataTableScan mainScan, - DataTableScan fallbackScan, - TableSchema tableSchema, - ChainGroupReadTable chainGroupReadTable) { + TableSchema tableSchema, ChainGroupReadTable chainGroupReadTable) { super( - mainScan, - fallbackScan, chainGroupReadTable.wrapped, - chainGroupReadTable.fallback(), - tableSchema); + chainGroupReadTable.other(), + tableSchema, + FileStoreTable::newScan); this.options = CoreOptions.fromMap(tableSchema.options()); this.chainGroupReadTable = chainGroupReadTable; this.partitionConverter = @@ -500,7 +488,7 @@ public class ChainGroupReadTable extends FallbackReadFileStoreTable { private Read() { this.mainRead = wrapped.newRead(); - this.fallbackRead = fallback().newRead(); + this.fallbackRead = other().newRead(); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index aff2aa9a23..5c839a5581 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -66,73 +66,82 @@ import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; /** - * A {@link FileStoreTable} which mainly read from the current branch. However, if the current - * branch does not have a partition, it will read that partition from the fallback branch. + * A {@link FileStoreTable} which reads from two branches with partition-level merging. When {@code + * wrappedFirst} is true, the wrapped table has read priority (fallback mode). When {@code + * wrappedFirst} is false, the other table has read priority (primary mode). Write operations are + * always delegated to the wrapped table (current branch). */ public class FallbackReadFileStoreTable extends DelegatedFileStoreTable { private static final Logger LOG = LoggerFactory.getLogger(FallbackReadFileStoreTable.class); - private final FileStoreTable fallback; + private final FileStoreTable other; + private final boolean wrappedFirst; - public FallbackReadFileStoreTable(FileStoreTable wrapped, FileStoreTable fallback) { + public FallbackReadFileStoreTable( + FileStoreTable wrapped, FileStoreTable other, boolean wrappedFirst) { super(wrapped); - this.fallback = fallback; + this.other = other; + this.wrappedFirst = wrappedFirst; Preconditions.checkArgument(!(wrapped instanceof FallbackReadFileStoreTable)); - if (fallback instanceof FallbackReadFileStoreTable) { + if (other instanceof FallbackReadFileStoreTable) { // ChainGroupReadTable need to be wrapped again - if (!(fallback instanceof ChainGroupReadTable)) { + if (!(other instanceof ChainGroupReadTable)) { throw new IllegalArgumentException( "This is a bug, perhaps there is a recursive call."); } } } - public FileStoreTable fallback() { - return fallback; + public FileStoreTable other() { + return other; } @Override public FileStoreTable copy(Map<String, String> dynamicOptions) { return new FallbackReadFileStoreTable( wrapped.copy(dynamicOptions), - fallback.copy(rewriteFallbackOptions(dynamicOptions))); + other.copy(rewriteOtherOptions(dynamicOptions)), + wrappedFirst); } @Override public FileStoreTable copy(TableSchema newTableSchema) { return new FallbackReadFileStoreTable( wrapped.copy(newTableSchema), - fallback.copy( - newTableSchema.copy(rewriteFallbackOptions(newTableSchema.options())))); + other.copy(newTableSchema.copy(rewriteOtherOptions(newTableSchema.options()))), + wrappedFirst); } @Override public FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions) { return new FallbackReadFileStoreTable( wrapped.copyWithoutTimeTravel(dynamicOptions), - fallback.copyWithoutTimeTravel(rewriteFallbackOptions(dynamicOptions))); + other.copyWithoutTimeTravel(rewriteOtherOptions(dynamicOptions)), + wrappedFirst); } @Override public FileStoreTable copyWithLatestSchema() { return new FallbackReadFileStoreTable( - wrapped.copyWithLatestSchema(), fallback.copyWithLatestSchema()); + wrapped.copyWithLatestSchema(), other.copyWithLatestSchema(), wrappedFirst); } @Override public FileStoreTable switchToBranch(String branchName) { - return new FallbackReadFileStoreTable(switchWrappedToBranch(branchName), fallback); + return new FallbackReadFileStoreTable( + switchWrappedToBranch(branchName), other, wrappedFirst); } @Override public void setManifestCache(SegmentsCache<Path> manifestCache) { super.setManifestCache(manifestCache); - fallback.setManifestCache(manifestCache); + other.setManifestCache(manifestCache); } protected FileStoreTable switchWrappedToBranch(String branchName) { @@ -153,35 +162,35 @@ public class FallbackReadFileStoreTable extends DelegatedFileStoreTable { wrapped.catalogEnvironment()); } - protected Map<String, String> rewriteFallbackOptions(Map<String, String> options) { + protected Map<String, String> rewriteOtherOptions(Map<String, String> options) { Map<String, String> result = new HashMap<>(options); - // branch of fallback table should never change + // branch of other table should never change String branchKey = CoreOptions.BRANCH.key(); if (options.containsKey(branchKey)) { - result.put(branchKey, fallback.options().get(branchKey)); + result.put(branchKey, other.options().get(branchKey)); } - // snapshot ids may be different between the main branch and the fallback branch, + // snapshot ids may be different between the main branch and the other branch, // so we need to convert main branch snapshot id to millisecond, - // then convert millisecond to fallback branch snapshot id + // then convert millisecond to other branch snapshot id String scanSnapshotIdOptionKey = CoreOptions.SCAN_SNAPSHOT_ID.key(); String scanSnapshotId = options.get(scanSnapshotIdOptionKey); if (scanSnapshotId != null) { long id = Long.parseLong(scanSnapshotId); long millis = wrapped.snapshotManager().snapshot(id).timeMillis(); - Snapshot fallbackSnapshot = fallback.snapshotManager().earlierOrEqualTimeMills(millis); - long fallbackId; - if (fallbackSnapshot == null) { - fallbackId = Snapshot.FIRST_SNAPSHOT_ID; + Snapshot otherSnapshot = other.snapshotManager().earlierOrEqualTimeMills(millis); + long otherId; + if (otherSnapshot == null) { + otherId = Snapshot.FIRST_SNAPSHOT_ID; } else { - fallbackId = fallbackSnapshot.id(); + otherId = otherSnapshot.id(); } - result.put(scanSnapshotIdOptionKey, String.valueOf(fallbackId)); + result.put(scanSnapshotIdOptionKey, String.valueOf(otherId)); } - // bucket number of main branch and fallback branch are very likely different, - // so we remove bucket in options to use fallback branch's bucket number + // bucket number of main branch and other branch are very likely different, + // so we remove bucket in options to use other branch's bucket number result.remove(CoreOptions.BUCKET.key()); return result; @@ -189,61 +198,66 @@ public class FallbackReadFileStoreTable extends DelegatedFileStoreTable { @Override public DataTableScan newScan() { + return newScan(FileStoreTable::newScan); + } + + public DataTableScan newScan(Function<FileStoreTable, DataTableScan> scanCreator) { validateSchema(); - return new FallbackReadScan( - wrapped.newScan(), fallback.newScan(), wrapped, fallback, wrapped.schema()); + FileStoreTable first = wrappedFirst ? wrapped : other; + FileStoreTable second = wrappedFirst ? other : wrapped; + return new FallbackReadScan(first, second, wrapped.schema(), scanCreator); } protected void validateSchema() { String mainBranch = wrapped.coreOptions().branch(); - String fallbackBranch = fallback.coreOptions().branch(); + String otherBranch = other.coreOptions().branch(); RowType mainRowType = wrapped.schema().logicalRowType(); - RowType fallbackRowType = fallback.schema().logicalRowType(); + RowType otherRowType = other.schema().logicalRowType(); Preconditions.checkArgument( - sameRowTypeIgnoreNullable(mainRowType, fallbackRowType), + sameRowTypeIgnoreNullable(mainRowType, otherRowType), "Branch %s and %s does not have the same row type.\n" + "Row type of branch %s is %s.\n" + "Row type of branch %s is %s.", mainBranch, - fallbackBranch, + otherBranch, mainBranch, mainRowType, - fallbackBranch, - fallbackRowType); + otherBranch, + otherRowType); List<String> mainPrimaryKeys = wrapped.schema().primaryKeys(); - List<String> fallbackPrimaryKeys = fallback.schema().primaryKeys(); + List<String> otherPrimaryKeys = other.schema().primaryKeys(); if (!mainPrimaryKeys.isEmpty()) { - if (fallbackPrimaryKeys.isEmpty()) { + if (otherPrimaryKeys.isEmpty()) { throw new IllegalArgumentException( "Branch " + mainBranch - + " has primary keys while fallback branch " - + fallbackBranch + + " has primary keys while branch " + + otherBranch + " does not. This is not allowed."); } Preconditions.checkArgument( - mainPrimaryKeys.equals(fallbackPrimaryKeys), + mainPrimaryKeys.equals(otherPrimaryKeys), "Branch %s and %s both have primary keys but are not the same.\n" + "Primary keys of %s are %s.\n" + "Primary keys of %s are %s.", mainBranch, - fallbackBranch, + otherBranch, mainBranch, mainPrimaryKeys, - fallbackBranch, - fallbackPrimaryKeys); + otherBranch, + otherPrimaryKeys); } } - private boolean sameRowTypeIgnoreNullable(RowType mainRowType, RowType fallbackRowType) { - if (mainRowType.getFieldCount() != fallbackRowType.getFieldCount()) { + private boolean sameRowTypeIgnoreNullable(RowType mainRowType, RowType otherRowType) { + if (mainRowType.getFieldCount() != otherRowType.getFieldCount()) { return false; } for (int i = 0; i < mainRowType.getFieldCount(); i++) { DataType mainType = mainRowType.getFields().get(i).type(); - DataType fallbackType = fallbackRowType.getFields().get(i).type(); - if (!mainType.equalsIgnoreNullable(fallbackType)) { + DataType otherType = otherRowType.getFields().get(i).type(); + if (!mainType.equalsIgnoreNullable(otherType)) { return false; } } @@ -356,24 +370,25 @@ public class FallbackReadFileStoreTable extends DelegatedFileStoreTable { /** Scan implementation for {@link FallbackReadFileStoreTable}. */ public static class FallbackReadScan implements DataTableScan { - protected final DataTableScan mainScan; - protected final DataTableScan fallbackScan; protected final FileStoreTable wrappedTable; protected final FileStoreTable fallbackTable; protected final TableSchema tableSchema; + protected final Function<FileStoreTable, DataTableScan> scanCreator; + protected final DataTableScan mainScan; + protected final DataTableScan fallbackScan; private PartitionPredicate partitionPredicate; public FallbackReadScan( - DataTableScan mainScan, - DataTableScan fallbackScan, FileStoreTable wrappedTable, FileStoreTable fallbackTable, - TableSchema tableSchema) { - this.mainScan = mainScan; - this.fallbackScan = fallbackScan; + TableSchema tableSchema, + Function<FileStoreTable, DataTableScan> scanCreator) { this.wrappedTable = wrappedTable; this.fallbackTable = fallbackTable; this.tableSchema = tableSchema; + this.scanCreator = scanCreator; + this.mainScan = scanCreator.apply(wrappedTable); + this.fallbackScan = scanCreator.apply(fallbackTable); } @Override @@ -561,7 +576,7 @@ public class FallbackReadFileStoreTable extends DelegatedFileStoreTable { private DataTableScan newPartitionListingScan( boolean isMain, PartitionPredicate scanPartitionPredicate) { - DataTableScan scan = isMain ? wrappedTable.newScan() : fallbackTable.newScan(); + DataTableScan scan = scanCreator.apply(isMain ? wrappedTable : fallbackTable); if (scanPartitionPredicate != null) { scan.withPartitionFilter(scanPartitionPredicate); } @@ -580,8 +595,10 @@ public class FallbackReadFileStoreTable extends DelegatedFileStoreTable { private final InnerTableRead fallbackRead; private Read() { - this.mainRead = wrapped.newRead(); - this.fallbackRead = fallback.newRead(); + FileStoreTable first = wrappedFirst ? wrapped : other; + FileStoreTable second = wrappedFirst ? other : wrapped; + this.mainRead = first.newRead(); + this.fallbackRead = second.newRead(); } @Override @@ -628,7 +645,7 @@ public class FallbackReadFileStoreTable extends DelegatedFileStoreTable { return fallbackRead.createReader(fallbackSplit.wrapped()); } catch (Exception ignored) { LOG.error( - "Reading from fallback branch has problems: {}", + "Reading from supplemental branch has problems: {}", fallbackSplit.wrapped()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java index 73ae6d2dd1..e4a8656799 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java @@ -99,35 +99,27 @@ public class FileStoreTableFactory { Options options = new Options(table.options()); String fallbackBranch = options.get(CoreOptions.SCAN_FALLBACK_BRANCH); + String primaryBranch = options.get(CoreOptions.SCAN_PRIMARY_BRANCH); if (ChainTableUtils.isChainTable(options.toMap())) { table = createChainTable(table, fileIO, tablePath, dynamicOptions, catalogEnvironment); } else if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) { - Options branchOptions = new Options(dynamicOptions.toMap()); - branchOptions.set(CoreOptions.BRANCH, fallbackBranch); - Optional<TableSchema> schema = - new SchemaManager(fileIO, tablePath, fallbackBranch).latest(); - if (schema.isPresent()) { - Identifier identifier = catalogEnvironment.identifier(); - CatalogEnvironment fallbackCatalogEnvironment = catalogEnvironment; - if (identifier != null) { - fallbackCatalogEnvironment = - catalogEnvironment.copy( - new Identifier( - identifier.getDatabaseName(), - identifier.getObjectName(), - fallbackBranch)); - } - FileStoreTable fallbackTable = - createWithoutFallbackBranch( - fileIO, - tablePath, - schema.get(), - branchOptions, - fallbackCatalogEnvironment); - table = new FallbackReadFileStoreTable(table, fallbackTable); + FileStoreTable otherTable = + createOtherBranchTable( + fileIO, tablePath, fallbackBranch, dynamicOptions, catalogEnvironment); + if (otherTable != null) { + table = new FallbackReadFileStoreTable(table, otherTable, true); } else { LOG.error("Fallback branch {} not found for table {}", fallbackBranch, tablePath); } + } else if (!StringUtils.isNullOrWhitespaceOnly(primaryBranch)) { + FileStoreTable otherTable = + createOtherBranchTable( + fileIO, tablePath, primaryBranch, dynamicOptions, catalogEnvironment); + if (otherTable != null) { + table = new FallbackReadFileStoreTable(table, otherTable, false); + } else { + LOG.error("Primary branch {} not found for table {}", primaryBranch, tablePath); + } } return table; @@ -189,7 +181,33 @@ public class FileStoreTableFactory { catalogEnvironment); FileStoreTable chainGroupFileStoreTable = new ChainGroupReadTable(snapshotTable, deltaTable); - return new FallbackReadFileStoreTable(table, chainGroupFileStoreTable); + return new FallbackReadFileStoreTable(table, chainGroupFileStoreTable, true); + } + + private static FileStoreTable createOtherBranchTable( + FileIO fileIO, + Path tablePath, + String branchName, + Options dynamicOptions, + CatalogEnvironment catalogEnvironment) { + Options branchOptions = new Options(dynamicOptions.toMap()); + branchOptions.set(CoreOptions.BRANCH, branchName); + Optional<TableSchema> schema = new SchemaManager(fileIO, tablePath, branchName).latest(); + if (schema.isPresent()) { + Identifier identifier = catalogEnvironment.identifier(); + CatalogEnvironment branchCatalogEnvironment = catalogEnvironment; + if (identifier != null) { + branchCatalogEnvironment = + catalogEnvironment.copy( + new Identifier( + identifier.getDatabaseName(), + identifier.getObjectName(), + branchName)); + } + return createWithoutFallbackBranch( + fileIO, tablePath, schema.get(), branchOptions, branchCatalogEnvironment); + } + return null; } public static FileStoreTable createWithoutFallbackBranch( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java index 64ef8aa34e..0a96533a99 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java @@ -29,7 +29,6 @@ import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.DataTable; import org.apache.paimon.table.FallbackReadFileStoreTable; -import org.apache.paimon.table.FallbackReadFileStoreTable.FallbackReadScan; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.ReadonlyTable; import org.apache.paimon.table.Table; @@ -139,13 +138,7 @@ public class ReadOptimizedTable implements DataTable, ReadonlyTable { @Override public DataTableScan newScan() { if (wrapped instanceof FallbackReadFileStoreTable) { - FallbackReadFileStoreTable table = (FallbackReadFileStoreTable) wrapped; - return new FallbackReadScan( - newScan(table.wrapped()), - newScan(table.fallback()), - table.wrapped(), - table.fallback(), - table.wrapped().schema()); + return ((FallbackReadFileStoreTable) wrapped).newScan(this::newScan); } return newScan(wrapped); } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java index 6ccbbbc8fc..99e25cc2ce 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java @@ -220,8 +220,7 @@ public class ChainTableUtils { public static FileStoreTable resolveChainPrimaryTable(FileStoreTable table) { if (table.coreOptions().isChainTable() && table instanceof FallbackReadFileStoreTable) { - return ((ChainGroupReadTable) ((FallbackReadFileStoreTable) table).fallback()) - .wrapped(); + return ((ChainGroupReadTable) ((FallbackReadFileStoreTable) table).other()).wrapped(); } return table; } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java index 7163202a8e..6a1f782fd2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java @@ -42,8 +42,9 @@ import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.TraceableFileIO; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.Collections; import java.util.List; @@ -77,8 +78,9 @@ public class FallbackReadFileStoreTableTest { fileIO = FileIOFinder.find(tablePath); } - @Test - public void testListPartitions() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testListPartitions(boolean wrappedFirst) throws Exception { String branchName = "bc"; FileStoreTable mainTable = createTable(); @@ -93,25 +95,20 @@ public class FallbackReadFileStoreTableTest { // write data into partition for branch only writeDataIntoTable(branchTable, 0, rowData(3, 60)); - FallbackReadFileStoreTable fallbackTable = - new FallbackReadFileStoreTable(mainTable, branchTable); - - List<Integer> partitionsFromMainTable = - mainTable.newScan().listPartitions().stream() - .map(row -> row.getInt(0)) - .collect(Collectors.toList()); - assertThat(partitionsFromMainTable).containsExactlyInAnyOrder(1, 2); + FallbackReadFileStoreTable table = + new FallbackReadFileStoreTable(mainTable, branchTable, wrappedFirst); - List<Integer> partitionsFromFallbackTable = - fallbackTable.newScan().listPartitions().stream() + List<Integer> partitions = + table.newScan().listPartitions().stream() .map(row -> row.getInt(0)) .collect(Collectors.toList()); // this should contain all partitions - assertThat(partitionsFromFallbackTable).containsExactlyInAnyOrder(1, 2, 3); + assertThat(partitions).containsExactlyInAnyOrder(1, 2, 3); } - @Test - public void testListPartitionEntries() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testListPartitionEntries(boolean wrappedFirst) throws Exception { String branchName = "bc"; FileStoreTable mainTable = createTable(); @@ -126,40 +123,38 @@ public class FallbackReadFileStoreTableTest { // write data into partition for branch only writeDataIntoTable(branchTable, 0, rowData(1, 50), rowData(3, 60), rowData(4, 70)); - FallbackReadFileStoreTable fallbackTable = - new FallbackReadFileStoreTable(mainTable, branchTable); - - List<PartitionEntry> partitionEntries = mainTable.newScan().listPartitionEntries(); - assertThat(partitionEntries) - .map(e -> e.partition().getInt(0)) - .containsExactlyInAnyOrder(1, 2); + FallbackReadFileStoreTable table = + new FallbackReadFileStoreTable(mainTable, branchTable, wrappedFirst); - List<PartitionEntry> fallbackPartitionEntries = - fallbackTable.newScan().listPartitionEntries(); - assertThat(fallbackPartitionEntries) + List<PartitionEntry> entries = table.newScan().listPartitionEntries(); + // partition 1 exists in both: record count depends on which table has priority + // wrappedFirst=true → mainTable has priority (2 records), false → branchTable (1 record) + long expectedPt1Count = wrappedFirst ? 2L : 1L; + assertThat(entries) .map(e -> Pair.of(e.partition().getInt(0), e.recordCount())) .containsExactlyInAnyOrder( - Pair.of(1, 2L), Pair.of(2, 1L), Pair.of(3, 1L), Pair.of(4, 1L)); + Pair.of(1, expectedPt1Count), + Pair.of(2, 1L), + Pair.of(3, 1L), + Pair.of(4, 1L)); } /** * Test that FallbackReadScan.plan() determines partition ownership based on partition - * predicates only, not mixed with data filters. If a partition exists in the main branch, it - * should never be read from fallback, regardless of the data filter. - * - * <p>Without the fix, the old code built completePartitions from mainScan.plan() results which - * already had data filters applied. When the data filter excluded all files of a main partition - * via filterByStats, that partition was incorrectly treated as "not in main" and read from - * fallback. + * predicates only, not mixed with data filters. If a partition exists in the priority table, it + * should never be read from the supplemental table, regardless of the data filter. */ - @Test - public void testPlanWithDataFilter() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testPlanWithDataFilter(boolean wrappedFirst) throws Exception { String branchName = "bc"; + InternalRow[] firstValues = new InternalRow[] {rowData(1, 10), rowData(2, 20)}; + InternalRow[] secondValues = new InternalRow[] {rowData(1, 100), rowData(3, 30)}; FileStoreTable mainTable = createTable(); // Main branch: partition 1 (a=10), partition 2 (a=20) - writeDataIntoTable(mainTable, 0, rowData(1, 10), rowData(2, 20)); + writeDataIntoTable(mainTable, 0, wrappedFirst ? firstValues : secondValues); mainTable.createBranch(branchName); @@ -167,10 +162,10 @@ public class FallbackReadFileStoreTableTest { // Fallback branch: partition 1 already has a=10 (inherited), add a=100. // Also add partition 3 (a=30) which is fallback-only. - writeDataIntoTable(branchTable, 1, rowData(1, 100), rowData(3, 30)); + writeDataIntoTable(branchTable, 1, wrappedFirst ? secondValues : firstValues); FallbackReadFileStoreTable fallbackTable = - new FallbackReadFileStoreTable(mainTable, branchTable); + new FallbackReadFileStoreTable(mainTable, branchTable, wrappedFirst); PredicateBuilder builder = new PredicateBuilder(ROW_TYPE); // Case 1: WHERE pt = 1 AND a = 100 @@ -202,6 +197,51 @@ public class FallbackReadFileStoreTableTest { .isTrue(); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testWriteGoesToWrapped(boolean wrappedFirst) throws Exception { + String branchName = "bc"; + + FileStoreTable mainTable = createTable(); + + // write data into partition 1 for main + writeDataIntoTable(mainTable, 0, rowData(1, 10)); + + mainTable.createBranch(branchName); + + FileStoreTable branchTable = createTableFromBranch(mainTable, branchName); + + // write data into partition 2 for branch + writeDataIntoTable(branchTable, 0, rowData(2, 20)); + + FallbackReadFileStoreTable table = + new FallbackReadFileStoreTable(mainTable, branchTable, wrappedFirst); + + // write through the merged table — should go to mainTable (wrapped) + writeDataIntoTable(table, 1, rowData(3, 30)); + + // verify: main branch should now have partition 1 and 3 + List<Integer> mainPartitions = + mainTable.newScan().listPartitions().stream() + .map(row -> row.getInt(0)) + .collect(Collectors.toList()); + assertThat(mainPartitions).containsExactlyInAnyOrder(1, 3); + + // verify: branch should still only have partition 2 + List<Integer> branchPartitions = + branchTable.newScan().listPartitions().stream() + .map(row -> row.getInt(0)) + .collect(Collectors.toList()); + assertThat(branchPartitions).containsExactlyInAnyOrder(2); + + // verify: merged read should see all three partitions + List<Integer> mergedPartitions = + table.newScan().listPartitions().stream() + .map(row -> row.getInt(0)) + .collect(Collectors.toList()); + assertThat(mergedPartitions).containsExactlyInAnyOrder(1, 2, 3); + } + private void writeDataIntoTable( FileStoreTable table, long commitIdentifier, InternalRow... allData) throws Exception { StreamTableWrite write = table.newWrite(commitUser); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java index f50e9922f5..c4ff1ca51a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java @@ -1218,9 +1218,8 @@ public abstract class SimpleTableTestBase { .satisfies( anyCauseMatches( IllegalArgumentException.class, - "can not delete the fallback branch. " - + "branchName to be deleted is fallback. you have set 'scan.fallback-branch' = 'fallback'. " - + "you should reset 'scan.fallback-branch' before deleting this branch.")); + "Cannot delete branch 'fallback' because it is configured as" + + " 'scan.fallback-branch'. Unset 'scan.fallback-branch' first.")); table.deleteBranch("fallback"); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index 1b7d50c852..641efb7335 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -760,7 +760,7 @@ public class BranchSqlITCase extends CatalogITCaseBase { @Test public void testCannotSetEmptyFallbackBranch() { String errMsg = - "Cannot set 'scan.fallback-branch' = 'test' because the branch 'test' isn't existed."; + "Cannot set 'scan.fallback-branch' = 'test' because the branch 'test' does not exist."; assertThatThrownBy( () -> sql( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java index 06afcce3bb..94bd5a30e7 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java @@ -966,7 +966,8 @@ public class PrimaryKeyFileStoreTableITCase extends AbstractTestBase { // branch1 is fallback branch, can not be deleted assertThatCode(() -> bEnv.executeSql("CALL sys.delete_branch('default.t', 'branch1')")) .rootCause() - .hasMessageContaining("can not delete the fallback branch."); + .hasMessageContaining( + "Cannot delete branch 'branch1' because it is configured as 'scan.fallback-branch'."); // reset scan.fallback-branch bEnv.executeSql("ALTER TABLE t RESET ('scan.fallback-branch')");
