Hi -
I’m streaming events from Kafka, processing in EventTime. I’d like to process
only events that are older (before) some given time (say, 2 days ago) at an
interval of 5 minutes. I’ve experimented with Flink DynamicTables:
String query = "SELECT pid, status, lastTry, TUMBLE_END(UserActionTime,
INTERVAL ‘5' MINUTE) as endT " +
" FROM " + rawTable +
" WHERE status=‘RETRY'" +
" GROUP BY status, pid, lastTry, TUMBLE(UserActionTime, INTERVAL ‘5'
MINUTE)";
But this ignores events that are older than 5 minutes. Here’s my timestamp
assigner:
public class TimeLagWatermarkAssigner implements
AssignerWithPeriodicWatermarks<RedactionResult> {
private final long maxTimeLag = 2000; // 2 seconds
@Override
public long extractTimestamp(RedactionResult rr, long
previousElementTimestamp) {
return rr.getLastTry().getTime();
}
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis() - maxTimeLag);
}
}
So, a couple of questions:
1. How can I get this query to recognize earlier events (before 5 minutes ago)?
2. Is using Dynamic Table a good solution, or could I accomplish the same thing
using DataStream windowing?
Thanks -
— Cindy