It seems to me that idleness was introduced to deal with a very specific issue. In the pipeline, watermarks are aggregated not on a per-split basis but on a per-subtask basis. This works well when each subtask has exactly one split. When a sub-task has multiple splits, various complications occur involving the commingling of watermarks. And when a sub-task has no splits, the pipeline stalls altogether. To deal with the latter problem, idleness was introduced. The sub-task simply declares itself to be idle to be taken out of consideration for purposes of watermark aggregation.
If we're looking for a more general solution, I would suggest we discuss how to track watermarks on a per-split basis. Or, as Till mentioned recently, an alternate solution may be to dynamically adjust the parallelism of the task. I don't agree with the notion that idleness involves a correctness tradeoff. The facility I described above has no impact on correctness. Meanwhile, various watermark strategies rely on heuristics involving the processing-time domain, and the term idleness seems to have found purchase there too. The connection among the concepts seems tenuous. -E On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski <pnowoj...@apache.org> wrote: > Hi Arvid, > > Thanks for writing down this summary and proposal. I think this was the > foundation of the disagreement in FLIP-167 discussion. Dawid was arguing > that idleness is intermittent, strictly a task local concept and as such > shouldn't be exposed in for example sinks. While me and Eron thought that > it's a concept strictly connected to watermarks. > > 1. I'm big +1 for changing the StreamStatus definition to stream "providing > watermark" and "not providing watermark". With respect to that I agree with > Dawid that record bound idleness *(if we would ever need to define/expose > it)* should be an intermittent concept, like for example the existing in > the Task/runtime input availability (StreamTaskInput#isAvailable). > 3. I agree that neither `StreamStatus` nor `IDLE` is a good name. But > I also don't have any good ideas. > `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`? > > Best, > Piotrek > > wt., 8 cze 2021 o 16:35 Arvid Heise <ar...@apache.org> napisaĆ(a): > > > Hi devs, > > > > While discussing "Watermark propagation with Sink API" and during > > "[FLINK-18934] Idle stream does not advance watermark in connected > stream", > > we noticed some drawbacks on how Flink defines idle partitions currently. > > > > To recap, idleness was always considered as a means to achieve progress > in > > window operators with idle partition in the source at the risk of losing > a > > bit of correctness. In particular, records could be considered late, > simply > > because of that idleness timeout and not because they arrived out of > order. > > A potential reprocessing would not be causing these records to be > > considered late and we may end up with a different (correct) result. > > > > The drawbacks that we discovered are as follows: > > - We currently only use idleness to exclude respective upstream tasks > from > > participating in watermark generation. > > - However, the definition is bound to records. [1] In particular, while a > > partition is idle, no records should be produced. > > - That brings us into quite a few edge cases, where operators emit > records, > > while they are actually idling: Think of timers, asyncIO operators, > window > > operators based on timeouts, etc. that trigger on an operator ingesting > an > > idle partition. > > - The proper solution would be to turn the operator active while emitting > > and to return to being idle afterwards (but when?). However, this has > some > > unintended side-effects depending on when you switch back: > > - If you toggle stream status for each record, you get a huge overhead > on > > stream status records and quite a bit of processing in downstream > operators > > (that code path is not much optimized since switching is considered a > rare > > thing). > > - If you toggle after a certain time, you may get delays>idleness in > the > > downstream window operators. > > - You could turn back when you processed all pending mails, but if you > > have a self-replicating mail that would be never. Self-enqueueing, low > > timer would also produce a flood similar to the first case. > > > > All in all, the situation is quite unsatisfying because idleness implies > no > > records. However, currently there is no need to have that implication: > > since we only use it for watermarks, we can easily allow records to be > > emitted (in fact that was the old behavior before FLINK-18934 in many > > cases) and still get the intended behavior in respect to watermarks: > > - A channel that is active is providing watermarks. > > - An idle channel is not providing any watermarks but can deliver > records. > > > > Ultimately, that would mean that we are actually not talking idle/active > > partitions anymore. We are talking more about whether a particular > subtask > > should influence downstream watermark calculation or not. Leading to the > > following questions: > > 1. Do we want to change the definition as outlined? > > 2. Do you see any problem with emitting records on subtask without > explicit > > watermarks? > > 3. If we want to go this way, we may need to refine the > names/definitions. > > Any ideas? > > > > I think idle partition should translate into something like > > automatic/implicit/passive watermarks; active partition into > > explicit/active watermarks. Then StreamStatus is more about WatermarkMode > > (not really happy with this one). > > > > Best, > > > > Arvid > > > > [1] > > > > > https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86 > > >