wuchong commented on a change in pull request #11950:
URL: https://github.com/apache/flink/pull/11950#discussion_r421966802
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -735,6 +740,49 @@ private TableResult executeOperation(Operation operation) {
alterTablePropertiesOp.getTableIdentifier().toObjectPath(),
alterTablePropertiesOp.getCatalogTable(),
false);
+ } else if (alterTableOperation instanceof
AlterTableAddConstraintOperation){
+ AlterTableAddConstraintOperation
addConstraintOP =
+
(AlterTableAddConstraintOperation) operation;
+ CatalogTable oriTable = (CatalogTable)
catalogManager
+
.getTable(addConstraintOP.getTableIdentifier())
+ .get()
+ .getTable();
+ TableSchema.Builder builder =
TableSchemaUtils
+
.builderWithGivenSchema(oriTable.getSchema());
+ if
(addConstraintOP.getConstraintName().isPresent()) {
+ builder.primaryKey(
+
addConstraintOP.getConstraintName().get(),
+
addConstraintOP.getColumnNames());
+ } else {
+
builder.primaryKey(addConstraintOP.getColumnNames());
+ }
+ CatalogTable newTable = new
CatalogTableImpl(
+ builder.build(),
+
oriTable.getPartitionKeys(),
+
oriTable.getProperties(),
Review comment:
Please use `oriTable.getOptions()`.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -735,6 +740,49 @@ private TableResult executeOperation(Operation operation) {
alterTablePropertiesOp.getTableIdentifier().toObjectPath(),
alterTablePropertiesOp.getCatalogTable(),
false);
+ } else if (alterTableOperation instanceof
AlterTableAddConstraintOperation){
+ AlterTableAddConstraintOperation
addConstraintOP =
+
(AlterTableAddConstraintOperation) operation;
+ CatalogTable oriTable = (CatalogTable)
catalogManager
+
.getTable(addConstraintOP.getTableIdentifier())
+ .get()
+ .getTable();
+ TableSchema.Builder builder =
TableSchemaUtils
+
.builderWithGivenSchema(oriTable.getSchema());
+ if
(addConstraintOP.getConstraintName().isPresent()) {
+ builder.primaryKey(
+
addConstraintOP.getConstraintName().get(),
+
addConstraintOP.getColumnNames());
+ } else {
+
builder.primaryKey(addConstraintOP.getColumnNames());
+ }
+ CatalogTable newTable = new
CatalogTableImpl(
+ builder.build(),
+
oriTable.getPartitionKeys(),
+
oriTable.getProperties(),
+ oriTable.getComment());
+ catalog.alterTable(
+
addConstraintOP.getTableIdentifier().toObjectPath(),
+ newTable,
+ false);
+ } else if (alterTableOperation instanceof
AlterTableDropConstraintOperation){
+ AlterTableDropConstraintOperation
dropConstraintOperation =
+
(AlterTableDropConstraintOperation) operation;
+ CatalogTable oriTable = (CatalogTable)
catalogManager
+
.getTable(dropConstraintOperation.getTableIdentifier())
+ .get()
+ .getTable();
+ // The constraint name is not used
because currently there is only one
+ // constraint in the schema.
+ CatalogTable newTable = new
CatalogTableImpl(
+
TableSchemaUtils.dropConstraint(oriTable.getSchema()),
+
oriTable.getPartitionKeys(),
+
oriTable.getProperties(),
Review comment:
Please use `oriTable.getOptions()`.
##########
File path:
flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests for TableSchemaUtils. */
+public class TableSchemaUtilsTest {
+ @Test
+ public void testBuilderWithGivenSchema() {
+ TableSchema oriSchema = TableSchema.builder()
+ .field("a", DataTypes.INT().notNull())
+ .field("b", DataTypes.STRING())
+ .field("c", DataTypes.INT(), "a + 1")
+ .field("t", DataTypes.TIMESTAMP(3))
+ .primaryKey("ct1", new String[] {"a"})
+ .watermark("t", "t", DataTypes.TIMESTAMP(3))
+ .build();
+ TableSchema newSchema =
TableSchemaUtils.builderWithGivenSchema(oriSchema).build();
+ final String expected = "root\n" +
+ " |-- a: INT NOT NULL\n" +
+ " |-- b: STRING\n" +
+ " |-- c: INT AS a + 1\n" +
+ " |-- t: TIMESTAMP(3)\n" +
+ " |-- WATERMARK FOR t AS t\n" +
+ " |-- CONSTRAINT ct1 PRIMARY KEY (a)\n";
+ assertEquals(expected, newSchema.toString());
Review comment:
nit: A simpler verification for this is `assertEquals(oriSchema,
newSchema)`.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -735,6 +740,49 @@ private TableResult executeOperation(Operation operation) {
alterTablePropertiesOp.getTableIdentifier().toObjectPath(),
alterTablePropertiesOp.getCatalogTable(),
false);
+ } else if (alterTableOperation instanceof
AlterTableAddConstraintOperation){
+ AlterTableAddConstraintOperation
addConstraintOP =
+
(AlterTableAddConstraintOperation) operation;
+ CatalogTable oriTable = (CatalogTable)
catalogManager
+
.getTable(addConstraintOP.getTableIdentifier())
+ .get()
+ .getTable();
+ TableSchema.Builder builder =
TableSchemaUtils
+
.builderWithGivenSchema(oriTable.getSchema());
+ if
(addConstraintOP.getConstraintName().isPresent()) {
+ builder.primaryKey(
+
addConstraintOP.getConstraintName().get(),
+
addConstraintOP.getColumnNames());
+ } else {
+
builder.primaryKey(addConstraintOP.getColumnNames());
+ }
+ CatalogTable newTable = new
CatalogTableImpl(
+ builder.build(),
+
oriTable.getPartitionKeys(),
+
oriTable.getProperties(),
+ oriTable.getComment());
+ catalog.alterTable(
+
addConstraintOP.getTableIdentifier().toObjectPath(),
+ newTable,
+ false);
+ } else if (alterTableOperation instanceof
AlterTableDropConstraintOperation){
+ AlterTableDropConstraintOperation
dropConstraintOperation =
+
(AlterTableDropConstraintOperation) operation;
+ CatalogTable oriTable = (CatalogTable)
catalogManager
+
.getTable(dropConstraintOperation.getTableIdentifier())
+ .get()
+ .getTable();
+ // The constraint name is not used
because currently there is only one
+ // constraint in the schema.
+ CatalogTable newTable = new
CatalogTableImpl(
+
TableSchemaUtils.dropConstraint(oriTable.getSchema()),
Review comment:
Even though we only support primary key constraint, I still think we
shouldn't remove all constraints.
We should remove it by name, and should throw exception when the constraint
can't be found.
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
##########
@@ -279,8 +282,59 @@ private Operation convertAlterTable(SqlAlterTable
sqlAlterTable) {
throw new
ValidationException(String.format("Table %s doesn't exist or is a temporary
table.",
tableIdentifier.toString()));
}
+ } else if (sqlAlterTable instanceof SqlAlterTableAddConstraint)
{
+ Optional<CatalogManager.TableLookupResult>
optionalCatalogTable =
+
catalogManager.getTable(tableIdentifier);
+ if (optionalCatalogTable.isPresent() &&
!optionalCatalogTable.get().isTemporary()) {
+ SqlTableConstraint constraint =
((SqlAlterTableAddConstraint) sqlAlterTable)
+ .getConstraint();
+ validateTableConstraint(constraint);
+ String[] columnNames =
constraint.getColumnNames();
Review comment:
useless
##########
File path:
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
##########
@@ -634,6 +697,139 @@ public void testAlterTable() throws Exception {
assertEquals(properties,
alterTablePropertiesOperation.getCatalogTable().getProperties());
}
+ @Test
+ public void testAlterTableAddPkConstraint() throws Exception {
+ Catalog catalog = new GenericInMemoryCatalog("default",
"default");
+ catalogManager.registerCatalog("cat1", catalog);
+ catalog.createDatabase("db1", new CatalogDatabaseImpl(new
HashMap<>(), null), true);
+ CatalogTable catalogTable = new CatalogTableImpl(
+ TableSchema.builder()
+ .field("a",
DataTypes.STRING().notNull())
+ .field("b",
DataTypes.BIGINT().notNull())
+ .field("c", DataTypes.BIGINT())
+ .build(),
+ new HashMap<>(),
+ "tb1");
+ catalogManager.setCurrentCatalog("cat1");
+ catalogManager.setCurrentDatabase("db1");
+ catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable,
true);
+ // Test alter add table constraint.
+ Operation operation = parse("alter table tb1 add constraint ct1
primary key(a, b) not enforced",
+ SqlDialect.DEFAULT);
+ assert operation instanceof AlterTableAddConstraintOperation;
+ AlterTableAddConstraintOperation addConstraintOperation =
+ (AlterTableAddConstraintOperation) operation;
+ assertThat(addConstraintOperation.asSummaryString(),
+ is("ALTER TABLE ADD CONSTRAINT: (identifier:
[`cat1`.`db1`.`tb1`], "
+ + "constraintName: [ct1],
columns: [a, b])"));
+ // Test alter table add pk on nullable column
+ exceptionRule.expect(ValidationException.class);
+ exceptionRule.expectMessage("Could not create a PRIMARY KEY
'ct1'. Column 'c' is nullable.");
+ parse("alter table tb1 add constraint ct1 primary key(c) not
enforced",
+ SqlDialect.DEFAULT);
+ }
+
+ @Test
+ public void testAlterTableAddPkConstraintEnforced() throws Exception {
Review comment:
Is this duplicated with `testAlterTableAddConstraintEnforced`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]