Hi Eron, Can you elaborate a bit more what do you mean? I don’t understand what do you mean by more general solution.
As of now, stream is marked idle by a source/watermark generator, which has an effect of temporarily ignoring this stream/partition from calculating min watermark in the downstream tasks. However stream is switching back to active when any record is emitted. This is what’s causing problems described by Arvid. The core of our proposal is very simple. Keep everything as it is except stating that stream will be changed back to active only once a watermark is emitted again - not record. In other words disconnecting idleness from presence of records and connecting it only to presence or lack of watermarks and allowing to emit records while “stream status” is “idle” Piotrek > Wiadomość napisana przez Eron Wright <ewri...@streamnative.io.invalid> w dniu > 09.06.2021, o godz. 06:01: > > It seems to me that idleness was introduced to deal with a very specific > issue. In the pipeline, watermarks are aggregated not on a per-split basis > but on a per-subtask basis. This works well when each subtask has exactly > one split. When a sub-task has multiple splits, various complications > occur involving the commingling of watermarks. And when a sub-task has no > splits, the pipeline stalls altogether. To deal with the latter problem, > idleness was introduced. The sub-task simply declares itself to be idle to > be taken out of consideration for purposes of watermark aggregation. > > If we're looking for a more general solution, I would suggest we discuss > how to track watermarks on a per-split basis. Or, as Till mentioned > recently, an alternate solution may be to dynamically adjust the > parallelism of the task. > > I don't agree with the notion that idleness involves a correctness > tradeoff. The facility I described above has no impact on correctness. > Meanwhile, various watermark strategies rely on heuristics involving the > processing-time domain, and the term idleness seems to have found purchase > there too. The connection among the concepts seems tenuous. > > -E > > > >> On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski <pnowoj...@apache.org> wrote: >> >> Hi Arvid, >> >> Thanks for writing down this summary and proposal. I think this was the >> foundation of the disagreement in FLIP-167 discussion. Dawid was arguing >> that idleness is intermittent, strictly a task local concept and as such >> shouldn't be exposed in for example sinks. While me and Eron thought that >> it's a concept strictly connected to watermarks. >> >> 1. I'm big +1 for changing the StreamStatus definition to stream "providing >> watermark" and "not providing watermark". With respect to that I agree with >> Dawid that record bound idleness *(if we would ever need to define/expose >> it)* should be an intermittent concept, like for example the existing in >> the Task/runtime input availability (StreamTaskInput#isAvailable). >> 3. I agree that neither `StreamStatus` nor `IDLE` is a good name. But >> I also don't have any good ideas. >> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`? >> >> Best, >> Piotrek >> >> wt., 8 cze 2021 o 16:35 Arvid Heise <ar...@apache.org> napisał(a): >> >>> Hi devs, >>> >>> While discussing "Watermark propagation with Sink API" and during >>> "[FLINK-18934] Idle stream does not advance watermark in connected >> stream", >>> we noticed some drawbacks on how Flink defines idle partitions currently. >>> >>> To recap, idleness was always considered as a means to achieve progress >> in >>> window operators with idle partition in the source at the risk of losing >> a >>> bit of correctness. In particular, records could be considered late, >> simply >>> because of that idleness timeout and not because they arrived out of >> order. >>> A potential reprocessing would not be causing these records to be >>> considered late and we may end up with a different (correct) result. >>> >>> The drawbacks that we discovered are as follows: >>> - We currently only use idleness to exclude respective upstream tasks >> from >>> participating in watermark generation. >>> - However, the definition is bound to records. [1] In particular, while a >>> partition is idle, no records should be produced. >>> - That brings us into quite a few edge cases, where operators emit >> records, >>> while they are actually idling: Think of timers, asyncIO operators, >> window >>> operators based on timeouts, etc. that trigger on an operator ingesting >> an >>> idle partition. >>> - The proper solution would be to turn the operator active while emitting >>> and to return to being idle afterwards (but when?). However, this has >> some >>> unintended side-effects depending on when you switch back: >>> - If you toggle stream status for each record, you get a huge overhead >> on >>> stream status records and quite a bit of processing in downstream >> operators >>> (that code path is not much optimized since switching is considered a >> rare >>> thing). >>> - If you toggle after a certain time, you may get delays>idleness in >> the >>> downstream window operators. >>> - You could turn back when you processed all pending mails, but if you >>> have a self-replicating mail that would be never. Self-enqueueing, low >>> timer would also produce a flood similar to the first case. >>> >>> All in all, the situation is quite unsatisfying because idleness implies >> no >>> records. However, currently there is no need to have that implication: >>> since we only use it for watermarks, we can easily allow records to be >>> emitted (in fact that was the old behavior before FLINK-18934 in many >>> cases) and still get the intended behavior in respect to watermarks: >>> - A channel that is active is providing watermarks. >>> - An idle channel is not providing any watermarks but can deliver >> records. >>> >>> Ultimately, that would mean that we are actually not talking idle/active >>> partitions anymore. We are talking more about whether a particular >> subtask >>> should influence downstream watermark calculation or not. Leading to the >>> following questions: >>> 1. Do we want to change the definition as outlined? >>> 2. Do you see any problem with emitting records on subtask without >> explicit >>> watermarks? >>> 3. If we want to go this way, we may need to refine the >> names/definitions. >>> Any ideas? >>> >>> I think idle partition should translate into something like >>> automatic/implicit/passive watermarks; active partition into >>> explicit/active watermarks. Then StreamStatus is more about WatermarkMode >>> (not really happy with this one). >>> >>> Best, >>> >>> Arvid >>> >>> [1] >>> >>> >> https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86 >>> >>