Hi Timo, Regarding to "`execute` method throws checked exception", is that mean we should convert the checked exception to unchecked exception or we need add ERROR type in ResultKind.
for the second approach, I still think it's not convenient for the user to check exception when calling `collect` method and `print` method. the code looks like: // add `getError()` method in TableResult and store the exception in TableResult independent TableResult result = tEnv.executeSql("select xxx"); if (result.getResultKind() == ResultKind.ERROR) { print result.getError(); } else { Iterator<Row> it = result.collect(); it... } // treat the exception as a kind of result, and get exception through `collect` method TableResult result = tEnv.executeSql("select xxx"); if (result.getResultKind() == ResultKind.ERROR) { Iterator<Row> it = result.collect(); Row row = it.next(); print row.getField(0); } else { Iterator<Row> it = result.collect(); it... } // for fluent programming Iterator<Row> it = tEnv.executeSql("select xxx").collect(); it... Best, Godfrey Timo Walther <twal...@apache.org> 于2020年4月1日周三 上午1:27写道: > Hi Godfrey, > > Aljoscha, Dawid, Klou, and I had another discussion around FLIP-84. In > particular, we discussed how the current status of the FLIP and the > future requirements around multiline statements, async/sync, collect() > fit together. > > We also updated the FLIP-84 Feedback Summary document [1] with some use > cases. > > We believe that we found a good solution that also fits to what is in > the current FLIP. So no bigger changes necessary, which is great! > > Our findings were: > > 1. Async vs sync submission of Flink jobs: > > Having a blocking `execute()` in DataStream API was rather a mistake. > Instead all submissions should be async because this allows supporting > both modes if necessary. Thus, submitting all queries async sounds good > to us. If users want to run a job sync, they can use the JobClient and > wait for completion (or collect() in case of batch jobs). > > 2. Multi-statement execution: > > For the multi-statement execution, we don't see a contradication with > the async execution behavior. We imagine a method like: > > TableEnvironment#executeMultilineSql(String statements): > Iterable<TableResult> > > Where the `Iterator#next()` method would trigger the next statement > submission. This allows a caller to decide synchronously when to submit > statements async to the cluster. Thus, a service such as the SQL Client > can handle the result of each statement individually and process > statement by statement sequentially. > > 3. The role of TableResult and result retrieval in general > > `TableResult` is similar to `JobClient`. Instead of returning a > `CompletableFuture` of something, it is a concrete util class where some > methods have the behavior of completable future (e.g. collect(), > print()) and some are already completed (getTableSchema(), > getResultKind()). > > `StatementSet#execute()` returns a single `TableResult` because the > order is undefined in a set and all statements have the same schema. Its > `collect()` will return a row for each executed `INSERT INTO` in the > order of statement definition. > > For simple `SELECT * FROM ...`, the query execution might block until > `collect()` is called to pull buffered rows from the job (from > socket/REST API what ever we will use in the future). We can say that a > statement finished successfully, when the `collect#Iterator#hasNext` has > returned false. > > I hope this summarizes our discussion @Dawid/Aljoscha/Klou? > > It would be great if we can add these findings to the FLIP before we > start voting. > > One minor thing: some `execute()` methods still throw a checked > exception; can we remove that from the FLIP? Also the above mentioned > `Iterator#next()` would trigger an execution without throwing a checked > exception. > > Thanks, > Timo > > [1] > > https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit# > > On 31.03.20 06:28, godfrey he wrote: > > Hi, Timo & Jark > > > > Thanks for your explanation. > > Agree with you that async execution should always be async, > > and sync execution scenario can be covered by async execution. > > It helps provide an unified entry point for batch and streaming. > > I think we can also use sync execution for some testing. > > So, I agree with you that we provide `executeSql` method and it's async > > method. > > If we want sync method in the future, we can add method named > > `executeSqlSync`. > > > > I think we've reached an agreement. I will update the document, and start > > voting process. > > > > Best, > > Godfrey > > > > > > Jark Wu <imj...@gmail.com> 于2020年3月31日周二 上午12:46写道: > > > >> Hi, > >> > >> I didn't follow the full discussion. > >> But I share the same concern with Timo that streaming queries should > always > >> be async. > >> Otherwise, I can image it will cause a lot of confusion and problems if > >> users don't deeply keep the "sync" in mind (e.g. client hangs). > >> Besides, the streaming mode is still the majority use cases of Flink and > >> Flink SQL. We should put the usability at a high priority. > >> > >> Best, > >> Jark > >> > >> > >> On Mon, 30 Mar 2020 at 23:27, Timo Walther <twal...@apache.org> wrote: > >> > >>> Hi Godfrey, > >>> > >>> maybe I wasn't expressing my biggest concern enough in my last mail. > >>> Even in a singleline and sync execution, I think that streaming queries > >>> should not block the execution. Otherwise it is not possible to call > >>> collect() or print() on them afterwards. > >>> > >>> "there are too many things need to discuss for multiline": > >>> > >>> True, I don't want to solve all of them right now. But what I know is > >>> that our newly introduced methods should fit into a multiline > execution. > >>> There is no big difference of calling `executeSql(A), executeSql(B)` > and > >>> processing a multiline file `A;\nB;`. > >>> > >>> I think the example that you mentioned can simply be undefined for now. > >>> Currently, no catalog is modifying data but just metadata. This is a > >>> separate discussion. > >>> > >>> "result of the second statement is indeterministic": > >>> > >>> Sure this is indeterministic. But this is the implementers fault and we > >>> cannot forbid such pipelines. > >>> > >>> How about we always execute streaming queries async? It would unblock > >>> executeSql() and multiline statements. > >>> > >>> Having a `executeSqlAsync()` is useful for batch. However, I don't want > >>> `sync/async` be the new batch/stream flag. The execution behavior > should > >>> come from the query itself. > >>> > >>> Regards, > >>> Timo > >>> > >>> > >>> On 30.03.20 11:12, godfrey he wrote: > >>>> Hi Timo, > >>>> > >>>> Agree with you that streaming queries is our top priority, > >>>> but I think there are too many things need to discuss for multiline > >>>> statements: > >>>> e.g. > >>>> 1. what's the behaivor of DDL and DML mixing for async execution: > >>>> create table t1 xxx; > >>>> create table t2 xxx; > >>>> insert into t2 select * from t1 where xxx; > >>>> drop table t1; // t1 may be a MySQL table, the data will also be > >> deleted. > >>>> > >>>> t1 is dropped when "insert" job is running. > >>>> > >>>> 2. what's the behaivor of unified scenario for async execution: (as > you > >>>> mentioned) > >>>> INSERT INTO t1 SELECT * FROM s; > >>>> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM; > >>>> > >>>> The result of the second statement is indeterministic, because the > >> first > >>>> statement maybe is running. > >>>> I think we need to put a lot of effort to define the behavior of > >>> logically > >>>> related queries. > >>>> > >>>> In this FLIP, I suggest we only handle single statement, and we also > >>>> introduce an async execute method > >>>> which is more important and more often used for users. > >>>> > >>>> Dor the sync methods (like `TableEnvironment.executeSql` and > >>>> `StatementSet.execute`), > >>>> the result will be returned until the job is finished. The following > >>>> methods will be introduced in this FLIP: > >>>> > >>>> /** > >>>> * Asynchronously execute the given single statement > >>>> */ > >>>> TableEnvironment.executeSqlAsync(String statement): TableResult > >>>> > >>>> /** > >>>> * Asynchronously execute the dml statements as a batch > >>>> */ > >>>> StatementSet.executeAsync(): TableResult > >>>> > >>>> public interface TableResult { > >>>> /** > >>>> * return JobClient for DQL and DML in async mode, else return > >>>> Optional.empty > >>>> */ > >>>> Optional<JobClient> getJobClient(); > >>>> } > >>>> > >>>> what do you think? > >>>> > >>>> Best, > >>>> Godfrey > >>>> > >>>> Timo Walther <twal...@apache.org> 于2020年3月26日周四 下午9:15写道: > >>>> > >>>>> Hi Godfrey, > >>>>> > >>>>> executing streaming queries must be our top priority because this is > >>>>> what distinguishes Flink from competitors. If we change the execution > >>>>> behavior, we should think about the other cases as well to not break > >> the > >>>>> API a third time. > >>>>> > >>>>> I fear that just having an async execute method will not be enough > >>>>> because users should be able to mix streaming and batch queries in a > >>>>> unified scenario. > >>>>> > >>>>> If I remember it correctly, we had some discussions in the past about > >>>>> what decides about the execution mode of a query. Currently, we would > >>>>> like to let the query decide, not derive it from the sources. > >>>>> > >>>>> So I could image a multiline pipeline as: > >>>>> > >>>>> USE CATALOG 'mycat'; > >>>>> INSERT INTO t1 SELECT * FROM s; > >>>>> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM; > >>>>> > >>>>> For executeMultilineSql(): > >>>>> > >>>>> sync because regular SQL > >>>>> sync because regular Batch SQL > >>>>> async because Streaming SQL > >>>>> > >>>>> For executeAsyncMultilineSql(): > >>>>> > >>>>> async because everything should be async > >>>>> async because everything should be async > >>>>> async because everything should be async > >>>>> > >>>>> What we should not start for executeAsyncMultilineSql(): > >>>>> > >>>>> sync because DDL > >>>>> async because everything should be async > >>>>> async because everything should be async > >>>>> > >>>>> What are you thoughts here? > >>>>> > >>>>> Regards, > >>>>> Timo > >>>>> > >>>>> > >>>>> On 26.03.20 12:50, godfrey he wrote: > >>>>>> Hi Timo, > >>>>>> > >>>>>> I agree with you that streaming queries mostly need async execution. > >>>>>> In fact, our original plan is only introducing sync methods in this > >>> FLIP, > >>>>>> and async methods (like "executeSqlAsync") will be introduced in the > >>>>> future > >>>>>> which is mentioned in the appendix. > >>>>>> > >>>>>> Maybe the async methods also need to be considered in this FLIP. > >>>>>> > >>>>>> I think sync methods is also useful for streaming which can be used > >> to > >>>>> run > >>>>>> bounded source. > >>>>>> Maybe we should check whether all sources are bounded in sync > >> execution > >>>>>> mode. > >>>>>> > >>>>>>> Also, if we block for streaming queries, we could never support > >>>>>>> multiline files. Because the first INSERT INTO would block the > >> further > >>>>>>> execution. > >>>>>> agree with you, we need async method to submit multiline files, > >>>>>> and files should be limited that the DQL and DML should be always in > >>> the > >>>>>> end for streaming. > >>>>>> > >>>>>> Best, > >>>>>> Godfrey > >>>>>> > >>>>>> Timo Walther <twal...@apache.org> 于2020年3月26日周四 下午4:29写道: > >>>>>> > >>>>>>> Hi Godfrey, > >>>>>>> > >>>>>>> having control over the job after submission is a requirement that > >> was > >>>>>>> requested frequently (some examples are [1], [2]). Users would like > >> to > >>>>>>> get insights about the running or completed job. Including the > >> jobId, > >>>>>>> jobGraph etc., the JobClient summarizes these properties. > >>>>>>> > >>>>>>> It is good to have a discussion about synchronous/asynchronous > >>>>>>> submission now to have a complete execution picture. > >>>>>>> > >>>>>>> I thought we submit streaming queries mostly async and just wait > for > >>> the > >>>>>>> successful submission. If we block for streaming queries, how can > we > >>>>>>> collect() or print() results? > >>>>>>> > >>>>>>> Also, if we block for streaming queries, we could never support > >>>>>>> multiline files. Because the first INSERT INTO would block the > >> further > >>>>>>> execution. > >>>>>>> > >>>>>>> If we decide to block entirely on streaming queries, we need the > >> async > >>>>>>> execution methods in the design already. However, I would rather go > >>> for > >>>>>>> non-blocking streaming queries. Also with the `EMIT STREAM` key > word > >>> in > >>>>>>> mind that we might add to SQL statements soon. > >>>>>>> > >>>>>>> Regards, > >>>>>>> Timo > >>>>>>> > >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-16761 > >>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-12214 > >>>>>>> > >>>>>>> On 25.03.20 16:30, godfrey he wrote: > >>>>>>>> Hi Timo, > >>>>>>>> > >>>>>>>> Thanks for the updating. > >>>>>>>> > >>>>>>>> Regarding to "multiline statement support", I'm also fine that > >>>>>>>> `TableEnvironment.executeSql()` only supports single line > >> statement, > >>>>> and > >>>>>>> we > >>>>>>>> can support multiline statement later (needs more discussion about > >>>>> this). > >>>>>>>> > >>>>>>>> Regarding to "StatementSet.explian()", I don't have strong > opinions > >>>>> about > >>>>>>>> that. > >>>>>>>> > >>>>>>>> Regarding to "TableResult.getJobClient()", I think it's > >> unnecessary. > >>>>> The > >>>>>>>> reason is: first, many statements (e.g. DDL, show xx, use xx) > will > >>> not > >>>>>>>> submit a Flink job. second, `TableEnvironment.executeSql()` and > >>>>>>>> `StatementSet.execute()` are synchronous method, `TableResult` > will > >>> be > >>>>>>>> returned only after the job is finished or failed. > >>>>>>>> > >>>>>>>> Regarding to "whether StatementSet.execute() needs to throw > >>>>> exception", I > >>>>>>>> think we should choose a unified way to tell whether the execution > >> is > >>>>>>>> successful. If `TableResult` contains ERROR kind (non-runtime > >>>>> exception), > >>>>>>>> users need to not only check the result but also catch the runtime > >>>>>>>> exception in their code. or `StatementSet.execute()` does not > throw > >>> any > >>>>>>>> exception (including runtime exception), all exception messages > are > >>> in > >>>>>>> the > >>>>>>>> result. I prefer "StatementSet.execute() needs to throw > >> exception". > >>> cc > >>>>>>> @Jark > >>>>>>>> Wu <imj...@gmail.com> > >>>>>>>> > >>>>>>>> I will update the agreed parts to the document first. > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Godfrey > >>>>>>>> > >>>>>>>> > >>>>>>>> Timo Walther <twal...@apache.org> 于2020年3月25日周三 下午6:51写道: > >>>>>>>> > >>>>>>>>> Hi Godfrey, > >>>>>>>>> > >>>>>>>>> thanks for starting the discussion on the mailing list. And sorry > >>>>> again > >>>>>>>>> for the late reply to FLIP-84. I have updated the Google doc one > >>> more > >>>>>>>>> time to incorporate the offline discussions. > >>>>>>>>> > >>>>>>>>> From Dawid's and my view, it is fine to postpone the > multiline > >>>>> support > >>>>>>>>> to a separate method. This can be future work even though we will > >>> need > >>>>>>>>> it rather soon. > >>>>>>>>> > >>>>>>>>> If there are no objections, I suggest to update the FLIP-84 again > >>> and > >>>>>>>>> have another voting process. > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> Timo > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On 25.03.20 11:17, godfrey he wrote: > >>>>>>>>>> Hi community, > >>>>>>>>>> Timo, Fabian and Dawid have some feedbacks about FLIP-84[1]. The > >>>>>>>>> feedbacks > >>>>>>>>>> are all about new introduced methods. We had a discussion > >>> yesterday, > >>>>>>> and > >>>>>>>>>> most of feedbacks have been agreed upon. Here is the > conclusions: > >>>>>>>>>> > >>>>>>>>>> *1. about proposed methods in `TableEnvironment`:* > >>>>>>>>>> > >>>>>>>>>> the original proposed methods: > >>>>>>>>>> > >>>>>>>>>> TableEnvironment.createDmlBatch(): DmlBatch > >>>>>>>>>> TableEnvironment.executeStatement(String statement): ResultTable > >>>>>>>>>> > >>>>>>>>>> the new proposed methods: > >>>>>>>>>> > >>>>>>>>>> // we should not use abbreviations in the API, and the term > >> "Batch" > >>>>> is > >>>>>>>>>> easily confused with batch/streaming processing > >>>>>>>>>> TableEnvironment.createStatementSet(): StatementSet > >>>>>>>>>> > >>>>>>>>>> // every method that takes SQL should have `Sql` in its name > >>>>>>>>>> // supports multiline statement ??? > >>>>>>>>>> TableEnvironment.executeSql(String statement): TableResult > >>>>>>>>>> > >>>>>>>>>> // new methods. supports explaining DQL and DML > >>>>>>>>>> TableEnvironment.explainSql(String statement, ExplainDetail... > >>>>>>> details): > >>>>>>>>>> String > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> *2. about proposed related classes:* > >>>>>>>>>> > >>>>>>>>>> the original proposed classes: > >>>>>>>>>> > >>>>>>>>>> interface DmlBatch { > >>>>>>>>>> void addInsert(String insert); > >>>>>>>>>> void addInsert(String targetPath, Table table); > >>>>>>>>>> ResultTable execute() throws Exception ; > >>>>>>>>>> String explain(boolean extended); > >>>>>>>>>> } > >>>>>>>>>> > >>>>>>>>>> public interface ResultTable { > >>>>>>>>>> TableSchema getResultSchema(); > >>>>>>>>>> Iterable<Row> getResultRows(); > >>>>>>>>>> } > >>>>>>>>>> > >>>>>>>>>> the new proposed classes: > >>>>>>>>>> > >>>>>>>>>> interface StatementSet { > >>>>>>>>>> // every method that takes SQL should have `Sql` in its > >>> name > >>>>>>>>>> // return StatementSet instance for fluent programming > >>>>>>>>>> addInsertSql(String statement): StatementSet > >>>>>>>>>> > >>>>>>>>>> // return StatementSet instance for fluent programming > >>>>>>>>>> addInsert(String tablePath, Table table): StatementSet > >>>>>>>>>> > >>>>>>>>>> // new method. support overwrite mode > >>>>>>>>>> addInsert(String tablePath, Table table, boolean > >>> overwrite): > >>>>>>>>>> StatementSet > >>>>>>>>>> > >>>>>>>>>> explain(): String > >>>>>>>>>> > >>>>>>>>>> // new method. supports adding more details for the > >> result > >>>>>>>>>> explain(ExplainDetail... extraDetails): String > >>>>>>>>>> > >>>>>>>>>> // throw exception ??? > >>>>>>>>>> execute(): TableResult > >>>>>>>>>> } > >>>>>>>>>> > >>>>>>>>>> interface TableResult { > >>>>>>>>>> getTableSchema(): TableSchema > >>>>>>>>>> > >>>>>>>>>> // avoid custom parsing of an "OK" row in programming > >>>>>>>>>> getResultKind(): ResultKind > >>>>>>>>>> > >>>>>>>>>> // instead of `get` make it explicit that this is might > >> be > >>>>>>>>> triggering > >>>>>>>>>> an expensive operation > >>>>>>>>>> collect(): Iterable<Row> > >>>>>>>>>> > >>>>>>>>>> // for fluent programming > >>>>>>>>>> print(): Unit > >>>>>>>>>> } > >>>>>>>>>> > >>>>>>>>>> enum ResultKind { > >>>>>>>>>> SUCCESS, // for DDL, DCL and statements with a simple > >> "OK" > >>>>>>>>>> SUCCESS_WITH_CONTENT, // rows with important content > are > >>>>>>> available > >>>>>>>>>> (DML, DQL) > >>>>>>>>>> } > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> *3. new proposed methods in `Table`* > >>>>>>>>>> > >>>>>>>>>> `Table.insertInto()` will be deprecated, and the following > >> methods > >>>>> are > >>>>>>>>>> introduced: > >>>>>>>>>> > >>>>>>>>>> Table.executeInsert(String tablePath): TableResult > >>>>>>>>>> Table.executeInsert(String tablePath, boolean overwrite): > >>> TableResult > >>>>>>>>>> Table.explain(ExplainDetail... details): String > >>>>>>>>>> Table.execute(): TableResult > >>>>>>>>>> > >>>>>>>>>> There are two issues need further discussion, one is whether > >>>>>>>>>> `TableEnvironment.executeSql(String statement): TableResult` > >> needs > >>> to > >>>>>>>>>> support multiline statement (or whether `TableEnvironment` needs > >> to > >>>>>>>>> support > >>>>>>>>>> multiline statement), and another one is whether > >>>>>>> `StatementSet.execute()` > >>>>>>>>>> needs to throw exception. > >>>>>>>>>> > >>>>>>>>>> please refer to the feedback document [2] for the details. > >>>>>>>>>> > >>>>>>>>>> Any suggestions are warmly welcomed! > >>>>>>>>>> > >>>>>>>>>> [1] > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >>> > >> > https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878 > >>>>>>>>>> [2] > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >>> > >> > https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Godfrey > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>> > >>> > >>> > >> > > > >