Hi,

Unfortunately, I cannot share the entire code, but the class roughly looks
like this:

 public class WfProcessFunction extends KeyedProcessFunction<Tuple2<String,
String>, Map<String, Object>, Map<String, Object>> {

    @Override
    public void processElement(Map<String, Object> inputRecord,
        Context context, Collector<Map<String, Object>> collector) throws
Exception {
        ...

context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime()
+ 5 * TimeUnit.SECONDS.toMillis(1L));
        ...
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx,
Collector<Map<String, Object>> out) throws Exception {
        ...
    }
}

Thanks!

On Thu, Mar 17, 2022 at 9:24 PM yu'an huang <h.yuan...@gmail.com> wrote:

> [ External sender. Exercise caution. ]
>
> Hi, can you share your code so we can check whether it is written
> correctly.
>
>
>
> > On 18 Mar 2022, at 7:54 AM, Binil Benjamin <bbenja...@splunk.com> wrote:
> >
> > Hi,
> >
> > We have a class that extends KeyedProcessFunction and overrides
> onTimer() method. During processElement(), we register a timer callback
> using
> context.timerService().registerProcessingTimeTimer(<some-future-time>). For
> a while, we see that the onTimer() method is getting called back and
> everything works as expected; however, after a while, the onTimer() stops
> getting any callbacks from Flink (the registration of the timer via.
> registerProcessingTimeTimer() is working just fine). Does anyone know what
> could be wrong here and how we can debug this?
> >
> > Flink version is 1.13.2 (running on AWS KDA)
> >
> > Thanks!
>
>
>

Reply via email to