Hi David and Till, Thanks for your great feedback. One definitely confusing point in the FLIP is who is doing the actual compaction. The compaction will not be done by the CommittableAggregator operator but the committers so it should also not affect the checkpointing duration or have a significant performance bottleneck because the committers are executed in parallel (also in batch mode [1]).
I will update the FLIP to clarify it. > From your description I would be in favour of option 2 for the following > reasons: Assuming that option 2 solves all our current problems, it seems > like the least invasive change and smallest in scope. Your main concern is > that it might not cover future use cases. Do you have some specific use > cases in mind? No, I do not have anything specific in mind I just wanted to raise the point that adding more and more operators to the sink might complicate the development in the future that they can all be used together. > What I am missing a bit > from the description is how option 2 will behave wrt checkpoints and the > batch execution mode. My idea was to always invoke CommittableAggregate#aggregate on a checkpoint and endOfInput. In the batch case the aggregation is only done once on all committables. > Few thoughts on the option 2) > > The file compaction is by definition quite costly IO bound operation. If I > understand the proposal correctly, the aggregation itself would run during > operator (aggregator) checkpoint. Would this significantly increase the > checkpoint duration? > > Compaction between different sub-tasks incur additional network IO (to > fetch the raw non-compacted files from the remote filesystem), so this > could quickly become a bottleneck. Basically we're decreasing the sink > parallelism (possible throughput) to parallelism of the aggregator. Hopefully these concerns are covered by the explanation at the beginning. > To be really effective here, compaction would ideally be able to compact > files from multiple checkpoints. However there is a huge tradeoff between > latency a efficiency (especially with exactly once). Is this something > worth exploring? I agree with you by enabling the compaction across checkpoint the latency can increase because files might be committed several checkpoints later. I guess the best we can do is to let the user configure the behaviour. By configuring the checkpointing interval and the wanted file size the user can already affect the latency. Is this answering you questions? I am not fully sure what you are referring to with efficiency. @dvmk > I hope that with option 2, we can support both use cases: single task compaction as well as cross task compaction if needed. Similarly for single checkpoint compaction as well as cross checkpoint compaction. Compaction across subtasks should be controllable by the parallelism of the commttableAggregator operator i.e. a parallelism of 2 can reduce the computational complexity but might not compute the best compaction. Best, Fabian [1] https://github.com/apache/flink/pull/17536 <https://github.com/apache/flink/pull/17536>)