This is an automated email from the ASF dual-hosted git repository. hope pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 3e0b96b6e972c28004d3e2d2928e08c170412585 Author: Zouxxyy <[email protected]> AuthorDate: Thu Mar 26 20:02:58 2026 +0800 [spark] Support merge schema through catalog for REST catalog (#7539) When using REST catalog, schema merge during write bypasses the catalog and directly commits to SchemaManager, which doesn't work for REST catalog. This PR introduces SchemaModification to route schema changes through Catalog.alterTable(), and adds diffSchemaChanges() to convert merged schemas into SchemaChange lists. --- .../java/org/apache/paimon/AbstractFileStore.java | 3 +- .../org/apache/paimon/schema/SchemaManager.java | 19 ++++-- .../apache/paimon/schema/SchemaMergingUtils.java | 54 ++++++++++++++++ .../apache/paimon/table/CatalogEnvironment.java | 9 +++ .../apache/paimon/table/SchemaModification.java | 46 ++++++++++++++ .../sql/WriteMergeSchemaWithRestCatalogTest.scala | 74 ++++++++++++++++++++++ 6 files changed, 199 insertions(+), 6 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 388b7eb9de..9aad0259cd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -267,7 +267,8 @@ abstract class AbstractFileStore<T> implements FileStore<T> { @Override public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) { - return schemaManager.mergeSchema(rowType, allowExplicitCast); + return schemaManager.mergeSchema( + rowType, allowExplicitCast, catalogEnvironment.schemaModification()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index dbc605b96c..13da6677f7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -37,6 +37,7 @@ import org.apache.paimon.schema.SchemaChange.UpdateColumnPosition; import org.apache.paimon.schema.SchemaChange.UpdateColumnType; import org.apache.paimon.schema.SchemaChange.UpdateComment; import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.SchemaModification; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; @@ -706,7 +707,10 @@ public class SchemaManager implements Serializable { } } - public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) { + public boolean mergeSchema( + RowType rowType, + boolean allowExplicitCast, + @Nullable SchemaModification schemaModification) { TableSchema current = latest().orElseThrow( () -> @@ -715,12 +719,17 @@ public class SchemaManager implements Serializable { TableSchema update = SchemaMergingUtils.mergeSchemas(current, rowType, allowExplicitCast); if (current.equals(update)) { return false; - } else { - try { + } + try { + if (schemaModification != null) { + List<SchemaChange> changes = SchemaMergingUtils.diffSchemaChanges(current, update); + schemaModification.alterSchema(changes); + return true; + } else { return commit(update); - } catch (Exception e) { - throw new RuntimeException("Failed to commit the schema.", e); } + } catch (Exception e) { + throw new RuntimeException("Failed to commit the schema.", e); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java index 82c61adf6c..1ed9bad7e6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java @@ -29,6 +29,7 @@ import org.apache.paimon.types.MultisetType; import org.apache.paimon.types.ReassignFieldId; import org.apache.paimon.types.RowType; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -233,4 +234,57 @@ public class SchemaMergingUtils { field.description(), field.defaultValue()); } + + /** + * Generate a list of {@link SchemaChange} by comparing the old and new {@link TableSchema}. + * This supports detecting added columns and type changes (including nested structs). + */ + public static List<SchemaChange> diffSchemaChanges( + TableSchema oldSchema, TableSchema newSchema) { + List<SchemaChange> changes = new ArrayList<>(); + diffFields( + oldSchema.logicalRowType().getFields(), + newSchema.logicalRowType().getFields(), + new String[0], + changes); + return changes; + } + + private static void diffFields( + List<DataField> oldFields, + List<DataField> newFields, + String[] parentNames, + List<SchemaChange> changes) { + Map<String, DataField> oldFieldMap = + oldFields.stream().collect(Collectors.toMap(DataField::name, Function.identity())); + + for (DataField newField : newFields) { + String[] fieldNames = appendFieldName(parentNames, newField.name()); + DataField oldField = oldFieldMap.get(newField.name()); + if (oldField == null) { + // new column added + changes.add( + SchemaChange.addColumn( + fieldNames, newField.type(), newField.description(), null)); + } else if (!oldField.type().equals(newField.type())) { + // type changed — check if it's a nested struct change + if (oldField.type() instanceof RowType && newField.type() instanceof RowType) { + diffFields( + ((RowType) oldField.type()).getFields(), + ((RowType) newField.type()).getFields(), + fieldNames, + changes); + } else { + changes.add(SchemaChange.updateColumnType(fieldNames, newField.type(), true)); + } + } + } + } + + private static String[] appendFieldName(String[] parentNames, String fieldName) { + String[] result = new String[parentNames.length + 1]; + System.arraycopy(parentNames, 0, result, 0, parentNames.length); + result[parentNames.length] = fieldName; + return result; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java index dac63fc178..cb40c4447e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java @@ -113,6 +113,15 @@ public class CatalogEnvironment implements Serializable { return supportsVersionManagement; } + @Nullable + public SchemaModification schemaModification() { + if (catalogLoader == null) { + return null; + } + Catalog catalog = catalogLoader.load(); + return SchemaModification.create(catalog, identifier); + } + @Nullable public SnapshotCommit snapshotCommit(SnapshotManager snapshotManager) { SnapshotCommit snapshotCommit; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/SchemaModification.java b/paimon-core/src/main/java/org/apache/paimon/table/SchemaModification.java new file mode 100644 index 0000000000..2f024811b5 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/SchemaModification.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.schema.SchemaChange; + +import java.util.List; + +/** Handler to alter table schema via catalog. */ +public interface SchemaModification extends AutoCloseable { + + void alterSchema(List<SchemaChange> changes) throws Exception; + + static SchemaModification create(Catalog catalog, Identifier identifier) { + return new SchemaModification() { + + @Override + public void alterSchema(List<SchemaChange> changes) throws Exception { + catalog.alterTable(identifier, changes, false); + } + + @Override + public void close() throws Exception { + catalog.close(); + } + }; + } +} diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaWithRestCatalogTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaWithRestCatalogTest.scala new file mode 100644 index 0000000000..caaea22766 --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaWithRestCatalogTest.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.paimon.spark.PaimonSparkTestWithRestCatalogBase + +import org.apache.spark.SparkConf +import org.apache.spark.sql.Row + +class WriteMergeSchemaWithRestCatalogTest extends PaimonSparkTestWithRestCatalogBase { + + import testImplicits._ + + override protected def sparkConf: SparkConf = { + super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false") + } + + test("Write merge schema with REST catalog: dataframe write") { + withTable("t") { + sql("CREATE TABLE t (a INT, b STRING)") + Seq((1, "1"), (2, "2")) + .toDF("a", "b") + .write + .format("paimon") + .mode("append") + .saveAsTable("t") + + // new columns + Seq((3, "3", 3)) + .toDF("a", "b", "c") + .write + .format("paimon") + .mode("append") + .option("write.merge-schema", "true") + .saveAsTable("t") + checkAnswer( + sql("SELECT * FROM t ORDER BY a"), + Seq(Row(1, "1", null), Row(2, "2", null), Row(3, "3", 3)) + ) + } + } + + test("Write merge schema with REST catalog: sql write") { + withTable("t") { + withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") { + sql("CREATE TABLE t (a INT, b STRING)") + sql("INSERT INTO t VALUES (1, '1'), (2, '2')") + + // new columns + sql("INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, 3 AS c") + checkAnswer( + sql("SELECT * FROM t ORDER BY a"), + Seq(Row(1, "1", null), Row(2, "2", null), Row(3, "3", 3)) + ) + } + } + } +}
