[ 
https://issues.apache.org/jira/browse/FLINK-21634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-21634:
----------------------------
    Description: 
We already introduced ALTER TABLE statement in FLIP-69 [1], but only support to 
rename table name and change table options. One useful feature of ALTER TABLE 
statement is modifying schema. This is also heavily required by integration 
with data lakes (e.g. iceberg). 

Therefore, I propose to support following ALTER TABLE statements:


{code:sql}
ALTER TABLE table_name {
    ADD { <schema_component> | (<schema_component> [, ...]) }
  | MODIFY { <schema_component> | (<schema_component> [, ...]) }
  | DROP {column_name | (column_name, column_name, ....) | PRIMARY KEY | 
CONSTRAINT constraint_name | WATERMARK}
  | RENAME old_column_name TO new_column_name
  | RENAME TO new_table_name
  | SET (key1=val1, ...)
  | RESET (key1, ...)
}

<schema_component>::
  { <column_component> | <constraint_component> | <watermark_component> }

<column_component>::
  column_name <column_definition> [FIRST | AFTER column_name]

<constraint_component>::
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<watermark_component>::
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

<column_definition>::
  { <physical_column_definition> | <metadata_column_definition> | 
<computed_column_definition> } [COMMENT column_comment]

<physical_column_definition>::
  column_type

<metadata_column_definition>::
  column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]

<computed_column_definition>::
  AS computed_column_expression
{code}

And some examples:

{code:sql}
-- add a new column 
ALTER TABLE mytable ADD new_column STRING COMMENT 'new_column docs';

-- add columns, constraint, and watermark
ALTER TABLE mytable ADD (
    log_ts STRING COMMENT 'log timestamp string' FIRST,
    ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,
    PRIMARY KEY (id) NOT ENFORCED,
    WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
);

-- modify a column type
ALTER TABLE prod.db.sample MODIFY measurement double COMMENT 'unit is bytes per 
second' AFTER `id`;

-- modify definition of column log_ts and ts, primary key, watermark. they must 
exist in table schema
ALTER TABLE mytable ADD (
    log_ts STRING COMMENT 'log timestamp string' AFTER `id`,  -- reoder columns
    ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,
    PRIMARY KEY (id) NOT ENFORCED,
    WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
);

-- drop an old column
ALTER TABLE prod.db.sample DROP measurement;

-- drop columns
ALTER TABLE prod.db.sample DROP (col1, col2, col3);

-- drop a watermark
ALTER TABLE prod.db.sample DROP WATERMARK;

-- rename column name
ALTER TABLE prod.db.sample RENAME `data` TO payload;

-- rename table name
ALTER TABLE mytable RENAME TO mytable2;

-- set options
ALTER TABLE kafka_table SET (
    'scan.startup.mode' = 'specific-offsets', 
    'scan.startup.specific-offsets' = 'partition:0,offset:42'
);

-- reset options
ALTER TABLE kafka_table RESET ('scan.startup.mode', 
'scan.startup.specific-offsets');
{code}

Note: we don't need to introduce new interfaces, because all the alter table 
operation will be forward to catalog through {{Catalog#alterTable(tablePath, 
newTable, ignoreIfNotExists)}}.

[1]: 
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/alter/#alter-table
[2]: http://iceberg.apache.org/spark-ddl/#alter-table-alter-column
[3]: https://trino.io/docs/current/sql/alter-table.html
[4]: https://dev.mysql.com/doc/refman/8.0/en/alter-table.html
[5]: https://www.postgresql.org/docs/9.1/sql-altertable.html

  was:
We already introduced ALTER TABLE statement in FLIP-69 [1], but only support to 
rename table name and change table options. One useful feature of ALTER TABLE 
statement is modifying schema. This is also heavily required by integration 
with data lakes (e.g. iceberg). 

Therefore, I propose to support following ALTER TABLE statements:

*Add Column*

{code:sql}
ALTER TABLE table_name
  ADD COLUMN column_name <column_definition> [FIRST | AFTER column_name]

<column_definition>::
{ <physical_column_definition> | <metadata_column_definition> | 
<computed_column_definition> } [COMMENT column_comment]

<physical_column_definition>::
column_type

<metadata_column_definition>::
column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]

<computed_column_definition>::
AS computed_column_expression
{code}

This follows SQL standard 2011 Section 11.10. And Iceberg[2], Trino[3], 
MySQL[4] also are the same. 

*Add Columns*

In order to support adding multiple columns easily, we will introduce {{ADD 
COLUMNS}}:

{code:sql}
ALTER TABLE table_name
  ADD COLUMNS (column_name <column_definition> [, ...]) [FIRST | AFTER 
column_name]
{code}

*Drop Column*

{code:sql}
ALTER TABLE <table_name> DROP COLUMN <column_name>
{code}

This follows SQL standard 2011 Section 11.10. And Iceberg[2], Trino[3], 
MySQL[4] also are the same. 

*Modify Column*

{code:sql}
ALTER TABLE table_name
MODIFY COLUMN column_name <column_definition> [FIRST | AFTER column_name]
{code}

This is not in SQL standard 2011 Section 11.10, but is in MySQL and Oracle. 
Modify Column can change column into arbitrary definition which works better 
with Flink. So we don't introduce {{ALTER COLUMN}} syntax that is not even able 
to change every aspect of the column.

*Rename Column*
{code:sql}
ALTER TABLE <table_name> RENAME COLUMN <old_name> TO <new_name>
{code}

