Hi Dario,

Did you explore other options? If your use case (apart from delaying sink
writes) can be solved via spark streaming. Then maybe spark streaming with
a micro-batch of 15 mins would help.



On Sat, Jul 17, 2021 at 10:17 PM Dario Heinisch <dario.heini...@gmail.com>
wrote:

> Hey there,
>
> Hope all is well!
>
> I would like to delay the time by 15minutes before my data arrives at my
> sinks:
>
> stream()
> .map()
> [....]
> .<DELAY_DATA_FOR_15_MINUTES>
> .print()
>
> I tried implementing my own ProcessFunction where TimeStamper is a
> custom Interface:
>
> public abstract class Timestamper {
>      public abstract long executedAt();
> }
>
> public class DelayedProcessor<T extends Timestamper> extends
> ProcessFunction<T, T> {
>
>      private final String stateName;
>      private final Class<T> clazz;
>
>      // TODO: Should we do ListState as this is being preferred for
> serialization
>      //  or should we do Value<Queue> but this may impact serialization.
>      private ListState<T> state;
>
>      private static long TIMEOUT = TimeUnit.MINUTES.toMillis(15);
>
>      public DelayedProcessor(String stateName, Class<T> clazz) {
>          this.stateName = stateName;
>          this.clazz = clazz;
>      }
>
>      @Override
>      public void open(Configuration parameters) {
>          state = getRuntimeContext().getListState(new
> ListStateDescriptor<>(stateName, clazz));
>      }
>
>      @Override
>      public void processElement(T t, Context ctx, Collector<T>
> collector) throws Exception {
>          this.state.add(t);
>          ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
> TIMEOUT);
>      }
>
>      @Override
>      public void onTimer(long timestamp, OnTimerContext ctx,
> Collector<T> out) throws Exception {
>          List<T> list = new ArrayList<>();
>          this.state.get().forEach(list::add);
>
>          val now = System.currentTimeMillis();
>
>          list = list.stream().filter(v -> {
>
>              if (v.executedAt() + TIMEOUT <= now) {
>                  out.collect(v);
>                  return false;
>              }
>
>              return true;
>
>          }).collect(Collectors.toList());
>
>          this.state.update(list);
>      }
> }
>
> Unfortunately, this can only used on a keyed stream which may not always
> be the case for me.
>
> One possible solution would be to use:
>
> .windowAll(SlidingEventTimeWindows.of(Time.minutes(15), Time.seconds(1)))
>
> and then always just take the value with the lowest timestamp but this
> seems very bad performance wise and the state would be very large.
>
> Does anyone has a solution for me or can point me in the right direction?
>
> Best regards,
>
> Dario
>
>

Reply via email to