Hi, the problem might be that your source does not send a watermark this timestamp MAX_LONG after the last record has been sent. So your operators never compute the last window.
Best, Fabian 2017-05-24 19:00 GMT+02:00 Sendoh <unicorn.bana...@gmail.com>: > Hi Flink users, > > We have a unit test to test event time window aggregation, but when the job > finishes, the last event is not output because the Flink job finishes > before > the watermark proceeds, as there is no next event. > > Does anyone have similar issue and have a solution? > > The code is like: > env.fromElements(TestData.events("2017-05-20T19:34:17.097Z", "997"), > TestData.events("2017-05-20T20:34:17.097Z", "998"), > TestData.events("2017-05-20T20:38:17.097Z", "999")); > > > DataStream<JsonNode> testResult = source.assignTimestampsAndWatermarks(new > EventWatermark()) > .keyBy(new KeyByID()) > .window(TumblingEventTimeWindows.of(Time.minutes(1))) > .trigger(PurgingTrigger.of(EventTimeTrigger103.create())) > .allowedLateness(Time.minutes(Long.MAX_VALUE)) > .fold(null, new AggFoldFunction()); > > Iterator<JsonNode> javaObj = DataStreamUtils.collect(testResult); > > int count = 0; > while (javaObj.hasNext()) { > JsonNode current = javaObj.next(); > System.out.println(current); > count++; > } > Assert.assertEquals(3, count); > > The watermark is simply as: > public class EventWatermark implements > AssignerWithPeriodicWatermarks<JsonNode> { > > private final long maxTimeLag = 5000; > > private long currentMaxTimestamp; > public transient static DateTimeFormatter parseFromTimeFormatter = > ISODateTimeFormat.dateTimeParser(); > > @Override > public long extractTimestamp(JsonNode element, long > previousElementTimestamp) { > long occurredAtLong; > try { > occurredAtLong = > DateTime.parse(element.get("metadata").get("occurred_at").asText(), > parseFromTimeFormatter).getMillis(); > } > catch(IllegalArgumentException ie) { > throw new IllegalArgumentException(element.asText()); > } > > if(occurredAtLong > currentMaxTimestamp){ > currentMaxTimestamp = occurredAtLong; > } > return occurredAtLong; > } > > @Override > public Watermark getCurrentWatermark() { > > return new Watermark(currentMaxTimestamp - maxTimeLag); > > } > } > > Best, > > Sendoh > > > > -- > View this message in context: http://apache-flink-user- > mailing-list-archive.2336050.n4.nabble.com/Last-event-in- > event-time-window-is-not-output-tp13305.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >