This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch release-1.4
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit f2c41f14d75844588a52d64b6fcceb4ac64673d3
Author: tsreaper <[email protected]>
AuthorDate: Wed Apr 1 10:41:43 2026 +0800

    [flink] Support dropping primary keys for empty tables in Flink (#7566)
    
    Paimon has supported adding primary keys for empty tables before. So in
    this PR, we support the symmetrical operation: dropping primary keys for
    empty tables.
---
 .../org/apache/paimon/schema/SchemaChange.java     | 27 +++++++++++++
 .../org/apache/paimon/schema/SchemaManager.java    | 11 +++++-
 .../apache/paimon/schema/SchemaManagerTest.java    | 45 ++++++++++++++++++++++
 .../java/org/apache/paimon/flink/FlinkCatalog.java | 38 ++++++++++++++----
 .../apache/paimon/flink/SchemaChangeITCase.java    | 24 ++++++++++++
 5 files changed, 136 insertions(+), 9 deletions(-)

diff --git 
a/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java 
b/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java
index 4b68ad105a..9b381a3490 100644
--- a/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java
+++ b/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java
@@ -78,6 +78,9 @@ import java.util.Objects;
     @JsonSubTypes.Type(
             value = SchemaChange.UpdateColumnPosition.class,
             name = SchemaChange.Actions.UPDATE_COLUMN_POSITION_ACTION),
+    @JsonSubTypes.Type(
+            value = SchemaChange.DropPrimaryKey.class,
+            name = SchemaChange.Actions.DROP_PRIMARY_KEY_ACTION),
 })
 public interface SchemaChange extends Serializable {
 
@@ -164,6 +167,10 @@ public interface SchemaChange extends Serializable {
         return new UpdateColumnPosition(move);
     }
 
+    static SchemaChange dropPrimaryKey() {
+        return new DropPrimaryKey();
+    }
+
     /** A SchemaChange to set a table option. */
     final class SetOption implements SchemaChange {
 
@@ -810,6 +817,25 @@ public interface SchemaChange extends Serializable {
         }
     }
 
+    /** A SchemaChange to drop primary key. */
+    final class DropPrimaryKey implements SchemaChange {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            return o != null && getClass() == o.getClass();
+        }
+
+        @Override
+        public int hashCode() {
+            return getClass().hashCode();
+        }
+    }
+
     /** Actions for schema changes: identify for schema change. */
     class Actions {
         public static final String FIELD_ACTION = "action";
@@ -824,6 +850,7 @@ public interface SchemaChange extends Serializable {
         public static final String UPDATE_COLUMN_COMMENT_ACTION = 
"updateColumnComment";
         public static final String UPDATE_COLUMN_DEFAULT_VALUE_ACTION = 
"updateColumnDefaultValue";
         public static final String UPDATE_COLUMN_POSITION_ACTION = 
"updateColumnPosition";
+        public static final String DROP_PRIMARY_KEY_ACTION = "dropPrimaryKey";
 
         private Actions() {}
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 13da6677f7..8667f2271d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -68,6 +68,7 @@ import java.io.Serializable;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -311,6 +312,7 @@ public class SchemaManager implements Serializable {
         List<DataField> newFields = new ArrayList<>(oldTableSchema.fields());
         AtomicInteger highestFieldId = new 
AtomicInteger(oldTableSchema.highestFieldId());
         String newComment = oldTableSchema.comment();
+        List<String> newPrimaryKeys = oldTableSchema.primaryKeys();
         for (SchemaChange change : changes) {
             if (change instanceof SetOption) {
                 SetOption setOption = (SetOption) change;
@@ -549,6 +551,12 @@ public class SchemaManager implements Serializable {
                                     update.newDefaultValue());
                         },
                         lazyIdentifier);
+            } else if (change instanceof SchemaChange.DropPrimaryKey) {
+                if (hasSnapshots.get()) {
+                    throw new UnsupportedOperationException(
+                            "Cannot drop primary keys on a non-empty table.");
+                }
+                newPrimaryKeys = Collections.emptyList();
             } else {
                 throw new UnsupportedOperationException("Unsupported change: " 
+ change.getClass());
             }
@@ -561,8 +569,7 @@ public class SchemaManager implements Serializable {
                         newFields,
                         oldTableSchema.partitionKeys(),
                         applyNotNestedColumnRename(
-                                oldTableSchema.primaryKeys(),
-                                Iterables.filter(changes, RenameColumn.class)),
+                                newPrimaryKeys, Iterables.filter(changes, 
RenameColumn.class)),
                         applyRenameColumnsToOptions(newOptions, changes),
                         newComment);
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index 31496f8bb8..9a15cab361 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -460,6 +460,51 @@ public class SchemaManagerTest {
                 .hasMessage("Change 'merge-engine' is not supported yet.");
     }
 
+    @Test
+    public void testDropPrimaryKeyOnEmptyTable() throws Exception {
+        Path tableRoot = new Path(tempDir.toString(), "table");
+        SchemaManager manager = new SchemaManager(LocalFileIO.create(), 
tableRoot);
+        manager.createTable(schema);
+
+        // drop primary keys on empty table should succeed
+        manager.commitChanges(SchemaChange.dropPrimaryKey());
+
+        FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), tableRoot);
+        assertThat(table.schema().primaryKeys()).isEmpty();
+    }
+
+    @Test
+    public void testDropPrimaryKeyOnNonEmptyTable() throws Exception {
+        Map<String, String> tableOptions = new HashMap<>(options);
+        tableOptions.put("bucket", "1");
+        Schema pkSchema =
+                new Schema(
+                        rowType.getFields(),
+                        Collections.emptyList(),
+                        primaryKeys,
+                        tableOptions,
+                        "");
+        Path tableRoot = new Path(tempDir.toString(), "table");
+        SchemaManager manager = new SchemaManager(LocalFileIO.create(), 
tableRoot);
+        manager.createTable(pkSchema);
+
+        // write data to create a snapshot
+        FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), tableRoot);
+        String commitUser = UUID.randomUUID().toString();
+        TableWriteImpl<?> write =
+                
table.newWrite(commitUser).withIOManager(IOManager.create(tempDir + "/io"));
+        TableCommitImpl commit = table.newCommit(commitUser);
+        write.write(GenericRow.of(1, 10L, BinaryString.fromString("apple")));
+        commit.commit(1, write.prepareCommit(false, 1));
+        write.close();
+        commit.close();
+
+        // drop primary keys on non-empty table should fail
+        assertThatThrownBy(() -> 
manager.commitChanges(SchemaChange.dropPrimaryKey()))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage("Cannot drop primary keys on a non-empty table.");
+    }
+
     @Test
     public void testAddAndDropNestedColumns() throws Exception {
         RowType innerType =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index e05106eef6..5f59063668 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -510,7 +510,9 @@ public class FlinkCatalog extends AbstractCatalog {
     }
 
     private List<SchemaChange> toSchemaChange(
-            TableChange change, Map<String, Integer> 
oldTableNonPhysicalColumnIndex) {
+            TableChange change,
+            Map<String, Integer> oldTableNonPhysicalColumnIndex,
+            @Nullable String primaryKeyConstraintName) {
         List<SchemaChange> schemaChanges = new ArrayList<>();
         if (change instanceof AddColumn) {
             if (((AddColumn) change).getColumn().isPhysical()) {
@@ -627,6 +629,15 @@ public class FlinkCatalog extends AbstractCatalog {
         } else if (change instanceof MaterializedTableChange
                 && handleMaterializedTableChange(change, schemaChanges)) {
             return schemaChanges;
+        } else if (change instanceof TableChange.DropConstraint) {
+            TableChange.DropConstraint dropConstraint = 
(TableChange.DropConstraint) change;
+            if (primaryKeyConstraintName == null
+                    || 
!primaryKeyConstraintName.equals(dropConstraint.getConstraintName())) {
+                throw new UnsupportedOperationException(
+                        "Only dropping primary key constraint is supported.");
+            }
+            schemaChanges.add(SchemaChange.dropPrimaryKey());
+            return schemaChanges;
         }
         throw new UnsupportedOperationException("Change is not supported: " + 
change.getClass());
     }
@@ -740,10 +751,17 @@ public class FlinkCatalog extends AbstractCatalog {
         checkArgument(
                 table instanceof FileStoreTable,
                 "Only support alter data table, but is: " + table.getClass());
-        validateAlterTable(toCatalogTable(table), newTable);
+        CatalogBaseTable oldCatalogTable = toCatalogTable(table);
+        validateAlterTable(oldCatalogTable, newTable);
         Map<String, Integer> oldTableNonPhysicalColumnIndex =
                 FlinkCatalogPropertiesUtil.nonPhysicalColumns(
                         table.options(), table.rowType().getFieldNames());
+        String primaryKeyConstraintName =
+                oldCatalogTable
+                        .getUnresolvedSchema()
+                        .getPrimaryKey()
+                        .map(pk -> pk.getConstraintName())
+                        .orElse(null);
 
         List<SchemaChange> changes = new ArrayList<>();
 
@@ -773,7 +791,9 @@ public class FlinkCatalog extends AbstractCatalog {
                             .flatMap(
                                     tableChange ->
                                             toSchemaChange(
-                                                    tableChange, 
oldTableNonPhysicalColumnIndex)
+                                                    tableChange,
+                                                    
oldTableNonPhysicalColumnIndex,
+                                                    primaryKeyConstraintName)
                                                     .stream())
                             .collect(Collectors.toList());
             changes.addAll(schemaChanges);
@@ -848,10 +868,10 @@ public class FlinkCatalog extends AbstractCatalog {
         if (!table1IsMaterialized) {
             org.apache.flink.table.api.Schema ts1 = ct1.getUnresolvedSchema();
             org.apache.flink.table.api.Schema ts2 = ct2.getUnresolvedSchema();
-            boolean pkEquality = false;
+            boolean allowAlterPk = false;
 
             if (ts1.getPrimaryKey().isPresent() && 
ts2.getPrimaryKey().isPresent()) {
-                pkEquality =
+                allowAlterPk =
                         Objects.equals(
                                         
ts1.getPrimaryKey().get().getConstraintName(),
                                         
ts2.getPrimaryKey().get().getConstraintName())
@@ -859,10 +879,14 @@ public class FlinkCatalog extends AbstractCatalog {
                                         
ts1.getPrimaryKey().get().getColumnNames(),
                                         
ts2.getPrimaryKey().get().getColumnNames());
             } else if (!ts1.getPrimaryKey().isPresent() && 
!ts2.getPrimaryKey().isPresent()) {
-                pkEquality = true;
+                allowAlterPk = true;
+            } else if (ts1.getPrimaryKey().isPresent() && 
!ts2.getPrimaryKey().isPresent()) {
+                // dropping primary key is allowed on empty tables,
+                // SchemaManager will validate this
+                allowAlterPk = true;
             }
 
-            if (!pkEquality) {
+            if (!allowAlterPk) {
                 throw new UnsupportedOperationException(
                         "Altering primary key is not supported yet.");
             }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index 840dce7942..cd7805b9b3 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -1776,4 +1776,28 @@ public class SchemaChangeITCase extends 
CatalogITCaseBase {
                         Row.of(1L, 100L, "buy", null, null, null, 
"2024-01-01", "10"),
                         Row.of(2L, 200L, "sell", 99.5, 10, 3.14, "2024-01-02", 
"11"));
     }
+
+    @Test
+    public void testDropPrimaryKeyOnEmptyTable() {
+        sql("CREATE TABLE T (a INT, b INT, c STRING, PRIMARY KEY (a) NOT 
ENFORCED)");
+
+        // drop primary key on empty table should succeed
+        sql("ALTER TABLE T DROP PRIMARY KEY");
+
+        List<Row> result = sql("SHOW CREATE TABLE T");
+        assertThat(result.get(0).toString()).doesNotContain("PRIMARY KEY");
+    }
+
+    @Test
+    public void testDropPrimaryKeyOnNonEmptyTable() {
+        sql("CREATE TABLE T (a INT, b INT, c STRING, PRIMARY KEY (a) NOT 
ENFORCED)");
+        sql("INSERT INTO T VALUES (1, 2, 'hello')");
+
+        // drop primary key on non-empty table should fail
+        assertThatThrownBy(() -> sql("ALTER TABLE T DROP PRIMARY KEY"))
+                .satisfies(
+                        anyCauseMatches(
+                                UnsupportedOperationException.class,
+                                "Cannot drop primary keys on a non-empty 
table."));
+    }
 }

Reply via email to