Yes, restarting the app with a clean state does seem to fix the issue, but
I think I may have found a bug in Flink.
Here's how we can replicate it:
- Create a simple application with KeyedProcessFunction (with onTimer())
- Send a few records with the same key. In processElement(), register a
timer
After fixing your negative timestamp bug, can the timer be triggered?
> On 23 Mar 2022, at 2:39 AM, Binil Benjamin wrote:
>
> Here are some more findings as I was debugging this. I peeked into the
> snapshot to see the current values in "_timer_state/processing_user-timers"
> and here is ho
Here are some more findings as I was debugging this. I peeked into the
snapshot to see the current values in
"_timer_state/processing_user-timers" and here is how they look:
Timer{timestamp=-9223372036854715808, key=(FFX22...),
namespace=VoidNamespace}
Timer{timestamp=-9223372036854715808, key=(FF
Hi,
Parallelism is currently set to 9 and it appears to be occurring for all
subtasks.
We did put logs to see the various timestamps. The following logs are from
the last 5 days.
- logs from processElement() - logged immediately after timer registration:
"message": "FunctionName=WfProcessFun
Hi Binil,
I think the code itself also looks good to me. May I have a double confirmation
on the
details of the issue:
1. What is the parallelism of this operator, and does the issues occurs for all
the subtasks?
2. Have we already added some logs in the processElement and onTimer to print
the
Hi,
Unfortunately, I cannot share the entire code, but the class roughly looks
like this:
public class WfProcessFunction extends KeyedProcessFunction, Map, Map> {
@Override
public void processElement(Map inputRecord,
Context context, Collector> collector) throws
Exception {
Hi, can you share your code so we can check whether it is written correctly.
> On 18 Mar 2022, at 7:54 AM, Binil Benjamin wrote:
>
> Hi,
>
> We have a class that extends KeyedProcessFunction and overrides onTimer()
> method. During processElement(), we register a timer callback using
> cont