Hey there,

Hope all is well!

I would like to delay the time by 15minutes before my data arrives at my sinks:

stream()
.map()
[....]
.<DELAY_DATA_FOR_15_MINUTES>
.print()

I tried implementing my own ProcessFunction where TimeStamper is a custom Interface:

public abstract class Timestamper {
    public abstract long executedAt();
}

public class DelayedProcessor<T extends Timestamper> extends ProcessFunction<T, T> {

    private final String stateName;
    private final Class<T> clazz;

    // TODO: Should we do ListState as this is being preferred for serialization
    //  or should we do Value<Queue> but this may impact serialization.
    private ListState<T> state;

    private static long TIMEOUT = TimeUnit.MINUTES.toMillis(15);

    public DelayedProcessor(String stateName, Class<T> clazz) {
        this.stateName = stateName;
        this.clazz = clazz;
    }

    @Override
    public void open(Configuration parameters) {
        state = getRuntimeContext().getListState(new ListStateDescriptor<>(stateName, clazz));
    }

    @Override
    public void processElement(T t, Context ctx, Collector<T> collector) throws Exception {
        this.state.add(t);
        ctx.timerService().registerEventTimeTimer(ctx.timestamp() + TIMEOUT);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<T> out) throws Exception {
        List<T> list = new ArrayList<>();
        this.state.get().forEach(list::add);

        val now = System.currentTimeMillis();

        list = list.stream().filter(v -> {

            if (v.executedAt() + TIMEOUT <= now) {
                out.collect(v);
                return false;
            }

            return true;

        }).collect(Collectors.toList());

        this.state.update(list);
    }
}

Unfortunately, this can only used on a keyed stream which may not always be the case for me.

One possible solution would be to use:

.windowAll(SlidingEventTimeWindows.of(Time.minutes(15), Time.seconds(1)))

and then always just take the value with the lowest timestamp but this seems very bad performance wise and the state would be very large.

Does anyone has a solution for me or can point me in the right direction?

Best regards,

Dario

Reply via email to