twalthr commented on code in PR #24886:
URL: https://github.com/apache/flink/pull/24886#discussion_r1627848065


##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java:
##########
@@ -40,7 +41,8 @@ public static void unparseTableSchema(
             int rightPrec,
             SqlNodeList columnList,
             List<SqlTableConstraint> constraints,
-            @Nullable SqlWatermark watermark) {
+            @Nullable SqlWatermark watermark,
+            @Nullable SqlDistribution distribution) {

Review Comment:
   This doesn't seem right. A distribution is not part of the table schema. A 
table schema is enclosed in `( )` like in `CREATE TABLE <name> (<table 
schema>)`. The distribution comes after the schema.



##########
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java:
##########
@@ -611,6 +611,70 @@ void testAlterTableAddWatermark() {
                 .fails("Multiple WATERMARK statements is not supported yet.");
     }
 
+    @Test
+    void testAlterTableAddDistribution() {
+        sql("alter table t1 add (DISTRIBUTION BY HASH(a, h) INTO 6 BUCKETS\n)")
+                .ok("ALTER TABLE `T1` ADD (\n  DISTRIBUTION BY HASH(`A`, `H`) 
INTO 6 BUCKETS\n)");
+
+        sql("alter table t1 add DISTRIBUTION BY HASH(a, h) INTO 6 BUCKETS\n")
+                .ok("ALTER TABLE `T1` ADD (\n  DISTRIBUTION BY HASH(`A`, `H`) 
INTO 6 BUCKETS\n)");
+
+        sql("alter table tbl1 add DISTRIBUTION BY HASH(a, h) INTO 6 BUCKETS\n")

Review Comment:
   remove? same as above?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -339,6 +364,10 @@ private void populatePrimaryKeyFromSourceTable(Schema 
oldSchema) {
             }
         }
 
+        private void populateDistribution(ResolvedCatalogTable oldTable) {

Review Comment:
   follow naming pattern and call it `populateDistributionFromSourceTable`



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -916,15 +987,29 @@ private Operation buildAlterTableChangeOperation(
             List<TableChange> tableChanges,
             Schema newSchema,
             ResolvedCatalogTable oldTable) {
+        CatalogTable.Builder builder =
+                CatalogTable.newBuilder()
+                        .schema(newSchema)
+                        .comment(oldTable.getComment())
+                        .partitionKeys(oldTable.getPartitionKeys())
+                        .options(oldTable.getOptions());
+
+        if (alterTable instanceof SqlAlterTableSchema) {

Review Comment:
   why can't we simply call `oldTable.getDistribution`?



##########
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java:
##########
@@ -611,6 +611,70 @@ void testAlterTableAddWatermark() {
                 .fails("Multiple WATERMARK statements is not supported yet.");
     }
 
+    @Test
+    void testAlterTableAddDistribution() {
+        sql("alter table t1 add (DISTRIBUTION BY HASH(a, h) INTO 6 BUCKETS\n)")
+                .ok("ALTER TABLE `T1` ADD (\n  DISTRIBUTION BY HASH(`A`, `H`) 
INTO 6 BUCKETS\n)");
+
+        sql("alter table t1 add DISTRIBUTION BY HASH(a, h) INTO 6 BUCKETS\n")
+                .ok("ALTER TABLE `T1` ADD (\n  DISTRIBUTION BY HASH(`A`, `H`) 
INTO 6 BUCKETS\n)");
+
+        sql("alter table tbl1 add DISTRIBUTION BY HASH(a, h) INTO 6 BUCKETS\n")
+                .ok("ALTER TABLE `TBL1` ADD (\n  DISTRIBUTION BY HASH(`A`, 
`H`) INTO 6 BUCKETS\n)");
+
+        sql("alter table tbl1 add DISTRIBUTION BY RANGE(a, h) INTO 6 
BUCKETS\n")
+                .ok(
+                        "ALTER TABLE `TBL1` ADD (\n  DISTRIBUTION BY 
RANGE(`A`, `H`) INTO 6 BUCKETS\n)");
+
+        sql("alter table tbl1 add DISTRIBUTION BY ^RANDOM^(a, h) INTO 6 
BUCKETS\n")
+                .fails("(?s).*Encountered \"RANDOM\" at line 1, column 38.*");
+
+        sql("alter table tbl1 add DISTRIBUTION BY (a, h) INTO 6 BUCKETS\n")
+                .ok("ALTER TABLE `TBL1` ADD (\n  DISTRIBUTION BY (`A`, `H`) 
INTO 6 BUCKETS\n)");
+
+        sql("alter table tbl1 add DISTRIBUTION BY RANGE(a, h)\n")
+                .ok("ALTER TABLE `TBL1` ADD (\n  DISTRIBUTION BY RANGE(`A`, 
`H`)\n)");
+
+        sql("alter table tbl1 add DISTRIBUTION BY (a, h)\n")
+                .ok("ALTER TABLE `TBL1` ADD (\n  DISTRIBUTION BY (`A`, 
`H`)\n)");
+    }
+
+    @Test
+    void testAlterTableModifyDistribution() {
+        sql("alter table t1 modify (DISTRIBUTION BY HASH(a, h) INTO 6 
BUCKETS\n)")
+                .ok(
+                        "ALTER TABLE `T1` MODIFY (\n  DISTRIBUTION BY 
HASH(`A`, `H`) INTO 6 BUCKETS\n)");
+
+        sql("alter table t1 modify DISTRIBUTION BY HASH(a, h) INTO 6 
BUCKETS\n")
+                .ok(
+                        "ALTER TABLE `T1` MODIFY (\n  DISTRIBUTION BY 
HASH(`A`, `H`) INTO 6 BUCKETS\n)");
+
+        sql("alter table tbl1 modify DISTRIBUTION BY HASH(a, h) INTO 6 
BUCKETS\n")

Review Comment:
   remove? same as above?



##########
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java:
##########
@@ -611,6 +611,70 @@ void testAlterTableAddWatermark() {
                 .fails("Multiple WATERMARK statements is not supported yet.");
     }
 
+    @Test
+    void testAlterTableAddDistribution() {
+        sql("alter table t1 add (DISTRIBUTION BY HASH(a, h) INTO 6 BUCKETS\n)")

Review Comment:
   Parenthesis should not be allowed. Take a look at the grammar in the Flink 
docs:
   ```
   ADD { <schema_component> | (<schema_component> [, ...]) | [IF NOT EXISTS] 
<partition_component> [<partition_component> ...]}
   ```
   
   Distribution is not a schema component. It deserves its own branch.  Do we 
support parenthesis for the partition component?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -206,6 +210,24 @@ public Operation convertAlterSchema(
                 dropColumn, tableChanges, schemaBuilder.build(), oldTable);
     }
 
+    /** Convert ALTER TABLE DROP DISTRIBUTION to generate an updated Schema. */
+    public Operation convertAlterSchema(

Review Comment:
   the name is misleading as distribution is not part of the "schema". 
`convertAlterDistribution`?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -110,6 +113,7 @@ public Operation convertAlterSchema(
         SchemaConverter converter = createSchemaConverter(alterTableSchema, 
oldTable);
         
converter.updateColumn(alterTableSchema.getColumnPositions().getList());
         alterTableSchema.getWatermark().ifPresent(converter::updateWatermark);
+        
alterTableSchema.getDistribution().ifPresent(converter::updateDistribution);

Review Comment:
   I thought the main change in this class is only to update the distribution 
in case of a column name change similar to  `buildUpdatedPrimaryKey`. 



##########
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java:
##########
@@ -611,6 +611,70 @@ void testAlterTableAddWatermark() {
                 .fails("Multiple WATERMARK statements is not supported yet.");
     }
 
+    @Test
+    void testAlterTableAddDistribution() {
+        sql("alter table t1 add (DISTRIBUTION BY HASH(a, h) INTO 6 BUCKETS\n)")
+                .ok("ALTER TABLE `T1` ADD (\n  DISTRIBUTION BY HASH(`A`, `H`) 
INTO 6 BUCKETS\n)");
+
+        sql("alter table t1 add DISTRIBUTION BY HASH(a, h) INTO 6 BUCKETS\n")
+                .ok("ALTER TABLE `T1` ADD (\n  DISTRIBUTION BY HASH(`A`, `H`) 
INTO 6 BUCKETS\n)");
+
+        sql("alter table tbl1 add DISTRIBUTION BY HASH(a, h) INTO 6 BUCKETS\n")
+                .ok("ALTER TABLE `TBL1` ADD (\n  DISTRIBUTION BY HASH(`A`, 
`H`) INTO 6 BUCKETS\n)");
+
+        sql("alter table tbl1 add DISTRIBUTION BY RANGE(a, h) INTO 6 
BUCKETS\n")
+                .ok(
+                        "ALTER TABLE `TBL1` ADD (\n  DISTRIBUTION BY 
RANGE(`A`, `H`) INTO 6 BUCKETS\n)");
+
+        sql("alter table tbl1 add DISTRIBUTION BY ^RANDOM^(a, h) INTO 6 
BUCKETS\n")
+                .fails("(?s).*Encountered \"RANDOM\" at line 1, column 38.*");
+
+        sql("alter table tbl1 add DISTRIBUTION BY (a, h) INTO 6 BUCKETS\n")
+                .ok("ALTER TABLE `TBL1` ADD (\n  DISTRIBUTION BY (`A`, `H`) 
INTO 6 BUCKETS\n)");
+
+        sql("alter table tbl1 add DISTRIBUTION BY RANGE(a, h)\n")
+                .ok("ALTER TABLE `TBL1` ADD (\n  DISTRIBUTION BY RANGE(`A`, 
`H`)\n)");
+
+        sql("alter table tbl1 add DISTRIBUTION BY (a, h)\n")
+                .ok("ALTER TABLE `TBL1` ADD (\n  DISTRIBUTION BY (`A`, 
`H`)\n)");
+    }
+
+    @Test
+    void testAlterTableModifyDistribution() {
+        sql("alter table t1 modify (DISTRIBUTION BY HASH(a, h) INTO 6 
BUCKETS\n)")

Review Comment:
   Same comment as above. Parenthesis should not be allowed. Distribution is 
not a schema component. It deserves its own branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to