Hi Dario,

out of curiosity, could you briefly describe the driving use-case? What is the (logical) constraint, that drives the requirement? I'd guess, that it could be related to waiting for some (external) condition? Or maybe related to late data? I think that there might be better approaches, than (unconditionally) delay data in pipeline. On the other hand, if that is really the best approach, then adding a random key to create a keyed stream should work in all cases, right?

 Jan

On 7/18/21 3:52 PM, Dario Heinisch wrote:

Hey Kiran,

Yeah was thinking of another solution, so I have one posgresql sink & one kafka sink.

So I can just process the data in real time and insert them in the DB. Then I would just select the latest row where created_at >= NOW() - interval '15 minutes' and for any kafka consumer I would just do:

let msg = get_next_kafka_msg();
let diff = created_at + 15min - now();
if diff > 0 {
    sleep(diff)
}
// do something
// ....
kafka_commit();

And then run some cron job to delete obsolete rows from the db which are not required anymore.

Best regards

Dario

On 18.07.21 15:29, Kiran Japannavar wrote:
Hi Dario,

Did you explore other options? If your use case (apart from delaying sink writes) can be solved via spark streaming. Then maybe spark streaming with a micro-batch of 15 mins would help.



On Sat, Jul 17, 2021 at 10:17 PM Dario Heinisch <dario.heini...@gmail.com <mailto:dario.heini...@gmail.com>> wrote:

    Hey there,

    Hope all is well!

    I would like to delay the time by 15minutes before my data
    arrives at my
    sinks:

    stream()
    .map()
    [....]
    .<DELAY_DATA_FOR_15_MINUTES>
    .print()

    I tried implementing my own ProcessFunction where TimeStamper is a
    custom Interface:

    public abstract class Timestamper {
         public abstract long executedAt();
    }

    public class DelayedProcessor<T extends Timestamper> extends
    ProcessFunction<T, T> {

         private final String stateName;
         private final Class<T> clazz;

         // TODO: Should we do ListState as this is being preferred for
    serialization
         //  or should we do Value<Queue> but this may impact
    serialization.
         private ListState<T> state;

         private static long TIMEOUT = TimeUnit.MINUTES.toMillis(15);

         public DelayedProcessor(String stateName, Class<T> clazz) {
             this.stateName = stateName;
             this.clazz = clazz;
         }

         @Override
         public void open(Configuration parameters) {
             state = getRuntimeContext().getListState(new
    ListStateDescriptor<>(stateName, clazz));
         }

         @Override
         public void processElement(T t, Context ctx, Collector<T>
    collector) throws Exception {
             this.state.add(t);
    ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
    TIMEOUT);
         }

         @Override
         public void onTimer(long timestamp, OnTimerContext ctx,
    Collector<T> out) throws Exception {
             List<T> list = new ArrayList<>();
             this.state.get().forEach(list::add);

             val now = System.currentTimeMillis();

             list = list.stream().filter(v -> {

                 if (v.executedAt() + TIMEOUT <= now) {
                     out.collect(v);
                     return false;
                 }

                 return true;

             }).collect(Collectors.toList());

             this.state.update(list);
         }
    }

    Unfortunately, this can only used on a keyed stream which may not
    always
    be the case for me.

    One possible solution would be to use:

    .windowAll(SlidingEventTimeWindows.of(Time.minutes(15),
    Time.seconds(1)))

    and then always just take the value with the lowest timestamp but
    this
    seems very bad performance wise and the state would be very large.

    Does anyone has a solution for me or can point me in the right
    direction?

    Best regards,

    Dario

Reply via email to