Hi Lincoln,

thank you for this proposal and discussing the motivation for this change.
I think this makes a lot of sense (as you said, we discussed this before).

I'd like to highlight the breaking change (among a several non-breaking
changes) proposed here:

We propose to deprecate TableEnvironment.sql(sql: String): Table and
replace it by TableEnvironment.sqlQuery(sql: String): Table.

The reasons for this change are:
- We need a sqlUpdate() method that does not return a Table. For now the
use case is "INSERT INTO x SELECT ..." but there are other DML statements
as well.
- In order to better distinguish query and updated functionality, we would
like to rename the sql() method to sqlQuery().
- We want to call the SQL method similar to their JDBC counterparts. In
JDBC the methods are executeQuery(): ResultSet and executeUpdate(): int.
Since the Table API is not only SQL, we think that sqlQuery() and
sqlUpdate() are good method names for the functionality.

What do others think?

Fabian



2017-07-31 6:50 GMT+02:00 Lin Li <lincoln.8...@gmail.com>:

> Hi everybody,
>   I'd like to propose and discuss some api changes to support DML
> operations like ‘insert into’ clause in TableAPI&SQL.
>  Originally this was discussed with Fabian in the PR conversations(see
> https://github.com/apache/flink/pull/3829),  considering it makes several
> api changes, so starting this mailing list to discuss it.
> # Motivation
>
> Currently in TableAPI  there’s only registration method for source table,
>  when we use SQL writing a streaming job, we should add additional code for
> the sink, like TableAPI does:
>
> val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
>
> val t = StreamTestData.getSmall3TupleDataStream(env)
>
> tEnv.registerDataStream("MyTable", t)
>
> // one way: invoke tableAPI’s writeToSink method directly
>
> val result = tEnv.sql(sqlQuery)
>
> result.writeToSink(new YourStreamSink)
>
> // another way: convert to datastream first and then invoke addSink
>
> val result = tEnv.sql(sqlQuery).toDataStream[Row]
>
> result.addSink(new StreamITCase.StringSink)
>
> From the api we can see the sink table always be a derived table because
> its 'schema' is inferred from the result type of upstream query.
>
> Compare to traditional RDBMS which support DML syntax, a query with a
> target output could be written like this:
>
> insert into table target_table_name
>
> [(column_name [ ,...n ])]
>
> query
>
> The equivalent form of the example above is as follows:
>
>    tEnv.registerTableSink("targetTable", new YourSink)
>
>    val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
>
>    val result = tEnv.sql(sql)
>
> It is supported by Calcite’s grammar:
>
> insert:( INSERT | UPSERT ) INTO tablePrimary
>
> [ '(' column [, column ]* ')' ]
>
> query
>
> I'd like to extend Flink TableAPI to support such feature.
> # Proposed changes
>
> 1. support registering a sink table (like source table registration, and
> will do validation according to the registered table)
>
> /**
>
>  * Registers an external [[TableSink]] in this [[TableEnvironment]]'s
> catalog.
>
>  * Registered sink tables can be referenced in SQL DML clause.
>
>  *
>
>  * @param name The name under which the [[TableSink]] is registered.
>
>  * @param tableSink The [[TableSink]] to register.
>
>  */
>
> def registerTableSink(name: String, tableSink: TableSink[_]): Unit
>
>
> 2. add two new methods to table.scala
>
>    -
>
>    def insertInto[T](tableSink: String): Unit
>    -
>
>    def insertInto[T](tableSink: String, conf: QueryConfig): Unit
>
> I propose to retain the current writeToSink method so that will not do a
> breaking change of the API. And in a sense, it is similar with ‘SQL CREATE
> TABLE AS statement’ usage in RDBMS(which creates a table from an existing
> table by copying the existing table's columns).
>
> 3. deprecate the current sql method and add two new methods to
> TableEnvironment
>
>    -
>
>    @deprecated def sql(sql: String): Table
>    -
>
>    def sqlQuery(sql: String): Table
>    -
>
>    def sqlUpdate(sql: String, config: QueryConfig): Unit
>
> I think the method sqlUpdate here is different from Jdbc's[1] executeUpdate
> which returns a int value, because sqlUpdate will not trigger an execution
> immediately, so keep the return value as Unit sounds reasonable and doesn't
> break down the consistency of Scala and Java APIs.
>
> Note that:
>
> A registered source table can not be update unless it registered as a sink
> table as well. So we need to add validation both in TableAPI and SQL for
> preventing query on sink table or insert into a source table.
>
> Do not support partial column insertion to a target table due to it hadn’t
> nullable property definition for now.
>
> Ref:
> [1]
> https://docs.oracle.com/javase/7/docs/api/java/sql/PreparedStatement.html
>
>
> doc link: https://goo.gl/n3phK5
>
> What do you think?
>
> Best, Lincoln
>

Reply via email to