+1 to support this change, as it makes the sql API more accurate and elegant. I hope this will not introduce too much troubles in the release upgrading for the existing flink SQL users.
On Mon, Jul 31, 2017 at 3:42 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Lincoln, > > thank you for this proposal and discussing the motivation for this change. > I think this makes a lot of sense (as you said, we discussed this before). > > I'd like to highlight the breaking change (among a several non-breaking > changes) proposed here: > > We propose to deprecate TableEnvironment.sql(sql: String): Table and > replace it by TableEnvironment.sqlQuery(sql: String): Table. > > The reasons for this change are: > - We need a sqlUpdate() method that does not return a Table. For now the > use case is "INSERT INTO x SELECT ..." but there are other DML statements > as well. > - In order to better distinguish query and updated functionality, we would > like to rename the sql() method to sqlQuery(). > - We want to call the SQL method similar to their JDBC counterparts. In > JDBC the methods are executeQuery(): ResultSet and executeUpdate(): int. > Since the Table API is not only SQL, we think that sqlQuery() and > sqlUpdate() are good method names for the functionality. > > What do others think? > > Fabian > > > > 2017-07-31 6:50 GMT+02:00 Lin Li <lincoln.8...@gmail.com>: > > > Hi everybody, > > I'd like to propose and discuss some api changes to support DML > > operations like ‘insert into’ clause in TableAPI&SQL. > > Originally this was discussed with Fabian in the PR conversations(see > > https://github.com/apache/flink/pull/3829), considering it makes > several > > api changes, so starting this mailing list to discuss it. > > # Motivation > > > > Currently in TableAPI there’s only registration method for source table, > > when we use SQL writing a streaming job, we should add additional code > for > > the sink, like TableAPI does: > > > > val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3" > > > > val t = StreamTestData.getSmall3TupleDataStream(env) > > > > tEnv.registerDataStream("MyTable", t) > > > > // one way: invoke tableAPI’s writeToSink method directly > > > > val result = tEnv.sql(sqlQuery) > > > > result.writeToSink(new YourStreamSink) > > > > // another way: convert to datastream first and then invoke addSink > > > > val result = tEnv.sql(sqlQuery).toDataStream[Row] > > > > result.addSink(new StreamITCase.StringSink) > > > > From the api we can see the sink table always be a derived table because > > its 'schema' is inferred from the result type of upstream query. > > > > Compare to traditional RDBMS which support DML syntax, a query with a > > target output could be written like this: > > > > insert into table target_table_name > > > > [(column_name [ ,...n ])] > > > > query > > > > The equivalent form of the example above is as follows: > > > > tEnv.registerTableSink("targetTable", new YourSink) > > > > val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable" > > > > val result = tEnv.sql(sql) > > > > It is supported by Calcite’s grammar: > > > > insert:( INSERT | UPSERT ) INTO tablePrimary > > > > [ '(' column [, column ]* ')' ] > > > > query > > > > I'd like to extend Flink TableAPI to support such feature. > > # Proposed changes > > > > 1. support registering a sink table (like source table registration, and > > will do validation according to the registered table) > > > > /** > > > > * Registers an external [[TableSink]] in this [[TableEnvironment]]'s > > catalog. > > > > * Registered sink tables can be referenced in SQL DML clause. > > > > * > > > > * @param name The name under which the [[TableSink]] is registered. > > > > * @param tableSink The [[TableSink]] to register. > > > > */ > > > > def registerTableSink(name: String, tableSink: TableSink[_]): Unit > > > > > > 2. add two new methods to table.scala > > > > - > > > > def insertInto[T](tableSink: String): Unit > > - > > > > def insertInto[T](tableSink: String, conf: QueryConfig): Unit > > > > I propose to retain the current writeToSink method so that will not do a > > breaking change of the API. And in a sense, it is similar with ‘SQL > CREATE > > TABLE AS statement’ usage in RDBMS(which creates a table from an existing > > table by copying the existing table's columns). > > > > 3. deprecate the current sql method and add two new methods to > > TableEnvironment > > > > - > > > > @deprecated def sql(sql: String): Table > > - > > > > def sqlQuery(sql: String): Table > > - > > > > def sqlUpdate(sql: String, config: QueryConfig): Unit > > > > I think the method sqlUpdate here is different from Jdbc's[1] > executeUpdate > > which returns a int value, because sqlUpdate will not trigger an > execution > > immediately, so keep the return value as Unit sounds reasonable and > doesn't > > break down the consistency of Scala and Java APIs. > > > > Note that: > > > > A registered source table can not be update unless it registered as a > sink > > table as well. So we need to add validation both in TableAPI and SQL for > > preventing query on sink table or insert into a source table. > > > > Do not support partial column insertion to a target table due to it > hadn’t > > nullable property definition for now. > > > > Ref: > > [1] > > https://docs.oracle.com/javase/7/docs/api/java/sql/ > PreparedStatement.html > > > > > > doc link: https://goo.gl/n3phK5 > > > > What do you think? > > > > Best, Lincoln > > >