Hi,

We let watermark proceed at the earliest timestamp among all event types.
Our test result looks correct.

/*
* Watermark proceeds at the earliest timestamp among all the event types
* */
public class EventsWatermark implements
AssignerWithPeriodicWatermarks<Map&lt;String, Object>> {

    private final long maxTimeLag = 180000;

    private long currentMaxTimestamp;
    private Map<String, Long> eventTimestampMap;
    private int eventSize;

    public EventsWatermark(int eventSize){
        this.eventSize = eventSize;
        eventTimestampMap = new HashMap<>();
    }

    @Override
    public long extractTimestamp(Map<String, Object> element, long
previousElementTimestamp) {
        long occurredAtLong =
DateTime.parse(element.get("occurred_at").toString(),
Config.timeFormatter).getMillis();
        String eventType = element.get("event_type").toString();

        // Update the timestamp of this event
        eventTimestampMap.put(eventType, occurredAtLong);

        // Haven't collected all timestamps of events, so watermark is not
forwarding
        if(eventSize != eventTimestampMap.size()){
            currentMaxTimestamp = Math.min(occurredAtLong,
currentMaxTimestamp);
        }
        // Get the smallest timestamp of all events which should be the
watermark that can proceed
        else{
            // Get the earliest timestamp of all events
            currentMaxTimestamp =
Collections.min(eventTimestampMap.values());
        }
        return occurredAtLong;
    }

    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp - maxTimeLag);

    }
}

Cheers,

Sendoh



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cannot-see-all-events-in-window-apply-for-big-input-tp9945p10028.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to