Hi Arvid, Thanks for writing down this summary and proposal. I think this was the foundation of the disagreement in FLIP-167 discussion. Dawid was arguing that idleness is intermittent, strictly a task local concept and as such shouldn't be exposed in for example sinks. While me and Eron thought that it's a concept strictly connected to watermarks.
1. I'm big +1 for changing the StreamStatus definition to stream "providing watermark" and "not providing watermark". With respect to that I agree with Dawid that record bound idleness *(if we would ever need to define/expose it)* should be an intermittent concept, like for example the existing in the Task/runtime input availability (StreamTaskInput#isAvailable). 3. I agree that neither `StreamStatus` nor `IDLE` is a good name. But I also don't have any good ideas. `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`? Best, Piotrek wt., 8 cze 2021 o 16:35 Arvid Heise <ar...@apache.org> napisaĆ(a): > Hi devs, > > While discussing "Watermark propagation with Sink API" and during > "[FLINK-18934] Idle stream does not advance watermark in connected stream", > we noticed some drawbacks on how Flink defines idle partitions currently. > > To recap, idleness was always considered as a means to achieve progress in > window operators with idle partition in the source at the risk of losing a > bit of correctness. In particular, records could be considered late, simply > because of that idleness timeout and not because they arrived out of order. > A potential reprocessing would not be causing these records to be > considered late and we may end up with a different (correct) result. > > The drawbacks that we discovered are as follows: > - We currently only use idleness to exclude respective upstream tasks from > participating in watermark generation. > - However, the definition is bound to records. [1] In particular, while a > partition is idle, no records should be produced. > - That brings us into quite a few edge cases, where operators emit records, > while they are actually idling: Think of timers, asyncIO operators, window > operators based on timeouts, etc. that trigger on an operator ingesting an > idle partition. > - The proper solution would be to turn the operator active while emitting > and to return to being idle afterwards (but when?). However, this has some > unintended side-effects depending on when you switch back: > - If you toggle stream status for each record, you get a huge overhead on > stream status records and quite a bit of processing in downstream operators > (that code path is not much optimized since switching is considered a rare > thing). > - If you toggle after a certain time, you may get delays>idleness in the > downstream window operators. > - You could turn back when you processed all pending mails, but if you > have a self-replicating mail that would be never. Self-enqueueing, low > timer would also produce a flood similar to the first case. > > All in all, the situation is quite unsatisfying because idleness implies no > records. However, currently there is no need to have that implication: > since we only use it for watermarks, we can easily allow records to be > emitted (in fact that was the old behavior before FLINK-18934 in many > cases) and still get the intended behavior in respect to watermarks: > - A channel that is active is providing watermarks. > - An idle channel is not providing any watermarks but can deliver records. > > Ultimately, that would mean that we are actually not talking idle/active > partitions anymore. We are talking more about whether a particular subtask > should influence downstream watermark calculation or not. Leading to the > following questions: > 1. Do we want to change the definition as outlined? > 2. Do you see any problem with emitting records on subtask without explicit > watermarks? > 3. If we want to go this way, we may need to refine the names/definitions. > Any ideas? > > I think idle partition should translate into something like > automatic/implicit/passive watermarks; active partition into > explicit/active watermarks. Then StreamStatus is more about WatermarkMode > (not really happy with this one). > > Best, > > Arvid > > [1] > > https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86 >