[ https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17195154#comment-17195154 ]
tinny cat commented on FLINK-19167: ----------------------------------- The watermark did increase, and the timer did trigger. I mean there is a problem with the code logic in the onTimer() method ,this equation will never hold. {code:java} if (timestamp == result.lastModified + 60000) { // emit the state on timeout out.collect(new Tuple2<String, Long>(result.key, result.count)); } {code} the reason is the code is incorrect, {code:java} current.lastModified = ctx.timestamp(); {code} it should be {code:java} current.lastModified = ctx.timerService().currentWatermark(); {code} > Proccess Function Example could not work > ---------------------------------------- > > Key: FLINK-19167 > URL: https://issues.apache.org/jira/browse/FLINK-19167 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.11.1 > Reporter: tinny cat > Priority: Major > > Section "*Porccess Function Example*" of > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] > current is: > {code:java} > // Some comments here > @Override > public void processElement( > Tuple2<String, String> value, > Context ctx, > Collector<Tuple2<String, Long>> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > current.lastModified = ctx.timestamp(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 60000); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector<Tuple2<String, Long>> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > // this will be never happened > if (timestamp == result.lastModified + 60000) { > // emit the state on timeout > out.collect(new Tuple2<String, Long>(result.key, result.count)); > } > } > {code} > however, it should be: > {code:java} > @Override > public void processElement( > Tuple2<String, String> value, > Context ctx, > Collector<Tuple2<String, Long>> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > // it should be the previous watermark > current.lastModified = ctx.timerService().currentWatermark(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 60000); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector<Tuple2<String, Long>> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > if (timestamp == result.lastModified + 60000) { > // emit the state on timeout > out.collect(new Tuple2<String, Long>(result.key, result.count)); > } > } > {code} > `current.lastModified = ctx.timestamp();` should be ` current.lastModified = > ctx.timerService().currentWatermark();` otherwise, `timestamp == > result.lastModified + 60000` will be never happend -- This message was sent by Atlassian Jira (v8.3.4#803005)