Hi, i've created a PR to fix scala and java examples and the error
suggested by Philippe.
Hope it will be helpful!!
Mauro
Il 09/03/2017 10:30, Kostas Kloudas ha scritto:
Hi Philippe,
You are right!
Thanks for reporting it!
We will fix it asap.
Kostas
On Mar 9, 2017, at 8:38 AM, Philippe Caparroy
<philippe.capar...@orange.fr <mailto:philippe.capar...@orange.fr>> wrote:
I think there is an error in the code snippet describing the
ProcessFunction time out example :
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html
@Override
|public void onTimer(long timestamp, OnTimerContext ctx,
Collector<Tuple2<String, Long>> out) throws Exception { // get the
state for the key that scheduled the timer CountWithTimestamp result
= state.value(); // check if this is an outdated timer or the latest
timer if (timestamp == result.lastModified) { // emit the state
out.collect(new Tuple2<String, Long>(result.key, result.count)); } }|
|
If, as stated in the example, the CountWithTimeoutFunction should
emit a key/count if no further update occurred during the minute
elapsed since last update, the test should be :
|if (timestamp == result.lastModified + 60000) { |
|// emit the state on timeout |
|out.collect(new Tuple2<String, Long>(result.key, result.count)); |
|}|
||
|As stated in the javadoc of the ProcessFunction : the timestamp arg
of on timer method is the timestamp |of the firing timer.
|