This is close:
String query = "SELECT pid, status, lastTry " +
" FROM " + rawTable +
" WHERE status='RECOVERABLE'" +
" GROUP BY HOP(UserActionTime, INTERVAL '30' SECOND, INTERVAL '5' HOUR),
pid, status, lastTry";
But I need to have a stream/table that will dynamically update every 30 seconds
with only events that were not in the last query.
On 2019/12/19 16:21:28, Cindy McMullen <[email protected]> wrote:
> Hi ->
>
> I’m streaming events from Kafka, processing in EventTime. I’d like to
> process only events that are older (before) some given time (say, 2 days ago)
> at an interval of 5 minutes. I’ve experimented with Flink DynamicTables:>
>
> String query = "SELECT pid, status, lastTry, TUMBLE_END(UserActionTime,
> INTERVAL ‘5' MINUTE) as endT " +>
> " FROM " + rawTable +>
> " WHERE status=‘RETRY'" +>
> " GROUP BY status, pid, lastTry, TUMBLE(UserActionTime, INTERVAL ‘5'
> MINUTE)";>
>
>
> But this ignores events that are older than 5 minutes. Here’s my timestamp
> assigner:>
> public class TimeLagWatermarkAssigner implements
> AssignerWithPeriodicWatermarks<RedactionResult> {>
>
> private final long maxTimeLag = 2000; // 2 seconds>
>
> @Override>
> public long extractTimestamp(RedactionResult rr, long
> previousElementTimestamp) {>
> return rr.getLastTry().getTime();>
> }>
>
> @Override>
> public Watermark getCurrentWatermark() {>
> return new Watermark(System.currentTimeMillis() - maxTimeLag);>
> }>
> }>
> So, a couple of questions:>
>
> 1. How can I get this query to recognize earlier events (before 5 minutes
> ago)?>
> 2. Is using Dynamic Table a good solution, or could I accomplish the same
> thing using DataStream windowing?>
>
> Thanks ->
>
> — Cindy>