Hi Dawid,

thanks for clarification. Having looked into the related classes again, I
agree that exposing the current StreamStatus would severly limit future
developments. I guess to expose it in the Sink, we would need to have a new
WatermarkStatus in eventtime package (as per Eron's suggestion) that is
independent of the internal status. It's very similar to how we have 2
Watermarks (and I now finally understand why there are 2 of them). I'd like
to leave that as future work.

With that being said, I started the vote thread about the original proposal
[1].

The explicit listing of the name changes are quite verbose, so I left it
out in the FLIP. I'd like to point to the POC PR [2] for details.
Ultimately, I can also attach an appendix or even an attachment if someone
needs it to cast the vote.

[1]
https://lists.apache.org/thread.html/rcfcb9126e31d6641e1cc96834310c5b6fafff0c948973f97d1ac70f2%40%3Cdev.flink.apache.org%3E
[2] https://github.com/apache/flink/pull/16433

On Thu, Aug 5, 2021 at 3:06 PM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hey all,
>
> Just a couple of comments from my side as I was called here.
>
> +1 for making stream status just about watermarks.
>
> I observe that AbstractStreamOperator is hardcoded to derive the output
> channel's status from the input channel's status.  May I suggest
> we refactor "AbstractStreamOperator::emitStreamStatus(StreamStatus,int)" to
> allow for the operator subclass to customize the processing of the
> aggregated watermark and watermark status.
>
> The reason for making that method final from my side is very similar to
> the reason of this discussion. There is/was no clear definition of what
> StreamStatus is and what it is not. I did not want to let users arbitrarily
> play with it until we have a clear semantic. Even if I still find the
> Operator API only semi public.
>
> As for the quesion if we should or should not expose it in sinks. If we
> say it is a purely user defined logic that affects only watermarks, even if
> hardly I can imagine it can be persisted. Still personally I don't think it
> is a good approach. As I mentioned a few times before, I see stream status
> as a tradeoff between correctness and making progress in real/processing
> time.
>
> However if we decide to expose that in sinks it must not be the
> StreamStatus we have now. It must be a completely new class. The current
> StreamStatus extends from StreamElement which is a really low level concept
> which I am strongly against exposing in any kind of public API.
>
> Best,
>
> Dawid
> On 05/08/2021 12:21, Till Rohrmann wrote:
>
> Coming back to my previous comment: I would actually propose to separate
> the discussion about whether to expose the WatermarkStatus in the sinks or
> not from correcting the StreamStatus and Idleness definition in order to
> keep the scope of this FLIP as small as possible. If there is a good reason
> to expose the WatermarkStatus, then we can probably do it.
>
> Cheers,
> Till
>
> On Fri, Jul 30, 2021 at 2:29 PM Arvid Heise <ar...@apache.org> 
> <ar...@apache.org> wrote:
>
>
> Hi Martijn,
>
> 1. Good question. The watermarks and statuses of the splits are first
> aggregated before emitted through the reader. The watermark strategy of the
> user is actually applied on all SourceOutputs (=splits). Since one split is
> active and one is idle, the watermark of the reader will not advance until
> the user-defined idleness is triggered on the idle split. At this point,
> the combined watermark solely depends on the active split. The combined
> status remains ACTIVE.
> 2. Kafka has no dynamic partitions. This is a complete misnomer on Flink
> side. In fact, if you search for Kafka and partition discovery, you will
> only find Flink resources. What we actually do is dynamic topic discovery
> and that can only be triggered through pattern afaik. We could go for topic
> discovery on all patterns by default if we don't do that already.
> 3. Yes, idleness on assigned partitions would even work with dynamic
> assignments. I will update the FLIP to reflect that.
> 4. Afaik it was only meant for scenario 2 (and your question 3) and it
> should be this way after the FLIP. I don't know of any source
> implementation that uses the user-specified idleness to handle scenario 3.
> The thing that is currently extra is that some readers go idle, when the
> reader doesn't have an active assignment.
>
> Best,
>
> Arvid
>
> On Fri, Jul 30, 2021 at 12:17 PM Martijn Visser <mart...@ververica.com> 
> <mart...@ververica.com>
> wrote:
>
>
> Hi all,
>
> I have a couple of questions after studying the FLIP and the docs:
>
> 1. What happens when one of the readers has two splits assigned and one
>
> of
>
> the splits actually receives data?
>
> 2. If I understand it correctly the Kinesis Source uses dynamic shard
> discovery by default (so in case of idleness scenario 3 would happen
>
> there)
>
> and the FileSource also has a dynamic assignment. The Kafka Source
>
> doesn't
>
> use dynamic partition discovery by default (so scenario 2 would be the
> default to happen there). Why did we choose to not enable dynamic
>
> partition
>
> discovery by default and should we actually change that?
>
> 3. To be sure, is it correct that in case of a dynamic assignment and
>
> there
>
> is temporarily no data, that scenario 2 is applicable?
>
> 4. Does WatermarkStrategy#withIdleness currently cover scenario 2, 3 and
> the one from my 3rd question? (edited)
>
> Best regards,
>
> Martijn
>
> On Fri, 23 Jul 2021 at 15:57, Till Rohrmann <trohrm...@apache.org> 
> <trohrm...@apache.org>
>
> wrote:
>
> Hi everyone,
>
> I would be in favour of what Arvid said about not exposing the
> WatermarkStatus to the Sink. Unless there is a very strong argument
>
> that
>
> this is required I think that keeping this concept internal seems to me
>
> the
>
> better choice right now. Moreover, as Arvid said the downstream
>
> application
>
> can derive the WatermarkStatus on their own depending on its business
> logic.
>
> Cheers,
> Till
>
> On Fri, Jul 23, 2021 at 2:15 PM Arvid Heise <ar...@apache.org> 
> <ar...@apache.org> wrote:
>
>
> Hi Eron,
>
> thank you very much for your feedback.
>
> Please mention that the "temporary status toggle" code will be
>
> removed.
>
> This code is already removed but there is still some automation of
>
> going
>
> idle when temporary no splits are assigned. I will include it in the
>
> FLIP.
>
> I agree with adding the markActive() functionality, for symmetry.
>
> Speaking
>
> of symmetry, could we now include the minor enhancement we
>
> discussed
>
> in
>
> FLIP-167, the exposure of watermark status changes on the Sink
>
> interface.
>
> I drafted a PR and would be happy to revisit it.
>
>
>
> https://github.com/streamnative/flink/pull/2/files#diff-64d9c652ffc3c60b6d838200a24b106eeeda4b2d853deae94dbbdf16d8d694c2R62-R70
>
> I'm still not sure if that's a good idea.
>
> If we have now refined idleness to be an user-specified,
> application-specific way to handle with temporarily stalled
>
> partitions,
>
> then downstream applications should actually implement their own
>
> idleness
>
> definition. Let's see what other devs think. I'm pinging the once
>
> that
>
> have
>
> been most involved in the discussion: @Stephan Ewen <
>
> se...@apache.org>
>
> @Till
> Rohrmann <trohrm...@apache.org> <trohrm...@apache.org> @Dawid Wysakowicz <
>
> dwysakow...@apache.org>
>
> .
>
> The flip mentions a 'watermarkstatus' package for the WatermarkStatus
>
> class.  Should it be 'eventtime' package?
>
>
> Are you proposing org.apache.flink.api.common.eventtime? I was simply
> suggesting to simply rename
> org.apache.flink.streaming.runtime.streamstatus but I'm very open for
>
> other
>
> suggestions (given that there are only 2 classes in the package).
>
>
>
> Regarding the change of 'streamStatus' to 'watermarkStatus', could
>
> you
>
> spell out what the new method names will be on each interface? May
>
> I
>
> suggest that Input.emitStreamStatus be Input.processStreamStatus?
>
> This
>
> is
>
> to help decouple the input's watermark status from the output's
>
> watermark
>
> status.
>
>
> I haven't found
> org.apache.flink.streaming.api.operators.Input#emitStreamStatus in
>
> master.
>
> Could you double-check if I'm looking at the correct class?
>
> The current idea was mainly to grep+replace
>
> /streamStatus/watermarkStatus/
>
> and /StreamStatus/WatermarkStatus/. But again I'm very open for more
> descriptive names. I can add an explicit list later. I'm assuming you
>
> are
>
> only interested in (semi-)public classes.
>
>
>
> I observe that AbstractStreamOperator is hardcoded to derive the
>
> output
>
> channel's status from the input channel's status.  May I suggest
> we refactor
>
> "AbstractStreamOperator::emitStreamStatus(StreamStatus,int)"
>
> to
>
> allow for the operator subclass to customize the processing of the
> aggregated watermark and watermark status.
>
>
> Can you add a motivation for that?
> @Dawid Wysakowicz <dwysakow...@apache.org> <dwysakow...@apache.org> , I think 
> you are the
>
> last
>
> person that touched the code. Do you have some example operators in
>
> your
>
> head that would change it?
>
> Maybe the FLIP should spell out the expected behavior of the generic
>
> watermark generator (TimestampsAndWatermarksOperator).  Should the
> generator ignore the upstream idleness signal?  I believe it
>
> propagates
>
> the
>
> signal, even though it also generates its own signals.   Given that
> source-based and generic watermark generation shouldn't be
>
> combined,
>
> one
>
> could argue that the generic watermark generator should activate
>
> only
>
> when
>
> its input channel's watermark status is idle.
>
>
> I will add a section. In general, we assume that we only have
>
> source-based
>
> watermark generators once FLIP-27 is properly adopted.
>
> Best,
>
> Arvid
>
> On Wed, Jul 21, 2021 at 12:40 AM Eron Wright<ewri...@streamnative.io.invalid> 
> <ewri...@streamnative.io.invalid> wrote:
>
>
> This proposal to narrow the definition of idleness to focus on the
> event-time clock is great.
>
> Please mention that the "temporary status toggle" code will be
>
> removed.
>
> I agree with adding the markActive() functionality, for symmetry.
>
> Speaking
>
> of symmetry, could we now include the minor enhancement we
>
> discussed
>
> in
>
> FLIP-167, the exposure of watermark status changes on the Sink
>
> interface.
>
> I drafted a PR and would be happy to revisit it.
>
>
>
> https://github.com/streamnative/flink/pull/2/files#diff-64d9c652ffc3c60b6d838200a24b106eeeda4b2d853deae94dbbdf16d8d694c2R62-R70
>
> The flip mentions a 'watermarkstatus' package for the
>
> WatermarkStatus
>
> class.  Should it be 'eventtime' package?
>
> Regarding the change of 'streamStatus' to 'watermarkStatus', could
>
> you
>
> spell out what the new method names will be on each interface? May
>
> I
>
> suggest that Input.emitStreamStatus be Input.processStreamStatus?
>
> This
>
> is
>
> to help decouple the input's watermark status from the output's
>
> watermark
>
> status.
>
> I observe that AbstractStreamOperator is hardcoded to derive the
>
> output
>
> channel's status from the input channel's status.  May I suggest
> we refactor
>
> "AbstractStreamOperator::emitStreamStatus(StreamStatus,int)"
>
> to
>
> allow for the operator subclass to customize the processing of the
> aggregated watermark and watermark status.
>
> Maybe the FLIP should spell out the expected behavior of the
>
> generic
>
> watermark generator (TimestampsAndWatermarksOperator).  Should the
> generator ignore the upstream idleness signal?  I believe it
>
> propagates
>
> the
>
> signal, even though it also generates its own signals.   Given that
> source-based and generic watermark generation shouldn't be
>
> combined,
>
> one
>
> could argue that the generic watermark generator should activate
>
> only
>
> when
>
> its input channel's watermark status is idle.
>
> Thanks again for this effort!
> -Eron
>
>
> On Sun, Jul 18, 2021 at 11:53 PM Arvid Heise <ar...@apache.org> 
> <ar...@apache.org>
>
> wrote:
>
> Dear devs,
>
> We recently discovered that StreamStatus and Idleness is
>
> insufficiently
>
> defined [1], so I drafted a FLIP [3] to amend that situation. It
>
> would
>
> be
>
> good to hear more opinions on that matter. Ideally, we can make
>
> the
>
> changes
>
> to 1.14 as some newly added methods are affected.
>
> Best,
>
> Arvid
>
> [1]
>
>
>
> https://lists.apache.org/thread.html/r5194e1cf157d1fd5ba7ca9b567cb01723bd582aa12dda57d25bca37e%40%3Cdev.flink.apache.org%3E
>
> [2]
>
>
>
> https://lists.apache.org/thread.html/rb871f5aecbca6e5d786303557a6cdb3d425954385cbdb1b777f2fcf5%40%3Cdev.flink.apache.org%3E
>
> [3]
>
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
>
>

Reply via email to