[ https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17049884#comment-17049884 ]
Caizhi Weng commented on FLINK-14807: ------------------------------------- Hi dear Flink community, After some implementation and thinking I discover that the core of this problem is actually the communication and coordination between client, JM and operators. {{updateGlobalAggregate}} in JobMaster has provided us a way to coordinate operators, so if we can add a REST API so that the client can also call {{updateGlobalAggregate}} we can implement {{Table#collect}} without modifying JM. As these aggregate functions will run in JM, they're free to send and receive messages to and from both clients and operators. We can use two aggregate functions to achieve this. One aggregate function is submitted by the sink to update the socket server address, the other aggregate function is submitted by the client to check if the socket server address has already been recorded and fetch results if so. The socket server address will be the value to be maintained by both client and operators. For the exactly once / at least once semantics, I think [~becket_qin] raises a very good point. We can push the complexity to client (to the implementation of the iterator in client). > Add Table#collect api for fetching data to client > ------------------------------------------------- > > Key: FLINK-14807 > URL: https://issues.apache.org/jira/browse/FLINK-14807 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API > Affects Versions: 1.9.1 > Reporter: Jeff Zhang > Priority: Major > Labels: usability > Fix For: 1.11.0 > > Attachments: table-collect.png > > > Currently, it is very unconvinient for user to fetch data of flink job unless > specify sink expclitly and then fetch data from this sink via its api (e.g. > write to hdfs sink, then read data from hdfs). However, most of time user > just want to get the data and do whatever processing he want. So it is very > necessary for flink to provide api Table#collect for this purpose. > > Other apis such as Table#head, Table#print is also helpful. > -- This message was sent by Atlassian Jira (v8.3.4#803005)