Hi Dario,
out of curiosity, could you briefly describe the driving use-case? What
is the (logical) constraint, that drives the requirement? I'd guess,
that it could be related to waiting for some (external) condition? Or
maybe related to late data? I think that there might be better
approaches, than (unconditionally) delay data in pipeline. On the other
hand, if that is really the best approach, then adding a random key to
create a keyed stream should work in all cases, right?
Jan
On 7/18/21 3:52 PM, Dario Heinisch wrote:
Hey Kiran,
Yeah was thinking of another solution, so I have one posgresql sink &
one kafka sink.
So I can just process the data in real time and insert them in the DB.
Then I would just select the latest row where created_at >= NOW() -
interval '15 minutes' and for any kafka consumer I would just do:
let msg = get_next_kafka_msg();
let diff = created_at + 15min - now();
if diff > 0 {
sleep(diff)
}
// do something
// ....
kafka_commit();
And then run some cron job to delete obsolete rows from the db which
are not required anymore.
Best regards
Dario
On 18.07.21 15:29, Kiran Japannavar wrote:
Hi Dario,
Did you explore other options? If your use case (apart from delaying
sink writes) can be solved via spark streaming. Then maybe spark
streaming with a micro-batch of 15 mins would help.
On Sat, Jul 17, 2021 at 10:17 PM Dario Heinisch
<dario.heini...@gmail.com <mailto:dario.heini...@gmail.com>> wrote:
Hey there,
Hope all is well!
I would like to delay the time by 15minutes before my data
arrives at my
sinks:
stream()
.map()
[....]
.<DELAY_DATA_FOR_15_MINUTES>
.print()
I tried implementing my own ProcessFunction where TimeStamper is a
custom Interface:
public abstract class Timestamper {
public abstract long executedAt();
}
public class DelayedProcessor<T extends Timestamper> extends
ProcessFunction<T, T> {
private final String stateName;
private final Class<T> clazz;
// TODO: Should we do ListState as this is being preferred for
serialization
// or should we do Value<Queue> but this may impact
serialization.
private ListState<T> state;
private static long TIMEOUT = TimeUnit.MINUTES.toMillis(15);
public DelayedProcessor(String stateName, Class<T> clazz) {
this.stateName = stateName;
this.clazz = clazz;
}
@Override
public void open(Configuration parameters) {
state = getRuntimeContext().getListState(new
ListStateDescriptor<>(stateName, clazz));
}
@Override
public void processElement(T t, Context ctx, Collector<T>
collector) throws Exception {
this.state.add(t);
ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
TIMEOUT);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx,
Collector<T> out) throws Exception {
List<T> list = new ArrayList<>();
this.state.get().forEach(list::add);
val now = System.currentTimeMillis();
list = list.stream().filter(v -> {
if (v.executedAt() + TIMEOUT <= now) {
out.collect(v);
return false;
}
return true;
}).collect(Collectors.toList());
this.state.update(list);
}
}
Unfortunately, this can only used on a keyed stream which may not
always
be the case for me.
One possible solution would be to use:
.windowAll(SlidingEventTimeWindows.of(Time.minutes(15),
Time.seconds(1)))
and then always just take the value with the lowest timestamp but
this
seems very bad performance wise and the state would be very large.
Does anyone has a solution for me or can point me in the right
direction?
Best regards,
Dario