Hi, Unfortunately, I cannot share the entire code, but the class roughly looks like this:
public class WfProcessFunction extends KeyedProcessFunction<Tuple2<String, String>, Map<String, Object>, Map<String, Object>> { @Override public void processElement(Map<String, Object> inputRecord, Context context, Collector<Map<String, Object>> collector) throws Exception { ... context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime() + 5 * TimeUnit.SECONDS.toMillis(1L)); ... } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Map<String, Object>> out) throws Exception { ... } } Thanks! On Thu, Mar 17, 2022 at 9:24 PM yu'an huang <h.yuan...@gmail.com> wrote: > [ External sender. Exercise caution. ] > > Hi, can you share your code so we can check whether it is written > correctly. > > > > > On 18 Mar 2022, at 7:54 AM, Binil Benjamin <bbenja...@splunk.com> wrote: > > > > Hi, > > > > We have a class that extends KeyedProcessFunction and overrides > onTimer() method. During processElement(), we register a timer callback > using > context.timerService().registerProcessingTimeTimer(<some-future-time>). For > a while, we see that the onTimer() method is getting called back and > everything works as expected; however, after a while, the onTimer() stops > getting any callbacks from Flink (the registration of the timer via. > registerProcessingTimeTimer() is working just fine). Does anyone know what > could be wrong here and how we can debug this? > > > > Flink version is 1.13.2 (running on AWS KDA) > > > > Thanks! > > >