Never mind.  Flink docs state that the query is an append, not an update, so 
the query is working as expected.
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/dynamic_tables.html#continuous-queries
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/dynamic_tables.html#continuous-queries>

A better solution is something along the lines of this:
String query = "SELECT lastTry, LOCALTIMESTAMP, TIMESTAMPDIFF(MINUTE, lastTry, 
LOCALTIMESTAMP) from " + rawTable +
    " WHERE (TIMESTAMPDIFF(MINUTE, lastTry, LOCALTIMESTAMP) > 30)";
which can be modified to select on desired fields.

On 2019/12/19 16:21:28, Cindy McMullen <[email protected]> wrote: 
> Hi -> 
> 
> I’m streaming events from Kafka, processing in EventTime.  I’d like to 
> process only events that are older (before) some given time (say, 2 days ago) 
> at an interval of 5 minutes.  I’ve experimented with Flink DynamicTables:> 
> 
> String query = "SELECT pid, status, lastTry, TUMBLE_END(UserActionTime, 
> INTERVAL ‘5' MINUTE) as endT " +> 
>     " FROM " + rawTable +> 
>     " WHERE status=‘RETRY'" +> 
>     " GROUP BY status, pid, lastTry, TUMBLE(UserActionTime,  INTERVAL ‘5' 
> MINUTE)";> 
> 
> 
> But this ignores events that are older than 5 minutes.  Here’s my timestamp 
> assigner:> 
> public class TimeLagWatermarkAssigner implements 
> AssignerWithPeriodicWatermarks<RedactionResult> {> 
> 
>   private final long maxTimeLag = 2000; // 2 seconds> 
> 
>   @Override> 
>   public long extractTimestamp(RedactionResult rr, long 
> previousElementTimestamp) {> 
>     return rr.getLastTry().getTime();> 
>   }> 
> 
>   @Override> 
>   public Watermark getCurrentWatermark() {> 
>     return new Watermark(System.currentTimeMillis() - maxTimeLag);> 
>   }> 
> }> 
> So, a couple of questions:> 
> 
> 1. How can I get this query to recognize earlier events (before 5 minutes 
> ago)?> 
> 2. Is using Dynamic Table a good solution, or could I accomplish the same 
> thing using DataStream windowing?> 
> 
> Thanks -> 
> 
> — Cindy> 

Reply via email to