[ https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17196123#comment-17196123 ]
Dawid Wysakowicz commented on FLINK-19167: ------------------------------------------ I think the problem is that it is not just simplified, but that it imposes assumptions on how the watermarks are generated (which I think are actually incorrect). I think what [~tinny] is describing that the condition is never met might be actually true. Let's assume a perfect watermark. To be precise a watermark where we generate a Watermark with value of the timestamp of each incoming record. If I am not mistaken the contract is that the element will be emitted first and the Watermark second. In that scenario when we have records incoming with timestamp t = 1, 2, 61000. We should emit a result with count 2 at time t = 60002. However the record which would move the watermark past the 60002, will update the lastModified to 61000. The example "works" only if the watermark progresses because of events from other keys. E.g. if you have values with just a single key, it won't work. > Proccess Function Example could not work > ---------------------------------------- > > Key: FLINK-19167 > URL: https://issues.apache.org/jira/browse/FLINK-19167 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.11.1 > Reporter: tinny cat > Priority: Major > > Section "*Porccess Function Example*" of > [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] > current is: > {code:java} > // Some comments here > @Override > public void processElement( > Tuple2<String, String> value, > Context ctx, > Collector<Tuple2<String, Long>> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > current.lastModified = ctx.timestamp(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 60000); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector<Tuple2<String, Long>> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > // this will be never happened > if (timestamp == result.lastModified + 60000) { > // emit the state on timeout > out.collect(new Tuple2<String, Long>(result.key, result.count)); > } > } > {code} > however, it should be: > {code:java} > @Override > public void processElement( > Tuple2<String, String> value, > Context ctx, > Collector<Tuple2<String, Long>> out) throws Exception { > // retrieve the current count > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } > // update the state's count > current.count++; > // set the state's timestamp to the record's assigned event time > timestamp > // it should be the previous watermark > current.lastModified = ctx.timerService().currentWatermark(); > // write the state back > state.update(current); > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.lastModified + > 60000); > } > @Override > public void onTimer( > long timestamp, > OnTimerContext ctx, > Collector<Tuple2<String, Long>> out) throws Exception { > // get the state for the key that scheduled the timer > CountWithTimestamp result = state.value(); > // check if this is an outdated timer or the latest timer > if (timestamp == result.lastModified + 60000) { > // emit the state on timeout > out.collect(new Tuple2<String, Long>(result.key, result.count)); > } > } > {code} > `current.lastModified = ctx.timestamp();` should be ` current.lastModified = > ctx.timerService().currentWatermark();` otherwise, `timestamp == > result.lastModified + 60000` will be never happend -- This message was sent by Atlassian Jira (v8.3.4#803005)