This is not listed in SQL standard, but is also very useful. Follows the syntax 
of Iceberg[2], Trino[3], MySQL[4].

*Reset Options*
{code:sql}
ALTER TABLE <table_name> RESET (key1, key2, ...)
{code}

Out of SQL standard, but is useful. Has been discussed in FLINK-17845. Use 
{{RESET}} to keep align with {{SET key=value}} and {{RESET key}} proposed in 
FLIP-163. And PG[5] also uses the {{RESET}} keyword.

For example:

{code:sql}
-- add a new column 
ALTER TABLE mytable ADD COLUMN new_column STRING COMMENT 'new_column docs';

-- drop an old column
ALTER TABLE prod.db.sample DROP COLUMN legacy_name;

-- rename column name
ALTER TABLE prod.db.sample RENAME COLUMN `data` TO payload;

-- alter table type
ALTER TABLE prod.db.sample ALTER COLUMN measurement double COMMENT 'unit is 
bytes per second' AFTER `id`;

-- reset options
ALTER TABLE kafka_table RESET ('scan.startup.mode', 
'scan.startup.specific-offsets');
{code}

Note: we don't need to introduce new interfaces, because all the alter table 
operation will be forward to catalog through {{Catalog#alterTable(tablePath, 
newTable, ignoreIfNotExists)}}.

[1]: 
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/alter/#alter-table
[2]: http://iceberg.apache.org/spark-ddl/#alter-table-alter-column
[3]: https://trino.io/docs/current/sql/alter-table.html
[4]: https://dev.mysql.com/doc/refman/8.0/en/alter-table.html
[5]: https://www.postgresql.org/docs/9.1/sql-altertable.html


> ALTER TABLE statement enhancement
> ---------------------------------
>
>                 Key: FLINK-21634
>                 URL: https://issues.apache.org/jira/browse/FLINK-21634
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / API, Table SQL / Client
>            Reporter: Jark Wu
>            Priority: Major
>
> We already introduced ALTER TABLE statement in FLIP-69 [1], but only support 
> to rename table name and change table options. One useful feature of ALTER 
> TABLE statement is modifying schema. This is also heavily required by 
> integration with data lakes (e.g. iceberg). 
> Therefore, I propose to support following ALTER TABLE statements:
> {code:sql}
> ALTER TABLE table_name {
>     ADD { <schema_component> | (<schema_component> [, ...]) }
>   | MODIFY { <schema_component> | (<schema_component> [, ...]) }
>   | DROP {column_name | (column_name, column_name, ....) | PRIMARY KEY | 
> CONSTRAINT constraint_name | WATERMARK}
>   | RENAME old_column_name TO new_column_name
>   | RENAME TO new_table_name
>   | SET (key1=val1, ...)
>   | RESET (key1, ...)
> }
> <schema_component>::
>   { <column_component> | <constraint_component> | <watermark_component> }
> <column_component>::
>   column_name <column_definition> [FIRST | AFTER column_name]
> <constraint_component>::
>   [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
> <watermark_component>::
>   WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
> <column_definition>::
>   { <physical_column_definition> | <metadata_column_definition> | 
> <computed_column_definition> } [COMMENT column_comment]
> <physical_column_definition>::
>   column_type
> <metadata_column_definition>::
>   column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]
> <computed_column_definition>::
>   AS computed_column_expression
> {code}
> And some examples:
> {code:sql}
> -- add a new column 
> ALTER TABLE mytable ADD new_column STRING COMMENT 'new_column docs';
> -- add columns, constraint, and watermark
> ALTER TABLE mytable ADD (
>     log_ts STRING COMMENT 'log timestamp string' FIRST,
>     ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,
>     PRIMARY KEY (id) NOT ENFORCED,
>     WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
> );
> -- modify a column type
> ALTER TABLE prod.db.sample MODIFY measurement double COMMENT 'unit is bytes 
> per second' AFTER `id`;
> -- modify definition of column log_ts and ts, primary key, watermark. they 
> must exist in table schema
> ALTER TABLE mytable ADD (
>     log_ts STRING COMMENT 'log timestamp string' AFTER `id`,  -- reoder 
> columns
>     ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,
>     PRIMARY KEY (id) NOT ENFORCED,
>     WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
> );
> -- drop an old column
> ALTER TABLE prod.db.sample DROP measurement;
> -- drop columns
> ALTER TABLE prod.db.sample DROP (col1, col2, col3);
> -- drop a watermark
> ALTER TABLE prod.db.sample DROP WATERMARK;
> -- rename column name
> ALTER TABLE prod.db.sample RENAME `data` TO payload;
> -- rename table name
> ALTER TABLE mytable RENAME TO mytable2;
> -- set options
> ALTER TABLE kafka_table SET (
>     'scan.startup.mode' = 'specific-offsets', 
>     'scan.startup.specific-offsets' = 'partition:0,offset:42'
> );
> -- reset options
> ALTER TABLE kafka_table RESET ('scan.startup.mode', 
> 'scan.startup.specific-offsets');
> {code}
> Note: we don't need to introduce new interfaces, because all the alter table 
> operation will be forward to catalog through {{Catalog#alterTable(tablePath, 
> newTable, ignoreIfNotExists)}}.
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/alter/#alter-table
> [2]: http://iceberg.apache.org/spark-ddl/#alter-table-alter-column
> [3]: https://trino.io/docs/current/sql/alter-table.html
> [4]: https://dev.mysql.com/doc/refman/8.0/en/alter-table.html
> [5]: https://www.postgresql.org/docs/9.1/sql-altertable.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to