It seems to me that idleness was introduced to deal with a very specific
issue.  In the pipeline, watermarks are aggregated not on a per-split basis
but on a per-subtask basis.  This works well when each subtask has exactly
one split.  When a sub-task has multiple splits, various complications
occur involving the commingling of watermarks.  And when a sub-task has no
splits, the pipeline stalls altogether.  To deal with the latter problem,
idleness was introduced.  The sub-task simply declares itself to be idle to
be taken out of consideration for purposes of watermark aggregation.

If we're looking for a more general solution, I would suggest we discuss
how to track watermarks on a per-split basis.  Or, as Till mentioned
recently, an alternate solution may be to dynamically adjust the
parallelism of the task.

I don't agree with the notion that idleness involves a correctness
tradeoff.  The facility I described above has no impact on correctness.
Meanwhile, various watermark strategies rely on heuristics involving the
processing-time domain, and the term idleness seems to have found purchase
there too.  The connection among the concepts seems tenuous.

-E



On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski <pnowoj...@apache.org> wrote:

> Hi Arvid,
>
> Thanks for writing down this summary and proposal. I think this was the
> foundation of the disagreement in FLIP-167 discussion. Dawid was arguing
> that idleness is intermittent, strictly a task local concept and as such
> shouldn't be exposed in for example sinks. While me and Eron thought that
> it's a concept strictly connected to watermarks.
>
> 1. I'm big +1 for changing the StreamStatus definition to stream "providing
> watermark" and "not providing watermark". With respect to that I agree with
> Dawid that record bound idleness *(if we would ever need to define/expose
> it)* should be an intermittent concept, like for example the existing in
> the Task/runtime input availability (StreamTaskInput#isAvailable).
> 3. I agree that neither `StreamStatus` nor `IDLE` is a good name. But
> I also don't have any good ideas.
> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?
>
> Best,
> Piotrek
>
> wt., 8 cze 2021 o 16:35 Arvid Heise <ar...@apache.org> napisaƂ(a):
>
> > Hi devs,
> >
> > While discussing "Watermark propagation with Sink API" and during
> > "[FLINK-18934] Idle stream does not advance watermark in connected
> stream",
> > we noticed some drawbacks on how Flink defines idle partitions currently.
> >
> > To recap, idleness was always considered as a means to achieve progress
> in
> > window operators with idle partition in the source at the risk of losing
> a
> > bit of correctness. In particular, records could be considered late,
> simply
> > because of that idleness timeout and not because they arrived out of
> order.
> > A potential reprocessing would not be causing these records to be
> > considered late and we may end up with a different (correct) result.
> >
> > The drawbacks that we discovered are as follows:
> > - We currently only use idleness to exclude respective upstream tasks
> from
> > participating in watermark generation.
> > - However, the definition is bound to records. [1] In particular, while a
> > partition is idle, no records should be produced.
> > - That brings us into quite a few edge cases, where operators emit
> records,
> > while they are actually idling: Think of timers, asyncIO operators,
> window
> > operators based on timeouts, etc. that trigger on an operator ingesting
> an
> > idle partition.
> > - The proper solution would be to turn the operator active while emitting
> > and to return to being idle afterwards (but when?). However, this has
> some
> > unintended side-effects depending on when you switch back:
> >   - If you toggle stream status for each record, you get a huge overhead
> on
> > stream status records and quite a bit of processing in downstream
> operators
> > (that code path is not much optimized since switching is considered a
> rare
> > thing).
> >   - If you toggle after a certain time, you may get delays>idleness in
> the
> > downstream window operators.
> >   - You could turn back when you processed all pending mails, but if you
> > have a self-replicating mail that would be never. Self-enqueueing, low
> > timer would also produce a flood similar to the first case.
> >
> > All in all, the situation is quite unsatisfying because idleness implies
> no
> > records. However, currently there is no need to have that implication:
> > since we only use it for watermarks, we can easily allow records to be
> > emitted (in fact that was the old behavior before FLINK-18934 in many
> > cases) and still get the intended behavior in respect to watermarks:
> > - A channel that is active is providing watermarks.
> > - An idle channel is not providing any watermarks but can deliver
> records.
> >
> > Ultimately, that would mean that we are actually not talking idle/active
> > partitions anymore. We are talking more about whether a particular
> subtask
> > should influence downstream watermark calculation or not. Leading to the
> > following questions:
> > 1. Do we want to change the definition as outlined?
> > 2. Do you see any problem with emitting records on subtask without
> explicit
> > watermarks?
> > 3. If we want to go this way, we may need to refine the
> names/definitions.
> > Any ideas?
> >
> > I think idle partition should translate into something like
> > automatic/implicit/passive watermarks; active partition into
> > explicit/active watermarks. Then StreamStatus is more about WatermarkMode
> > (not really happy with this one).
> >
> > Best,
> >
> > Arvid
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86
> >
>

Reply via email to