+1 to support this change, as it makes the sql API more accurate and
elegant.
I hope this will not introduce too much troubles in the release upgrading
for the existing flink SQL users.


On Mon, Jul 31, 2017 at 3:42 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Lincoln,
>
> thank you for this proposal and discussing the motivation for this change.
> I think this makes a lot of sense (as you said, we discussed this before).
>
> I'd like to highlight the breaking change (among a several non-breaking
> changes) proposed here:
>
> We propose to deprecate TableEnvironment.sql(sql: String): Table and
> replace it by TableEnvironment.sqlQuery(sql: String): Table.
>
> The reasons for this change are:
> - We need a sqlUpdate() method that does not return a Table. For now the
> use case is "INSERT INTO x SELECT ..." but there are other DML statements
> as well.
> - In order to better distinguish query and updated functionality, we would
> like to rename the sql() method to sqlQuery().
> - We want to call the SQL method similar to their JDBC counterparts. In
> JDBC the methods are executeQuery(): ResultSet and executeUpdate(): int.
> Since the Table API is not only SQL, we think that sqlQuery() and
> sqlUpdate() are good method names for the functionality.
>
> What do others think?
>
> Fabian
>
>
>
> 2017-07-31 6:50 GMT+02:00 Lin Li <lincoln.8...@gmail.com>:
>
> > Hi everybody,
> >   I'd like to propose and discuss some api changes to support DML
> > operations like ‘insert into’ clause in TableAPI&SQL.
> >  Originally this was discussed with Fabian in the PR conversations(see
> > https://github.com/apache/flink/pull/3829),  considering it makes
> several
> > api changes, so starting this mailing list to discuss it.
> > # Motivation
> >
> > Currently in TableAPI  there’s only registration method for source table,
> >  when we use SQL writing a streaming job, we should add additional code
> for
> > the sink, like TableAPI does:
> >
> > val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
> >
> > val t = StreamTestData.getSmall3TupleDataStream(env)
> >
> > tEnv.registerDataStream("MyTable", t)
> >
> > // one way: invoke tableAPI’s writeToSink method directly
> >
> > val result = tEnv.sql(sqlQuery)
> >
> > result.writeToSink(new YourStreamSink)
> >
> > // another way: convert to datastream first and then invoke addSink
> >
> > val result = tEnv.sql(sqlQuery).toDataStream[Row]
> >
> > result.addSink(new StreamITCase.StringSink)
> >
> > From the api we can see the sink table always be a derived table because
> > its 'schema' is inferred from the result type of upstream query.
> >
> > Compare to traditional RDBMS which support DML syntax, a query with a
> > target output could be written like this:
> >
> > insert into table target_table_name
> >
> > [(column_name [ ,...n ])]
> >
> > query
> >
> > The equivalent form of the example above is as follows:
> >
> >    tEnv.registerTableSink("targetTable", new YourSink)
> >
> >    val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
> >
> >    val result = tEnv.sql(sql)
> >
> > It is supported by Calcite’s grammar:
> >
> > insert:( INSERT | UPSERT ) INTO tablePrimary
> >
> > [ '(' column [, column ]* ')' ]
> >
> > query
> >
> > I'd like to extend Flink TableAPI to support such feature.
> > # Proposed changes
> >
> > 1. support registering a sink table (like source table registration, and
> > will do validation according to the registered table)
> >
> > /**
> >
> >  * Registers an external [[TableSink]] in this [[TableEnvironment]]'s
> > catalog.
> >
> >  * Registered sink tables can be referenced in SQL DML clause.
> >
> >  *
> >
> >  * @param name The name under which the [[TableSink]] is registered.
> >
> >  * @param tableSink The [[TableSink]] to register.
> >
> >  */
> >
> > def registerTableSink(name: String, tableSink: TableSink[_]): Unit
> >
> >
> > 2. add two new methods to table.scala
> >
> >    -
> >
> >    def insertInto[T](tableSink: String): Unit
> >    -
> >
> >    def insertInto[T](tableSink: String, conf: QueryConfig): Unit
> >
> > I propose to retain the current writeToSink method so that will not do a
> > breaking change of the API. And in a sense, it is similar with ‘SQL
> CREATE
> > TABLE AS statement’ usage in RDBMS(which creates a table from an existing
> > table by copying the existing table's columns).
> >
> > 3. deprecate the current sql method and add two new methods to
> > TableEnvironment
> >
> >    -
> >
> >    @deprecated def sql(sql: String): Table
> >    -
> >
> >    def sqlQuery(sql: String): Table
> >    -
> >
> >    def sqlUpdate(sql: String, config: QueryConfig): Unit
> >
> > I think the method sqlUpdate here is different from Jdbc's[1]
> executeUpdate
> > which returns a int value, because sqlUpdate will not trigger an
> execution
> > immediately, so keep the return value as Unit sounds reasonable and
> doesn't
> > break down the consistency of Scala and Java APIs.
> >
> > Note that:
> >
> > A registered source table can not be update unless it registered as a
> sink
> > table as well. So we need to add validation both in TableAPI and SQL for
> > preventing query on sink table or insert into a source table.
> >
> > Do not support partial column insertion to a target table due to it
> hadn’t
> > nullable property definition for now.
> >
> > Ref:
> > [1]
> > https://docs.oracle.com/javase/7/docs/api/java/sql/
> PreparedStatement.html
> >
> >
> > doc link: https://goo.gl/n3phK5
> >
> > What do you think?
> >
> > Best, Lincoln
> >
>

Reply via email to