Hi devs,

While discussing "Watermark propagation with Sink API" and during
"[FLINK-18934] Idle stream does not advance watermark in connected stream",
we noticed some drawbacks on how Flink defines idle partitions currently.

To recap, idleness was always considered as a means to achieve progress in
window operators with idle partition in the source at the risk of losing a
bit of correctness. In particular, records could be considered late, simply
because of that idleness timeout and not because they arrived out of order.
A potential reprocessing would not be causing these records to be
considered late and we may end up with a different (correct) result.

The drawbacks that we discovered are as follows:
- We currently only use idleness to exclude respective upstream tasks from
participating in watermark generation.
- However, the definition is bound to records. [1] In particular, while a
partition is idle, no records should be produced.
- That brings us into quite a few edge cases, where operators emit records,
while they are actually idling: Think of timers, asyncIO operators, window
operators based on timeouts, etc. that trigger on an operator ingesting an
idle partition.
- The proper solution would be to turn the operator active while emitting
and to return to being idle afterwards (but when?). However, this has some
unintended side-effects depending on when you switch back:
  - If you toggle stream status for each record, you get a huge overhead on
stream status records and quite a bit of processing in downstream operators
(that code path is not much optimized since switching is considered a rare
thing).
  - If you toggle after a certain time, you may get delays>idleness in the
downstream window operators.
  - You could turn back when you processed all pending mails, but if you
have a self-replicating mail that would be never. Self-enqueueing, low
timer would also produce a flood similar to the first case.

All in all, the situation is quite unsatisfying because idleness implies no
records. However, currently there is no need to have that implication:
since we only use it for watermarks, we can easily allow records to be
emitted (in fact that was the old behavior before FLINK-18934 in many
cases) and still get the intended behavior in respect to watermarks:
- A channel that is active is providing watermarks.
- An idle channel is not providing any watermarks but can deliver records.

Ultimately, that would mean that we are actually not talking idle/active
partitions anymore. We are talking more about whether a particular subtask
should influence downstream watermark calculation or not. Leading to the
following questions:
1. Do we want to change the definition as outlined?
2. Do you see any problem with emitting records on subtask without explicit
watermarks?
3. If we want to go this way, we may need to refine the names/definitions.
Any ideas?

I think idle partition should translate into something like
automatic/implicit/passive watermarks; active partition into
explicit/active watermarks. Then StreamStatus is more about WatermarkMode
(not really happy with this one).

Best,

Arvid

[1]
https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86

Reply via email to