Hi Jeff,

I think the class you're looking for is StatusWatermarkValve. Note that
this is fairly deep into the runtime stack.
Seth

On Tue, Feb 1, 2022 at 2:34 PM Jeff Carter <jpcarter...@gmail.com> wrote:

> Thanks, Till.
>
> That definitely helps a bit. I'm still not seeing where there is some idle
> variable that the output.markIdle is setting to true (or whatever it sets).
> Like the ideal thing would be if there is just some "output.isIdle()" that
> could be called to know if the stream is or isnt idle. Since that doesn't
> exist, what is the variable in "output" that dictates if it is idle or not
> that that I'd just have to make an isIdle() method to make its state
> visible to other code.
>
> I see the checkIfIdle() method in the code (in at least the testing piece)
> you pointed out, but that seems like it's just a way to set a timer and
> check if the idle state should be set or not. But I dont know if that's
> setting some isIdle variable or if it's just checked and calculated
> everytime and that method is basically the variable I'm looking for. But
> that might just be my confusion.
>
> On Tue, Jan 11, 2022, 11:05 AM Till Rohrmann <trohrm...@apache.org> wrote:
>
> > Hi Jeff,
> >
> > I think this happens in the WatermarksWithIdleness [1].
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java#L73
> >
> > Cheers,
> > Till
> >
> > On Tue, Jan 11, 2022 at 6:05 PM Jeff Carter <jpcarter...@gmail.com>
> wrote:
> >
> > > I'm looking into making a feature for flink related to watermarks and
> am
> > > digging into the inner watermark mechanisms, specifically with
> idleness.
> > > I'm familiar with idleness, but digging into the root code I can only
> get
> > > to where idlenessTimeout gets set in
> WatermarkStrategyWithIdleness.java.
> > >
> > >  But what I'm looking for the pieces beyond that. If I set the idleness
> > to
> > > 500 milliseconds, where in the code does it actually go "I haven't
> seen a
> > > message in 500 milliseconds. I'm setting this stream to idle."?
> > >
> > > The reason being that what I'm thinking of would need to be able to see
> > if
> > > any streams are marked idle, and if so react accordingly.
> > >
> > > Thanks for any help in advance.
> > >
> >
>

Reply via email to