+1 Best, Jingsong Lee
On Sun, May 17, 2020 at 3:42 PM Kurt Young <ykt...@gmail.com> wrote: > +1 from my side. > > Best, > Kurt > > > On Sun, May 17, 2020 at 12:41 PM godfrey he <godfre...@gmail.com> wrote: > > > hi everyone, > > > > I would like to bring up another topic about the return value of > > TableResult#collect() method. > > Currently, the return type is `Iterator<Row>`, we meet some problems when > > implementing FLINK-14807[1]. > > > > In current design, the sink operator has a buffer pool which buffers the > > data from upstream, > > and waits the client to consume the data. The client will pull the data > > when `Iterator<Row>#next()` method is called. > > If the client submits a select job, consumes a part of data and exits. > The > > job will not be finished. > > This will cause resource leak. We can't require the client must consume > all > > data. for unbounded stream job, it's also impossible. > > Currently, users can also cancel the job via > > `TableResult.getJobClient().get().cancel()` method. > > But this approach is not intuitive and convenient. > > > > So, I want to change the return type from `Iterator<Row>` to > > `CloseableRowIterator`, > > the new method likes like: > > > > public interface TableResult { > > > > CloseableRowIterator collect(); > > > > } > > > > public interface CloseableRowIterator extends Iterator<Row>, > AutoCloseable > > { > > > > } > > > > Prefixing the name with "Closeable" is intended to remind the users that > > this iterator should be closed, > > users can conveniently use try-with-resources statement to close the > > resources. > > The resource leak problem is still there if users do not close the > iterator > > or cancel the job through job client, > > we just provide an easier way for users to avoid this. > > > > I also notice that there is a `CloseableIterator` interface in > > `org.apache.flink.util` package. > > But I still tend to introduce `CloseableRowIterator`. My point of view > is: > > 1) `CloseableIterator` is in a util package, not a public interface. > > 2) `CloseableRowIterator` is more convenient, users do not need to define > > generic type `<Row>`. > > > > What do you think? > > > > Best, > > Godfrey > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-14807 > > > > > > Fabian Hueske <fhue...@gmail.com> 于2020年5月7日周四 下午3:59写道: > > > > > Thanks for the update Godfrey! > > > > > > +1 to this approach. > > > > > > Since there can be only one primary key, I'd also be fine to just use > > > `PRI` even if it is composite, but `PRI(f0, f5)` might be more > convenient > > > for users. > > > > > > Thanks, Fabian > > > > > > Am Do., 7. Mai 2020 um 09:31 Uhr schrieb godfrey he < > godfre...@gmail.com > > >: > > > > > >> Hi fabian, > > >> Thanks for you suggestions. > > >> > > >> Agree with you that `UNQ(f2, f3)` is more clear. > > >> > > >> A table can have only ONE primary key, > > >> this primary key can consist of single or multiple columns. [1] > > >> if primary key consists of single column, > > >> we can simply use `PRI` (or `PRI(xx)`) to represent it. > > >> if primary key have multiple columns, > > >> we should use `PRI(xx, yy, ...)` to represent it. > > >> > > >> A table may have multiple unique keys, > > >> each unique key can consist of single or multiple columns. [2] > > >> if there is only one unique key and this unique key has only single > > >> column, > > >> we can simply use `UNQ` (or `UNQ(xx)`) to represent it. > > >> otherwise, we should use `UNQ(xx, yy, ...)` to represent it. > > >> (a corner case: two unique keys with same columns, like `UNQ(f2, f3)`, > > >> `UNQ(f2, f3)`, > > >> we can forbid this case or add a unique name for each key in the > future) > > >> > > >> primary key and unique key with multiple columns example: > > >> create table MyTable ( > > >> f0 BIGINT NOT NULL, > > >> f1 ROW<q1 STRING, q2 TIMESTAMP(3)>, > > >> f2 VARCHAR<256>, > > >> f3 AS f0 + 1, > > >> f4 TIMESTAMP(3) NOT NULL, > > >> f5 BIGINT NOT NULL, > > >> * PRIMARY KEY (f0, f5)*, > > >> *UNIQUE (f3, f2)*, > > >> WATERMARK f4 AS f4 - INTERVAL '3' SECOND > > >> ) with (...) > > >> > > >> > > >> > > > +--------+------------------------------------------------------+-------+----------------+-----------------------+--------------------------------------+ > > >> | name | type > | > > >> null | key | compute column | watermark > > >> | > > >> > > >> > > > +--------+------------------------------------------------------+-------+----------------+-----------------------+--------------------------------------+ > > >> | f0 | BIGINT > > | > > >> false | PRI(f0, f5) | (NULL) | (NULL) > > >> | > > >> > > >> > > > +--------+------------------------------------------------------+-------+----------------+-----------------------+--------------------------------------+ > > >> | f1 | ROW<q1 STRING, q2 TIMESTAMP(3)> | true | (NULL) > | > > >> (NULL) | (NULL) | > > >> > > >> > > > +--------+------------------------------------------------------+-------+----------------+-----------------------+--------------------------------------+ > > >> | f2 | VARCHAR<256> | true > | > > >> UNQ(f2, f3) | (NULL) | (NULL) > > >> | > > >> > > >> > > > +--------+------------------------------------------------------+-------+----------------+-----------------------+--------------------------------------+ > > >> | f3 | BIGINT > > | > > >> false | UNQ(f2, f3) | f0 + 1 | (NULL) > > >> | > > >> > > >> > > > +--------+------------------------------------------------------+-------+----------------+-----------------------+--------------------------------------+ > > >> | f4 | TIMESTAMP(3) | > false > > >> | (NULL) | (NULL) | f4 - INTERVAL '3' SECOND | > > >> > > >> > > > +--------+------------------------------------------------------+-------+----------------+-----------------------+--------------------------------------+ > > >> | f5 | BIGINT > > | > > >> false | PRI(f0, f5) | (NULL) | (NULL) > > >> | > > >> > > >> > > > +--------+------------------------------------------------------+-------+----------------+-----------------------+--------------------------------------+ > > >> > > >> "Regarding to the watermark on nested columns", that's a good approach > > >> which can both support watermark on nested columns in the future and > > keep > > >> current table form. > > >> > > >> [1] https://www.w3schools.com/sql/sql_primarykey.asp > > >> [2] https://www.w3schools.com/sql/sql_unique.ASP > > >> > > >> Best, > > >> Godfrey > > >> > > >> Fabian Hueske <fhue...@gmail.com> 于2020年5月7日周四 上午12:03写道: > > >> > > >>> Hi Godfrey, > > >>> > > >>> This looks good to me. > > >>> > > >>> One side note, indicating unique constraints with "UNQ" is probably > not > > >>> enough. > > >>> There might be multiple unique constraints and users would like to > know > > >>> which field combinations are unique. > > >>> So in your example above, "UNQ(f2, f3)" might be a better marker. > > >>> > > >>> Just as a thought, if we would later add support for watermark on > > nested > > >>> columns, we could add a row just for the nested field (in addition to > > the > > >>> top-level field) like this: > > >>> > > >>> > > >>> > > > +------------------------+---------------------------+-------+-----------+-------------+-----------------------------------------------------------+ > > >>> | f4.nested.rowtime | TIMESTAMP(3) | false | (NULL) | (NULL) > > >>> | f4.nested.rowtime - INTERVAL '3' SECOND | > > >>> > > >>> > > > +------------------------+---------------------------+-------+-----------+-------------+-----------------------------------------------------------+ > > >>> > > >>> Thanks, > > >>> Fabian > > >>> > > >>> Am Mi., 6. Mai 2020 um 17:51 Uhr schrieb godfrey he < > > godfre...@gmail.com > > >>> >: > > >>> > > >>>> Hi @fhue...@gmail.com @Timo Walther <twal...@apache.org> @Dawid > > >>>> Wysakowicz <dwysakow...@apache.org> > > >>>> What do you think we limit watermark must be defined on top-level > > >>>> column ? > > >>>> > > >>>> if we do that, we can add an expression column to represent > watermark > > >>>> like compute column, > > >>>> An example of all cases: > > >>>> create table MyTable ( > > >>>> f0 BIGINT NOT NULL, > > >>>> f1 ROW<q1 STRING, q2 TIMESTAMP(3)>, > > >>>> f2 VARCHAR<256>, > > >>>> f3 AS f0 + 1, > > >>>> f4 TIMESTAMP(3) NOT NULL, > > >>>> PRIMARY KEY (f0), > > >>>> UNIQUE (f3, f2), > > >>>> WATERMARK f4 AS f4 - INTERVAL '3' SECOND > > >>>> ) with (...) > > >>>> > > >>>> > > >>>> > > > +--------+------------------------------------------------------+-------+-----------+-----------------------+--------------------------------------+ > > >>>> | name | type > > >>>> | null | key | compute column | watermark > > >>>> | > > >>>> > > >>>> > > > +--------+------------------------------------------------------+-------+-----------+-----------------------+--------------------------------------+ > > >>>> | f0 | BIGINT > > >>>> | false | PRI | (NULL) | (NULL) > > >>>> | > > >>>> > > >>>> > > > +--------+------------------------------------------------------+-------+-----------+-----------------------+--------------------------------------+ > > >>>> | f1 | ROW<q1 STRING, q2 TIMESTAMP(3)> | true | (NULL) | > > >>>> (NULL) | (NULL) | > > >>>> > > >>>> > > > +--------+------------------------------------------------------+-------+-----------+-----------------------+--------------------------------------+ > > >>>> | f2 | VARCHAR<256> | true > > >>>> | UNQ | (NULL) | (NULL) > > >>>> | > > >>>> > > >>>> > > > +--------+------------------------------------------------------+-------+-----------+-----------------------+--------------------------------------+ > > >>>> | f3 | BIGINT > > >>>> | false | UNQ | f0 + 1 | (NULL) > > >>>> | > > >>>> > > >>>> > > > +--------+------------------------------------------------------+-------+-----------+-----------------------+--------------------------------------+ > > >>>> | f4 | TIMESTAMP(3) | > > >>>> false | (NULL) | (NULL) | f4 - INTERVAL '3' SECOND | > > >>>> > > >>>> > > > +--------+------------------------------------------------------+-------+-----------+-----------------------+--------------------------------------+ > > >>>> > > >>>> WDYT ? > > >>>> > > >>>> Best, > > >>>> Godfrey > > >>>> > > >>>> > > >>>> > > >>>> godfrey he <godfre...@gmail.com> 于2020年4月30日周四 下午11:57写道: > > >>>> > > >>>>> Hi Fabian, > > >>>>> > > >>>>> the broken example is: > > >>>>> > > >>>>> create table MyTable ( > > >>>>> > > >>>>> f0 BIGINT NOT NULL, > > >>>>> > > >>>>> f1 ROW<q1 STRING, q2 TIMESTAMP(3)>, > > >>>>> > > >>>>> f2 VARCHAR<256>, > > >>>>> > > >>>>> f3 AS f0 + 1, > > >>>>> > > >>>>> PRIMARY KEY (f0), > > >>>>> > > >>>>> UNIQUE (f3, f2), > > >>>>> > > >>>>> WATERMARK f1.q2 AS (`f1.q2` - INTERVAL '3' SECOND) > > >>>>> > > >>>>> ) with (...) > > >>>>> > > >>>>> > > >>>>> name > > >>>>> > > >>>>> type > > >>>>> > > >>>>> key > > >>>>> > > >>>>> compute column > > >>>>> > > >>>>> watermark > > >>>>> > > >>>>> f0 > > >>>>> > > >>>>> BIGINT NOT NULL > > >>>>> > > >>>>> PRI > > >>>>> > > >>>>> (NULL) > > >>>>> > > >>>>> f1 > > >>>>> > > >>>>> ROW<`q1` STRING, `q2` TIMESTAMP(3)> > > >>>>> > > >>>>> UNQ > > >>>>> > > >>>>> (NULL) > > >>>>> > > >>>>> f1.q2 AS (`f1.q2` - INTERVAL '3' SECOND) > > >>>>> > > >>>>> f2 > > >>>>> > > >>>>> VARCHAR<256> > > >>>>> > > >>>>> (NULL) > > >>>>> > > >>>>> NULL > > >>>>> > > >>>>> f3 > > >>>>> > > >>>>> BIGINT NOT NULL > > >>>>> > > >>>>> UNQ > > >>>>> > > >>>>> f0 + 1 > > >>>>> > > >>>>> > > >>>>> or we add a column to represent nullability. > > >>>>> > > >>>>> name > > >>>>> > > >>>>> type > > >>>>> > > >>>>> null > > >>>>> > > >>>>> key > > >>>>> > > >>>>> compute column > > >>>>> > > >>>>> watermark > > >>>>> > > >>>>> f0 > > >>>>> > > >>>>> BIGINT > > >>>>> > > >>>>> false > > >>>>> > > >>>>> PRI > > >>>>> > > >>>>> (NULL) > > >>>>> > > >>>>> f1 > > >>>>> > > >>>>> ROW<`q1` STRING, `q2` TIMESTAMP(3)> > > >>>>> > > >>>>> true > > >>>>> > > >>>>> UNQ > > >>>>> > > >>>>> (NULL) > > >>>>> > > >>>>> f1.q2 AS (`f1.q2` - INTERVAL '3' SECOND) > > >>>>> > > >>>>> f2 > > >>>>> > > >>>>> VARCHAR<256> > > >>>>> > > >>>>> true > > >>>>> > > >>>>> (NULL) > > >>>>> > > >>>>> NULL > > >>>>> > > >>>>> f3 > > >>>>> > > >>>>> BIGINT > > >>>>> > > >>>>> false > > >>>>> > > >>>>> UNQ > > >>>>> > > >>>>> f0 + 1 > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> Hi Jark, > > >>>>> If we can limit watermark must be defined on top-level column, > > >>>>> this will become more simple. > > >>>>> > > >>>>> Best, > > >>>>> Godfrey > > >>>>> > > >>>>> Jark Wu <imj...@gmail.com> 于2020年4月30日周四 下午11:38写道: > > >>>>> > > >>>>>> Hi, > > >>>>>> > > >>>>>> I'm in favor of Fabian's proposal. > > >>>>>> First, watermark is not a column, but a metadata just like primary > > >>>>>> key, so > > >>>>>> shouldn't stand with columns. > > >>>>>> Second, AFAIK, primary key can only be defined on top-level > columns. > > >>>>>> Third, I think watermark can also follow primary key than only > allow > > >>>>>> to > > >>>>>> define on top-level columns. > > >>>>>> > > >>>>>> I have to admit that in FLIP-66, watermark can define on nested > > >>>>>> fields. > > >>>>>> However, during implementation, I found that it's too complicated > to > > >>>>>> do > > >>>>>> that. We have refactor time-based physical nodes, > > >>>>>> we have to use code generation to access event-time, we have to > > >>>>>> refactor > > >>>>>> FlinkTypeFactory to support a complex nested rowtime. > > >>>>>> There is not much value of this feature, but introduce a lot of > > >>>>>> complexity > > >>>>>> in code base. > > >>>>>> So I think we can force watermark define on top-level columns. If > > >>>>>> user want > > >>>>>> to define on nested columns, > > >>>>>> he/she can use computed column to be a top-level column. > > >>>>>> > > >>>>>> Best, > > >>>>>> Jark > > >>>>>> > > >>>>>> > > >>>>>> On Thu, 30 Apr 2020 at 17:55, Fabian Hueske <fhue...@gmail.com> > > >>>>>> wrote: > > >>>>>> > > >>>>>> > Hi Godfrey, > > >>>>>> > > > >>>>>> > The formatting of your example seems to be broken. > > >>>>>> > Could you send them again please? > > >>>>>> > > > >>>>>> > Regarding your points > > >>>>>> > > because watermark express can be a sub-column, just like > `f1.q2` > > >>>>>> in above > > >>>>>> > example I give. > > >>>>>> > > > >>>>>> > I would put the watermark information in the row of the > top-level > > >>>>>> field and > > >>>>>> > indicate to which nested field the watermark refers. > > >>>>>> > Don't we have to solve the same issue for primary keys that are > > >>>>>> defined on > > >>>>>> > a nested field? > > >>>>>> > > > >>>>>> > > A boolean flag can't represent such info. and I do know > whether > > >>>>>> we will > > >>>>>> > support complex watermark expression involving multiple columns > in > > >>>>>> the > > >>>>>> > future. such as: "WATERMARK FOR ts as ts + f1 + interval '1' > > second" > > >>>>>> > > > >>>>>> > You are right, a simple binary flag is definitely not sufficient > > to > > >>>>>> display > > >>>>>> > the watermark information. > > >>>>>> > I would put the expression string into the field, i.e., "ts + > f1 + > > >>>>>> interval > > >>>>>> > '1' second" > > >>>>>> > > > >>>>>> > > > >>>>>> > For me the most important point of why to not show the watermark > > as > > >>>>>> a row > > >>>>>> > in the table is that it is not field that can be queried but > meta > > >>>>>> > information on an existing field. > > >>>>>> > For the user it is important to know that a certain field has a > > >>>>>> watermark. > > >>>>>> > Otherwise, certain queries cannot be correctly specified. > > >>>>>> > Also there might be support for multiple watermarks that are > > >>>>>> defined of > > >>>>>> > different fields at some point. Would those be printed in > multiple > > >>>>>> rows? > > >>>>>> > > > >>>>>> > Best, > > >>>>>> > Fabian > > >>>>>> > > > >>>>>> > > > >>>>>> > Am Do., 30. Apr. 2020 um 11:25 Uhr schrieb godfrey he < > > >>>>>> godfre...@gmail.com > > >>>>>> > >: > > >>>>>> > > > >>>>>> > > Hi Fabian, Aljoscha > > >>>>>> > > > > >>>>>> > > Thanks for the feedback. > > >>>>>> > > > > >>>>>> > > Agree with you that we can deal with primary key as you > > mentioned. > > >>>>>> > > now, the type column has contained the nullability attribute, > > >>>>>> e.g. BIGINT > > >>>>>> > > NOT NULL. > > >>>>>> > > (I'm also ok that we use two columns to represent type just > like > > >>>>>> mysql) > > >>>>>> > > > > >>>>>> > > >Why I treat `watermark` as a special row ? > > >>>>>> > > because watermark express can be a sub-column, just like > `f1.q2` > > >>>>>> in above > > >>>>>> > > example I give. > > >>>>>> > > A boolean flag can't represent such info. and I do know > whether > > >>>>>> we will > > >>>>>> > > support complex > > >>>>>> > > watermark expression involving multiple columns in the future. > > >>>>>> such as: > > >>>>>> > > "WATERMARK FOR ts as ts + f1 + interval '1' second" > > >>>>>> > > > > >>>>>> > > If we do not support complex watermark expression, we can add > a > > >>>>>> watermark > > >>>>>> > > column. > > >>>>>> > > > > >>>>>> > > for example: > > >>>>>> > > > > >>>>>> > > create table MyTable ( > > >>>>>> > > > > >>>>>> > > f0 BIGINT NOT NULL, > > >>>>>> > > > > >>>>>> > > f1 ROW<q1 STRING, q2 TIMESTAMP(3)>, > > >>>>>> > > > > >>>>>> > > f2 VARCHAR<256>, > > >>>>>> > > > > >>>>>> > > f3 AS f0 + 1, > > >>>>>> > > > > >>>>>> > > PRIMARY KEY (f0), > > >>>>>> > > > > >>>>>> > > UNIQUE (f3, f2), > > >>>>>> > > > > >>>>>> > > WATERMARK f1.q2 AS (`f1.q2` - INTERVAL '3' SECOND) > > >>>>>> > > > > >>>>>> > > ) with (...) > > >>>>>> > > > > >>>>>> > > > > >>>>>> > > name > > >>>>>> > > > > >>>>>> > > type > > >>>>>> > > > > >>>>>> > > key > > >>>>>> > > > > >>>>>> > > compute column > > >>>>>> > > > > >>>>>> > > watermark > > >>>>>> > > > > >>>>>> > > f0 > > >>>>>> > > > > >>>>>> > > BIGINT NOT NULL > > >>>>>> > > > > >>>>>> > > PRI > > >>>>>> > > > > >>>>>> > > (NULL) > > >>>>>> > > > > >>>>>> > > f1 > > >>>>>> > > > > >>>>>> > > ROW<`q1` STRING, `q2` TIMESTAMP(3)> > > >>>>>> > > > > >>>>>> > > UNQ > > >>>>>> > > > > >>>>>> > > (NULL) > > >>>>>> > > > > >>>>>> > > f1.q2 AS (`f1.q2` - INTERVAL '3' SECOND) > > >>>>>> > > > > >>>>>> > > f2 > > >>>>>> > > > > >>>>>> > > VARCHAR<256> > > >>>>>> > > > > >>>>>> > > (NULL) > > >>>>>> > > > > >>>>>> > > NULL > > >>>>>> > > > > >>>>>> > > f3 > > >>>>>> > > > > >>>>>> > > BIGINT NOT NULL > > >>>>>> > > > > >>>>>> > > UNQ > > >>>>>> > > > > >>>>>> > > f0 + 1 > > >>>>>> > > > > >>>>>> > > > > >>>>>> > > or we add a column to represent nullability. > > >>>>>> > > > > >>>>>> > > name > > >>>>>> > > > > >>>>>> > > type > > >>>>>> > > > > >>>>>> > > null > > >>>>>> > > > > >>>>>> > > key > > >>>>>> > > > > >>>>>> > > compute column > > >>>>>> > > > > >>>>>> > > watermark > > >>>>>> > > > > >>>>>> > > f0 > > >>>>>> > > > > >>>>>> > > BIGINT > > >>>>>> > > > > >>>>>> > > false > > >>>>>> > > > > >>>>>> > > PRI > > >>>>>> > > > > >>>>>> > > (NULL) > > >>>>>> > > > > >>>>>> > > f1 > > >>>>>> > > > > >>>>>> > > ROW<`q1` STRING, `q2` TIMESTAMP(3)> > > >>>>>> > > > > >>>>>> > > true > > >>>>>> > > > > >>>>>> > > UNQ > > >>>>>> > > > > >>>>>> > > (NULL) > > >>>>>> > > > > >>>>>> > > f1.q2 AS (`f1.q2` - INTERVAL '3' SECOND) > > >>>>>> > > > > >>>>>> > > f2 > > >>>>>> > > > > >>>>>> > > VARCHAR<256> > > >>>>>> > > > > >>>>>> > > true > > >>>>>> > > > > >>>>>> > > (NULL) > > >>>>>> > > > > >>>>>> > > NULL > > >>>>>> > > > > >>>>>> > > f3 > > >>>>>> > > > > >>>>>> > > BIGINT > > >>>>>> > > > > >>>>>> > > false > > >>>>>> > > > > >>>>>> > > UNQ > > >>>>>> > > > > >>>>>> > > f0 + 1 > > >>>>>> > > > > >>>>>> > > > > >>>>>> > > Personally, I like the second one. (we need do some changes on > > >>>>>> > LogicalType > > >>>>>> > > to get type name without nullability) > > >>>>>> > > > > >>>>>> > > > > >>>>>> > > Best, > > >>>>>> > > Godfrey > > >>>>>> > > > > >>>>>> > > > > >>>>>> > > Aljoscha Krettek <aljos...@apache.org> 于2020年4月29日周三 > 下午5:47写道: > > >>>>>> > > > > >>>>>> > > > +1 I like the general idea of printing the results as a > table. > > >>>>>> > > > > > >>>>>> > > > On the specifics I don't know enough but Fabians suggestions > > >>>>>> seems to > > >>>>>> > > > make sense to me. > > >>>>>> > > > > > >>>>>> > > > Aljoscha > > >>>>>> > > > > > >>>>>> > > > On 29.04.20 10:56, Fabian Hueske wrote: > > >>>>>> > > > > Hi Godfrey, > > >>>>>> > > > > > > >>>>>> > > > > Thanks for starting this discussion! > > >>>>>> > > > > > > >>>>>> > > > > In my mind, WATERMARK is a property (or constraint) of a > > >>>>>> field, just > > >>>>>> > > like > > >>>>>> > > > > PRIMARY KEY. > > >>>>>> > > > > Take this example from MySQL: > > >>>>>> > > > > > > >>>>>> > > > > mysql> CREATE TABLE people (id INT NOT NULL, name > > >>>>>> VARCHAR(128) NOT > > >>>>>> > > NULL, > > >>>>>> > > > > age INT, PRIMARY KEY (id)); > > >>>>>> > > > > Query OK, 0 rows affected (0.06 sec) > > >>>>>> > > > > > > >>>>>> > > > > mysql> describe people; > > >>>>>> > > > > +-------+--------------+------+-----+---------+-------+ > > >>>>>> > > > > | Field | Type | Null | Key | Default | Extra | > > >>>>>> > > > > +-------+--------------+------+-----+---------+-------+ > > >>>>>> > > > > | id | int | NO | PRI | NULL | | > > >>>>>> > > > > | name | varchar(128) | NO | | NULL | | > > >>>>>> > > > > | age | int | YES | | NULL | | > > >>>>>> > > > > +-------+--------------+------+-----+---------+-------+ > > >>>>>> > > > > 3 rows in set (0.01 sec) > > >>>>>> > > > > > > >>>>>> > > > > Here, PRIMARY KEY is marked in the Key column of the id > > field. > > >>>>>> > > > > We could do the same for watermarks by adding a Watermark > > >>>>>> column. > > >>>>>> > > > > > > >>>>>> > > > > Best, Fabian > > >>>>>> > > > > > > >>>>>> > > > > > > >>>>>> > > > > Am Mi., 29. Apr. 2020 um 10:43 Uhr schrieb godfrey he < > > >>>>>> > > > godfre...@gmail.com>: > > >>>>>> > > > > > > >>>>>> > > > >> Hi everyone, > > >>>>>> > > > >> > > >>>>>> > > > >> I would like to bring up a discussion about the result > type > > >>>>>> of > > >>>>>> > > describe > > >>>>>> > > > >> statement, > > >>>>>> > > > >> which is introduced in FLIP-84[1]. > > >>>>>> > > > >> In previous version, we define the result type of > > `describe` > > >>>>>> > statement > > >>>>>> > > > is a > > >>>>>> > > > >> single column as following > > >>>>>> > > > >> > > >>>>>> > > > >> Statement > > >>>>>> > > > >> > > >>>>>> > > > >> Result Schema > > >>>>>> > > > >> > > >>>>>> > > > >> Result Value > > >>>>>> > > > >> > > >>>>>> > > > >> Result Kind > > >>>>>> > > > >> > > >>>>>> > > > >> Examples > > >>>>>> > > > >> > > >>>>>> > > > >> DESCRIBE xx > > >>>>>> > > > >> > > >>>>>> > > > >> field name: result > > >>>>>> > > > >> > > >>>>>> > > > >> field type: VARCHAR(n) > > >>>>>> > > > >> > > >>>>>> > > > >> (n is the max length of values) > > >>>>>> > > > >> > > >>>>>> > > > >> describe the detail of an object > > >>>>>> > > > >> > > >>>>>> > > > >> (single row) > > >>>>>> > > > >> > > >>>>>> > > > >> SUCCESS_WITH_CONTENT > > >>>>>> > > > >> > > >>>>>> > > > >> DESCRIBE table_name > > >>>>>> > > > >> > > >>>>>> > > > >> for "describe table_name", the result value is the > > >>>>>> `toString` value > > >>>>>> > of > > >>>>>> > > > >> `TableSchema`, which is an unstructured data. > > >>>>>> > > > >> It's hard to for user to use this info. > > >>>>>> > > > >> > > >>>>>> > > > >> for example: > > >>>>>> > > > >> > > >>>>>> > > > >> TableSchema schema = TableSchema.builder() > > >>>>>> > > > >> .field("f0", DataTypes.BIGINT()) > > >>>>>> > > > >> .field("f1", DataTypes.ROW( > > >>>>>> > > > >> DataTypes.FIELD("q1", DataTypes.STRING()), > > >>>>>> > > > >> DataTypes.FIELD("q2", DataTypes.TIMESTAMP(3)))) > > >>>>>> > > > >> .field("f2", DataTypes.STRING()) > > >>>>>> > > > >> .field("f3", DataTypes.BIGINT(), "f0 + 1") > > >>>>>> > > > >> .watermark("f1.q2", WATERMARK_EXPRESSION, > > >>>>>> WATERMARK_DATATYPE) > > >>>>>> > > > >> .build(); > > >>>>>> > > > >> > > >>>>>> > > > >> its `toString` value is: > > >>>>>> > > > >> root > > >>>>>> > > > >> |-- f0: BIGINT > > >>>>>> > > > >> |-- f1: ROW<`q1` STRING, `q2` TIMESTAMP(3)> > > >>>>>> > > > >> |-- f2: STRING > > >>>>>> > > > >> |-- f3: BIGINT AS f0 + 1 > > >>>>>> > > > >> |-- WATERMARK FOR f1.q2 AS now() > > >>>>>> > > > >> > > >>>>>> > > > >> For hive, MySQL, etc., the describe result is table form > > >>>>>> including > > >>>>>> > > field > > >>>>>> > > > >> names and field types. > > >>>>>> > > > >> which is more familiar with users. > > >>>>>> > > > >> TableSchema[2] has watermark expression and compute > column, > > >>>>>> we > > >>>>>> > should > > >>>>>> > > > also > > >>>>>> > > > >> put them into the table: > > >>>>>> > > > >> for compute column, it's a column level, we add a new > > column > > >>>>>> named > > >>>>>> > > > `expr`. > > >>>>>> > > > >> for watermark expression, it's a table level, we add a > > >>>>>> special row > > >>>>>> > > > named > > >>>>>> > > > >> `WATERMARK` to represent it. > > >>>>>> > > > >> > > >>>>>> > > > >> The result will look like about above example: > > >>>>>> > > > >> > > >>>>>> > > > >> name > > >>>>>> > > > >> > > >>>>>> > > > >> type > > >>>>>> > > > >> > > >>>>>> > > > >> expr > > >>>>>> > > > >> > > >>>>>> > > > >> f0 > > >>>>>> > > > >> > > >>>>>> > > > >> BIGINT > > >>>>>> > > > >> > > >>>>>> > > > >> (NULL) > > >>>>>> > > > >> > > >>>>>> > > > >> f1 > > >>>>>> > > > >> > > >>>>>> > > > >> ROW<`q1` STRING, `q2` TIMESTAMP(3)> > > >>>>>> > > > >> > > >>>>>> > > > >> (NULL) > > >>>>>> > > > >> > > >>>>>> > > > >> f2 > > >>>>>> > > > >> > > >>>>>> > > > >> STRING > > >>>>>> > > > >> > > >>>>>> > > > >> NULL > > >>>>>> > > > >> > > >>>>>> > > > >> f3 > > >>>>>> > > > >> > > >>>>>> > > > >> BIGINT > > >>>>>> > > > >> > > >>>>>> > > > >> f0 + 1 > > >>>>>> > > > >> > > >>>>>> > > > >> WATERMARK > > >>>>>> > > > >> > > >>>>>> > > > >> (NULL) > > >>>>>> > > > >> > > >>>>>> > > > >> f1.q2 AS now() > > >>>>>> > > > >> > > >>>>>> > > > >> now there is a pr FLINK-17112 [3] to implement DESCRIBE > > >>>>>> statement. > > >>>>>> > > > >> > > >>>>>> > > > >> What do you think about this update? > > >>>>>> > > > >> Any feedback are welcome~ > > >>>>>> > > > >> > > >>>>>> > > > >> Best, > > >>>>>> > > > >> Godfrey > > >>>>>> > > > >> > > >>>>>> > > > >> [1] > > >>>>>> > > > >> > > >>>>>> > > > > > >>>>>> > > > > >>>>>> > > > >>>>>> > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878 > > >>>>>> > > > >> [2] > > >>>>>> > > > >> > > >>>>>> > > > >> > > >>>>>> > > > > > >>>>>> > > > > >>>>>> > > > >>>>>> > > > https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java > > >>>>>> > > > >> [3] https://github.com/apache/flink/pull/11892 > > >>>>>> > > > >> > > >>>>>> > > > >> > > >>>>>> > > > >> godfrey he <godfre...@gmail.com> 于2020年4月6日周一 下午10:38写道: > > >>>>>> > > > >> > > >>>>>> > > > >>> Hi Timo, > > >>>>>> > > > >>> > > >>>>>> > > > >>> Sorry for the late reply, and thanks for your > correction. > > >>>>>> > > > >>> I missed DQL for job submission scenario. > > >>>>>> > > > >>> I'll fix the document right away. > > >>>>>> > > > >>> > > >>>>>> > > > >>> Best, > > >>>>>> > > > >>> Godfrey > > >>>>>> > > > >>> > > >>>>>> > > > >>> Timo Walther <twal...@apache.org> 于2020年4月3日周五 > 下午9:53写道: > > >>>>>> > > > >>> > > >>>>>> > > > >>>> Hi Godfrey, > > >>>>>> > > > >>>> > > >>>>>> > > > >>>> I'm sorry to jump in again but I still need to clarify > > >>>>>> some things > > >>>>>> > > > >>>> around TableResult. > > >>>>>> > > > >>>> > > >>>>>> > > > >>>> The FLIP says: > > >>>>>> > > > >>>> "For DML, this method returns TableResult until the job > > is > > >>>>>> > > submitted. > > >>>>>> > > > >>>> For other statements, TableResult is returned until the > > >>>>>> execution > > >>>>>> > is > > >>>>>> > > > >>>> finished." > > >>>>>> > > > >>>> > > >>>>>> > > > >>>> I thought we agreed on making every execution async? > This > > >>>>>> also > > >>>>>> > means > > >>>>>> > > > >>>> returning a TableResult for DQLs even though the > > execution > > >>>>>> is not > > >>>>>> > > done > > >>>>>> > > > >>>> yet. People need access to the JobClient also for batch > > >>>>>> jobs in > > >>>>>> > > order > > >>>>>> > > > to > > >>>>>> > > > >>>> cancel long lasting queries. If people want to wait for > > the > > >>>>>> > > completion > > >>>>>> > > > >>>> they can hook into JobClient or collect(). > > >>>>>> > > > >>>> > > >>>>>> > > > >>>> Can we rephrase this part to: > > >>>>>> > > > >>>> > > >>>>>> > > > >>>> The FLIP says: > > >>>>>> > > > >>>> "For DML and DQL, this method returns TableResult once > > the > > >>>>>> job has > > >>>>>> > > > been > > >>>>>> > > > >>>> submitted. For DDL and DCL statements, TableResult is > > >>>>>> returned > > >>>>>> > once > > >>>>>> > > > the > > >>>>>> > > > >>>> operation has finished." > > >>>>>> > > > >>>> > > >>>>>> > > > >>>> Regards, > > >>>>>> > > > >>>> Timo > > >>>>>> > > > >>>> > > >>>>>> > > > >>>> > > >>>>>> > > > >>>> On 02.04.20 05:27, godfrey he wrote: > > >>>>>> > > > >>>>> Hi Aljoscha, Dawid, Timo, > > >>>>>> > > > >>>>> > > >>>>>> > > > >>>>> Thanks so much for the detailed explanation. > > >>>>>> > > > >>>>> Agree with you that the multiline story is not > completed > > >>>>>> now, and > > >>>>>> > > we > > >>>>>> > > > >> can > > >>>>>> > > > >>>>> keep discussion. > > >>>>>> > > > >>>>> I will add current discussions and conclusions to the > > >>>>>> FLIP. > > >>>>>> > > > >>>>> > > >>>>>> > > > >>>>> Best, > > >>>>>> > > > >>>>> Godfrey > > >>>>>> > > > >>>>> > > >>>>>> > > > >>>>> > > >>>>>> > > > >>>>> > > >>>>>> > > > >>>>> Timo Walther <twal...@apache.org> 于2020年4月1日周三 > > 下午11:27写道: > > >>>>>> > > > >>>>> > > >>>>>> > > > >>>>>> Hi Godfrey, > > >>>>>> > > > >>>>>> > > >>>>>> > > > >>>>>> first of all, I agree with Dawid. The multiline story > > is > > >>>>>> not > > >>>>>> > > > >> completed > > >>>>>> > > > >>>>>> by this FLIP. It just verifies the big picture. > > >>>>>> > > > >>>>>> > > >>>>>> > > > >>>>>> 1. "control the execution logic through the proposed > > >>>>>> method if > > >>>>>> > > they > > >>>>>> > > > >>>> know > > >>>>>> > > > >>>>>> what the statements are" > > >>>>>> > > > >>>>>> > > >>>>>> > > > >>>>>> This is a good point that also Fabian raised in the > > >>>>>> linked > > >>>>>> > google > > >>>>>> > > > >> doc. > > >>>>>> > > > >>>> I > > >>>>>> > > > >>>>>> could also imagine to return a more complicated POJO > > when > > >>>>>> > calling > > >>>>>> > > > >>>>>> `executeMultiSql()`. > > >>>>>> > > > >>>>>> > > >>>>>> > > > >>>>>> The POJO would include some `getSqlProperties()` such > > >>>>>> that a > > >>>>>> > > > platform > > >>>>>> > > > >>>>>> gets insights into the query before executing. We > could > > >>>>>> also > > >>>>>> > > trigger > > >>>>>> > > > >>>> the > > >>>>>> > > > >>>>>> execution more explicitly instead of hiding it behind > > an > > >>>>>> > iterator. > > >>>>>> > > > >>>>>> > > >>>>>> > > > >>>>>> 2. "there are some special commands introduced in SQL > > >>>>>> client" > > >>>>>> > > > >>>>>> > > >>>>>> > > > >>>>>> For platforms and SQL Client specific commands, we > > could > > >>>>>> offer a > > >>>>>> > > > hook > > >>>>>> > > > >>>> to > > >>>>>> > > > >>>>>> the parser or a fallback parser in case the regular > > table > > >>>>>> > > > environment > > >>>>>> > > > >>>>>> parser cannot deal with the statement. > > >>>>>> > > > >>>>>> > > >>>>>> > > > >>>>>> However, all of that is future work and can be > > discussed > > >>>>>> in a > > >>>>>> > > > >> separate > > >>>>>> > > > >>>>>> FLIP. > > >>>>>> > > > >>>>>> > > >>>>>> > > > >>>>>> 3. +1 for the `Iterator` instead of `Iterable`. > > >>>>>> > > > >>>>>> > > >>>>>> > > > >>>>>> 4. "we should convert the checked exception to > > unchecked > > >>>>>> > > exception" > > >>>>>> > > > >>>>>> > > >>>>>> > > > >>>>>> Yes, I meant using a runtime exception instead of a > > >>>>>> checked > > >>>>>> > > > >> exception. > > >>>>>> > > > >>>>>> There was no consensus on putting the exception into > > the > > >>>>>> > > > >> `TableResult`. > > >>>>>> > > > >>>>>> > > >>>>>> > > > >>>>>> Regards, > > >>>>>> > > > >>>>>> Timo > > >>>>>> > > > >>>>>> > > >>>>>> > > > >>>>>> On 01.04.20 15:35, Dawid Wysakowicz wrote: > > >>>>>> > > > >>>>>>> When considering the multi-line support I think it > is > > >>>>>> helpful > > >>>>>> > to > > >>>>>> > > > >> start > > >>>>>> > > > >>>>>>> with a use case in mind. In my opinion consumers of > > >>>>>> this method > > >>>>>> > > > will > > >>>>>> > > > >>>> be: > > >>>>>> > > > >>>>>>> > > >>>>>> > > > >>>>>>> 1. sql-client > > >>>>>> > > > >>>>>>> 2. third-part sql based platforms > > >>>>>> > > > >>>>>>> > > >>>>>> > > > >>>>>>> @Godfrey As for the quit/source/... commands. I > think > > >>>>>> those > > >>>>>> > > belong > > >>>>>> > > > >> to > > >>>>>> > > > >>>>>>> the responsibility of aforementioned. I think they > > >>>>>> should not > > >>>>>> > be > > >>>>>> > > > >>>>>>> understandable by the TableEnvironment. What would > > quit > > >>>>>> on a > > >>>>>> > > > >>>>>>> TableEnvironment do? Moreover I think such commands > > >>>>>> should be > > >>>>>> > > > >> prefixed > > >>>>>> > > > >>>>>>> appropriately. I think it's a common practice to > e.g. > > >>>>>> prefix > > >>>>>> > > those > > >>>>>> > > > >>>> with > > >>>>>> > > > >>>>>>> ! or : to say they are meta commands of the tool > > rather > > >>>>>> than a > > >>>>>> > > > >> query. > > >>>>>> > > > >>>>>>> > > >>>>>> > > > >>>>>>> I also don't necessarily understand why platform > users > > >>>>>> need to > > >>>>>> > > know > > >>>>>> > > > >>>> the > > >>>>>> > > > >>>>>>> kind of the query to use the proposed method. They > > >>>>>> should get > > >>>>>> > the > > >>>>>> > > > >> type > > >>>>>> > > > >>>>>>> from the TableResult#ResultKind. If the ResultKind > is > > >>>>>> SUCCESS, > > >>>>>> > it > > >>>>>> > > > >> was > > >>>>>> > > > >>>> a > > >>>>>> > > > >>>>>>> DCL/DDL. If SUCCESS_WITH_CONTENT it was a DML/DQL. > If > > >>>>>> that's > > >>>>>> > not > > >>>>>> > > > >>>> enough > > >>>>>> > > > >>>>>>> we can enrich the TableResult with more explicit > kind > > >>>>>> of query, > > >>>>>> > > but > > >>>>>> > > > >> so > > >>>>>> > > > >>>>>>> far I don't see such a need. > > >>>>>> > > > >>>>>>> > > >>>>>> > > > >>>>>>> @Kurt In those cases I would assume the developers > > want > > >>>>>> to > > >>>>>> > > present > > >>>>>> > > > >>>>>>> results of the queries anyway. Moreover I think it > is > > >>>>>> safe to > > >>>>>> > > > assume > > >>>>>> > > > >>>>>>> they can adhere to such a contract that the results > > >>>>>> must be > > >>>>>> > > > >> iterated. > > >>>>>> > > > >>>>>>> > > >>>>>> > > > >>>>>>> For direct users of TableEnvironment/Table API this > > >>>>>> method does > > >>>>>> > > not > > >>>>>> > > > >>>> make > > >>>>>> > > > >>>>>>> much sense anyway, in my opinion. I think we can > > rather > > >>>>>> safely > > >>>>>> > > > >> assume > > >>>>>> > > > >>>> in > > >>>>>> > > > >>>>>>> this scenario they do not want to submit multiple > > >>>>>> queries at a > > >>>>>> > > > >> single > > >>>>>> > > > >>>>>> time. > > >>>>>> > > > >>>>>>> > > >>>>>> > > > >>>>>>> Best, > > >>>>>> > > > >>>>>>> > > >>>>>> > > > >>>>>>> Dawid > > >>>>>> > > > >>>>>>> > > >>>>>> > > > >>>>>>> > > >>>>>> > > > >>>>>>> On 01/04/2020 15:07, Kurt Young wrote: > > >>>>>> > > > >>>>>>>> One comment to `executeMultilineSql`, I'm afraid > > >>>>>> sometimes > > >>>>>> > user > > >>>>>> > > > >> might > > >>>>>> > > > >>>>>>>> forget to > > >>>>>> > > > >>>>>>>> iterate the returned iterators, e.g. user submits a > > >>>>>> bunch of > > >>>>>> > > DDLs > > >>>>>> > > > >> and > > >>>>>> > > > >>>>>>>> expect the > > >>>>>> > > > >>>>>>>> framework will execute them one by one. But it > > didn't. > > >>>>>> > > > >>>>>>>> > > >>>>>> > > > >>>>>>>> Best, > > >>>>>> > > > >>>>>>>> Kurt > > >>>>>> > > > >>>>>>>> > > >>>>>> > > > >>>>>>>> > > >>>>>> > > > >>>>>>>> On Wed, Apr 1, 2020 at 5:10 PM Aljoscha Krettek< > > >>>>>> > > > >> aljos...@apache.org> > > >>>>>> > > > >>>>>> wrote: > > >>>>>> > > > >>>>>>>> > > >>>>>> > > > >>>>>>>>> Agreed to what Dawid and Timo said. > > >>>>>> > > > >>>>>>>>> > > >>>>>> > > > >>>>>>>>> To answer your question about multi line SQL: no, > we > > >>>>>> don't > > >>>>>> > > think > > >>>>>> > > > >> we > > >>>>>> > > > >>>>>> need > > >>>>>> > > > >>>>>>>>> this in Flink 1.11, we only wanted to make sure > that > > >>>>>> the > > >>>>>> > > > >> interfaces > > >>>>>> > > > >>>>>> that > > >>>>>> > > > >>>>>>>>> we now put in place will potentially allow this in > > the > > >>>>>> > future. > > >>>>>> > > > >>>>>>>>> > > >>>>>> > > > >>>>>>>>> Best, > > >>>>>> > > > >>>>>>>>> Aljoscha > > >>>>>> > > > >>>>>>>>> > > >>>>>> > > > >>>>>>>>> On 01.04.20 09:31, godfrey he wrote: > > >>>>>> > > > >>>>>>>>>> Hi, Timo & Dawid, > > >>>>>> > > > >>>>>>>>>> > > >>>>>> > > > >>>>>>>>>> Thanks so much for the effort of `multiline > > >>>>>> statements > > >>>>>> > > > >> supporting`, > > >>>>>> > > > >>>>>>>>>> I have a few questions about this method: > > >>>>>> > > > >>>>>>>>>> > > >>>>>> > > > >>>>>>>>>> 1. users can well control the execution logic > > >>>>>> through the > > >>>>>> > > > >> proposed > > >>>>>> > > > >>>>>> method > > >>>>>> > > > >>>>>>>>>> if they know what the statements are (a > > >>>>>> statement is a > > >>>>>> > > > DDL, a > > >>>>>> > > > >>>> DML > > >>>>>> > > > >>>>>> or > > >>>>>> > > > >>>>>>>>>> others). > > >>>>>> > > > >>>>>>>>>> but if a statement is from a file, that means > users > > >>>>>> do not > > >>>>>> > > know > > >>>>>> > > > >>>> what > > >>>>>> > > > >>>>>> the > > >>>>>> > > > >>>>>>>>>> statements are, > > >>>>>> > > > >>>>>>>>>> the execution behavior is unclear. > > >>>>>> > > > >>>>>>>>>> As a platform user, I think this method is hard > to > > >>>>>> use, > > >>>>>> > unless > > >>>>>> > > > >> the > > >>>>>> > > > >>>>>>>>> platform > > >>>>>> > > > >>>>>>>>>> defines > > >>>>>> > > > >>>>>>>>>> a set of rule about the statements order, such > as: > > >>>>>> no select > > >>>>>> > > in > > >>>>>> > > > >> the > > >>>>>> > > > >>>>>>>>> middle, > > >>>>>> > > > >>>>>>>>>> dml must be at tail of sql file (which may be the > > >>>>>> most case > > >>>>>> > in > > >>>>>> > > > >>>> product > > >>>>>> > > > >>>>>>>>>> env). > > >>>>>> > > > >>>>>>>>>> Otherwise the platform must parse the sql first, > > >>>>>> then know > > >>>>>> > > what > > >>>>>> > > > >> the > > >>>>>> > > > >>>>>>>>>> statements are. > > >>>>>> > > > >>>>>>>>>> If do like that, the platform can handle all > cases > > >>>>>> through > > >>>>>> > > > >>>>>> `executeSql` > > >>>>>> > > > >>>>>>>>> and > > >>>>>> > > > >>>>>>>>>> `StatementSet`. > > >>>>>> > > > >>>>>>>>>> > > >>>>>> > > > >>>>>>>>>> 2. SQL client can't also use > `executeMultilineSql` > > to > > >>>>>> > supports > > >>>>>> > > > >>>>>> multiline > > >>>>>> > > > >>>>>>>>>> statements, > > >>>>>> > > > >>>>>>>>>> because there are some special commands > > >>>>>> introduced in > > >>>>>> > SQL > > >>>>>> > > > >>>> client, > > >>>>>> > > > >>>>>>>>>> such as `quit`, `source`, `load jar` (not exist > > now, > > >>>>>> but > > >>>>>> > maybe > > >>>>>> > > > we > > >>>>>> > > > >>>> need > > >>>>>> > > > >>>>>>>>> this > > >>>>>> > > > >>>>>>>>>> command > > >>>>>> > > > >>>>>>>>>> to support dynamic table source and udf). > > >>>>>> > > > >>>>>>>>>> Does TableEnvironment also supports those > commands? > > >>>>>> > > > >>>>>>>>>> > > >>>>>> > > > >>>>>>>>>> 3. btw, we must have this feature in > release-1.11? > > I > > >>>>>> find > > >>>>>> > > there > > >>>>>> > > > >> are > > >>>>>> > > > >>>>>> few > > >>>>>> > > > >>>>>>>>>> user cases > > >>>>>> > > > >>>>>>>>>> in the feedback document which behavior is > > >>>>>> unclear now. > > >>>>>> > > > >>>>>>>>>> > > >>>>>> > > > >>>>>>>>>> regarding to "change the return value from > > >>>>>> `Iterable<Row` to > > >>>>>> > > > >>>>>>>>>> `Iterator<Row`", > > >>>>>> > > > >>>>>>>>>> I couldn't agree more with this change. Just as > > Dawid > > >>>>>> > > mentioned > > >>>>>> > > > >>>>>>>>>> "The contract of the Iterable#iterator is that it > > >>>>>> returns a > > >>>>>> > > new > > >>>>>> > > > >>>>>> iterator > > >>>>>> > > > >>>>>>>>>> each time, > > >>>>>> > > > >>>>>>>>>> which effectively means we can iterate the > > >>>>>> results > > >>>>>> > > multiple > > >>>>>> > > > >>>>>> times.", > > >>>>>> > > > >>>>>>>>>> we does not provide iterate the results multiple > > >>>>>> times. > > >>>>>> > > > >>>>>>>>>> If we want do that, the client must buffer all > > >>>>>> results. but > > >>>>>> > > it's > > >>>>>> > > > >>>>>>>>> impossible > > >>>>>> > > > >>>>>>>>>> for streaming job. > > >>>>>> > > > >>>>>>>>>> > > >>>>>> > > > >>>>>>>>>> Best, > > >>>>>> > > > >>>>>>>>>> Godfrey > > >>>>>> > > > >>>>>>>>>> > > >>>>>> > > > >>>>>>>>>> Dawid Wysakowicz<dwysakow...@apache.org> > > >>>>>> 于2020年4月1日周三 > > >>>>>> > > > 上午3:14写道: > > >>>>>> > > > >>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>> Thank you Timo for the great summary! It covers > > >>>>>> (almost) > > >>>>>> > all > > >>>>>> > > > the > > >>>>>> > > > >>>>>> topics. > > >>>>>> > > > >>>>>>>>>>> Even though in the end we are not suggesting > much > > >>>>>> changes > > >>>>>> > to > > >>>>>> > > > the > > >>>>>> > > > >>>>>> current > > >>>>>> > > > >>>>>>>>>>> state of FLIP I think it is important to lay out > > all > > >>>>>> > possible > > >>>>>> > > > >> use > > >>>>>> > > > >>>>>> cases > > >>>>>> > > > >>>>>>>>>>> so that we do not change the execution model > every > > >>>>>> release. > > >>>>>> > > > >>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>> There is one additional thing we discussed. > Could > > >>>>>> we change > > >>>>>> > > the > > >>>>>> > > > >>>>>> result > > >>>>>> > > > >>>>>>>>>>> type of TableResult#collect to Iterator<Row>? > Even > > >>>>>> though > > >>>>>> > > those > > >>>>>> > > > >>>>>>>>>>> interfaces do not differ much. I think Iterator > > >>>>>> better > > >>>>>> > > > describes > > >>>>>> > > > >>>> that > > >>>>>> > > > >>>>>>>>>>> the results might not be materialized on the > > client > > >>>>>> side, > > >>>>>> > but > > >>>>>> > > > >> can > > >>>>>> > > > >>>> be > > >>>>>> > > > >>>>>>>>>>> retrieved on a per record basis. The contract of > > the > > >>>>>> > > > >>>>>> Iterable#iterator > > >>>>>> > > > >>>>>>>>>>> is that it returns a new iterator each time, > which > > >>>>>> > > effectively > > >>>>>> > > > >>>> means > > >>>>>> > > > >>>>>> we > > >>>>>> > > > >>>>>>>>>>> can iterate the results multiple times. > Iterating > > >>>>>> the > > >>>>>> > results > > >>>>>> > > > is > > >>>>>> > > > >>>> not > > >>>>>> > > > >>>>>>>>>>> possible when we don't retrieve all the results > > >>>>>> from the > > >>>>>> > > > cluster > > >>>>>> > > > >>>> at > > >>>>>> > > > >>>>>>>>> once. > > >>>>>> > > > >>>>>>>>>>> I think we should also use Iterator for > > >>>>>> > > > >>>>>>>>>>> TableEnvironment#executeMultilineSql(String > > >>>>>> statements): > > >>>>>> > > > >>>>>>>>>>> Iterator<TableResult>. > > >>>>>> > > > >>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>> Best, > > >>>>>> > > > >>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>> Dawid > > >>>>>> > > > >>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>> On 31/03/2020 19:27, Timo Walther wrote: > > >>>>>> > > > >>>>>>>>>>>> Hi Godfrey, > > >>>>>> > > > >>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>> Aljoscha, Dawid, Klou, and I had another > > >>>>>> discussion around > > >>>>>> > > > >>>> FLIP-84. > > >>>>>> > > > >>>>>> In > > >>>>>> > > > >>>>>>>>>>>> particular, we discussed how the current status > > of > > >>>>>> the > > >>>>>> > FLIP > > >>>>>> > > > and > > >>>>>> > > > >>>> the > > >>>>>> > > > >>>>>>>>>>>> future requirements around multiline > statements, > > >>>>>> > async/sync, > > >>>>>> > > > >>>>>> collect() > > >>>>>> > > > >>>>>>>>>>>> fit together. > > >>>>>> > > > >>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>> We also updated the FLIP-84 Feedback Summary > > >>>>>> document [1] > > >>>>>> > > with > > >>>>>> > > > >>>> some > > >>>>>> > > > >>>>>>>>>>>> use cases. > > >>>>>> > > > >>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>> We believe that we found a good solution that > > also > > >>>>>> fits to > > >>>>>> > > > what > > >>>>>> > > > >>>> is > > >>>>>> > > > >>>>>> in > > >>>>>> > > > >>>>>>>>>>>> the current FLIP. So no bigger changes > necessary, > > >>>>>> which is > > >>>>>> > > > >> great! > > >>>>>> > > > >>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>> Our findings were: > > >>>>>> > > > >>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>> 1. Async vs sync submission of Flink jobs: > > >>>>>> > > > >>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>> Having a blocking `execute()` in DataStream API > > was > > >>>>>> > rather a > > >>>>>> > > > >>>>>> mistake. > > >>>>>> > > > >>>>>>>>>>>> Instead all submissions should be async because > > >>>>>> this > > >>>>>> > allows > > >>>>>> > > > >>>>>> supporting > > >>>>>> > > > >>>>>>>>>>>> both modes if necessary. Thus, submitting all > > >>>>>> queries > > >>>>>> > async > > >>>>>> > > > >>>> sounds > > >>>>>> > > > >>>>>>>>>>>> good to us. If users want to run a job sync, > they > > >>>>>> can use > > >>>>>> > > the > > >>>>>> > > > >>>>>>>>>>>> JobClient and wait for completion (or collect() > > in > > >>>>>> case of > > >>>>>> > > > >> batch > > >>>>>> > > > >>>>>> jobs). > > >>>>>> > > > >>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>> 2. Multi-statement execution: > > >>>>>> > > > >>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>> For the multi-statement execution, we don't > see a > > >>>>>> > > > >> contradication > > >>>>>> > > > >>>>>> with > > >>>>>> > > > >>>>>>>>>>>> the async execution behavior. We imagine a > method > > >>>>>> like: > > >>>>>> > > > >>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>> TableEnvironment#executeMultilineSql(String > > >>>>>> statements): > > >>>>>> > > > >>>>>>>>>>>> Iterable<TableResult> > > >>>>>> > > > >>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>> Where the `Iterator#next()` method would > trigger > > >>>>>> the next > > >>>>>> > > > >>>> statement > > >>>>>> > > > >>>>>>>>>>>> submission. This allows a caller to decide > > >>>>>> synchronously > > >>>>>> > > when > > >>>>>> > > > >> to > > >>>>>> > > > >>>>>>>>>>>> submit statements async to the cluster. Thus, a > > >>>>>> service > > >>>>>> > such > > >>>>>> > > > as > > >>>>>> > > > >>>> the > > >>>>>> > > > >>>>>>>>>>>> SQL Client can handle the result of each > > statement > > >>>>>> > > > individually > > >>>>>> > > > >>>> and > > >>>>>> > > > >>>>>>>>>>>> process statement by statement sequentially. > > >>>>>> > > > >>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>> 3. The role of TableResult and result retrieval > > in > > >>>>>> general > > >>>>>> > > > >>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>> `TableResult` is similar to `JobClient`. > Instead > > of > > >>>>>> > > returning > > >>>>>> > > > a > > >>>>>> > > > >>>>>>>>>>>> `CompletableFuture` of something, it is a > > concrete > > >>>>>> util > > >>>>>> > > class > > >>>>>> > > > >>>> where > > >>>>>> > > > >>>>>>>>>>>> some methods have the behavior of completable > > >>>>>> future (e.g. > > >>>>>> > > > >>>>>> collect(), > > >>>>>> > > > >>>>>>>>>>>> print()) and some are already completed > > >>>>>> (getTableSchema(), > > >>>>>> > > > >>>>>>>>>>>> getResultKind()). > > >>>>>> > > > >>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>> `StatementSet#execute()` returns a single > > >>>>>> `TableResult` > > >>>>>> > > > because > > >>>>>> > > > >>>> the > > >>>>>> > > > >>>>>>>>>>>> order is undefined in a set and all statements > > >>>>>> have the > > >>>>>> > same > > >>>>>> > > > >>>> schema. > > >>>>>> > > > >>>>>>>>>>>> Its `collect()` will return a row for each > > executed > > >>>>>> > `INSERT > > >>>>>> > > > >>>> INTO` in > > >>>>>> > > > >>>>>>>>>>>> the order of statement definition. > > >>>>>> > > > >>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>> For simple `SELECT * FROM ...`, the query > > >>>>>> execution might > > >>>>>> > > > block > > >>>>>> > > > >>>>>> until > > >>>>>> > > > >>>>>>>>>>>> `collect()` is called to pull buffered rows > from > > >>>>>> the job > > >>>>>> > > (from > > >>>>>> > > > >>>>>>>>>>>> socket/REST API what ever we will use in the > > >>>>>> future). We > > >>>>>> > can > > >>>>>> > > > >> say > > >>>>>> > > > >>>>>> that > > >>>>>> > > > >>>>>>>>>>>> a statement finished successfully, when the > > >>>>>> > > > >>>>>> `collect#Iterator#hasNext` > > >>>>>> > > > >>>>>>>>>>>> has returned false. > > >>>>>> > > > >>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>> I hope this summarizes our discussion > > >>>>>> > @Dawid/Aljoscha/Klou? > > >>>>>> > > > >>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>> It would be great if we can add these findings > to > > >>>>>> the FLIP > > >>>>>> > > > >>>> before we > > >>>>>> > > > >>>>>>>>>>>> start voting. > > >>>>>> > > > >>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>> One minor thing: some `execute()` methods still > > >>>>>> throw a > > >>>>>> > > > checked > > >>>>>> > > > >>>>>>>>>>>> exception; can we remove that from the FLIP? > Also > > >>>>>> the > > >>>>>> > above > > >>>>>> > > > >>>>>> mentioned > > >>>>>> > > > >>>>>>>>>>>> `Iterator#next()` would trigger an execution > > >>>>>> without > > >>>>>> > > throwing > > >>>>>> > > > a > > >>>>>> > > > >>>>>>>>>>>> checked exception. > > >>>>>> > > > >>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>> Thanks, > > >>>>>> > > > >>>>>>>>>>>> Timo > > >>>>>> > > > >>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>> [1] > > >>>>>> > > > >>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>> > > >>>>>> > > > >>>>>> > > >>>>>> > > > >>>> > > >>>>>> > > > >> > > >>>>>> > > > > > >>>>>> > > > > >>>>>> > > > >>>>>> > > > https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit# > > >>>>>> > > > >>>>>>>>>>>> On 31.03.20 06:28, godfrey he wrote: > > >>>>>> > > > >>>>>>>>>>>>> Hi, Timo & Jark > > >>>>>> > > > >>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>> Thanks for your explanation. > > >>>>>> > > > >>>>>>>>>>>>> Agree with you that async execution should > > always > > >>>>>> be > > >>>>>> > async, > > >>>>>> > > > >>>>>>>>>>>>> and sync execution scenario can be covered by > > >>>>>> async > > >>>>>> > > > >> execution. > > >>>>>> > > > >>>>>>>>>>>>> It helps provide an unified entry point for > > batch > > >>>>>> and > > >>>>>> > > > >> streaming. > > >>>>>> > > > >>>>>>>>>>>>> I think we can also use sync execution for > some > > >>>>>> testing. > > >>>>>> > > > >>>>>>>>>>>>> So, I agree with you that we provide > > `executeSql` > > >>>>>> method > > >>>>>> > > and > > >>>>>> > > > >>>> it's > > >>>>>> > > > >>>>>>>>> async > > >>>>>> > > > >>>>>>>>>>>>> method. > > >>>>>> > > > >>>>>>>>>>>>> If we want sync method in the future, we can > add > > >>>>>> method > > >>>>>> > > named > > >>>>>> > > > >>>>>>>>>>>>> `executeSqlSync`. > > >>>>>> > > > >>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>> I think we've reached an agreement. I will > > update > > >>>>>> the > > >>>>>> > > > >> document, > > >>>>>> > > > >>>> and > > >>>>>> > > > >>>>>>>>>>>>> start > > >>>>>> > > > >>>>>>>>>>>>> voting process. > > >>>>>> > > > >>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>> Best, > > >>>>>> > > > >>>>>>>>>>>>> Godfrey > > >>>>>> > > > >>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>> Jark Wu<imj...@gmail.com> 于2020年3月31日周二 > > >>>>>> 上午12:46写道: > > >>>>>> > > > >>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>> Hi, > > >>>>>> > > > >>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>> I didn't follow the full discussion. > > >>>>>> > > > >>>>>>>>>>>>>> But I share the same concern with Timo that > > >>>>>> streaming > > >>>>>> > > > queries > > >>>>>> > > > >>>>>> should > > >>>>>> > > > >>>>>>>>>>>>>> always > > >>>>>> > > > >>>>>>>>>>>>>> be async. > > >>>>>> > > > >>>>>>>>>>>>>> Otherwise, I can image it will cause a lot of > > >>>>>> confusion > > >>>>>> > > and > > >>>>>> > > > >>>>>> problems > > >>>>>> > > > >>>>>>>>> if > > >>>>>> > > > >>>>>>>>>>>>>> users don't deeply keep the "sync" in mind > > (e.g. > > >>>>>> client > > >>>>>> > > > >> hangs). > > >>>>>> > > > >>>>>>>>>>>>>> Besides, the streaming mode is still the > > >>>>>> majority use > > >>>>>> > > cases > > >>>>>> > > > >> of > > >>>>>> > > > >>>>>> Flink > > >>>>>> > > > >>>>>>>>>>>>>> and > > >>>>>> > > > >>>>>>>>>>>>>> Flink SQL. We should put the usability at a > > high > > >>>>>> > priority. > > >>>>>> > > > >>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>> Best, > > >>>>>> > > > >>>>>>>>>>>>>> Jark > > >>>>>> > > > >>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>> On Mon, 30 Mar 2020 at 23:27, Timo Walther< > > >>>>>> > > > >> twal...@apache.org> > > >>>>>> > > > >>>>>>>>> wrote: > > >>>>>> > > > >>>>>>>>>>>>>>> Hi Godfrey, > > >>>>>> > > > >>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>> maybe I wasn't expressing my biggest concern > > >>>>>> enough in > > >>>>>> > my > > >>>>>> > > > >> last > > >>>>>> > > > >>>>>> mail. > > >>>>>> > > > >>>>>>>>>>>>>>> Even in a singleline and sync execution, I > > >>>>>> think that > > >>>>>> > > > >>>> streaming > > >>>>>> > > > >>>>>>>>>>>>>>> queries > > >>>>>> > > > >>>>>>>>>>>>>>> should not block the execution. Otherwise it > > is > > >>>>>> not > > >>>>>> > > > possible > > >>>>>> > > > >>>> to > > >>>>>> > > > >>>>>> call > > >>>>>> > > > >>>>>>>>>>>>>>> collect() or print() on them afterwards. > > >>>>>> > > > >>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>> "there are too many things need to discuss > for > > >>>>>> > > multiline": > > >>>>>> > > > >>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>> True, I don't want to solve all of them > right > > >>>>>> now. But > > >>>>>> > > what > > >>>>>> > > > >> I > > >>>>>> > > > >>>>>> know > > >>>>>> > > > >>>>>>>>> is > > >>>>>> > > > >>>>>>>>>>>>>>> that our newly introduced methods should fit > > >>>>>> into a > > >>>>>> > > > >> multiline > > >>>>>> > > > >>>>>>>>>>>>>>> execution. > > >>>>>> > > > >>>>>>>>>>>>>>> There is no big difference of calling > > >>>>>> `executeSql(A), > > >>>>>> > > > >>>>>>>>>>>>>>> executeSql(B)` and > > >>>>>> > > > >>>>>>>>>>>>>>> processing a multiline file `A;\nB;`. > > >>>>>> > > > >>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>> I think the example that you mentioned can > > >>>>>> simply be > > >>>>>> > > > >> undefined > > >>>>>> > > > >>>>>> for > > >>>>>> > > > >>>>>>>>>>>>>>> now. > > >>>>>> > > > >>>>>>>>>>>>>>> Currently, no catalog is modifying data but > > just > > >>>>>> > > metadata. > > >>>>>> > > > >>>> This > > >>>>>> > > > >>>>>> is a > > >>>>>> > > > >>>>>>>>>>>>>>> separate discussion. > > >>>>>> > > > >>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>> "result of the second statement is > > >>>>>> indeterministic": > > >>>>>> > > > >>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>> Sure this is indeterministic. But this is > the > > >>>>>> > > implementers > > >>>>>> > > > >>>> fault > > >>>>>> > > > >>>>>>>>>>>>>>> and we > > >>>>>> > > > >>>>>>>>>>>>>>> cannot forbid such pipelines. > > >>>>>> > > > >>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>> How about we always execute streaming > queries > > >>>>>> async? It > > >>>>>> > > > >> would > > >>>>>> > > > >>>>>>>>> unblock > > >>>>>> > > > >>>>>>>>>>>>>>> executeSql() and multiline statements. > > >>>>>> > > > >>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>> Having a `executeSqlAsync()` is useful for > > >>>>>> batch. > > >>>>>> > > However, > > >>>>>> > > > I > > >>>>>> > > > >>>>>> don't > > >>>>>> > > > >>>>>>>>>>>>>>> want > > >>>>>> > > > >>>>>>>>>>>>>>> `sync/async` be the new batch/stream flag. > The > > >>>>>> > execution > > >>>>>> > > > >>>> behavior > > >>>>>> > > > >>>>>>>>>>>>>>> should > > >>>>>> > > > >>>>>>>>>>>>>>> come from the query itself. > > >>>>>> > > > >>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>> Regards, > > >>>>>> > > > >>>>>>>>>>>>>>> Timo > > >>>>>> > > > >>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>> On 30.03.20 11:12, godfrey he wrote: > > >>>>>> > > > >>>>>>>>>>>>>>>> Hi Timo, > > >>>>>> > > > >>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>> Agree with you that streaming queries is > our > > >>>>>> top > > >>>>>> > > priority, > > >>>>>> > > > >>>>>>>>>>>>>>>> but I think there are too many things need > to > > >>>>>> discuss > > >>>>>> > > for > > >>>>>> > > > >>>>>> multiline > > >>>>>> > > > >>>>>>>>>>>>>>>> statements: > > >>>>>> > > > >>>>>>>>>>>>>>>> e.g. > > >>>>>> > > > >>>>>>>>>>>>>>>> 1. what's the behaivor of DDL and DML > mixing > > >>>>>> for async > > >>>>>> > > > >>>>>> execution: > > >>>>>> > > > >>>>>>>>>>>>>>>> create table t1 xxx; > > >>>>>> > > > >>>>>>>>>>>>>>>> create table t2 xxx; > > >>>>>> > > > >>>>>>>>>>>>>>>> insert into t2 select * from t1 where xxx; > > >>>>>> > > > >>>>>>>>>>>>>>>> drop table t1; // t1 may be a MySQL table, > > the > > >>>>>> data > > >>>>>> > will > > >>>>>> > > > >>>> also be > > >>>>>> > > > >>>>>>>>>>>>>> deleted. > > >>>>>> > > > >>>>>>>>>>>>>>>> t1 is dropped when "insert" job is running. > > >>>>>> > > > >>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>> 2. what's the behaivor of unified scenario > > for > > >>>>>> async > > >>>>>> > > > >>>> execution: > > >>>>>> > > > >>>>>>>>>>>>>>>> (as you > > >>>>>> > > > >>>>>>>>>>>>>>>> mentioned) > > >>>>>> > > > >>>>>>>>>>>>>>>> INSERT INTO t1 SELECT * FROM s; > > >>>>>> > > > >>>>>>>>>>>>>>>> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT > > >>>>>> STREAM; > > >>>>>> > > > >>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>> The result of the second statement is > > >>>>>> indeterministic, > > >>>>>> > > > >>>> because > > >>>>>> > > > >>>>>> the > > >>>>>> > > > >>>>>>>>>>>>>> first > > >>>>>> > > > >>>>>>>>>>>>>>>> statement maybe is running. > > >>>>>> > > > >>>>>>>>>>>>>>>> I think we need to put a lot of effort to > > >>>>>> define the > > >>>>>> > > > >>>> behavior of > > >>>>>> > > > >>>>>>>>>>>>>>> logically > > >>>>>> > > > >>>>>>>>>>>>>>>> related queries. > > >>>>>> > > > >>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>> In this FLIP, I suggest we only handle > single > > >>>>>> > statement, > > >>>>>> > > > >> and > > >>>>>> > > > >>>> we > > >>>>>> > > > >>>>>>>>> also > > >>>>>> > > > >>>>>>>>>>>>>>>> introduce an async execute method > > >>>>>> > > > >>>>>>>>>>>>>>>> which is more important and more often used > > >>>>>> for users. > > >>>>>> > > > >>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>> Dor the sync methods (like > > >>>>>> > `TableEnvironment.executeSql` > > >>>>>> > > > >> and > > >>>>>> > > > >>>>>>>>>>>>>>>> `StatementSet.execute`), > > >>>>>> > > > >>>>>>>>>>>>>>>> the result will be returned until the job > is > > >>>>>> finished. > > >>>>>> > > The > > >>>>>> > > > >>>>>>>>> following > > >>>>>> > > > >>>>>>>>>>>>>>>> methods will be introduced in this FLIP: > > >>>>>> > > > >>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>> /** > > >>>>>> > > > >>>>>>>>>>>>>>>> * Asynchronously execute the given > > >>>>>> single > > >>>>>> > > > statement > > >>>>>> > > > >>>>>>>>>>>>>>>> */ > > >>>>>> > > > >>>>>>>>>>>>>>>> TableEnvironment.executeSqlAsync(String > > >>>>>> statement): > > >>>>>> > > > >>>> TableResult > > >>>>>> > > > >>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>> /** > > >>>>>> > > > >>>>>>>>>>>>>>>> * Asynchronously execute the dml > > >>>>>> statements as > > >>>>>> > a > > >>>>>> > > > >> batch > > >>>>>> > > > >>>>>>>>>>>>>>>> */ > > >>>>>> > > > >>>>>>>>>>>>>>>> StatementSet.executeAsync(): TableResult > > >>>>>> > > > >>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>> public interface TableResult { > > >>>>>> > > > >>>>>>>>>>>>>>>> /** > > >>>>>> > > > >>>>>>>>>>>>>>>> * return JobClient for DQL and > DML > > >>>>>> in async > > >>>>>> > > > mode, > > >>>>>> > > > >>>> else > > >>>>>> > > > >>>>>>>>> return > > >>>>>> > > > >>>>>>>>>>>>>>>> Optional.empty > > >>>>>> > > > >>>>>>>>>>>>>>>> */ > > >>>>>> > > > >>>>>>>>>>>>>>>> Optional<JobClient> > getJobClient(); > > >>>>>> > > > >>>>>>>>>>>>>>>> } > > >>>>>> > > > >>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>> what do you think? > > >>>>>> > > > >>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>> Best, > > >>>>>> > > > >>>>>>>>>>>>>>>> Godfrey > > >>>>>> > > > >>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>> Timo Walther<twal...@apache.org> > > >>>>>> 于2020年3月26日周四 > > >>>>>> > > 下午9:15写道: > > >>>>>> > > > >>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>>> Hi Godfrey, > > >>>>>> > > > >>>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>>> executing streaming queries must be our > top > > >>>>>> priority > > >>>>>> > > > >> because > > >>>>>> > > > >>>>>> this > > >>>>>> > > > >>>>>>>>> is > > >>>>>> > > > >>>>>>>>>>>>>>>>> what distinguishes Flink from competitors. > > If > > >>>>>> we > > >>>>>> > change > > >>>>>> > > > >> the > > >>>>>> > > > >>>>>>>>>>>>>>>>> execution > > >>>>>> > > > >>>>>>>>>>>>>>>>> behavior, we should think about the other > > >>>>>> cases as > > >>>>>> > well > > >>>>>> > > > to > > >>>>>> > > > >>>> not > > >>>>>> > > > >>>>>>>>> break > > >>>>>> > > > >>>>>>>>>>>>>> the > > >>>>>> > > > >>>>>>>>>>>>>>>>> API a third time. > > >>>>>> > > > >>>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>>> I fear that just having an async execute > > >>>>>> method will > > >>>>>> > > not > > >>>>>> > > > >> be > > >>>>>> > > > >>>>>> enough > > >>>>>> > > > >>>>>>>>>>>>>>>>> because users should be able to mix > > streaming > > >>>>>> and > > >>>>>> > batch > > >>>>>> > > > >>>> queries > > >>>>>> > > > >>>>>>>>> in a > > >>>>>> > > > >>>>>>>>>>>>>>>>> unified scenario. > > >>>>>> > > > >>>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>>> If I remember it correctly, we had some > > >>>>>> discussions > > >>>>>> > in > > >>>>>> > > > the > > >>>>>> > > > >>>> past > > >>>>>> > > > >>>>>>>>>>>>>>>>> about > > >>>>>> > > > >>>>>>>>>>>>>>>>> what decides about the execution mode of a > > >>>>>> query. > > >>>>>> > > > >>>> Currently, we > > >>>>>> > > > >>>>>>>>>>>>>>>>> would > > >>>>>> > > > >>>>>>>>>>>>>>>>> like to let the query decide, not derive > it > > >>>>>> from the > > >>>>>> > > > >>>> sources. > > >>>>>> > > > >>>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>>> So I could image a multiline pipeline as: > > >>>>>> > > > >>>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>>> USE CATALOG 'mycat'; > > >>>>>> > > > >>>>>>>>>>>>>>>>> INSERT INTO t1 SELECT * FROM s; > > >>>>>> > > > >>>>>>>>>>>>>>>>> INSERT INTO t2 SELECT * FROM s JOIN t1 > EMIT > > >>>>>> STREAM; > > >>>>>> > > > >>>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>>> For executeMultilineSql(): > > >>>>>> > > > >>>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>>> sync because regular SQL > > >>>>>> > > > >>>>>>>>>>>>>>>>> sync because regular Batch SQL > > >>>>>> > > > >>>>>>>>>>>>>>>>> async because Streaming SQL > > >>>>>> > > > >>>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>>> For executeAsyncMultilineSql(): > > >>>>>> > > > >>>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>>> async because everything should be async > > >>>>>> > > > >>>>>>>>>>>>>>>>> async because everything should be async > > >>>>>> > > > >>>>>>>>>>>>>>>>> async because everything should be async > > >>>>>> > > > >>>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>>> What we should not start for > > >>>>>> > > executeAsyncMultilineSql(): > > >>>>>> > > > >>>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>>> sync because DDL > > >>>>>> > > > >>>>>>>>>>>>>>>>> async because everything should be async > > >>>>>> > > > >>>>>>>>>>>>>>>>> async because everything should be async > > >>>>>> > > > >>>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>>> What are you thoughts here? > > >>>>>> > > > >>>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>>> Regards, > > >>>>>> > > > >>>>>>>>>>>>>>>>> Timo > > >>>>>> > > > >>>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>>> On 26.03.20 12:50, godfrey he wrote: > > >>>>>> > > > >>>>>>>>>>>>>>>>>> Hi Timo, > > >>>>>> > > > >>>>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>>>> I agree with you that streaming queries > > >>>>>> mostly need > > >>>>>> > > > async > > >>>>>> > > > >>>>>>>>>>>>>>>>>> execution. > > >>>>>> > > > >>>>>>>>>>>>>>>>>> In fact, our original plan is only > > >>>>>> introducing sync > > >>>>>> > > > >>>> methods in > > >>>>>> > > > >>>>>>>>> this > > >>>>>> > > > >>>>>>>>>>>>>>> FLIP, > > >>>>>> > > > >>>>>>>>>>>>>>>>>> and async methods (like > "executeSqlAsync") > > >>>>>> will be > > >>>>>> > > > >>>> introduced > > >>>>>> > > > >>>>>> in > > >>>>>> > > > >>>>>>>>>>>>>>>>>> the > > >>>>>> > > > >>>>>>>>>>>>>>>>> future > > >>>>>> > > > >>>>>>>>>>>>>>>>>> which is mentioned in the appendix. > > >>>>>> > > > >>>>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>>>> Maybe the async methods also need to be > > >>>>>> considered > > >>>>>> > in > > >>>>>> > > > >> this > > >>>>>> > > > >>>>>> FLIP. > > >>>>>> > > > >>>>>>>>>>>>>>>>>> > > >>>>>> > > > >>>>>>>>>>>>>>>>>> I think sync methods is also useful for > > >>>>>> streaming > > >>>>>> > > which > > >>>>>> > > >>>>> > > > -- Best, Jingsong Lee