[ https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17195306#comment-17195306 ]
Dawid Wysakowicz commented on FLINK-19167: ------------------------------------------ I second [~aljoscha] I think the example is correct. What do you return in your {{AssignerWithPeriodicWatermarks#extractTimestamp}} ? Do you return {{timestamp}} or {{currentMaxTimestamp}}? The correct behaviour would be to return the {{timestamp}}. The code in the {{onTime}} should trigger. Consider this example. There are 5 events incoming with timestamps t = 1, 2, 4, 8, 7. We want to emit results when the event time reaches 60007. Therefore we need to register a timer for 7 + 60000, which is {{ctx.timestamp() + 60000}}. Let us know if it is still unclear. > Proccess Function Example could not work > ---------------------------------------- > > Key: FLINK-19167 > URL: https://issues.apache.org/jira/browse/FLINK-19167 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.11.1 > Reporter: tinny cat > Priority: Major > > Section "*Porccess Function Example*" of > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] > current is: > {code:java} > // Some comments here > @Override > public void processElement( > Tuple2<String, String> value, > Context ctx, > Collector<Tuple2<String, Long>> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > current.lastModified = ctx.timestamp(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 60000); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector<Tuple2<String, Long>> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > // this will be never happened > if (timestamp == result.lastModified + 60000) { > // emit the state on timeout > out.collect(new Tuple2<String, Long>(result.key, result.count)); > } > } > {code} > however, it should be: > {code:java} > @Override > public void processElement( > Tuple2<String, String> value, > Context ctx, > Collector<Tuple2<String, Long>> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > // it should be the previous watermark > current.lastModified = ctx.timerService().currentWatermark(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 60000); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector<Tuple2<String, Long>> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > if (timestamp == result.lastModified + 60000) { > // emit the state on timeout > out.collect(new Tuple2<String, Long>(result.key, result.count)); > } > } > {code} > `current.lastModified = ctx.timestamp();` should be ` current.lastModified = > ctx.timerService().currentWatermark();` otherwise, `timestamp == > result.lastModified + 60000` will be never happend -- This message was sent by Atlassian Jira (v8.3.4#803005)