[ 
https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tinny cat updated FLINK-19167:
------------------------------
    Description: 
Section "*Porccess Function Example*" of 
[https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
 current is:
{code:java}
// Some comments here
@Override
    public void processElement(
            Tuple2<String, String> value, 
            Context ctx, 
            Collector<Tuple2<String, Long>> out) throws Exception {

        // retrieve the current count
        CountWithTimestamp current = state.value();
        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.f0;
        }

        // update the state's count
        current.count++;

        // set the state's timestamp to the record's assigned event time 
timestamp
        current.lastModified = ctx.timestamp();

        // write the state back
        state.update(current);

        // schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
    }

    @Override
    public void onTimer(
            long timestamp, 
            OnTimerContext ctx, 
            Collector<Tuple2<String, Long>> out) throws Exception {

        // get the state for the key that scheduled the timer
        CountWithTimestamp result = state.value();

        // check if this is an outdated timer or the latest timer
        // this will be never happened
        if (timestamp == result.lastModified + 60000) {
            // emit the state on timeout
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }
    }
{code}
however, it should be: 
{code:java}
@Override
    public void processElement(
            Tuple2<String, String> value, 
            Context ctx, 
            Collector<Tuple2<String, Long>> out) throws Exception {

        // retrieve the current count
        CountWithTimestamp current = state.value();
        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.f0;
        }

        // update the state's count
        current.count++;

        // set the state's timestamp to the record's assigned event time 
timestamp
        // it should be the previous watermark
        current.lastModified = ctx.timerService().currentWatermark();

        // write the state back
        state.update(current);

        // schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
    }

    @Override
    public void onTimer(
            long timestamp, 
            OnTimerContext ctx, 
            Collector<Tuple2<String, Long>> out) throws Exception {

        // get the state for the key that scheduled the timer
        CountWithTimestamp result = state.value();

        // check if this is an outdated timer or the latest timer
        if (timestamp == result.lastModified + 60000) {
            // emit the state on timeout
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }
    }
{code}

  was:
Section "*Porccess Function Example*" of 
[https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
 current is:
{code:java}
// Some comments here
@Override
    public void processElement(
            Tuple2<String, String> value, 
            Context ctx, 
            Collector<Tuple2<String, Long>> out) throws Exception {

        // retrieve the current count
        CountWithTimestamp current = state.value();
        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.f0;
        }

        // update the state's count
        current.count++;

        // set the state's timestamp to the record's assigned event time 
timestamp
        current.lastModified = ctx.timestamp();

        // write the state back
        state.update(current);

        // schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
    }

    @Override
    public void onTimer(
            long timestamp, 
            OnTimerContext ctx, 
            Collector<Tuple2<String, Long>> out) throws Exception {

        // get the state for the key that scheduled the timer
        CountWithTimestamp result = state.value();

        // check if this is an outdated timer or the latest timer
        // this will be never happened
        if (timestamp == result.lastModified + 60000) {
            // emit the state on timeout
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }
    }
{code}
however, it should be:
{code:java}
@Override
    public void processElement(
            Tuple2<String, String> value, 
            Context ctx, 
            Collector<Tuple2<String, Long>> out) throws Exception {

        // retrieve the current count
        CountWithTimestamp current = state.value();
        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.f0;
        }

        // update the state's count
        current.count++;

        // set the state's timestamp to the record's assigned event time 
timestamp
        // it should be the previous watermark
        current.lastModified = ctx.timerService().currentWatermark();

        // write the state back
        state.update(current);

        // schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
    }

    @Override
    public void onTimer(
            long timestamp, 
            OnTimerContext ctx, 
            Collector<Tuple2<String, Long>> out) throws Exception {

        // get the state for the key that scheduled the timer
        CountWithTimestamp result = state.value();

        // check if this is an outdated timer or the latest timer
        if (timestamp == result.lastModified + 60000) {
            // emit the state on timeout
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }
    }
{code}


> Proccess Function Example could not work
> ----------------------------------------
>
>                 Key: FLINK-19167
>                 URL: https://issues.apache.org/jira/browse/FLINK-19167
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.11.1
>            Reporter: tinny cat
>            Priority: Major
>
> Section "*Porccess Function Example*" of 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
>  current is:
> {code:java}
> // Some comments here
> @Override
>     public void processElement(
>             Tuple2<String, String> value, 
>             Context ctx, 
>             Collector<Tuple2<String, Long>> out) throws Exception {
>         // retrieve the current count
>         CountWithTimestamp current = state.value();
>         if (current == null) {
>             current = new CountWithTimestamp();
>             current.key = value.f0;
>         }
>         // update the state's count
>         current.count++;
>         // set the state's timestamp to the record's assigned event time 
> timestamp
>         current.lastModified = ctx.timestamp();
>         // write the state back
>         state.update(current);
>         // schedule the next timer 60 seconds from the current event time
>         ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 60000);
>     }
>     @Override
>     public void onTimer(
>             long timestamp, 
>             OnTimerContext ctx, 
>             Collector<Tuple2<String, Long>> out) throws Exception {
>         // get the state for the key that scheduled the timer
>         CountWithTimestamp result = state.value();
>         // check if this is an outdated timer or the latest timer
>         // this will be never happened
>         if (timestamp == result.lastModified + 60000) {
>             // emit the state on timeout
>             out.collect(new Tuple2<String, Long>(result.key, result.count));
>         }
>     }
> {code}
> however, it should be: 
> {code:java}
> @Override
>     public void processElement(
>             Tuple2<String, String> value, 
>             Context ctx, 
>             Collector<Tuple2<String, Long>> out) throws Exception {
>         // retrieve the current count
>         CountWithTimestamp current = state.value();
>         if (current == null) {
>             current = new CountWithTimestamp();
>             current.key = value.f0;
>         }
>         // update the state's count
>         current.count++;
>         // set the state's timestamp to the record's assigned event time 
> timestamp
>         // it should be the previous watermark
>         current.lastModified = ctx.timerService().currentWatermark();
>         // write the state back
>         state.update(current);
>         // schedule the next timer 60 seconds from the current event time
>         ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 60000);
>     }
>     @Override
>     public void onTimer(
>             long timestamp, 
>             OnTimerContext ctx, 
>             Collector<Tuple2<String, Long>> out) throws Exception {
>         // get the state for the key that scheduled the timer
>         CountWithTimestamp result = state.value();
>         // check if this is an outdated timer or the latest timer
>         if (timestamp == result.lastModified + 60000) {
>             // emit the state on timeout
>             out.collect(new Tuple2<String, Long>(result.key, result.count));
>         }
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to