[ 
https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17196015#comment-17196015
 ] 

tinny cat commented on FLINK-19167:
-----------------------------------

The following is my test output code:
{code:java}
@Override
  public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<Tuple2<String, Long>> out) throws Exception {
    Tuple1<String> key = (Tuple1<String>) ctx.getCurrentKey();
    CountWithTimestamp result = state.value();
    System.out.println("timestamp: " + timestamp + ", " + "lastModified: " + 
result.lastModified);
    if (timestamp == result.lastModified + 60000) {
      out.collect(new Tuple2<>(key.f0, result.count));
    }
  }
{code}
Here are the three pieces of data I sent:
 7, 8, 60007
 Then the output of flink is as follows:
{code:java}
timestamp: 60007, lastModified: 60007
{code}
[~dwysakowicz]


> Proccess Function Example could not work
> ----------------------------------------
>
>                 Key: FLINK-19167
>                 URL: https://issues.apache.org/jira/browse/FLINK-19167
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.11.1
>            Reporter: tinny cat
>            Priority: Major
>
> Section "*Porccess Function Example*" of 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
>  current is:
> {code:java}
> // Some comments here
> @Override
>     public void processElement(
>             Tuple2<String, String> value, 
>             Context ctx, 
>             Collector<Tuple2<String, Long>> out) throws Exception {
>         // retrieve the current count
>         CountWithTimestamp current = state.value();
>         if (current == null) {
>             current = new CountWithTimestamp();
>             current.key = value.f0;
>         }
>         // update the state's count
>         current.count++;
>         // set the state's timestamp to the record's assigned event time 
> timestamp
>         current.lastModified = ctx.timestamp();
>         // write the state back
>         state.update(current);
>         // schedule the next timer 60 seconds from the current event time
>         ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 60000);
>     }
>     @Override
>     public void onTimer(
>             long timestamp, 
>             OnTimerContext ctx, 
>             Collector<Tuple2<String, Long>> out) throws Exception {
>         // get the state for the key that scheduled the timer
>         CountWithTimestamp result = state.value();
>         // check if this is an outdated timer or the latest timer
>         // this will be never happened
>         if (timestamp == result.lastModified + 60000) {
>             // emit the state on timeout
>             out.collect(new Tuple2<String, Long>(result.key, result.count));
>         }
>     }
> {code}
> however, it should be: 
> {code:java}
> @Override
>     public void processElement(
>             Tuple2<String, String> value, 
>             Context ctx, 
>             Collector<Tuple2<String, Long>> out) throws Exception {
>         // retrieve the current count
>         CountWithTimestamp current = state.value();
>         if (current == null) {
>             current = new CountWithTimestamp();
>             current.key = value.f0;
>         }
>         // update the state's count
>         current.count++;
>         // set the state's timestamp to the record's assigned event time 
> timestamp
>         // it should be the previous watermark
>         current.lastModified = ctx.timerService().currentWatermark();
>         // write the state back
>         state.update(current);
>         // schedule the next timer 60 seconds from the current event time
>         ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 60000);
>     }
>     @Override
>     public void onTimer(
>             long timestamp, 
>             OnTimerContext ctx, 
>             Collector<Tuple2<String, Long>> out) throws Exception {
>         // get the state for the key that scheduled the timer
>         CountWithTimestamp result = state.value();
>         // check if this is an outdated timer or the latest timer
>         if (timestamp == result.lastModified + 60000) {
>             // emit the state on timeout
>             out.collect(new Tuple2<String, Long>(result.key, result.count));
>         }
>     }
> {code}
> `current.lastModified = ctx.timestamp();` should be ` current.lastModified = 
> ctx.timerService().currentWatermark();`  otherwise, `timestamp == 
> result.lastModified + 60000` will be never happend



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to