[ 
https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17195306#comment-17195306
 ] 

Dawid Wysakowicz commented on FLINK-19167:
------------------------------------------

I second [~aljoscha] I think the example is correct.
What do you return in your {{AssignerWithPeriodicWatermarks#extractTimestamp}} 
? Do you return {{timestamp}} or {{currentMaxTimestamp}}? The correct behaviour 
would be to return the {{timestamp}}.

The code in the {{onTime}} should trigger. Consider this example. There are 5 
events incoming with timestamps t = 1, 2, 4, 8, 7. We want to emit results when 
the event time reaches 60007. Therefore we need to register a timer for 7 + 
60000, which is {{ctx.timestamp() + 60000}}. Let us know if it is still unclear.

> Proccess Function Example could not work
> ----------------------------------------
>
>                 Key: FLINK-19167
>                 URL: https://issues.apache.org/jira/browse/FLINK-19167
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.11.1
>            Reporter: tinny cat
>            Priority: Major
>
> Section "*Porccess Function Example*" of 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
>  current is:
> {code:java}
> // Some comments here
> @Override
>     public void processElement(
>             Tuple2<String, String> value, 
>             Context ctx, 
>             Collector<Tuple2<String, Long>> out) throws Exception {
>         // retrieve the current count
>         CountWithTimestamp current = state.value();
>         if (current == null) {
>             current = new CountWithTimestamp();
>             current.key = value.f0;
>         }
>         // update the state's count
>         current.count++;
>         // set the state's timestamp to the record's assigned event time 
> timestamp
>         current.lastModified = ctx.timestamp();
>         // write the state back
>         state.update(current);
>         // schedule the next timer 60 seconds from the current event time
>         ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 60000);
>     }
>     @Override
>     public void onTimer(
>             long timestamp, 
>             OnTimerContext ctx, 
>             Collector<Tuple2<String, Long>> out) throws Exception {
>         // get the state for the key that scheduled the timer
>         CountWithTimestamp result = state.value();
>         // check if this is an outdated timer or the latest timer
>         // this will be never happened
>         if (timestamp == result.lastModified + 60000) {
>             // emit the state on timeout
>             out.collect(new Tuple2<String, Long>(result.key, result.count));
>         }
>     }
> {code}
> however, it should be: 
> {code:java}
> @Override
>     public void processElement(
>             Tuple2<String, String> value, 
>             Context ctx, 
>             Collector<Tuple2<String, Long>> out) throws Exception {
>         // retrieve the current count
>         CountWithTimestamp current = state.value();
>         if (current == null) {
>             current = new CountWithTimestamp();
>             current.key = value.f0;
>         }
>         // update the state's count
>         current.count++;
>         // set the state's timestamp to the record's assigned event time 
> timestamp
>         // it should be the previous watermark
>         current.lastModified = ctx.timerService().currentWatermark();
>         // write the state back
>         state.update(current);
>         // schedule the next timer 60 seconds from the current event time
>         ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 60000);
>     }
>     @Override
>     public void onTimer(
>             long timestamp, 
>             OnTimerContext ctx, 
>             Collector<Tuple2<String, Long>> out) throws Exception {
>         // get the state for the key that scheduled the timer
>         CountWithTimestamp result = state.value();
>         // check if this is an outdated timer or the latest timer
>         if (timestamp == result.lastModified + 60000) {
>             // emit the state on timeout
>             out.collect(new Tuple2<String, Long>(result.key, result.count));
>         }
>     }
> {code}
> `current.lastModified = ctx.timestamp();` should be ` current.lastModified = 
> ctx.timerService().currentWatermark();`  otherwise, `timestamp == 
> result.lastModified + 60000` will be never happend



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to