Hi Arvid,

Thanks for writing down this summary and proposal. I think this was the
foundation of the disagreement in FLIP-167 discussion. Dawid was arguing
that idleness is intermittent, strictly a task local concept and as such
shouldn't be exposed in for example sinks. While me and Eron thought that
it's a concept strictly connected to watermarks.

1. I'm big +1 for changing the StreamStatus definition to stream "providing
watermark" and "not providing watermark". With respect to that I agree with
Dawid that record bound idleness *(if we would ever need to define/expose
it)* should be an intermittent concept, like for example the existing in
the Task/runtime input availability (StreamTaskInput#isAvailable).
3. I agree that neither `StreamStatus` nor `IDLE` is a good name. But
I also don't have any good ideas.
`WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?

Best,
Piotrek

wt., 8 cze 2021 o 16:35 Arvid Heise <ar...@apache.org> napisaƂ(a):

> Hi devs,
>
> While discussing "Watermark propagation with Sink API" and during
> "[FLINK-18934] Idle stream does not advance watermark in connected stream",
> we noticed some drawbacks on how Flink defines idle partitions currently.
>
> To recap, idleness was always considered as a means to achieve progress in
> window operators with idle partition in the source at the risk of losing a
> bit of correctness. In particular, records could be considered late, simply
> because of that idleness timeout and not because they arrived out of order.
> A potential reprocessing would not be causing these records to be
> considered late and we may end up with a different (correct) result.
>
> The drawbacks that we discovered are as follows:
> - We currently only use idleness to exclude respective upstream tasks from
> participating in watermark generation.
> - However, the definition is bound to records. [1] In particular, while a
> partition is idle, no records should be produced.
> - That brings us into quite a few edge cases, where operators emit records,
> while they are actually idling: Think of timers, asyncIO operators, window
> operators based on timeouts, etc. that trigger on an operator ingesting an
> idle partition.
> - The proper solution would be to turn the operator active while emitting
> and to return to being idle afterwards (but when?). However, this has some
> unintended side-effects depending on when you switch back:
>   - If you toggle stream status for each record, you get a huge overhead on
> stream status records and quite a bit of processing in downstream operators
> (that code path is not much optimized since switching is considered a rare
> thing).
>   - If you toggle after a certain time, you may get delays>idleness in the
> downstream window operators.
>   - You could turn back when you processed all pending mails, but if you
> have a self-replicating mail that would be never. Self-enqueueing, low
> timer would also produce a flood similar to the first case.
>
> All in all, the situation is quite unsatisfying because idleness implies no
> records. However, currently there is no need to have that implication:
> since we only use it for watermarks, we can easily allow records to be
> emitted (in fact that was the old behavior before FLINK-18934 in many
> cases) and still get the intended behavior in respect to watermarks:
> - A channel that is active is providing watermarks.
> - An idle channel is not providing any watermarks but can deliver records.
>
> Ultimately, that would mean that we are actually not talking idle/active
> partitions anymore. We are talking more about whether a particular subtask
> should influence downstream watermark calculation or not. Leading to the
> following questions:
> 1. Do we want to change the definition as outlined?
> 2. Do you see any problem with emitting records on subtask without explicit
> watermarks?
> 3. If we want to go this way, we may need to refine the names/definitions.
> Any ideas?
>
> I think idle partition should translate into something like
> automatic/implicit/passive watermarks; active partition into
> explicit/active watermarks. Then StreamStatus is more about WatermarkMode
> (not really happy with this one).
>
> Best,
>
> Arvid
>
> [1]
>
> https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86
>

Reply via email to