Thanks for providing these details Gordon. I have to admit that I do not
fully follow the reasoning why periodic watermark generators forced us to
define idleness for records. Is it because the idleness was generated based
on the non-availability of more data in the sources and not in the
watermark generators which are executed after the records have been read
from the external system? So was the problem where the stream status was
decided in the end?

If there is a periodic watermark generator somewhere in the pipeline that
periodically generates watermarks, then we don't have to mark its output
channels as watermark idle because watermarks are being sent. Hence, given
that the watermark generation logic makes sense, the overall job should be
able to make progress. If the watermark generator is informed about its
input channel status, it could even decide whether to propagate the
watermark idleness and stop generating watermarks or not. Of course, this
leaves room for people shooting themselves into their feet.

Cheers,
Till

On Thu, Jun 10, 2021 at 5:44 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Forgot to provide the link to the [1] reference:
>
> [1] https://issues.apache.org/jira/browse/FLINK-5017
>
> On Thu, Jun 10, 2021 at 11:43 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
> > Hi everyone,
> >
> > Sorry for chiming in late here.
> >
> > Regarding the topic of changing the definition of StreamStatus and
> > changing the name as well:
> > After digging into some of the roots of this implementation [1],
> initially
> > the StreamStatus was actually defined to mark "watermark idleness", and
> not
> > "record idleness" (in fact, the alternative name "WatermarkStatus" was
> > considered at the time).
> >
> > The concern at the time causing us to alter the definition to be "record
> > idleness" in the final implementation was due to the existence of
> periodic
> > timestamp / watermark generators within the pipeline. Those would
> continue
> > to generate non-increasing watermarks in the absence of any input records
> > from upstream. In this scenario, downstream operators would not be able
> to
> > consider that channel as idle and therefore watermark progress is locked.
> > We could consider a timeout-based approach on those specific operators to
> > toggle watermark idleness if the values remain constant for a period of
> > time, but then again, this is very ill-defined and most likely wrong.
> >
> > I have not followed the newest changes to the watermark generator
> > operators and am not sure if this issue is still relevant.
> > Otherwise, I don't see other problems with changing the definition here.
> >
> > Thanks,
> > Gordon
> >
> > On Wed, Jun 9, 2021 at 3:06 PM Arvid Heise <ar...@apache.org> wrote:
> >
> >> Hi Eron,
> >>
> >> again to recap from the other thread:
> >> - You are right that idleness is correct with static assignment and
> fully
> >> active partitions. In this case, the source defines idleness. (case A)
> >> - For the more pressing use cases of idle, assigned partitions, the user
> >> defines an idleness threshold, and it becomes potentially incorrect,
> when
> >> the partition becomes active again. (case B)
> >> - Same holds for dynamic assignment of splits. If a source without a
> split
> >> gets a split assigned dynamically, there is a realistic chance that the
> >> watermark advanced past the first record of the newly assigned split.
> >> (case
> >> C)
> >> You can certainly insist that only the first case is valid (as it's
> >> correct) but we know that users use it in other ways and that was also
> the
> >> intent of the devs.
> >>
> >> Now the question could be if it makes sense to distinguish these cases.
> >> Would you treat the idleness information differently (especially in the
> >> sink/source that motivated FLIP-167) if you knew that the idleness is
> >> guaranteed correct?
> >> We could have some WatermarkStatus with ACTIVE, IDLE (case A), TIMEOUT
> >> (case B).
> >>
> >> However, that would still leave case C, which probably would need to be
> >> solved completely differently. I could imagine that a source with
> dynamic
> >> assignments should never have IDLE subtasks and rather manage the
> idleness
> >> itself. For example, it could emit a watermark per second/minute that is
> >> directly fetched from the source system. I'm just not sure if the
> current
> >> WatermarkAssigner interface suffices in that regard...
> >>
> >>
> >> On Wed, Jun 9, 2021 at 7:31 AM Piotr Nowojski <piotr.nowoj...@gmail.com
> >
> >> wrote:
> >>
> >> > Hi Eron,
> >> >
> >> > Can you elaborate a bit more what do you mean? I don’t understand what
> >> do
> >> > you mean by more general solution.
> >> >
> >> > As of now, stream is marked idle by a source/watermark generator,
> which
> >> > has an effect of temporarily ignoring this stream/partition from
> >> > calculating min watermark in the downstream tasks. However stream is
> >> > switching back to active when any record is emitted. This is what’s
> >> causing
> >> > problems described by Arvid.
> >> >
> >> > The core of our proposal is very simple. Keep everything as it is
> except
> >> > stating that stream will be changed back to active only once a
> >> watermark is
> >> > emitted again - not record. In other words disconnecting idleness from
> >> > presence of records and connecting it only to presence or lack of
> >> > watermarks and allowing to emit records while “stream status” is
> “idle”
> >> >
> >> > Piotrek
> >> >
> >> >
> >> > > Wiadomość napisana przez Eron Wright <ewri...@streamnative.io
> >> .invalid>
> >> > w dniu 09.06.2021, o godz. 06:01:
> >> > >
> >> > > It seems to me that idleness was introduced to deal with a very
> >> specific
> >> > > issue.  In the pipeline, watermarks are aggregated not on a
> per-split
> >> > basis
> >> > > but on a per-subtask basis.  This works well when each subtask has
> >> > exactly
> >> > > one split.  When a sub-task has multiple splits, various
> complications
> >> > > occur involving the commingling of watermarks.  And when a sub-task
> >> has
> >> > no
> >> > > splits, the pipeline stalls altogether.  To deal with the latter
> >> problem,
> >> > > idleness was introduced.  The sub-task simply declares itself to be
> >> idle
> >> > to
> >> > > be taken out of consideration for purposes of watermark aggregation.
> >> > >
> >> > > If we're looking for a more general solution, I would suggest we
> >> discuss
> >> > > how to track watermarks on a per-split basis.  Or, as Till mentioned
> >> > > recently, an alternate solution may be to dynamically adjust the
> >> > > parallelism of the task.
> >> > >
> >> > > I don't agree with the notion that idleness involves a correctness
> >> > > tradeoff.  The facility I described above has no impact on
> >> correctness.
> >> > > Meanwhile, various watermark strategies rely on heuristics involving
> >> the
> >> > > processing-time domain, and the term idleness seems to have found
> >> > purchase
> >> > > there too.  The connection among the concepts seems tenuous.
> >> > >
> >> > > -E
> >> > >
> >> > >
> >> > >
> >> > >> On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski <
> pnowoj...@apache.org>
> >> > wrote:
> >> > >>
> >> > >> Hi Arvid,
> >> > >>
> >> > >> Thanks for writing down this summary and proposal. I think this was
> >> the
> >> > >> foundation of the disagreement in FLIP-167 discussion. Dawid was
> >> arguing
> >> > >> that idleness is intermittent, strictly a task local concept and as
> >> such
> >> > >> shouldn't be exposed in for example sinks. While me and Eron
> thought
> >> > that
> >> > >> it's a concept strictly connected to watermarks.
> >> > >>
> >> > >> 1. I'm big +1 for changing the StreamStatus definition to stream
> >> > "providing
> >> > >> watermark" and "not providing watermark". With respect to that I
> >> agree
> >> > with
> >> > >> Dawid that record bound idleness *(if we would ever need to
> >> > define/expose
> >> > >> it)* should be an intermittent concept, like for example the
> >> existing in
> >> > >> the Task/runtime input availability (StreamTaskInput#isAvailable).
> >> > >> 3. I agree that neither `StreamStatus` nor `IDLE` is a good name.
> But
> >> > >> I also don't have any good ideas.
> >> > >> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?
> >> > >>
> >> > >> Best,
> >> > >> Piotrek
> >> > >>
> >> > >> wt., 8 cze 2021 o 16:35 Arvid Heise <ar...@apache.org> napisał(a):
> >> > >>
> >> > >>> Hi devs,
> >> > >>>
> >> > >>> While discussing "Watermark propagation with Sink API" and during
> >> > >>> "[FLINK-18934] Idle stream does not advance watermark in connected
> >> > >> stream",
> >> > >>> we noticed some drawbacks on how Flink defines idle partitions
> >> > currently.
> >> > >>>
> >> > >>> To recap, idleness was always considered as a means to achieve
> >> progress
> >> > >> in
> >> > >>> window operators with idle partition in the source at the risk of
> >> > losing
> >> > >> a
> >> > >>> bit of correctness. In particular, records could be considered
> late,
> >> > >> simply
> >> > >>> because of that idleness timeout and not because they arrived out
> of
> >> > >> order.
> >> > >>> A potential reprocessing would not be causing these records to be
> >> > >>> considered late and we may end up with a different (correct)
> result.
> >> > >>>
> >> > >>> The drawbacks that we discovered are as follows:
> >> > >>> - We currently only use idleness to exclude respective upstream
> >> tasks
> >> > >> from
> >> > >>> participating in watermark generation.
> >> > >>> - However, the definition is bound to records. [1] In particular,
> >> > while a
> >> > >>> partition is idle, no records should be produced.
> >> > >>> - That brings us into quite a few edge cases, where operators emit
> >> > >> records,
> >> > >>> while they are actually idling: Think of timers, asyncIO
> operators,
> >> > >> window
> >> > >>> operators based on timeouts, etc. that trigger on an operator
> >> ingesting
> >> > >> an
> >> > >>> idle partition.
> >> > >>> - The proper solution would be to turn the operator active while
> >> > emitting
> >> > >>> and to return to being idle afterwards (but when?). However, this
> >> has
> >> > >> some
> >> > >>> unintended side-effects depending on when you switch back:
> >> > >>>  - If you toggle stream status for each record, you get a huge
> >> overhead
> >> > >> on
> >> > >>> stream status records and quite a bit of processing in downstream
> >> > >> operators
> >> > >>> (that code path is not much optimized since switching is
> considered
> >> a
> >> > >> rare
> >> > >>> thing).
> >> > >>>  - If you toggle after a certain time, you may get delays>idleness
> >> in
> >> > >> the
> >> > >>> downstream window operators.
> >> > >>>  - You could turn back when you processed all pending mails, but
> if
> >> you
> >> > >>> have a self-replicating mail that would be never. Self-enqueueing,
> >> low
> >> > >>> timer would also produce a flood similar to the first case.
> >> > >>>
> >> > >>> All in all, the situation is quite unsatisfying because idleness
> >> > implies
> >> > >> no
> >> > >>> records. However, currently there is no need to have that
> >> implication:
> >> > >>> since we only use it for watermarks, we can easily allow records
> to
> >> be
> >> > >>> emitted (in fact that was the old behavior before FLINK-18934 in
> >> many
> >> > >>> cases) and still get the intended behavior in respect to
> watermarks:
> >> > >>> - A channel that is active is providing watermarks.
> >> > >>> - An idle channel is not providing any watermarks but can deliver
> >> > >> records.
> >> > >>>
> >> > >>> Ultimately, that would mean that we are actually not talking
> >> > idle/active
> >> > >>> partitions anymore. We are talking more about whether a particular
> >> > >> subtask
> >> > >>> should influence downstream watermark calculation or not. Leading
> to
> >> > the
> >> > >>> following questions:
> >> > >>> 1. Do we want to change the definition as outlined?
> >> > >>> 2. Do you see any problem with emitting records on subtask without
> >> > >> explicit
> >> > >>> watermarks?
> >> > >>> 3. If we want to go this way, we may need to refine the
> >> > >> names/definitions.
> >> > >>> Any ideas?
> >> > >>>
> >> > >>> I think idle partition should translate into something like
> >> > >>> automatic/implicit/passive watermarks; active partition into
> >> > >>> explicit/active watermarks. Then StreamStatus is more about
> >> > WatermarkMode
> >> > >>> (not really happy with this one).
> >> > >>>
> >> > >>> Best,
> >> > >>>
> >> > >>> Arvid
> >> > >>>
> >> > >>> [1]
> >> > >>>
> >> > >>>
> >> > >>
> >> >
> >>
> https://github.com/apache/flink/blob/a954aed31b9b01473ffa002f62438723172a7b7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L28-L86
> >> > >>>
> >> > >>
> >> >
> >>
> >
>

Reply via email to