Hello all!

Please disregard the last message; I used Thread.sleep() and Stateful
Source Functions
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#stateful-source-functions>
.

But just out of curiosity, can processing-time Timers get rescheduled
inside the onTimer method?






On Mon, Jan 20, 2020 at 7:04 PM Benoît Paris <
benoit.pa...@centraliens-lille.org> wrote:

> Hello all :)!
>
> I'm having trouble creating a tick service.
>
> Goal: register a TableSource that emits a Row roughly every 200ms in
> processing time. The Row would contain only one column "counter" that is
> incremented by 1 each Row.
>
> Current attempt: Using TimerService
> A TableSource with
>
> public DataStream<String> getDataStream(StreamExecutionEnvironment execEnv) {
>     return execEnv
>             .fromElements((Long) offset) // default 0L, one element
>             .keyBy(new NullByteKeySelector<>())
>             .process(new TickKeyedProcessFunction(200L))
>             .forceNonParallel();
> }
>
> And a KeyedProcessFunction with onTimer doing the heavy-lifting:
>
> public void processElement(Long value, Context context, Collector<Long> 
> collector) throws IOException {
>     // called once
>     counter.update(value);
>     Long now = System.currentTimeMillis();
>     context.timerService().registerProcessingTimeTimer(now);
> }
>
>  public void onTimer(long timestamp, OnTimerContext ctx, Collector<Long> out) 
> throws Exception {
>     Long then = timestamp + interval;
>     Long current = counter.value();
>     current++;
>     counter.update(current);
>     ctx.timerService().registerProcessingTimeTimer(then);
>     out.collect(current);
> }
>
> Now, the runtime tells me the Source is in FINISHED status. So obviously
> there must be limitations around re-scheduling one key inside onTimer.
>
> Is there a way to use the TimerService to go around that?
> Also, how would you implement this tick service by other means?
>
> Cheers
> Ben
>
>
>

-- 
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00
http://benoit.paris
http://explicable.ml

Reply via email to