Hi All,

If I have high parallelism and use processFunction to registerEventTimeTimer, 
the timer never gets fired.
After debugging, I found out the watermark isn't updated because I have keyBy 
right after assignTimestampsAndWatermarks.
And if I set assignTimestampsAndWatermarks right after the keyBy, an exception 
is thrown.

 val contractFlow = enrichedFlow
      .keyBy(f => f.fiveTupleKey)
      .assignTimestampsAndWatermarks(new AggFlowTimestampAssigner) <<<<<
      .process(new FlowContractStitcherProcess)
      .name("contractStitcher")

at FlowContractStitcherProcess.endState(FlowContractResolver.scala:30)
        at 
FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96)
        at 
FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17)
        at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
        at 
org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)


Any idea how to solve my problem ? How do I update the watermark after keyBy ?

Would I hit scaling issue if on large number of timer if I use 
registerProcessingTimeTimer instead ? I'm using event time throughout the 
pipeline, would mixing processing timer with event time might cause problem 
down the line ?

--
Fritz

Reply via email to