Hello all :)!

I'm having trouble creating a tick service.

Goal: register a TableSource that emits a Row roughly every 200ms in
processing time. The Row would contain only one column "counter" that is
incremented by 1 each Row.

Current attempt: Using TimerService
A TableSource with

public DataStream<String> getDataStream(StreamExecutionEnvironment execEnv) {
    return execEnv
            .fromElements((Long) offset) // default 0L, one element
            .keyBy(new NullByteKeySelector<>())
            .process(new TickKeyedProcessFunction(200L))
            .forceNonParallel();
}

And a KeyedProcessFunction with onTimer doing the heavy-lifting:

public void processElement(Long value, Context context,
Collector<Long> collector) throws IOException {
    // called once
    counter.update(value);
    Long now = System.currentTimeMillis();
    context.timerService().registerProcessingTimeTimer(now);
}

 public void onTimer(long timestamp, OnTimerContext ctx,
Collector<Long> out) throws Exception {
    Long then = timestamp + interval;
    Long current = counter.value();
    current++;
    counter.update(current);
    ctx.timerService().registerProcessingTimeTimer(then);
    out.collect(current);
}

Now, the runtime tells me the Source is in FINISHED status. So obviously
there must be limitations around re-scheduling one key inside onTimer.

Is there a way to use the TimerService to go around that?
Also, how would you implement this tick service by other means?

Cheers
Ben

Reply via email to