Hi, dear Flink community ~ The data integration is one of the most popular use case for Flink, from the point of SQL, we aim to design an API for SQL users in data ingestion. We have referenced some RDBMS and there is a classic CTAS syntax for table data copy [1][2][3][4].
We decide to use the CTAS syntax for Flink data ingestion use cases too here. *The Syntax* CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name [COMMENT table_comment] [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] WITH (key1=val1, key2=val2, ...) [ LIKE source_table [( <like_options> )] ] [AS query ]<query>: TABLE table_name | select_statement Explanation: - AS TABLE: copy source table data into target, schema evolution is supported - AS select_statement: copy source table data into target, schema evolution is disabled, for e.g, the schema is solid for compile time - SELECT statement should alias each expression, if no explicit alias is supported, an error throws (auto-gen names are hard to use and confusing) - AS and LIKE should not both exists because the functionality conflicts - By default, the WITH options are not copied, because most of the time, the options are different for source and sink. We disable the schema evolution for AS SELECT to support use non-evolution cases and to avoid unclear semantics for SQL transformation, for example, a SELECT * in a Join query. *The Use Case* Here is an example: create table dlf_catalog.default.hudi_sinkPARTITIONED BY (ts)WITH ( 'connector'='hudi', 'table.type'='MERGE_ON_READ')AS TABLE mysql_cdc_catalog.default.mysql_source; This statement creates a hudi sink table with same schema of MYSQL source table, the partition field is "ts". Here is a use case: CREATE CATALOG mysql_cdc WITH (type=mysql, instance=instance1, db=db1 ...);CREATE CATALOG dlf_datalake WITH (type = dlf, endpoint='http://xxx', accessKey='abc'); USE CATALOG dlf_datalake;CREATE TABLE ods_hudi_tablePARTITIONED BY (ts)WITH (connector='hudi', ...)AS TABLE mysql_table;// the sync pipeline was pulled up from here *About the Atomicity* The statements has 2 actions: 1. create the table 2. start the data sync pipeline The Flink catalog does not ensure the atomicity, thus we do not keep the atomicity here: - If a table exists when creation but [IF NOT EXISTS] keywords are not specified, throws - If the table is created but the write fails, the table is not dropped, user should drop it manually *Q & A* 1. some catalog does not support custom with options storage The catalog should throw directly thus the statement execute fails 2. how to write data into existing table with history data Declare [IF NOT EXISTS] keywords and we ignore the table creation but the pipeline still starts up 3. How to match sub-database and sub-table ? Use regex style source table name, such as: create table dlf_catalog.default.hudi_sinkPARTITIONED BY (ts)WITH ( 'connector'='hudi', 'table.type'='MERGE_ON_READ')AS TABLE `instant1.db1.*.user_*`; This statement would sync all the databases with "db1" as prefix and table name with "user_" as prefix into the target table. 4. If user defines new field with nullability as false and no default value specified, but we want schema evolution for that Throws because we do not know how to fill the defaults with nullability as false. Waiting for your nice ideas, thanks ~ [1] postgresql: https://www.postgresql.org/docs/13/sql-createtableas.html [2] spark3.0: https://spark.apache.org/docs/3.0.0/sql-ref-syntax-ddl-create-table-datasource.html [3] Hive 3.0: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-CreateTableCreate/Drop/TruncateTable [4] MySQL 5.6: https://dev.mysql.com/doc/refman/5.6/en/create-table-select.html Best, Danny Chan