This is an automated email from the ASF dual-hosted git repository. yuzelin pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit f2c41f14d75844588a52d64b6fcceb4ac64673d3 Author: tsreaper <[email protected]> AuthorDate: Wed Apr 1 10:41:43 2026 +0800 [flink] Support dropping primary keys for empty tables in Flink (#7566) Paimon has supported adding primary keys for empty tables before. So in this PR, we support the symmetrical operation: dropping primary keys for empty tables. --- .../org/apache/paimon/schema/SchemaChange.java | 27 +++++++++++++ .../org/apache/paimon/schema/SchemaManager.java | 11 +++++- .../apache/paimon/schema/SchemaManagerTest.java | 45 ++++++++++++++++++++++ .../java/org/apache/paimon/flink/FlinkCatalog.java | 38 ++++++++++++++---- .../apache/paimon/flink/SchemaChangeITCase.java | 24 ++++++++++++ 5 files changed, 136 insertions(+), 9 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java b/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java index 4b68ad105a..9b381a3490 100644 --- a/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java +++ b/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java @@ -78,6 +78,9 @@ import java.util.Objects; @JsonSubTypes.Type( value = SchemaChange.UpdateColumnPosition.class, name = SchemaChange.Actions.UPDATE_COLUMN_POSITION_ACTION), + @JsonSubTypes.Type( + value = SchemaChange.DropPrimaryKey.class, + name = SchemaChange.Actions.DROP_PRIMARY_KEY_ACTION), }) public interface SchemaChange extends Serializable { @@ -164,6 +167,10 @@ public interface SchemaChange extends Serializable { return new UpdateColumnPosition(move); } + static SchemaChange dropPrimaryKey() { + return new DropPrimaryKey(); + } + /** A SchemaChange to set a table option. */ final class SetOption implements SchemaChange { @@ -810,6 +817,25 @@ public interface SchemaChange extends Serializable { } } + /** A SchemaChange to drop primary key. */ + final class DropPrimaryKey implements SchemaChange { + + private static final long serialVersionUID = 1L; + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + return o != null && getClass() == o.getClass(); + } + + @Override + public int hashCode() { + return getClass().hashCode(); + } + } + /** Actions for schema changes: identify for schema change. */ class Actions { public static final String FIELD_ACTION = "action"; @@ -824,6 +850,7 @@ public interface SchemaChange extends Serializable { public static final String UPDATE_COLUMN_COMMENT_ACTION = "updateColumnComment"; public static final String UPDATE_COLUMN_DEFAULT_VALUE_ACTION = "updateColumnDefaultValue"; public static final String UPDATE_COLUMN_POSITION_ACTION = "updateColumnPosition"; + public static final String DROP_PRIMARY_KEY_ACTION = "dropPrimaryKey"; private Actions() {} } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 13da6677f7..8667f2271d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -68,6 +68,7 @@ import java.io.Serializable; import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -311,6 +312,7 @@ public class SchemaManager implements Serializable { List<DataField> newFields = new ArrayList<>(oldTableSchema.fields()); AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId()); String newComment = oldTableSchema.comment(); + List<String> newPrimaryKeys = oldTableSchema.primaryKeys(); for (SchemaChange change : changes) { if (change instanceof SetOption) { SetOption setOption = (SetOption) change; @@ -549,6 +551,12 @@ public class SchemaManager implements Serializable { update.newDefaultValue()); }, lazyIdentifier); + } else if (change instanceof SchemaChange.DropPrimaryKey) { + if (hasSnapshots.get()) { + throw new UnsupportedOperationException( + "Cannot drop primary keys on a non-empty table."); + } + newPrimaryKeys = Collections.emptyList(); } else { throw new UnsupportedOperationException("Unsupported change: " + change.getClass()); } @@ -561,8 +569,7 @@ public class SchemaManager implements Serializable { newFields, oldTableSchema.partitionKeys(), applyNotNestedColumnRename( - oldTableSchema.primaryKeys(), - Iterables.filter(changes, RenameColumn.class)), + newPrimaryKeys, Iterables.filter(changes, RenameColumn.class)), applyRenameColumnsToOptions(newOptions, changes), newComment); diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index 31496f8bb8..9a15cab361 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -460,6 +460,51 @@ public class SchemaManagerTest { .hasMessage("Change 'merge-engine' is not supported yet."); } + @Test + public void testDropPrimaryKeyOnEmptyTable() throws Exception { + Path tableRoot = new Path(tempDir.toString(), "table"); + SchemaManager manager = new SchemaManager(LocalFileIO.create(), tableRoot); + manager.createTable(schema); + + // drop primary keys on empty table should succeed + manager.commitChanges(SchemaChange.dropPrimaryKey()); + + FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tableRoot); + assertThat(table.schema().primaryKeys()).isEmpty(); + } + + @Test + public void testDropPrimaryKeyOnNonEmptyTable() throws Exception { + Map<String, String> tableOptions = new HashMap<>(options); + tableOptions.put("bucket", "1"); + Schema pkSchema = + new Schema( + rowType.getFields(), + Collections.emptyList(), + primaryKeys, + tableOptions, + ""); + Path tableRoot = new Path(tempDir.toString(), "table"); + SchemaManager manager = new SchemaManager(LocalFileIO.create(), tableRoot); + manager.createTable(pkSchema); + + // write data to create a snapshot + FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tableRoot); + String commitUser = UUID.randomUUID().toString(); + TableWriteImpl<?> write = + table.newWrite(commitUser).withIOManager(IOManager.create(tempDir + "/io")); + TableCommitImpl commit = table.newCommit(commitUser); + write.write(GenericRow.of(1, 10L, BinaryString.fromString("apple"))); + commit.commit(1, write.prepareCommit(false, 1)); + write.close(); + commit.close(); + + // drop primary keys on non-empty table should fail + assertThatThrownBy(() -> manager.commitChanges(SchemaChange.dropPrimaryKey())) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot drop primary keys on a non-empty table."); + } + @Test public void testAddAndDropNestedColumns() throws Exception { RowType innerType = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index e05106eef6..5f59063668 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -510,7 +510,9 @@ public class FlinkCatalog extends AbstractCatalog { } private List<SchemaChange> toSchemaChange( - TableChange change, Map<String, Integer> oldTableNonPhysicalColumnIndex) { + TableChange change, + Map<String, Integer> oldTableNonPhysicalColumnIndex, + @Nullable String primaryKeyConstraintName) { List<SchemaChange> schemaChanges = new ArrayList<>(); if (change instanceof AddColumn) { if (((AddColumn) change).getColumn().isPhysical()) { @@ -627,6 +629,15 @@ public class FlinkCatalog extends AbstractCatalog { } else if (change instanceof MaterializedTableChange && handleMaterializedTableChange(change, schemaChanges)) { return schemaChanges; + } else if (change instanceof TableChange.DropConstraint) { + TableChange.DropConstraint dropConstraint = (TableChange.DropConstraint) change; + if (primaryKeyConstraintName == null + || !primaryKeyConstraintName.equals(dropConstraint.getConstraintName())) { + throw new UnsupportedOperationException( + "Only dropping primary key constraint is supported."); + } + schemaChanges.add(SchemaChange.dropPrimaryKey()); + return schemaChanges; } throw new UnsupportedOperationException("Change is not supported: " + change.getClass()); } @@ -740,10 +751,17 @@ public class FlinkCatalog extends AbstractCatalog { checkArgument( table instanceof FileStoreTable, "Only support alter data table, but is: " + table.getClass()); - validateAlterTable(toCatalogTable(table), newTable); + CatalogBaseTable oldCatalogTable = toCatalogTable(table); + validateAlterTable(oldCatalogTable, newTable); Map<String, Integer> oldTableNonPhysicalColumnIndex = FlinkCatalogPropertiesUtil.nonPhysicalColumns( table.options(), table.rowType().getFieldNames()); + String primaryKeyConstraintName = + oldCatalogTable + .getUnresolvedSchema() + .getPrimaryKey() + .map(pk -> pk.getConstraintName()) + .orElse(null); List<SchemaChange> changes = new ArrayList<>(); @@ -773,7 +791,9 @@ public class FlinkCatalog extends AbstractCatalog { .flatMap( tableChange -> toSchemaChange( - tableChange, oldTableNonPhysicalColumnIndex) + tableChange, + oldTableNonPhysicalColumnIndex, + primaryKeyConstraintName) .stream()) .collect(Collectors.toList()); changes.addAll(schemaChanges); @@ -848,10 +868,10 @@ public class FlinkCatalog extends AbstractCatalog { if (!table1IsMaterialized) { org.apache.flink.table.api.Schema ts1 = ct1.getUnresolvedSchema(); org.apache.flink.table.api.Schema ts2 = ct2.getUnresolvedSchema(); - boolean pkEquality = false; + boolean allowAlterPk = false; if (ts1.getPrimaryKey().isPresent() && ts2.getPrimaryKey().isPresent()) { - pkEquality = + allowAlterPk = Objects.equals( ts1.getPrimaryKey().get().getConstraintName(), ts2.getPrimaryKey().get().getConstraintName()) @@ -859,10 +879,14 @@ public class FlinkCatalog extends AbstractCatalog { ts1.getPrimaryKey().get().getColumnNames(), ts2.getPrimaryKey().get().getColumnNames()); } else if (!ts1.getPrimaryKey().isPresent() && !ts2.getPrimaryKey().isPresent()) { - pkEquality = true; + allowAlterPk = true; + } else if (ts1.getPrimaryKey().isPresent() && !ts2.getPrimaryKey().isPresent()) { + // dropping primary key is allowed on empty tables, + // SchemaManager will validate this + allowAlterPk = true; } - if (!pkEquality) { + if (!allowAlterPk) { throw new UnsupportedOperationException( "Altering primary key is not supported yet."); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index 840dce7942..cd7805b9b3 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -1776,4 +1776,28 @@ public class SchemaChangeITCase extends CatalogITCaseBase { Row.of(1L, 100L, "buy", null, null, null, "2024-01-01", "10"), Row.of(2L, 200L, "sell", 99.5, 10, 3.14, "2024-01-02", "11")); } + + @Test + public void testDropPrimaryKeyOnEmptyTable() { + sql("CREATE TABLE T (a INT, b INT, c STRING, PRIMARY KEY (a) NOT ENFORCED)"); + + // drop primary key on empty table should succeed + sql("ALTER TABLE T DROP PRIMARY KEY"); + + List<Row> result = sql("SHOW CREATE TABLE T"); + assertThat(result.get(0).toString()).doesNotContain("PRIMARY KEY"); + } + + @Test + public void testDropPrimaryKeyOnNonEmptyTable() { + sql("CREATE TABLE T (a INT, b INT, c STRING, PRIMARY KEY (a) NOT ENFORCED)"); + sql("INSERT INTO T VALUES (1, 2, 'hello')"); + + // drop primary key on non-empty table should fail + assertThatThrownBy(() -> sql("ALTER TABLE T DROP PRIMARY KEY")) + .satisfies( + anyCauseMatches( + UnsupportedOperationException.class, + "Cannot drop primary keys on a non-empty table.")); + } }
