[ 
https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17053298#comment-17053298
 ] 

Caizhi Weng commented on FLINK-14807:
-------------------------------------

Hi [~sewen],

Thanks for the reply. I think we might need to sync on the use cases and the 
implementation ideas of {{Table#collect}}.

*Use cases we expected*

We expect {{Table#collect}} to be used for both batch and streaming jobs, and 
that it supports iterating through large results.

*Why do we come up with the implementation idea based on back-pressuring*
 # As Flink does not have (or rely on) a dedicated storage service (like Hive 
which relies on HDFS, or like Kafka which is itself a storage service), we 
cannot store the (possibly infinite) results into somewhere, even into the blob 
storage (it may finally be filled up by the results of a never-ending streaming 
job). So we have to rely on back-pressuring to limit the size of results 
generated but not consumed by the clients.
 # As the sink will back-pressure the whole job, it needs to be informed when 
to send new results to the client and to loose the pressure. That is to say, 
there must be a way for the clients to communicate with task managers. But 
under some network configurations (for example a Yarn or k8s cluster) clients 
can't directly communicate with TMs, so job managers must forward the message 
from the clients to the sinks.
 # Currently there is no way for JMs to initiatively talk to sinks. 
{{OperatorCoordinators}} is indeed the feature we want most but it hasn't been 
implemented, so we create a socket server in the sink function to do the 
communication work (we cannot introduce a new RPC interface to achieve this as 
its functionality will overlap with {{OperatorCoordinators}}). In this way we 
do not need to modify the JMs so we can conveniently refactor this when 
{{OperatorCoordinators}} is implemented.
 # Currently there is no way for clients / sinks to initiatively talk to JMs 
either. We don't want to introduce a new feature and currently 
{{GlobalAggregateManager}} is the only RPC call to achieve this functionality. 
We of course would like to use {{OperatorCoordinators}} but it is not 
implemented yet.

*What if the connection to the server socket is lost / restarts*

To deal with this problem we've introduced a token in the communication. See 
the patch for a detailed implementation.

When the client needs some new results from the sink, it must provide the sink 
with this token. This token can be considered as "The client has successfully 
received the results before the {{token}}-th one, and wants some results 
starting from the {{token}}-th result of the job". If the connection is lost / 
data is corrupted or any bad things happen, the client can provide the sink 
with the same token and it can get the same batch of results again.

*What if the sink / server socket restarts*

To deal with this problem we've introduced a version ID in the communication. 
See the patch for a detailed implementation.

This version ID will be set as a random UUID when the sink opens. So if the 
client discover that the version of the results received is different from what 
it is expecting, it knows that the sink has restarted and the client can throw 
away all the data it receives after the latest checkpoint.

Here "client" means the result iterator which is also implemented by us. Users 
using this iterator will see the results under the exactly-once semantics.

*Users may experience checkpoint interval delay*

We decide to provide the users with two kinds of iterators. One iterator will 
directly forward the results to the users, but will throw a 
{{RuntimeException}} when the sink restarts. The other will buffer the results 
(for large results it will spill them to disks) and show the results to the 
users only after checkpointing. Users are free to choose according to their 
needs.

*Why not use accumulators / blob servers*

We've considered the implementation with accumulators before. There are a few 
problems:

# By using accumulators and blob servers, sinks are initiatively (instead of 
passively asked by the client) providing the results. That is to say, we cannot 
control the speed of the sink for producing the results and we have to store 
these results in somewhere.
# Accumulators are currently updated with TM heartbeats. The default heartbeat 
interval is 10s which is too slow for a small job. Also, when the results 
become large it will put a heavy burden on the network and JM memory.
# Like we mentioned above, we can't store the results in some component as they 
will finally be filled up by a never-ending streaming job. So we can't store 
the results in blob servers unless blob servers are backed by a dedicated 
storage service.

> Add Table#collect api for fetching data to client
> -------------------------------------------------
>
>                 Key: FLINK-14807
>                 URL: https://issues.apache.org/jira/browse/FLINK-14807
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / API
>    Affects Versions: 1.9.1
>            Reporter: Jeff Zhang
>            Priority: Major
>              Labels: usability
>             Fix For: 1.11.0
>
>         Attachments: table-collect-draft.patch, table-collect.png
>
>
> Currently, it is very unconvinient for user to fetch data of flink job unless 
> specify sink expclitly and then fetch data from this sink via its api (e.g. 
> write to hdfs sink, then read data from hdfs). However, most of time user 
> just want to get the data and do whatever processing he want. So it is very 
> necessary for flink to provide api Table#collect for this purpose. 
>  
> Other apis such as Table#head, Table#print is also helpful.  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to