Hello all! Please disregard the last message; I used Thread.sleep() and Stateful Source Functions <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#stateful-source-functions> .
But just out of curiosity, can processing-time Timers get rescheduled inside the onTimer method? On Mon, Jan 20, 2020 at 7:04 PM Benoît Paris < benoit.pa...@centraliens-lille.org> wrote: > Hello all :)! > > I'm having trouble creating a tick service. > > Goal: register a TableSource that emits a Row roughly every 200ms in > processing time. The Row would contain only one column "counter" that is > incremented by 1 each Row. > > Current attempt: Using TimerService > A TableSource with > > public DataStream<String> getDataStream(StreamExecutionEnvironment execEnv) { > return execEnv > .fromElements((Long) offset) // default 0L, one element > .keyBy(new NullByteKeySelector<>()) > .process(new TickKeyedProcessFunction(200L)) > .forceNonParallel(); > } > > And a KeyedProcessFunction with onTimer doing the heavy-lifting: > > public void processElement(Long value, Context context, Collector<Long> > collector) throws IOException { > // called once > counter.update(value); > Long now = System.currentTimeMillis(); > context.timerService().registerProcessingTimeTimer(now); > } > > public void onTimer(long timestamp, OnTimerContext ctx, Collector<Long> out) > throws Exception { > Long then = timestamp + interval; > Long current = counter.value(); > current++; > counter.update(current); > ctx.timerService().registerProcessingTimeTimer(then); > out.collect(current); > } > > Now, the runtime tells me the Source is in FINISHED status. So obviously > there must be limitations around re-scheduling one key inside onTimer. > > Is there a way to use the TimerService to go around that? > Also, how would you implement this tick service by other means? > > Cheers > Ben > > > -- Benoît Paris Ingénieur Machine Learning Explicable Tél : +33 6 60 74 23 00 http://benoit.paris http://explicable.ml