Hi Dario, Did you explore other options? If your use case (apart from delaying sink writes) can be solved via spark streaming. Then maybe spark streaming with a micro-batch of 15 mins would help.
On Sat, Jul 17, 2021 at 10:17 PM Dario Heinisch <dario.heini...@gmail.com> wrote: > Hey there, > > Hope all is well! > > I would like to delay the time by 15minutes before my data arrives at my > sinks: > > stream() > .map() > [....] > .<DELAY_DATA_FOR_15_MINUTES> > .print() > > I tried implementing my own ProcessFunction where TimeStamper is a > custom Interface: > > public abstract class Timestamper { > public abstract long executedAt(); > } > > public class DelayedProcessor<T extends Timestamper> extends > ProcessFunction<T, T> { > > private final String stateName; > private final Class<T> clazz; > > // TODO: Should we do ListState as this is being preferred for > serialization > // or should we do Value<Queue> but this may impact serialization. > private ListState<T> state; > > private static long TIMEOUT = TimeUnit.MINUTES.toMillis(15); > > public DelayedProcessor(String stateName, Class<T> clazz) { > this.stateName = stateName; > this.clazz = clazz; > } > > @Override > public void open(Configuration parameters) { > state = getRuntimeContext().getListState(new > ListStateDescriptor<>(stateName, clazz)); > } > > @Override > public void processElement(T t, Context ctx, Collector<T> > collector) throws Exception { > this.state.add(t); > ctx.timerService().registerEventTimeTimer(ctx.timestamp() + > TIMEOUT); > } > > @Override > public void onTimer(long timestamp, OnTimerContext ctx, > Collector<T> out) throws Exception { > List<T> list = new ArrayList<>(); > this.state.get().forEach(list::add); > > val now = System.currentTimeMillis(); > > list = list.stream().filter(v -> { > > if (v.executedAt() + TIMEOUT <= now) { > out.collect(v); > return false; > } > > return true; > > }).collect(Collectors.toList()); > > this.state.update(list); > } > } > > Unfortunately, this can only used on a keyed stream which may not always > be the case for me. > > One possible solution would be to use: > > .windowAll(SlidingEventTimeWindows.of(Time.minutes(15), Time.seconds(1))) > > and then always just take the value with the lowest timestamp but this > seems very bad performance wise and the state would be very large. > > Does anyone has a solution for me or can point me in the right direction? > > Best regards, > > Dario > >