Never mind. Flink docs state that the query is an append, not an update, so
the query is working as expected.
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/dynamic_tables.html#continuous-queries
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/dynamic_tables.html#continuous-queries>
A better solution is something along the lines of this:
String query = "SELECT lastTry, LOCALTIMESTAMP, TIMESTAMPDIFF(MINUTE, lastTry,
LOCALTIMESTAMP) from " + rawTable +
" WHERE (TIMESTAMPDIFF(MINUTE, lastTry, LOCALTIMESTAMP) > 30)";
which can be modified to select on desired fields.
On 2019/12/19 16:21:28, Cindy McMullen <[email protected]> wrote:
> Hi ->
>
> I’m streaming events from Kafka, processing in EventTime. I’d like to
> process only events that are older (before) some given time (say, 2 days ago)
> at an interval of 5 minutes. I’ve experimented with Flink DynamicTables:>
>
> String query = "SELECT pid, status, lastTry, TUMBLE_END(UserActionTime,
> INTERVAL ‘5' MINUTE) as endT " +>
> " FROM " + rawTable +>
> " WHERE status=‘RETRY'" +>
> " GROUP BY status, pid, lastTry, TUMBLE(UserActionTime, INTERVAL ‘5'
> MINUTE)";>
>
>
> But this ignores events that are older than 5 minutes. Here’s my timestamp
> assigner:>
> public class TimeLagWatermarkAssigner implements
> AssignerWithPeriodicWatermarks<RedactionResult> {>
>
> private final long maxTimeLag = 2000; // 2 seconds>
>
> @Override>
> public long extractTimestamp(RedactionResult rr, long
> previousElementTimestamp) {>
> return rr.getLastTry().getTime();>
> }>
>
> @Override>
> public Watermark getCurrentWatermark() {>
> return new Watermark(System.currentTimeMillis() - maxTimeLag);>
> }>
> }>
> So, a couple of questions:>
>
> 1. How can I get this query to recognize earlier events (before 5 minutes
> ago)?>
> 2. Is using Dynamic Table a good solution, or could I accomplish the same
> thing using DataStream windowing?>
>
> Thanks ->
>
> — Cindy>