Hi Lincoln, thank you for this proposal and discussing the motivation for this change. I think this makes a lot of sense (as you said, we discussed this before).
I'd like to highlight the breaking change (among a several non-breaking changes) proposed here: We propose to deprecate TableEnvironment.sql(sql: String): Table and replace it by TableEnvironment.sqlQuery(sql: String): Table. The reasons for this change are: - We need a sqlUpdate() method that does not return a Table. For now the use case is "INSERT INTO x SELECT ..." but there are other DML statements as well. - In order to better distinguish query and updated functionality, we would like to rename the sql() method to sqlQuery(). - We want to call the SQL method similar to their JDBC counterparts. In JDBC the methods are executeQuery(): ResultSet and executeUpdate(): int. Since the Table API is not only SQL, we think that sqlQuery() and sqlUpdate() are good method names for the functionality. What do others think? Fabian 2017-07-31 6:50 GMT+02:00 Lin Li <lincoln.8...@gmail.com>: > Hi everybody, > I'd like to propose and discuss some api changes to support DML > operations like ‘insert into’ clause in TableAPI&SQL. > Originally this was discussed with Fabian in the PR conversations(see > https://github.com/apache/flink/pull/3829), considering it makes several > api changes, so starting this mailing list to discuss it. > # Motivation > > Currently in TableAPI there’s only registration method for source table, > when we use SQL writing a streaming job, we should add additional code for > the sink, like TableAPI does: > > val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3" > > val t = StreamTestData.getSmall3TupleDataStream(env) > > tEnv.registerDataStream("MyTable", t) > > // one way: invoke tableAPI’s writeToSink method directly > > val result = tEnv.sql(sqlQuery) > > result.writeToSink(new YourStreamSink) > > // another way: convert to datastream first and then invoke addSink > > val result = tEnv.sql(sqlQuery).toDataStream[Row] > > result.addSink(new StreamITCase.StringSink) > > From the api we can see the sink table always be a derived table because > its 'schema' is inferred from the result type of upstream query. > > Compare to traditional RDBMS which support DML syntax, a query with a > target output could be written like this: > > insert into table target_table_name > > [(column_name [ ,...n ])] > > query > > The equivalent form of the example above is as follows: > > tEnv.registerTableSink("targetTable", new YourSink) > > val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable" > > val result = tEnv.sql(sql) > > It is supported by Calcite’s grammar: > > insert:( INSERT | UPSERT ) INTO tablePrimary > > [ '(' column [, column ]* ')' ] > > query > > I'd like to extend Flink TableAPI to support such feature. > # Proposed changes > > 1. support registering a sink table (like source table registration, and > will do validation according to the registered table) > > /** > > * Registers an external [[TableSink]] in this [[TableEnvironment]]'s > catalog. > > * Registered sink tables can be referenced in SQL DML clause. > > * > > * @param name The name under which the [[TableSink]] is registered. > > * @param tableSink The [[TableSink]] to register. > > */ > > def registerTableSink(name: String, tableSink: TableSink[_]): Unit > > > 2. add two new methods to table.scala > > - > > def insertInto[T](tableSink: String): Unit > - > > def insertInto[T](tableSink: String, conf: QueryConfig): Unit > > I propose to retain the current writeToSink method so that will not do a > breaking change of the API. And in a sense, it is similar with ‘SQL CREATE > TABLE AS statement’ usage in RDBMS(which creates a table from an existing > table by copying the existing table's columns). > > 3. deprecate the current sql method and add two new methods to > TableEnvironment > > - > > @deprecated def sql(sql: String): Table > - > > def sqlQuery(sql: String): Table > - > > def sqlUpdate(sql: String, config: QueryConfig): Unit > > I think the method sqlUpdate here is different from Jdbc's[1] executeUpdate > which returns a int value, because sqlUpdate will not trigger an execution > immediately, so keep the return value as Unit sounds reasonable and doesn't > break down the consistency of Scala and Java APIs. > > Note that: > > A registered source table can not be update unless it registered as a sink > table as well. So we need to add validation both in TableAPI and SQL for > preventing query on sink table or insert into a source table. > > Do not support partial column insertion to a target table due to it hadn’t > nullable property definition for now. > > Ref: > [1] > https://docs.oracle.com/javase/7/docs/api/java/sql/PreparedStatement.html > > > doc link: https://goo.gl/n3phK5 > > What do you think? > > Best, Lincoln >