Hey all,

We propose to slightly change the behavior of the CollectSinkFunction
<https://github.com/apache/flink/blob/6f4d31f1b79afbde6c093b5d40ac83fe7524e303/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java#L129>
and make it wait till all the result from the buffer is consumed by the
client, before shutting it down.

Overall protocol and all the other behavior stay the same.

This would be a way to guarantee result availability upon the job
completion. Today, the tail of the result is stored in an accumulator, and
gets stored in the job manager. There is an opportunity for this part of
the result to get lost, after the job is claimed to be
successfully "completed". Waiting till all the results are consumed while
the job is running is a natural way to achieve availability. Once the job
is done, we are certain all the results are consumed.

This change would be achieved by overriding the endInput() method
in CollectSinkOperator, and passing the call to CollectSinkFunction to wait
till the buffer is empty.

The old behavior could be enabled via a configuration flag (to be added).

A notable side-effect of the change is that any invocation
of StreamExecutionEnvironment.execute() (synchronous execution) with a
pipeline with CollectSinkFunction in it, would effectively block waiting
for the results to get consumed. This would require running the consumer on
a different thread. Though note, it is* already the case* when the result
is larger that what can fit into the CollectSinkFunction's buffer.  Take a
look at flink-end-to-end-tests/test-scripts/test_quickstarts.sh in the
current state of the repo: if we change the parameter numRecords to be
1,000,000, the test locks and waits forever. So, the only difference with
the change would be that in similar setups it would wait on any buffer size
> 0. It makes behavior consistent for results of any non-zero size.


Let me know your thoughts.

Thanks,
Alexey

Reply via email to