Hi All, If I have high parallelism and use processFunction to registerEventTimeTimer, the timer never gets fired. After debugging, I found out the watermark isn't updated because I have keyBy right after assignTimestampsAndWatermarks. And if I set assignTimestampsAndWatermarks right after the keyBy, an exception is thrown.
val contractFlow = enrichedFlow .keyBy(f => f.fiveTupleKey) .assignTimestampsAndWatermarks(new AggFlowTimestampAssigner) <<<<< .process(new FlowContractStitcherProcess) .name("contractStitcher") at FlowContractStitcherProcess.endState(FlowContractResolver.scala:30) at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96) at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Any idea how to solve my problem ? How do I update the watermark after keyBy ? Would I hit scaling issue if on large number of timer if I use registerProcessingTimeTimer instead ? I'm using event time throughout the pipeline, would mixing processing timer with event time might cause problem down the line ? -- Fritz