This is an automated email from the ASF dual-hosted git repository.

hope pushed a commit to branch release-1.4
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 3e0b96b6e972c28004d3e2d2928e08c170412585
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Mar 26 20:02:58 2026 +0800

    [spark] Support merge schema through catalog for REST catalog (#7539)
    
    When using REST catalog, schema merge during write bypasses the catalog
    and directly commits to SchemaManager, which doesn't work for REST
    catalog. This PR introduces SchemaModification to route schema changes
    through Catalog.alterTable(), and adds diffSchemaChanges() to convert
    merged schemas into SchemaChange lists.
---
 .../java/org/apache/paimon/AbstractFileStore.java  |  3 +-
 .../org/apache/paimon/schema/SchemaManager.java    | 19 ++++--
 .../apache/paimon/schema/SchemaMergingUtils.java   | 54 ++++++++++++++++
 .../apache/paimon/table/CatalogEnvironment.java    |  9 +++
 .../apache/paimon/table/SchemaModification.java    | 46 ++++++++++++++
 .../sql/WriteMergeSchemaWithRestCatalogTest.scala  | 74 ++++++++++++++++++++++
 6 files changed, 199 insertions(+), 6 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 388b7eb9de..9aad0259cd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -267,7 +267,8 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
 
     @Override
     public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {
-        return schemaManager.mergeSchema(rowType, allowExplicitCast);
+        return schemaManager.mergeSchema(
+                rowType, allowExplicitCast, 
catalogEnvironment.schemaModification());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index dbc605b96c..13da6677f7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -37,6 +37,7 @@ import 
org.apache.paimon.schema.SchemaChange.UpdateColumnPosition;
 import org.apache.paimon.schema.SchemaChange.UpdateColumnType;
 import org.apache.paimon.schema.SchemaChange.UpdateComment;
 import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.SchemaModification;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
@@ -706,7 +707,10 @@ public class SchemaManager implements Serializable {
         }
     }
 
-    public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {
+    public boolean mergeSchema(
+            RowType rowType,
+            boolean allowExplicitCast,
+            @Nullable SchemaModification schemaModification) {
         TableSchema current =
                 latest().orElseThrow(
                                 () ->
@@ -715,12 +719,17 @@ public class SchemaManager implements Serializable {
         TableSchema update = SchemaMergingUtils.mergeSchemas(current, rowType, 
allowExplicitCast);
         if (current.equals(update)) {
             return false;
-        } else {
-            try {
+        }
+        try {
+            if (schemaModification != null) {
+                List<SchemaChange> changes = 
SchemaMergingUtils.diffSchemaChanges(current, update);
+                schemaModification.alterSchema(changes);
+                return true;
+            } else {
                 return commit(update);
-            } catch (Exception e) {
-                throw new RuntimeException("Failed to commit the schema.", e);
             }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to commit the schema.", e);
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
index 82c61adf6c..1ed9bad7e6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
@@ -29,6 +29,7 @@ import org.apache.paimon.types.MultisetType;
 import org.apache.paimon.types.ReassignFieldId;
 import org.apache.paimon.types.RowType;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -233,4 +234,57 @@ public class SchemaMergingUtils {
                 field.description(),
                 field.defaultValue());
     }
+
+    /**
+     * Generate a list of {@link SchemaChange} by comparing the old and new 
{@link TableSchema}.
+     * This supports detecting added columns and type changes (including 
nested structs).
+     */
+    public static List<SchemaChange> diffSchemaChanges(
+            TableSchema oldSchema, TableSchema newSchema) {
+        List<SchemaChange> changes = new ArrayList<>();
+        diffFields(
+                oldSchema.logicalRowType().getFields(),
+                newSchema.logicalRowType().getFields(),
+                new String[0],
+                changes);
+        return changes;
+    }
+
+    private static void diffFields(
+            List<DataField> oldFields,
+            List<DataField> newFields,
+            String[] parentNames,
+            List<SchemaChange> changes) {
+        Map<String, DataField> oldFieldMap =
+                oldFields.stream().collect(Collectors.toMap(DataField::name, 
Function.identity()));
+
+        for (DataField newField : newFields) {
+            String[] fieldNames = appendFieldName(parentNames, 
newField.name());
+            DataField oldField = oldFieldMap.get(newField.name());
+            if (oldField == null) {
+                // new column added
+                changes.add(
+                        SchemaChange.addColumn(
+                                fieldNames, newField.type(), 
newField.description(), null));
+            } else if (!oldField.type().equals(newField.type())) {
+                // type changed — check if it's a nested struct change
+                if (oldField.type() instanceof RowType && newField.type() 
instanceof RowType) {
+                    diffFields(
+                            ((RowType) oldField.type()).getFields(),
+                            ((RowType) newField.type()).getFields(),
+                            fieldNames,
+                            changes);
+                } else {
+                    changes.add(SchemaChange.updateColumnType(fieldNames, 
newField.type(), true));
+                }
+            }
+        }
+    }
+
+    private static String[] appendFieldName(String[] parentNames, String 
fieldName) {
+        String[] result = new String[parentNames.length + 1];
+        System.arraycopy(parentNames, 0, result, 0, parentNames.length);
+        result[parentNames.length] = fieldName;
+        return result;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java 
b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
index dac63fc178..cb40c4447e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
@@ -113,6 +113,15 @@ public class CatalogEnvironment implements Serializable {
         return supportsVersionManagement;
     }
 
+    @Nullable
+    public SchemaModification schemaModification() {
+        if (catalogLoader == null) {
+            return null;
+        }
+        Catalog catalog = catalogLoader.load();
+        return SchemaModification.create(catalog, identifier);
+    }
+
     @Nullable
     public SnapshotCommit snapshotCommit(SnapshotManager snapshotManager) {
         SnapshotCommit snapshotCommit;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/SchemaModification.java 
b/paimon-core/src/main/java/org/apache/paimon/table/SchemaModification.java
new file mode 100644
index 0000000000..2f024811b5
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/SchemaModification.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.schema.SchemaChange;
+
+import java.util.List;
+
+/** Handler to alter table schema via catalog. */
+public interface SchemaModification extends AutoCloseable {
+
+    void alterSchema(List<SchemaChange> changes) throws Exception;
+
+    static SchemaModification create(Catalog catalog, Identifier identifier) {
+        return new SchemaModification() {
+
+            @Override
+            public void alterSchema(List<SchemaChange> changes) throws 
Exception {
+                catalog.alterTable(identifier, changes, false);
+            }
+
+            @Override
+            public void close() throws Exception {
+                catalog.close();
+            }
+        };
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaWithRestCatalogTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaWithRestCatalogTest.scala
new file mode 100644
index 0000000000..caaea22766
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaWithRestCatalogTest.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestWithRestCatalogBase
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.Row
+
+class WriteMergeSchemaWithRestCatalogTest extends 
PaimonSparkTestWithRestCatalogBase {
+
+  import testImplicits._
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false")
+  }
+
+  test("Write merge schema with REST catalog: dataframe write") {
+    withTable("t") {
+      sql("CREATE TABLE t (a INT, b STRING)")
+      Seq((1, "1"), (2, "2"))
+        .toDF("a", "b")
+        .write
+        .format("paimon")
+        .mode("append")
+        .saveAsTable("t")
+
+      // new columns
+      Seq((3, "3", 3))
+        .toDF("a", "b", "c")
+        .write
+        .format("paimon")
+        .mode("append")
+        .option("write.merge-schema", "true")
+        .saveAsTable("t")
+      checkAnswer(
+        sql("SELECT * FROM t ORDER BY a"),
+        Seq(Row(1, "1", null), Row(2, "2", null), Row(3, "3", 3))
+      )
+    }
+  }
+
+  test("Write merge schema with REST catalog: sql write") {
+    withTable("t") {
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        sql("CREATE TABLE t (a INT, b STRING)")
+        sql("INSERT INTO t VALUES (1, '1'), (2, '2')")
+
+        // new columns
+        sql("INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, 3 AS c")
+        checkAnswer(
+          sql("SELECT * FROM t ORDER BY a"),
+          Seq(Row(1, "1", null), Row(2, "2", null), Row(3, "3", 3))
+        )
+      }
+    }
+  }
+}

Reply via email to