[ https://issues.apache.org/jira/browse/FLINK-21634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jark Wu updated FLINK-21634: ---------------------------- Description: We already introduced ALTER TABLE statement in FLIP-69 [1], but only support to rename table name and change table options. One useful feature of ALTER TABLE statement is modifying schema. This is also heavily required by integration with data lakes (e.g. iceberg). Therefore, I propose to support following ALTER TABLE statements: {code:sql} ALTER TABLE table_name { ADD { <schema_component> | (<schema_component> [, ...]) } | MODIFY { <schema_component> | (<schema_component> [, ...]) } | DROP {column_name | (column_name, column_name, ....) | PRIMARY KEY | CONSTRAINT constraint_name | WATERMARK} | RENAME old_column_name TO new_column_name | RENAME TO new_table_name | SET (key1=val1, ...) | RESET (key1, ...) } <schema_component>:: { <column_component> | <constraint_component> | <watermark_component> } <column_component>:: column_name <column_definition> [FIRST | AFTER column_name] <constraint_component>:: [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED <watermark_component>:: WATERMARK FOR rowtime_column_name AS watermark_strategy_expression <column_definition>:: { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> } [COMMENT column_comment] <physical_column_definition>:: column_type <metadata_column_definition>:: column_type METADATA [ FROM metadata_key ] [ VIRTUAL ] <computed_column_definition>:: AS computed_column_expression {code} And some examples: {code:sql} -- add a new column ALTER TABLE mytable ADD new_column STRING COMMENT 'new_column docs'; -- add columns, constraint, and watermark ALTER TABLE mytable ADD ( log_ts STRING COMMENT 'log timestamp string' FIRST, ts AS TO_TIMESTAMP(log_ts) AFTER log_ts, PRIMARY KEY (id) NOT ENFORCED, WATERMARK FOR ts AS ts - INTERVAL '3' SECOND ); -- modify a column type ALTER TABLE prod.db.sample MODIFY measurement double COMMENT 'unit is bytes per second' AFTER `id`; -- modify definition of column log_ts and ts, primary key, watermark. they must exist in table schema ALTER TABLE mytable ADD ( log_ts STRING COMMENT 'log timestamp string' AFTER `id`, -- reoder columns ts AS TO_TIMESTAMP(log_ts) AFTER log_ts, PRIMARY KEY (id) NOT ENFORCED, WATERMARK FOR ts AS ts - INTERVAL '3' SECOND ); -- drop an old column ALTER TABLE prod.db.sample DROP measurement; -- drop columns ALTER TABLE prod.db.sample DROP (col1, col2, col3); -- drop a watermark ALTER TABLE prod.db.sample DROP WATERMARK; -- rename column name ALTER TABLE prod.db.sample RENAME `data` TO payload; -- rename table name ALTER TABLE mytable RENAME TO mytable2; -- set options ALTER TABLE kafka_table SET ( 'scan.startup.mode' = 'specific-offsets', 'scan.startup.specific-offsets' = 'partition:0,offset:42' ); -- reset options ALTER TABLE kafka_table RESET ('scan.startup.mode', 'scan.startup.specific-offsets'); {code} Note: we don't need to introduce new interfaces, because all the alter table operation will be forward to catalog through {{Catalog#alterTable(tablePath, newTable, ignoreIfNotExists)}}. [1]: https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/alter/#alter-table [2]: http://iceberg.apache.org/spark-ddl/#alter-table-alter-column [3]: https://trino.io/docs/current/sql/alter-table.html [4]: https://dev.mysql.com/doc/refman/8.0/en/alter-table.html [5]: https://www.postgresql.org/docs/9.1/sql-altertable.html was: We already introduced ALTER TABLE statement in FLIP-69 [1], but only support to rename table name and change table options. One useful feature of ALTER TABLE statement is modifying schema. This is also heavily required by integration with data lakes (e.g. iceberg). Therefore, I propose to support following ALTER TABLE statements: *Add Column* {code:sql} ALTER TABLE table_name ADD COLUMN column_name <column_definition> [FIRST | AFTER column_name] <column_definition>:: { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> } [COMMENT column_comment] <physical_column_definition>:: column_type <metadata_column_definition>:: column_type METADATA [ FROM metadata_key ] [ VIRTUAL ] <computed_column_definition>:: AS computed_column_expression {code} This follows SQL standard 2011 Section 11.10. And Iceberg[2], Trino[3], MySQL[4] also are the same. *Add Columns* In order to support adding multiple columns easily, we will introduce {{ADD COLUMNS}}: {code:sql} ALTER TABLE table_name ADD COLUMNS (column_name <column_definition> [, ...]) [FIRST | AFTER column_name] {code} *Drop Column* {code:sql} ALTER TABLE <table_name> DROP COLUMN <column_name> {code} This follows SQL standard 2011 Section 11.10. And Iceberg[2], Trino[3], MySQL[4] also are the same. *Modify Column* {code:sql} ALTER TABLE table_name MODIFY COLUMN column_name <column_definition> [FIRST | AFTER column_name] {code} This is not in SQL standard 2011 Section 11.10, but is in MySQL and Oracle. Modify Column can change column into arbitrary definition which works better with Flink. So we don't introduce {{ALTER COLUMN}} syntax that is not even able to change every aspect of the column. *Rename Column* {code:sql} ALTER TABLE <table_name> RENAME COLUMN <old_name> TO <new_name> {code} This is not listed in SQL standard, but is also very useful. Follows the syntax of Iceberg[2], Trino[3], MySQL[4]. *Reset Options* {code:sql} ALTER TABLE <table_name> RESET (key1, key2, ...) {code} Out of SQL standard, but is useful. Has been discussed in FLINK-17845. Use {{RESET}} to keep align with {{SET key=value}} and {{RESET key}} proposed in FLIP-163. And PG[5] also uses the {{RESET}} keyword. For example: {code:sql} -- add a new column ALTER TABLE mytable ADD COLUMN new_column STRING COMMENT 'new_column docs'; -- drop an old column ALTER TABLE prod.db.sample DROP COLUMN legacy_name; -- rename column name ALTER TABLE prod.db.sample RENAME COLUMN `data` TO payload; -- alter table type ALTER TABLE prod.db.sample ALTER COLUMN measurement double COMMENT 'unit is bytes per second' AFTER `id`; -- reset options ALTER TABLE kafka_table RESET ('scan.startup.mode', 'scan.startup.specific-offsets'); {code} Note: we don't need to introduce new interfaces, because all the alter table operation will be forward to catalog through {{Catalog#alterTable(tablePath, newTable, ignoreIfNotExists)}}. [1]: https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/alter/#alter-table [2]: http://iceberg.apache.org/spark-ddl/#alter-table-alter-column [3]: https://trino.io/docs/current/sql/alter-table.html [4]: https://dev.mysql.com/doc/refman/8.0/en/alter-table.html [5]: https://www.postgresql.org/docs/9.1/sql-altertable.html > ALTER TABLE statement enhancement > --------------------------------- > > Key: FLINK-21634 > URL: https://issues.apache.org/jira/browse/FLINK-21634 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API, Table SQL / Client > Reporter: Jark Wu > Priority: Major > > We already introduced ALTER TABLE statement in FLIP-69 [1], but only support > to rename table name and change table options. One useful feature of ALTER > TABLE statement is modifying schema. This is also heavily required by > integration with data lakes (e.g. iceberg). > Therefore, I propose to support following ALTER TABLE statements: > {code:sql} > ALTER TABLE table_name { > ADD { <schema_component> | (<schema_component> [, ...]) } > | MODIFY { <schema_component> | (<schema_component> [, ...]) } > | DROP {column_name | (column_name, column_name, ....) | PRIMARY KEY | > CONSTRAINT constraint_name | WATERMARK} > | RENAME old_column_name TO new_column_name > | RENAME TO new_table_name > | SET (key1=val1, ...) > | RESET (key1, ...) > } > <schema_component>:: > { <column_component> | <constraint_component> | <watermark_component> } > <column_component>:: > column_name <column_definition> [FIRST | AFTER column_name] > <constraint_component>:: > [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED > <watermark_component>:: > WATERMARK FOR rowtime_column_name AS watermark_strategy_expression > <column_definition>:: > { <physical_column_definition> | <metadata_column_definition> | > <computed_column_definition> } [COMMENT column_comment] > <physical_column_definition>:: > column_type > <metadata_column_definition>:: > column_type METADATA [ FROM metadata_key ] [ VIRTUAL ] > <computed_column_definition>:: > AS computed_column_expression > {code} > And some examples: > {code:sql} > -- add a new column > ALTER TABLE mytable ADD new_column STRING COMMENT 'new_column docs'; > -- add columns, constraint, and watermark > ALTER TABLE mytable ADD ( > log_ts STRING COMMENT 'log timestamp string' FIRST, > ts AS TO_TIMESTAMP(log_ts) AFTER log_ts, > PRIMARY KEY (id) NOT ENFORCED, > WATERMARK FOR ts AS ts - INTERVAL '3' SECOND > ); > -- modify a column type > ALTER TABLE prod.db.sample MODIFY measurement double COMMENT 'unit is bytes > per second' AFTER `id`; > -- modify definition of column log_ts and ts, primary key, watermark. they > must exist in table schema > ALTER TABLE mytable ADD ( > log_ts STRING COMMENT 'log timestamp string' AFTER `id`, -- reoder > columns > ts AS TO_TIMESTAMP(log_ts) AFTER log_ts, > PRIMARY KEY (id) NOT ENFORCED, > WATERMARK FOR ts AS ts - INTERVAL '3' SECOND > ); > -- drop an old column > ALTER TABLE prod.db.sample DROP measurement; > -- drop columns > ALTER TABLE prod.db.sample DROP (col1, col2, col3); > -- drop a watermark > ALTER TABLE prod.db.sample DROP WATERMARK; > -- rename column name > ALTER TABLE prod.db.sample RENAME `data` TO payload; > -- rename table name > ALTER TABLE mytable RENAME TO mytable2; > -- set options > ALTER TABLE kafka_table SET ( > 'scan.startup.mode' = 'specific-offsets', > 'scan.startup.specific-offsets' = 'partition:0,offset:42' > ); > -- reset options > ALTER TABLE kafka_table RESET ('scan.startup.mode', > 'scan.startup.specific-offsets'); > {code} > Note: we don't need to introduce new interfaces, because all the alter table > operation will be forward to catalog through {{Catalog#alterTable(tablePath, > newTable, ignoreIfNotExists)}}. > [1]: > https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/alter/#alter-table > [2]: http://iceberg.apache.org/spark-ddl/#alter-table-alter-column > [3]: https://trino.io/docs/current/sql/alter-table.html > [4]: https://dev.mysql.com/doc/refman/8.0/en/alter-table.html > [5]: https://www.postgresql.org/docs/9.1/sql-altertable.html -- This message was sent by Atlassian Jira (v8.3.4#803005)