Hi Dawid, thanks for clarification. Having looked into the related classes again, I agree that exposing the current StreamStatus would severly limit future developments. I guess to expose it in the Sink, we would need to have a new WatermarkStatus in eventtime package (as per Eron's suggestion) that is independent of the internal status. It's very similar to how we have 2 Watermarks (and I now finally understand why there are 2 of them). I'd like to leave that as future work.
With that being said, I started the vote thread about the original proposal [1]. The explicit listing of the name changes are quite verbose, so I left it out in the FLIP. I'd like to point to the POC PR [2] for details. Ultimately, I can also attach an appendix or even an attachment if someone needs it to cast the vote. [1] https://lists.apache.org/thread.html/rcfcb9126e31d6641e1cc96834310c5b6fafff0c948973f97d1ac70f2%40%3Cdev.flink.apache.org%3E [2] https://github.com/apache/flink/pull/16433 On Thu, Aug 5, 2021 at 3:06 PM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hey all, > > Just a couple of comments from my side as I was called here. > > +1 for making stream status just about watermarks. > > I observe that AbstractStreamOperator is hardcoded to derive the output > channel's status from the input channel's status. May I suggest > we refactor "AbstractStreamOperator::emitStreamStatus(StreamStatus,int)" to > allow for the operator subclass to customize the processing of the > aggregated watermark and watermark status. > > The reason for making that method final from my side is very similar to > the reason of this discussion. There is/was no clear definition of what > StreamStatus is and what it is not. I did not want to let users arbitrarily > play with it until we have a clear semantic. Even if I still find the > Operator API only semi public. > > As for the quesion if we should or should not expose it in sinks. If we > say it is a purely user defined logic that affects only watermarks, even if > hardly I can imagine it can be persisted. Still personally I don't think it > is a good approach. As I mentioned a few times before, I see stream status > as a tradeoff between correctness and making progress in real/processing > time. > > However if we decide to expose that in sinks it must not be the > StreamStatus we have now. It must be a completely new class. The current > StreamStatus extends from StreamElement which is a really low level concept > which I am strongly against exposing in any kind of public API. > > Best, > > Dawid > On 05/08/2021 12:21, Till Rohrmann wrote: > > Coming back to my previous comment: I would actually propose to separate > the discussion about whether to expose the WatermarkStatus in the sinks or > not from correcting the StreamStatus and Idleness definition in order to > keep the scope of this FLIP as small as possible. If there is a good reason > to expose the WatermarkStatus, then we can probably do it. > > Cheers, > Till > > On Fri, Jul 30, 2021 at 2:29 PM Arvid Heise <ar...@apache.org> > <ar...@apache.org> wrote: > > > Hi Martijn, > > 1. Good question. The watermarks and statuses of the splits are first > aggregated before emitted through the reader. The watermark strategy of the > user is actually applied on all SourceOutputs (=splits). Since one split is > active and one is idle, the watermark of the reader will not advance until > the user-defined idleness is triggered on the idle split. At this point, > the combined watermark solely depends on the active split. The combined > status remains ACTIVE. > 2. Kafka has no dynamic partitions. This is a complete misnomer on Flink > side. In fact, if you search for Kafka and partition discovery, you will > only find Flink resources. What we actually do is dynamic topic discovery > and that can only be triggered through pattern afaik. We could go for topic > discovery on all patterns by default if we don't do that already. > 3. Yes, idleness on assigned partitions would even work with dynamic > assignments. I will update the FLIP to reflect that. > 4. Afaik it was only meant for scenario 2 (and your question 3) and it > should be this way after the FLIP. I don't know of any source > implementation that uses the user-specified idleness to handle scenario 3. > The thing that is currently extra is that some readers go idle, when the > reader doesn't have an active assignment. > > Best, > > Arvid > > On Fri, Jul 30, 2021 at 12:17 PM Martijn Visser <mart...@ververica.com> > <mart...@ververica.com> > wrote: > > > Hi all, > > I have a couple of questions after studying the FLIP and the docs: > > 1. What happens when one of the readers has two splits assigned and one > > of > > the splits actually receives data? > > 2. If I understand it correctly the Kinesis Source uses dynamic shard > discovery by default (so in case of idleness scenario 3 would happen > > there) > > and the FileSource also has a dynamic assignment. The Kafka Source > > doesn't > > use dynamic partition discovery by default (so scenario 2 would be the > default to happen there). Why did we choose to not enable dynamic > > partition > > discovery by default and should we actually change that? > > 3. To be sure, is it correct that in case of a dynamic assignment and > > there > > is temporarily no data, that scenario 2 is applicable? > > 4. Does WatermarkStrategy#withIdleness currently cover scenario 2, 3 and > the one from my 3rd question? (edited) > > Best regards, > > Martijn > > On Fri, 23 Jul 2021 at 15:57, Till Rohrmann <trohrm...@apache.org> > <trohrm...@apache.org> > > wrote: > > Hi everyone, > > I would be in favour of what Arvid said about not exposing the > WatermarkStatus to the Sink. Unless there is a very strong argument > > that > > this is required I think that keeping this concept internal seems to me > > the > > better choice right now. Moreover, as Arvid said the downstream > > application > > can derive the WatermarkStatus on their own depending on its business > logic. > > Cheers, > Till > > On Fri, Jul 23, 2021 at 2:15 PM Arvid Heise <ar...@apache.org> > <ar...@apache.org> wrote: > > > Hi Eron, > > thank you very much for your feedback. > > Please mention that the "temporary status toggle" code will be > > removed. > > This code is already removed but there is still some automation of > > going > > idle when temporary no splits are assigned. I will include it in the > > FLIP. > > I agree with adding the markActive() functionality, for symmetry. > > Speaking > > of symmetry, could we now include the minor enhancement we > > discussed > > in > > FLIP-167, the exposure of watermark status changes on the Sink > > interface. > > I drafted a PR and would be happy to revisit it. > > > > https://github.com/streamnative/flink/pull/2/files#diff-64d9c652ffc3c60b6d838200a24b106eeeda4b2d853deae94dbbdf16d8d694c2R62-R70 > > I'm still not sure if that's a good idea. > > If we have now refined idleness to be an user-specified, > application-specific way to handle with temporarily stalled > > partitions, > > then downstream applications should actually implement their own > > idleness > > definition. Let's see what other devs think. I'm pinging the once > > that > > have > > been most involved in the discussion: @Stephan Ewen < > > se...@apache.org> > > @Till > Rohrmann <trohrm...@apache.org> <trohrm...@apache.org> @Dawid Wysakowicz < > > dwysakow...@apache.org> > > . > > The flip mentions a 'watermarkstatus' package for the WatermarkStatus > > class. Should it be 'eventtime' package? > > > Are you proposing org.apache.flink.api.common.eventtime? I was simply > suggesting to simply rename > org.apache.flink.streaming.runtime.streamstatus but I'm very open for > > other > > suggestions (given that there are only 2 classes in the package). > > > > Regarding the change of 'streamStatus' to 'watermarkStatus', could > > you > > spell out what the new method names will be on each interface? May > > I > > suggest that Input.emitStreamStatus be Input.processStreamStatus? > > This > > is > > to help decouple the input's watermark status from the output's > > watermark > > status. > > > I haven't found > org.apache.flink.streaming.api.operators.Input#emitStreamStatus in > > master. > > Could you double-check if I'm looking at the correct class? > > The current idea was mainly to grep+replace > > /streamStatus/watermarkStatus/ > > and /StreamStatus/WatermarkStatus/. But again I'm very open for more > descriptive names. I can add an explicit list later. I'm assuming you > > are > > only interested in (semi-)public classes. > > > > I observe that AbstractStreamOperator is hardcoded to derive the > > output > > channel's status from the input channel's status. May I suggest > we refactor > > "AbstractStreamOperator::emitStreamStatus(StreamStatus,int)" > > to > > allow for the operator subclass to customize the processing of the > aggregated watermark and watermark status. > > > Can you add a motivation for that? > @Dawid Wysakowicz <dwysakow...@apache.org> <dwysakow...@apache.org> , I think > you are the > > last > > person that touched the code. Do you have some example operators in > > your > > head that would change it? > > Maybe the FLIP should spell out the expected behavior of the generic > > watermark generator (TimestampsAndWatermarksOperator). Should the > generator ignore the upstream idleness signal? I believe it > > propagates > > the > > signal, even though it also generates its own signals. Given that > source-based and generic watermark generation shouldn't be > > combined, > > one > > could argue that the generic watermark generator should activate > > only > > when > > its input channel's watermark status is idle. > > > I will add a section. In general, we assume that we only have > > source-based > > watermark generators once FLIP-27 is properly adopted. > > Best, > > Arvid > > On Wed, Jul 21, 2021 at 12:40 AM Eron Wright<ewri...@streamnative.io.invalid> > <ewri...@streamnative.io.invalid> wrote: > > > This proposal to narrow the definition of idleness to focus on the > event-time clock is great. > > Please mention that the "temporary status toggle" code will be > > removed. > > I agree with adding the markActive() functionality, for symmetry. > > Speaking > > of symmetry, could we now include the minor enhancement we > > discussed > > in > > FLIP-167, the exposure of watermark status changes on the Sink > > interface. > > I drafted a PR and would be happy to revisit it. > > > > https://github.com/streamnative/flink/pull/2/files#diff-64d9c652ffc3c60b6d838200a24b106eeeda4b2d853deae94dbbdf16d8d694c2R62-R70 > > The flip mentions a 'watermarkstatus' package for the > > WatermarkStatus > > class. Should it be 'eventtime' package? > > Regarding the change of 'streamStatus' to 'watermarkStatus', could > > you > > spell out what the new method names will be on each interface? May > > I > > suggest that Input.emitStreamStatus be Input.processStreamStatus? > > This > > is > > to help decouple the input's watermark status from the output's > > watermark > > status. > > I observe that AbstractStreamOperator is hardcoded to derive the > > output > > channel's status from the input channel's status. May I suggest > we refactor > > "AbstractStreamOperator::emitStreamStatus(StreamStatus,int)" > > to > > allow for the operator subclass to customize the processing of the > aggregated watermark and watermark status. > > Maybe the FLIP should spell out the expected behavior of the > > generic > > watermark generator (TimestampsAndWatermarksOperator). Should the > generator ignore the upstream idleness signal? I believe it > > propagates > > the > > signal, even though it also generates its own signals. Given that > source-based and generic watermark generation shouldn't be > > combined, > > one > > could argue that the generic watermark generator should activate > > only > > when > > its input channel's watermark status is idle. > > Thanks again for this effort! > -Eron > > > On Sun, Jul 18, 2021 at 11:53 PM Arvid Heise <ar...@apache.org> > <ar...@apache.org> > > wrote: > > Dear devs, > > We recently discovered that StreamStatus and Idleness is > > insufficiently > > defined [1], so I drafted a FLIP [3] to amend that situation. It > > would > > be > > good to hear more opinions on that matter. Ideally, we can make > > the > > changes > > to 1.14 as some newly added methods are affected. > > Best, > > Arvid > > [1] > > > > https://lists.apache.org/thread.html/r5194e1cf157d1fd5ba7ca9b567cb01723bd582aa12dda57d25bca37e%40%3Cdev.flink.apache.org%3E > > [2] > > > > https://lists.apache.org/thread.html/rb871f5aecbca6e5d786303557a6cdb3d425954385cbdb1b777f2fcf5%40%3Cdev.flink.apache.org%3E > > [3] > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition > >