Hey there,
Hope all is well!
I would like to delay the time by 15minutes before my data arrives at my
sinks:
stream()
.map()
[....]
.<DELAY_DATA_FOR_15_MINUTES>
.print()
I tried implementing my own ProcessFunction where TimeStamper is a
custom Interface:
public abstract class Timestamper {
public abstract long executedAt();
}
public class DelayedProcessor<T extends Timestamper> extends
ProcessFunction<T, T> {
private final String stateName;
private final Class<T> clazz;
// TODO: Should we do ListState as this is being preferred for
serialization
// or should we do Value<Queue> but this may impact serialization.
private ListState<T> state;
private static long TIMEOUT = TimeUnit.MINUTES.toMillis(15);
public DelayedProcessor(String stateName, Class<T> clazz) {
this.stateName = stateName;
this.clazz = clazz;
}
@Override
public void open(Configuration parameters) {
state = getRuntimeContext().getListState(new
ListStateDescriptor<>(stateName, clazz));
}
@Override
public void processElement(T t, Context ctx, Collector<T>
collector) throws Exception {
this.state.add(t);
ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
TIMEOUT);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx,
Collector<T> out) throws Exception {
List<T> list = new ArrayList<>();
this.state.get().forEach(list::add);
val now = System.currentTimeMillis();
list = list.stream().filter(v -> {
if (v.executedAt() + TIMEOUT <= now) {
out.collect(v);
return false;
}
return true;
}).collect(Collectors.toList());
this.state.update(list);
}
}
Unfortunately, this can only used on a keyed stream which may not always
be the case for me.
One possible solution would be to use:
.windowAll(SlidingEventTimeWindows.of(Time.minutes(15), Time.seconds(1)))
and then always just take the value with the lowest timestamp but this
seems very bad performance wise and the state would be very large.
Does anyone has a solution for me or can point me in the right direction?
Best regards,
Dario