Hi -

I’m streaming events from Kafka, processing in EventTime.  I’d like to process 
only events that are older (before) some given time (say, 2 days ago) at an 
interval of 5 minutes.  I’ve experimented with Flink DynamicTables:

String query = "SELECT pid, status, lastTry, TUMBLE_END(UserActionTime, 
INTERVAL ‘5' MINUTE) as endT " +
    " FROM " + rawTable +
    " WHERE status=‘RETRY'" +
    " GROUP BY status, pid, lastTry, TUMBLE(UserActionTime,  INTERVAL ‘5' 
MINUTE)";


But this ignores events that are older than 5 minutes.  Here’s my timestamp 
assigner:
public class TimeLagWatermarkAssigner implements 
AssignerWithPeriodicWatermarks<RedactionResult> {

  private final long maxTimeLag = 2000; // 2 seconds

  @Override
  public long extractTimestamp(RedactionResult rr, long 
previousElementTimestamp) {
    return rr.getLastTry().getTime();
  }

  @Override
  public Watermark getCurrentWatermark() {
    return new Watermark(System.currentTimeMillis() - maxTimeLag);
  }
}
So, a couple of questions:

1. How can I get this query to recognize earlier events (before 5 minutes ago)?
2. Is using Dynamic Table a good solution, or could I accomplish the same thing 
using DataStream windowing?

Thanks -

— Cindy

Reply via email to