Hi Matthias, Thanks for providing the example, I would reply back soon after I do some debug.
Best, JING ZHANG Matthias Broecheler <matth...@dataeng.ai> 于2021年8月19日周四 上午1:53写道: > Hey JING, > > thanks for getting back to me. I tried to produce the smallest, > self-contained example that produces the phenomenon: > https://gist.github.com/mbroecheler/fd27dd8a810b038ec463cdd5339d290f > > If you run MainRepl you should see an infinite loop of re-processing the 5 > integers. The offending process is BufferedLatestSelector - specifically > the event timer that is registered in it. Without the timer the process > will not emit an output. > > The timer is set whenever the state is null. Is there a problem with how I > implemented that buffering process? > Thank you, > Matthias > > On Sun, Aug 15, 2021 at 8:59 PM JING ZHANG <beyond1...@gmail.com> wrote: > >> Hi Matthias, >> How often do you register the event-time timer? >> It is registered per input record, or re-registered a new timer after an >> event-time timer is triggered? >> Would you please provide your test case code, it would be very helpful >> for troubleshooting. >> >> Best wishes, >> JING ZHANG >> >> Matthias Broecheler <matth...@dataeng.ai> 于2021年8月14日周六 上午3:44写道: >> >>> Hey guys, >>> >>> I have a KeyedProcessFunction that gathers statistics on the events that >>> flow through and emits it periodically (every few seconds) to a SideOutput. >>> However, at the end of stream the last set of statistics don't get >>> emitted. I read on the mailing list that processing time timers that are >>> pending don't get triggered when Flink cleans up a stream, but that event >>> timers do get triggered because a watermark with Long.MAX_VALUE is sent >>> through the stream. >>> Hence, I thought that I could register a "backup" event timer for >>> Long.MAX_VALUE-1 to make sure that my process function gets notified when >>> the stream ends to emit the in-flight statistics. >>> >>> However, now my simple test case (with a data source fromCollection of 4 >>> elements) keeps iterating over the same 4 elements in an infinite loop. >>> >>> I don't know how to make sense of this and would appreciate your help. >>> Is there a better way to set a timer that gets triggered at the end of >>> stream? >>> And for my education: Why does registering an event timer cause an >>> infinite loop over the source elements? >>> >>> Thanks a lot and have a wonderful weekend, >>> Matthias >>> >>