godfreyhe commented on a change in pull request #12577:
URL: https://github.com/apache/flink/pull/12577#discussion_r440716919



##########
File path: docs/try-flink/table_api.md
##########
@@ -462,14 +469,17 @@ public class SpendReport {
         tEnv.registerTableSource("transactions", new 
UnboundedTransactionTableSource());
         tEnv.registerTableSink("spend_report", new SpendReportTableSink());
 
-        tEnv
+        Table table = tEnv
             .scan("transactions")
             .window(Tumble.over("1.hour").on("timestamp").as("w"))
             .groupBy("accountId, w")
-            .select("accountId, w.start as timestamp, amount.sum")
-            .insertInto("spend_report");
+            .select("accountId, w.start as timestamp, amount.sum");
 
-        env.execute("Spend Report");
+        // trigger execution
+        TableResult tableResult = table.executeInsert("spend_report");
+        // wait job finished
+        tableResult.getJobClient().get()

Review comment:
       Thanks for the feedback. agree with you that current api is not friendly 
for testing or demos.  
   For INSERT job, `TableResult.awaitCompletion(timeout)` will work fine. but 
for `SELECT` job, there are some problems: in current implementation, client 
will send a request to JM (which will re-forward the request to TM) to fetch a 
part of result when `Iterator.next()` is called. The sink operator has a 
fixed-length pool to buffer the result. If the pool is full, the back-pressure 
mechanism will trigger. So the job will not finish until all data has been 
fetched to client (or the job is canceled).  Code snippet:
   
   ```
   TableResult tableResult = ...
   TableResult.awaitCompletion(timeout); 
   CloseableIterator<Row> it = TableResult.collect();
   it...
   ```
   
   `TableResult.awaitCompletion(timeout)` will block the execution of 
`TableResult.collect()`. This will cause the job will not finish, or get 
TimeoutException thrown by `awaitCompletion` method. 
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to