Hey all, We propose to slightly change the behavior of the CollectSinkFunction <https://github.com/apache/flink/blob/6f4d31f1b79afbde6c093b5d40ac83fe7524e303/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java#L129> and make it wait till all the result from the buffer is consumed by the client, before shutting it down.
Overall protocol and all the other behavior stay the same. This would be a way to guarantee result availability upon the job completion. Today, the tail of the result is stored in an accumulator, and gets stored in the job manager. There is an opportunity for this part of the result to get lost, after the job is claimed to be successfully "completed". Waiting till all the results are consumed while the job is running is a natural way to achieve availability. Once the job is done, we are certain all the results are consumed. This change would be achieved by overriding the endInput() method in CollectSinkOperator, and passing the call to CollectSinkFunction to wait till the buffer is empty. The old behavior could be enabled via a configuration flag (to be added). A notable side-effect of the change is that any invocation of StreamExecutionEnvironment.execute() (synchronous execution) with a pipeline with CollectSinkFunction in it, would effectively block waiting for the results to get consumed. This would require running the consumer on a different thread. Though note, it is* already the case* when the result is larger that what can fit into the CollectSinkFunction's buffer. Take a look at flink-end-to-end-tests/test-scripts/test_quickstarts.sh in the current state of the repo: if we change the parameter numRecords to be 1,000,000, the test locks and waits forever. So, the only difference with the change would be that in similar setups it would wait on any buffer size > 0. It makes behavior consistent for results of any non-zero size. Let me know your thoughts. Thanks, Alexey