This is close:

String query = "SELECT pid, status, lastTry " +
    " FROM " + rawTable +
    " WHERE status='RECOVERABLE'" +
    " GROUP BY HOP(UserActionTime,  INTERVAL '30' SECOND, INTERVAL '5' HOUR), 
pid, status, lastTry";
But I need to have a stream/table that will dynamically update every 30 seconds 
with only events that were not in the last query.


On 2019/12/19 16:21:28, Cindy McMullen <[email protected]> wrote: 
> Hi -> 
> 
> I’m streaming events from Kafka, processing in EventTime.  I’d like to 
> process only events that are older (before) some given time (say, 2 days ago) 
> at an interval of 5 minutes.  I’ve experimented with Flink DynamicTables:> 
> 
> String query = "SELECT pid, status, lastTry, TUMBLE_END(UserActionTime, 
> INTERVAL ‘5' MINUTE) as endT " +> 
>     " FROM " + rawTable +> 
>     " WHERE status=‘RETRY'" +> 
>     " GROUP BY status, pid, lastTry, TUMBLE(UserActionTime,  INTERVAL ‘5' 
> MINUTE)";> 
> 
> 
> But this ignores events that are older than 5 minutes.  Here’s my timestamp 
> assigner:> 
> public class TimeLagWatermarkAssigner implements 
> AssignerWithPeriodicWatermarks<RedactionResult> {> 
> 
>   private final long maxTimeLag = 2000; // 2 seconds> 
> 
>   @Override> 
>   public long extractTimestamp(RedactionResult rr, long 
> previousElementTimestamp) {> 
>     return rr.getLastTry().getTime();> 
>   }> 
> 
>   @Override> 
>   public Watermark getCurrentWatermark() {> 
>     return new Watermark(System.currentTimeMillis() - maxTimeLag);> 
>   }> 
> }> 
> So, a couple of questions:> 
> 
> 1. How can I get this query to recognize earlier events (before 5 minutes 
> ago)?> 
> 2. Is using Dynamic Table a good solution, or could I accomplish the same 
> thing using DataStream windowing?> 
> 
> Thanks -> 
> 
> — Cindy> 

Reply via email to