Hi Timo,

Regarding to "`execute` method throws checked exception",
 is that mean we should convert the checked exception to unchecked
exception
or we need add ERROR type in ResultKind.

for the second approach, I still think it's not convenient
for the user to check exception when calling `collect` method and `print`
method.
the code looks like:

// add `getError()` method in TableResult and store the exception
in TableResult independent
TableResult result = tEnv.executeSql("select xxx");
if (result.getResultKind() == ResultKind.ERROR) {
  print result.getError();
} else {
  Iterator<Row> it =  result.collect();
  it...
}

 // treat the exception as a kind of result, and get exception through
`collect` method
TableResult result = tEnv.executeSql("select xxx");
if (result.getResultKind() == ResultKind.ERROR) {
   Iterator<Row> it =  result.collect();
   Row row = it.next();
   print row.getField(0);
} else {
  Iterator<Row> it =  result.collect();
  it...
}

// for fluent programming
Iterator<Row> it = tEnv.executeSql("select xxx").collect();
it...

Best,
Godfrey

Timo Walther <twal...@apache.org> 于2020年4月1日周三 上午1:27写道:

> Hi Godfrey,
>
> Aljoscha, Dawid, Klou, and I had another discussion around FLIP-84. In
> particular, we discussed how the current status of the FLIP and the
> future requirements around multiline statements, async/sync, collect()
> fit together.
>
> We also updated the FLIP-84 Feedback Summary document [1] with some use
> cases.
>
> We believe that we found a good solution that also fits to what is in
> the current FLIP. So no bigger changes necessary, which is great!
>
> Our findings were:
>
> 1. Async vs sync submission of Flink jobs:
>
> Having a blocking `execute()` in DataStream API was rather a mistake.
> Instead all submissions should be async because this allows supporting
> both modes if necessary. Thus, submitting all queries async sounds good
> to us. If users want to run a job sync, they can use the JobClient and
> wait for completion (or collect() in case of batch jobs).
>
> 2. Multi-statement execution:
>
> For the multi-statement execution, we don't see a contradication with
> the async execution behavior. We imagine a method like:
>
> TableEnvironment#executeMultilineSql(String statements):
> Iterable<TableResult>
>
> Where the `Iterator#next()` method would trigger the next statement
> submission. This allows a caller to decide synchronously when to submit
> statements async to the cluster. Thus, a service such as the SQL Client
> can handle the result of each statement individually and process
> statement by statement sequentially.
>
> 3. The role of TableResult and result retrieval in general
>
> `TableResult` is similar to `JobClient`. Instead of returning a
> `CompletableFuture` of something, it is a concrete util class where some
> methods have the behavior of completable future (e.g. collect(),
> print()) and some are already completed (getTableSchema(),
> getResultKind()).
>
> `StatementSet#execute()` returns a single `TableResult` because the
> order is undefined in a set and all statements have the same schema. Its
> `collect()` will return a row for each executed `INSERT INTO` in the
> order of statement definition.
>
> For simple `SELECT * FROM ...`, the query execution might block until
> `collect()` is called to pull buffered rows from the job (from
> socket/REST API what ever we will use in the future). We can say that a
> statement finished successfully, when the `collect#Iterator#hasNext` has
> returned false.
>
> I hope this summarizes our discussion @Dawid/Aljoscha/Klou?
>
> It would be great if we can add these findings to the FLIP before we
> start voting.
>
> One minor thing: some `execute()` methods still throw a checked
> exception; can we remove that from the FLIP? Also the above mentioned
> `Iterator#next()` would trigger an execution without throwing a checked
> exception.
>
> Thanks,
> Timo
>
> [1]
>
> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit#
>
> On 31.03.20 06:28, godfrey he wrote:
> > Hi, Timo & Jark
> >
> > Thanks for your explanation.
> > Agree with you that async execution should always be async,
> > and sync execution scenario can be covered  by async execution.
> > It helps provide an unified entry point for batch and streaming.
> > I think we can also use sync execution for some testing.
> > So, I agree with you that we provide `executeSql` method and it's async
> > method.
> > If we want sync method in the future, we can add method named
> > `executeSqlSync`.
> >
> > I think we've reached an agreement. I will update the document, and start
> > voting process.
> >
> > Best,
> > Godfrey
> >
> >
> > Jark Wu <imj...@gmail.com> 于2020年3月31日周二 上午12:46写道:
> >
> >> Hi,
> >>
> >> I didn't follow the full discussion.
> >> But I share the same concern with Timo that streaming queries should
> always
> >> be async.
> >> Otherwise, I can image it will cause a lot of confusion and problems if
> >> users don't deeply keep the "sync" in mind (e.g. client hangs).
> >> Besides, the streaming mode is still the majority use cases of Flink and
> >> Flink SQL. We should put the usability at a high priority.
> >>
> >> Best,
> >> Jark
> >>
> >>
> >> On Mon, 30 Mar 2020 at 23:27, Timo Walther <twal...@apache.org> wrote:
> >>
> >>> Hi Godfrey,
> >>>
> >>> maybe I wasn't expressing my biggest concern enough in my last mail.
> >>> Even in a singleline and sync execution, I think that streaming queries
> >>> should not block the execution. Otherwise it is not possible to call
> >>> collect() or print() on them afterwards.
> >>>
> >>> "there are too many things need to discuss for multiline":
> >>>
> >>> True, I don't want to solve all of them right now. But what I know is
> >>> that our newly introduced methods should fit into a multiline
> execution.
> >>> There is no big difference of calling `executeSql(A), executeSql(B)`
> and
> >>> processing a multiline file `A;\nB;`.
> >>>
> >>> I think the example that you mentioned can simply be undefined for now.
> >>> Currently, no catalog is modifying data but just metadata. This is a
> >>> separate discussion.
> >>>
> >>> "result of the second statement is indeterministic":
> >>>
> >>> Sure this is indeterministic. But this is the implementers fault and we
> >>> cannot forbid such pipelines.
> >>>
> >>> How about we always execute streaming queries async? It would unblock
> >>> executeSql() and multiline statements.
> >>>
> >>> Having a `executeSqlAsync()` is useful for batch. However, I don't want
> >>> `sync/async` be the new batch/stream flag. The execution behavior
> should
> >>> come from the query itself.
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>>
> >>> On 30.03.20 11:12, godfrey he wrote:
> >>>> Hi Timo,
> >>>>
> >>>> Agree with you that streaming queries is our top priority,
> >>>> but I think there are too many things need to discuss for multiline
> >>>> statements:
> >>>> e.g.
> >>>> 1. what's the behaivor of DDL and DML mixing for async execution:
> >>>> create table t1 xxx;
> >>>> create table t2 xxx;
> >>>> insert into t2 select * from t1 where xxx;
> >>>> drop table t1; // t1 may be a MySQL table, the data will also be
> >> deleted.
> >>>>
> >>>> t1 is dropped when "insert" job is running.
> >>>>
> >>>> 2. what's the behaivor of unified scenario for async execution: (as
> you
> >>>> mentioned)
> >>>> INSERT INTO t1 SELECT * FROM s;
> >>>> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
> >>>>
> >>>> The result of the second statement is indeterministic, because the
> >> first
> >>>> statement maybe is running.
> >>>> I think we need to put a lot of effort to define the behavior of
> >>> logically
> >>>> related queries.
> >>>>
> >>>> In this FLIP, I suggest we only handle single statement, and we also
> >>>> introduce an async execute method
> >>>> which is more important and more often used for users.
> >>>>
> >>>> Dor the sync methods (like `TableEnvironment.executeSql` and
> >>>> `StatementSet.execute`),
> >>>> the result will be returned until the job is finished. The following
> >>>> methods will be introduced in this FLIP:
> >>>>
> >>>>    /**
> >>>>     * Asynchronously execute the given single statement
> >>>>     */
> >>>> TableEnvironment.executeSqlAsync(String statement): TableResult
> >>>>
> >>>> /**
> >>>>    * Asynchronously execute the dml statements as a batch
> >>>>    */
> >>>> StatementSet.executeAsync(): TableResult
> >>>>
> >>>> public interface TableResult {
> >>>>      /**
> >>>>       * return JobClient for DQL and DML in async mode, else return
> >>>> Optional.empty
> >>>>       */
> >>>>      Optional<JobClient> getJobClient();
> >>>> }
> >>>>
> >>>> what do you think?
> >>>>
> >>>> Best,
> >>>> Godfrey
> >>>>
> >>>> Timo Walther <twal...@apache.org> 于2020年3月26日周四 下午9:15写道:
> >>>>
> >>>>> Hi Godfrey,
> >>>>>
> >>>>> executing streaming queries must be our top priority because this is
> >>>>> what distinguishes Flink from competitors. If we change the execution
> >>>>> behavior, we should think about the other cases as well to not break
> >> the
> >>>>> API a third time.
> >>>>>
> >>>>> I fear that just having an async execute method will not be enough
> >>>>> because users should be able to mix streaming and batch queries in a
> >>>>> unified scenario.
> >>>>>
> >>>>> If I remember it correctly, we had some discussions in the past about
> >>>>> what decides about the execution mode of a query. Currently, we would
> >>>>> like to let the query decide, not derive it from the sources.
> >>>>>
> >>>>> So I could image a multiline pipeline as:
> >>>>>
> >>>>> USE CATALOG 'mycat';
> >>>>> INSERT INTO t1 SELECT * FROM s;
> >>>>> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
> >>>>>
> >>>>> For executeMultilineSql():
> >>>>>
> >>>>> sync because regular SQL
> >>>>> sync because regular Batch SQL
> >>>>> async because Streaming SQL
> >>>>>
> >>>>> For executeAsyncMultilineSql():
> >>>>>
> >>>>> async because everything should be async
> >>>>> async because everything should be async
> >>>>> async because everything should be async
> >>>>>
> >>>>> What we should not start for executeAsyncMultilineSql():
> >>>>>
> >>>>> sync because DDL
> >>>>> async because everything should be async
> >>>>> async because everything should be async
> >>>>>
> >>>>> What are you thoughts here?
> >>>>>
> >>>>> Regards,
> >>>>> Timo
> >>>>>
> >>>>>
> >>>>> On 26.03.20 12:50, godfrey he wrote:
> >>>>>> Hi Timo,
> >>>>>>
> >>>>>> I agree with you that streaming queries mostly need async execution.
> >>>>>> In fact, our original plan is only introducing sync methods in this
> >>> FLIP,
> >>>>>> and async methods (like "executeSqlAsync") will be introduced in the
> >>>>> future
> >>>>>> which is mentioned in the appendix.
> >>>>>>
> >>>>>> Maybe the async methods also need to be considered in this FLIP.
> >>>>>>
> >>>>>> I think sync methods is also useful for streaming which can be used
> >> to
> >>>>> run
> >>>>>> bounded source.
> >>>>>> Maybe we should check whether all sources are bounded in sync
> >> execution
> >>>>>> mode.
> >>>>>>
> >>>>>>> Also, if we block for streaming queries, we could never support
> >>>>>>> multiline files. Because the first INSERT INTO would block the
> >> further
> >>>>>>> execution.
> >>>>>> agree with you, we need async method to submit multiline files,
> >>>>>> and files should be limited that the DQL and DML should be always in
> >>> the
> >>>>>> end for streaming.
> >>>>>>
> >>>>>> Best,
> >>>>>> Godfrey
> >>>>>>
> >>>>>> Timo Walther <twal...@apache.org> 于2020年3月26日周四 下午4:29写道:
> >>>>>>
> >>>>>>> Hi Godfrey,
> >>>>>>>
> >>>>>>> having control over the job after submission is a requirement that
> >> was
> >>>>>>> requested frequently (some examples are [1], [2]). Users would like
> >> to
> >>>>>>> get insights about the running or completed job. Including the
> >> jobId,
> >>>>>>> jobGraph etc., the JobClient summarizes these properties.
> >>>>>>>
> >>>>>>> It is good to have a discussion about synchronous/asynchronous
> >>>>>>> submission now to have a complete execution picture.
> >>>>>>>
> >>>>>>> I thought we submit streaming queries mostly async and just wait
> for
> >>> the
> >>>>>>> successful submission. If we block for streaming queries, how can
> we
> >>>>>>> collect() or print() results?
> >>>>>>>
> >>>>>>> Also, if we block for streaming queries, we could never support
> >>>>>>> multiline files. Because the first INSERT INTO would block the
> >> further
> >>>>>>> execution.
> >>>>>>>
> >>>>>>> If we decide to block entirely on streaming queries, we need the
> >> async
> >>>>>>> execution methods in the design already. However, I would rather go
> >>> for
> >>>>>>> non-blocking streaming queries. Also with the `EMIT STREAM` key
> word
> >>> in
> >>>>>>> mind that we might add to SQL statements soon.
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Timo
> >>>>>>>
> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-16761
> >>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-12214
> >>>>>>>
> >>>>>>> On 25.03.20 16:30, godfrey he wrote:
> >>>>>>>> Hi Timo,
> >>>>>>>>
> >>>>>>>> Thanks for the updating.
> >>>>>>>>
> >>>>>>>> Regarding to "multiline statement support", I'm also fine that
> >>>>>>>> `TableEnvironment.executeSql()` only supports single line
> >> statement,
> >>>>> and
> >>>>>>> we
> >>>>>>>> can support multiline statement later (needs more discussion about
> >>>>> this).
> >>>>>>>>
> >>>>>>>> Regarding to "StatementSet.explian()", I don't have strong
> opinions
> >>>>> about
> >>>>>>>> that.
> >>>>>>>>
> >>>>>>>> Regarding to "TableResult.getJobClient()", I think it's
> >> unnecessary.
> >>>>> The
> >>>>>>>> reason is: first, many statements (e.g. DDL, show xx, use xx)
> will
> >>> not
> >>>>>>>> submit a Flink job. second, `TableEnvironment.executeSql()` and
> >>>>>>>> `StatementSet.execute()` are synchronous method, `TableResult`
> will
> >>> be
> >>>>>>>> returned only after the job is finished or failed.
> >>>>>>>>
> >>>>>>>> Regarding to "whether StatementSet.execute() needs to throw
> >>>>> exception", I
> >>>>>>>> think we should choose a unified way to tell whether the execution
> >> is
> >>>>>>>> successful. If `TableResult` contains ERROR kind (non-runtime
> >>>>> exception),
> >>>>>>>> users need to not only check the result but also catch the runtime
> >>>>>>>> exception in their code. or `StatementSet.execute()` does not
> throw
> >>> any
> >>>>>>>> exception (including runtime exception), all exception messages
> are
> >>> in
> >>>>>>> the
> >>>>>>>> result.  I prefer "StatementSet.execute() needs to throw
> >> exception".
> >>> cc
> >>>>>>> @Jark
> >>>>>>>> Wu <imj...@gmail.com>
> >>>>>>>>
> >>>>>>>> I will update the agreed parts to the document first.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Godfrey
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Timo Walther <twal...@apache.org> 于2020年3月25日周三 下午6:51写道:
> >>>>>>>>
> >>>>>>>>> Hi Godfrey,
> >>>>>>>>>
> >>>>>>>>> thanks for starting the discussion on the mailing list. And sorry
> >>>>> again
> >>>>>>>>> for the late reply to FLIP-84. I have updated the Google doc one
> >>> more
> >>>>>>>>> time to incorporate the offline discussions.
> >>>>>>>>>
> >>>>>>>>>      From Dawid's and my view, it is fine to postpone the
> multiline
> >>>>> support
> >>>>>>>>> to a separate method. This can be future work even though we will
> >>> need
> >>>>>>>>> it rather soon.
> >>>>>>>>>
> >>>>>>>>> If there are no objections, I suggest to update the FLIP-84 again
> >>> and
> >>>>>>>>> have another voting process.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Timo
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 25.03.20 11:17, godfrey he wrote:
> >>>>>>>>>> Hi community,
> >>>>>>>>>> Timo, Fabian and Dawid have some feedbacks about FLIP-84[1]. The
> >>>>>>>>> feedbacks
> >>>>>>>>>> are all about new introduced methods. We had a discussion
> >>> yesterday,
> >>>>>>> and
> >>>>>>>>>> most of feedbacks have been agreed upon. Here is the
> conclusions:
> >>>>>>>>>>
> >>>>>>>>>> *1. about proposed methods in `TableEnvironment`:*
> >>>>>>>>>>
> >>>>>>>>>> the original proposed methods:
> >>>>>>>>>>
> >>>>>>>>>> TableEnvironment.createDmlBatch(): DmlBatch
> >>>>>>>>>> TableEnvironment.executeStatement(String statement): ResultTable
> >>>>>>>>>>
> >>>>>>>>>> the new proposed methods:
> >>>>>>>>>>
> >>>>>>>>>> // we should not use abbreviations in the API, and the term
> >> "Batch"
> >>>>> is
> >>>>>>>>>> easily confused with batch/streaming processing
> >>>>>>>>>> TableEnvironment.createStatementSet(): StatementSet
> >>>>>>>>>>
> >>>>>>>>>> // every method that takes SQL should have `Sql` in its name
> >>>>>>>>>> // supports multiline statement ???
> >>>>>>>>>> TableEnvironment.executeSql(String statement): TableResult
> >>>>>>>>>>
> >>>>>>>>>> // new methods. supports explaining DQL and DML
> >>>>>>>>>> TableEnvironment.explainSql(String statement, ExplainDetail...
> >>>>>>> details):
> >>>>>>>>>> String
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> *2. about proposed related classes:*
> >>>>>>>>>>
> >>>>>>>>>> the original proposed classes:
> >>>>>>>>>>
> >>>>>>>>>> interface DmlBatch {
> >>>>>>>>>>          void addInsert(String insert);
> >>>>>>>>>>          void addInsert(String targetPath, Table table);
> >>>>>>>>>>          ResultTable execute() throws Exception ;
> >>>>>>>>>>          String explain(boolean extended);
> >>>>>>>>>> }
> >>>>>>>>>>
> >>>>>>>>>> public interface ResultTable {
> >>>>>>>>>>          TableSchema getResultSchema();
> >>>>>>>>>>          Iterable<Row> getResultRows();
> >>>>>>>>>> }
> >>>>>>>>>>
> >>>>>>>>>> the new proposed classes:
> >>>>>>>>>>
> >>>>>>>>>> interface StatementSet {
> >>>>>>>>>>          // every method that takes SQL should have `Sql` in its
> >>> name
> >>>>>>>>>>          // return StatementSet instance for fluent programming
> >>>>>>>>>>          addInsertSql(String statement): StatementSet
> >>>>>>>>>>
> >>>>>>>>>>          // return StatementSet instance for fluent programming
> >>>>>>>>>>          addInsert(String tablePath, Table table): StatementSet
> >>>>>>>>>>
> >>>>>>>>>>          // new method. support overwrite mode
> >>>>>>>>>>          addInsert(String tablePath, Table table, boolean
> >>> overwrite):
> >>>>>>>>>> StatementSet
> >>>>>>>>>>
> >>>>>>>>>>          explain(): String
> >>>>>>>>>>
> >>>>>>>>>>          // new method. supports adding more details for the
> >> result
> >>>>>>>>>>          explain(ExplainDetail... extraDetails): String
> >>>>>>>>>>
> >>>>>>>>>>          // throw exception ???
> >>>>>>>>>>          execute(): TableResult
> >>>>>>>>>> }
> >>>>>>>>>>
> >>>>>>>>>> interface TableResult {
> >>>>>>>>>>          getTableSchema(): TableSchema
> >>>>>>>>>>
> >>>>>>>>>>          // avoid custom parsing of an "OK" row in programming
> >>>>>>>>>>          getResultKind(): ResultKind
> >>>>>>>>>>
> >>>>>>>>>>          // instead of `get` make it explicit that this is might
> >> be
> >>>>>>>>> triggering
> >>>>>>>>>> an expensive operation
> >>>>>>>>>>          collect(): Iterable<Row>
> >>>>>>>>>>
> >>>>>>>>>>          // for fluent programming
> >>>>>>>>>>          print(): Unit
> >>>>>>>>>> }
> >>>>>>>>>>
> >>>>>>>>>> enum ResultKind {
> >>>>>>>>>>          SUCCESS, // for DDL, DCL and statements with a simple
> >> "OK"
> >>>>>>>>>>          SUCCESS_WITH_CONTENT, // rows with important content
> are
> >>>>>>> available
> >>>>>>>>>> (DML, DQL)
> >>>>>>>>>> }
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> *3. new proposed methods in `Table`*
> >>>>>>>>>>
> >>>>>>>>>> `Table.insertInto()` will be deprecated, and the following
> >> methods
> >>>>> are
> >>>>>>>>>> introduced:
> >>>>>>>>>>
> >>>>>>>>>> Table.executeInsert(String tablePath): TableResult
> >>>>>>>>>> Table.executeInsert(String tablePath, boolean overwrite):
> >>> TableResult
> >>>>>>>>>> Table.explain(ExplainDetail... details): String
> >>>>>>>>>> Table.execute(): TableResult
> >>>>>>>>>>
> >>>>>>>>>> There are two issues need further discussion, one is whether
> >>>>>>>>>> `TableEnvironment.executeSql(String statement): TableResult`
> >> needs
> >>> to
> >>>>>>>>>> support multiline statement (or whether `TableEnvironment` needs
> >> to
> >>>>>>>>> support
> >>>>>>>>>> multiline statement), and another one is whether
> >>>>>>> `StatementSet.execute()`
> >>>>>>>>>> needs to throw exception.
> >>>>>>>>>>
> >>>>>>>>>> please refer to the feedback document [2] for the details.
> >>>>>>>>>>
> >>>>>>>>>> Any suggestions are warmly welcomed!
> >>>>>>>>>>
> >>>>>>>>>> [1]
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> >>
> https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> >>>>>>>>>> [2]
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> >>
> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Godfrey
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>
> >
>
>

Reply via email to