Hi, We let watermark proceed at the earliest timestamp among all event types. Our test result looks correct.
/* * Watermark proceeds at the earliest timestamp among all the event types * */ public class EventsWatermark implements AssignerWithPeriodicWatermarks<Map<String, Object>> { private final long maxTimeLag = 180000; private long currentMaxTimestamp; private Map<String, Long> eventTimestampMap; private int eventSize; public EventsWatermark(int eventSize){ this.eventSize = eventSize; eventTimestampMap = new HashMap<>(); } @Override public long extractTimestamp(Map<String, Object> element, long previousElementTimestamp) { long occurredAtLong = DateTime.parse(element.get("occurred_at").toString(), Config.timeFormatter).getMillis(); String eventType = element.get("event_type").toString(); // Update the timestamp of this event eventTimestampMap.put(eventType, occurredAtLong); // Haven't collected all timestamps of events, so watermark is not forwarding if(eventSize != eventTimestampMap.size()){ currentMaxTimestamp = Math.min(occurredAtLong, currentMaxTimestamp); } // Get the smallest timestamp of all events which should be the watermark that can proceed else{ // Get the earliest timestamp of all events currentMaxTimestamp = Collections.min(eventTimestampMap.values()); } return occurredAtLong; } @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp - maxTimeLag); } } Cheers, Sendoh -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cannot-see-all-events-in-window-apply-for-big-input-tp9945p10028.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.