Hi devs, While discussing "Watermark propagation with Sink API" and during "[FLINK-18934] Idle stream does not advance watermark in connected stream", we noticed some drawbacks on how Flink defines idle partitions currently.
To recap, idleness was always considered as a means to achieve progress in window operators with idle partition in the source at the risk of losing a bit of correctness. In particular, records could be considered late, simply because of that idleness timeout and not because they arrived out of order. A potential reprocessing would not be causing these records to be considered late and we may end up with a different (correct) result. The drawbacks that we discovered are as follows: - We currently only use idleness to exclude respective upstream tasks from participating in watermark generation. - However, the definition is bound to records. [1] In particular, while a partition is idle, no records should be produced. - That brings us into quite a few edge cases, where operators emit records, while they are actually idling: Think of timers, asyncIO operators, window operators based on timeouts, etc. that trigger on an operator ingesting an idle partition. - The proper solution would be to turn the operator active while emitting and to return to being idle afterwards (but when?). However, this has some unintended side-effects depending on when you switch back: - If you toggle stream status for each record, you get a huge overhead on stream status records and quite a bit of processing in downstream operators (that code path is not much optimized since switching is considered a rare thing). - If you toggle after a certain time, you may get delays>idleness in the downstream window operators. - You could turn back when you processed all pending mails, but if you have a self-replicating mail that would be never. Self-enqueueing, low timer would also produce a flood similar to the first case. All in all, the situation is quite unsatisfying because idleness implies no records. However, currently there is no need to have that implication: since we only use it for watermarks, we can easily allow records to be emitted (in fact that was the old behavior before FLINK-18934 in many cases) and still get the intended behavior in respect to watermarks: - A channel that is active is providing watermarks. - An idle channel is not providing any watermarks but can deliver records. Ultimately, that would mean that we are actually not talking idle/active partitions anymore. We are talking more about whether a particular subtask should influence downstream watermark calculation or not. Leading to the following questions: 1. Do we want to change the definition as outlined? 2. Do you see any problem with emitting records on subtask without explicit watermarks? 3. If we want to go this way, we may need to refine the names/definitions. Any ideas? I think idle partition should translate into something like automatic/implicit/passive watermarks; active partition into explicit/active watermarks. Then StreamStatus is more about WatermarkMode (not really happy with this one). Best, Arvid [1] https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86