Hi Jeff, I think the class you're looking for is StatusWatermarkValve. Note that this is fairly deep into the runtime stack.
Seth On Tue, Feb 1, 2022 at 2:34 PM Jeff Carter <jpcarter...@gmail.com> wrote: > Thanks, Till. > > That definitely helps a bit. I'm still not seeing where there is some idle > variable that the output.markIdle is setting to true (or whatever it sets). > Like the ideal thing would be if there is just some "output.isIdle()" that > could be called to know if the stream is or isnt idle. Since that doesn't > exist, what is the variable in "output" that dictates if it is idle or not > that that I'd just have to make an isIdle() method to make its state > visible to other code. > > I see the checkIfIdle() method in the code (in at least the testing piece) > you pointed out, but that seems like it's just a way to set a timer and > check if the idle state should be set or not. But I dont know if that's > setting some isIdle variable or if it's just checked and calculated > everytime and that method is basically the variable I'm looking for. But > that might just be my confusion. > > On Tue, Jan 11, 2022, 11:05 AM Till Rohrmann <trohrm...@apache.org> wrote: > > > Hi Jeff, > > > > I think this happens in the WatermarksWithIdleness [1]. > > > > [1] > > > > > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java#L73 > > > > Cheers, > > Till > > > > On Tue, Jan 11, 2022 at 6:05 PM Jeff Carter <jpcarter...@gmail.com> > wrote: > > > > > I'm looking into making a feature for flink related to watermarks and > am > > > digging into the inner watermark mechanisms, specifically with > idleness. > > > I'm familiar with idleness, but digging into the root code I can only > get > > > to where idlenessTimeout gets set in > WatermarkStrategyWithIdleness.java. > > > > > > But what I'm looking for the pieces beyond that. If I set the idleness > > to > > > 500 milliseconds, where in the code does it actually go "I haven't > seen a > > > message in 500 milliseconds. I'm setting this stream to idle."? > > > > > > The reason being that what I'm thinking of would need to be able to see > > if > > > any streams are marked idle, and if so react accordingly. > > > > > > Thanks for any help in advance. > > > > > >