Hi,

the problem might be that your source does not send a watermark this
timestamp MAX_LONG after the last record has been sent.
So your operators never compute the last window.

Best, Fabian

2017-05-24 19:00 GMT+02:00 Sendoh <unicorn.bana...@gmail.com>:

> Hi Flink users,
>
> We have a unit test to test event time window aggregation, but when the job
> finishes, the last event is not output because the Flink job finishes
> before
> the watermark proceeds, as there is no next event.
>
> Does anyone have similar issue and have a solution?
>
> The code is like:
> env.fromElements(TestData.events("2017-05-20T19:34:17.097Z", "997"),
>                 TestData.events("2017-05-20T20:34:17.097Z", "998"),
>                 TestData.events("2017-05-20T20:38:17.097Z", "999"));
>
>
> DataStream<JsonNode> testResult = source.assignTimestampsAndWatermarks(new
> EventWatermark())
>                 .keyBy(new KeyByID())
>                 .window(TumblingEventTimeWindows.of(Time.minutes(1)))
>                 .trigger(PurgingTrigger.of(EventTimeTrigger103.create()))
>                 .allowedLateness(Time.minutes(Long.MAX_VALUE))
>                 .fold(null, new AggFoldFunction());
>
>         Iterator<JsonNode> javaObj = DataStreamUtils.collect(testResult);
>
>         int count = 0;
>         while (javaObj.hasNext()) {
>             JsonNode current = javaObj.next();
>             System.out.println(current);
>             count++;
>         }
>         Assert.assertEquals(3, count);
>
> The watermark is simply as:
> public class EventWatermark implements
> AssignerWithPeriodicWatermarks<JsonNode> {
>
>     private final long maxTimeLag = 5000;
>
>     private long currentMaxTimestamp;
>     public transient static DateTimeFormatter parseFromTimeFormatter =
> ISODateTimeFormat.dateTimeParser();
>
>     @Override
>     public long extractTimestamp(JsonNode element, long
> previousElementTimestamp) {
>         long occurredAtLong;
>         try {
>             occurredAtLong =
> DateTime.parse(element.get("metadata").get("occurred_at").asText(),
> parseFromTimeFormatter).getMillis();
>         }
>         catch(IllegalArgumentException ie) {
>             throw new IllegalArgumentException(element.asText());
>         }
>
>         if(occurredAtLong > currentMaxTimestamp){
>             currentMaxTimestamp = occurredAtLong;
>             }
>         return occurredAtLong;
>     }
>
>     @Override
>     public Watermark getCurrentWatermark() {
>
>         return new Watermark(currentMaxTimestamp - maxTimeLag);
>
>     }
> }
>
> Best,
>
> Sendoh
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Last-event-in-
> event-time-window-is-not-output-tp13305.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Reply via email to