Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-23 Thread Binil Benjamin
Yes, restarting the app with a clean state does seem to fix the issue, but I think I may have found a bug in Flink. Here's how we can replicate it: - Create a simple application with KeyedProcessFunction (with onTimer()) - Send a few records with the same key. In processElement(), register a timer

Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-22 Thread yu'an huang
After fixing your negative timestamp bug, can the timer be triggered? > On 23 Mar 2022, at 2:39 AM, Binil Benjamin wrote: > > Here are some more findings as I was debugging this. I peeked into the > snapshot to see the current values in "_timer_state/processing_user-timers" > and here is ho

Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-22 Thread Binil Benjamin
Here are some more findings as I was debugging this. I peeked into the snapshot to see the current values in "_timer_state/processing_user-timers" and here is how they look: Timer{timestamp=-9223372036854715808, key=(FFX22...), namespace=VoidNamespace} Timer{timestamp=-9223372036854715808, key=(FF

Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-18 Thread Binil Benjamin
Hi, Parallelism is currently set to 9 and it appears to be occurring for all subtasks. We did put logs to see the various timestamps. The following logs are from the last 5 days. - logs from processElement() - logged immediately after timer registration: "message": "FunctionName=WfProcessFun

Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-18 Thread Yun Gao
Hi Binil, I think the code itself also looks good to me. May I have a double confirmation on the details of the issue: 1. What is the parallelism of this operator, and does the issues occurs for all the subtasks? 2. Have we already added some logs in the processElement and onTimer to print the

Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-18 Thread Binil Benjamin
Hi, Unfortunately, I cannot share the entire code, but the class roughly looks like this: public class WfProcessFunction extends KeyedProcessFunction, Map, Map> { @Override public void processElement(Map inputRecord, Context context, Collector> collector) throws Exception {

Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-17 Thread yu'an huang
Hi, can you share your code so we can check whether it is written correctly. > On 18 Mar 2022, at 7:54 AM, Binil Benjamin wrote: > > Hi, > > We have a class that extends KeyedProcessFunction and overrides onTimer() > method. During processElement(), we register a timer callback using > cont