Background: Using Flink v. 1.13.2 on AWS, with job parallelism of 4. Ingress data from AWS Kinesis are not partitioned by the correct key according to business logic. For that reason, events are repartitioned by using a KeyedStream produced by calling keyBy(.) function providing the correct logical key.
A BoundedOutOfOrderness watermark strategy with a maxOutOfOrderness of 60 sec and the appropriate TimestampAssigner is assigned after consuming from AWS Kinesis. To get the events in order after repartitioning a naive sort approach based on KeyedProcessFunction is used. The implementation is heavily inspired by: https://github.com/apache/flink/blob/ce11ce307e7c90ecbc92a42815d9e3cbae707675/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java#L367 This solution is also based on this cookbook: https://stackoverflow.com/a/59468155/2906175 Problem Description: 1) To evaluate the correctness of the sorting, the event timestamps of subsequent events are compared (current.eventTime - previous.eventTime > 0) in a simple downstream map function. Result: all event times are in an ascending order. 2) Parallel to the comparison of the subsequent event times, the event time of the current event is compared with the current watermark (context.timerService().currentWatermark()). Result: Watermarks and event timestamps are NOT ascending. On some events the event timestamp is lower than the current watermark. I somehow suspect the passing of the watermark in the KeyedProcessFunction might be not as expected. Is the watermark forwarded when out.collect(.) is called? Or is there something else wrong in my approach. Sort function: @Slf4j public abstract class KeyedAbstractBufferFunction extends KeyedProcessFunction<String, DataPointEvent, DataPointEvent> { public static final String EVENT_QUEUE_STATE_NAME = "eventQueuesStateName"; private final TypeSerializer<DataPointEvent> inputSerializer; private transient MapState<Long, List<DataPointEvent>> elementQueueState; public KeyedAbstractBufferFunction2(TypeSerializer<DataPointEvent> inputSerializer) { super(); this.inputSerializer = inputSerializer; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); elementQueueState = getRuntimeContext().getMapState(new MapStateDescriptor<>(EVENT_QUEUE_STATE_NAME, LongSerializer.INSTANCE, new ListSerializer<>(inputSerializer))); } protected void processElementsOnTimer(Stream<DataPointEvent> elements, long timestamp, Context ctx, Collector<DataPointEvent> out) { elements.forEachOrdered(event -> { try { out.collect(event); } catch (Exception e) { throw new RuntimeException(e); } }); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<DataPointEvent> out) throws Exception { PriorityQueue<Long> sortedTimestamps = getSortedTimestamps(); var currentWatermark = ctx.timerService().currentWatermark(); while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= currentWatermark) { long nextTimestamp = sortedTimestamps.poll(); try (Stream<DataPointEvent> elements = elementQueueState.get(nextTimestamp).stream()) { processElementsOnTimer(elements, nextTimestamp, ctx, out); } elementQueueState.remove(nextTimestamp); } } @Override public void processElement(DataPointEvent element, Context ctx, Collector<DataPointEvent> out) throws Exception { long timestamp = element.getEventTimestamp(); if (timestamp > ctx.timerService().currentWatermark()) { // we have an event with a valid timestamp, so we buffer it until we receive the proper watermark. bufferEvent(element, timestamp, ctx); } } private PriorityQueue<Long> getSortedTimestamps() throws Exception { PriorityQueue<Long> sortedTimestamps = new PriorityQueue<>(); for (Long timestamp : elementQueueState.keys()) { sortedTimestamps.offer(timestamp); } return sortedTimestamps; } private void bufferEvent(DataPointEvent event, long currentTime, Context ctx) throws Exception { List<DataPointEvent> elementsForTimestamp = elementQueueState.get(currentTime); if (elementsForTimestamp == null) { elementsForTimestamp = new ArrayList<>(); registerTimer(event.getEventTimestamp(), ctx); } elementsForTimestamp.add(event); elementQueueState.put(currentTime, elementsForTimestamp); } protected void registerTimer(long timestamp, Context ctx) { ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 1); } } Evaluation Function: @Slf4j public class KeyedDataPointEventOutOfOrderMetricsFunction extends ProcessFunction<KeyedEvent<DataPointEvent>, KeyedEvent<DataPointEvent>> { private transient DataPointEvent previous; private transient Counter numOutOfOrderKeyedDataPointEvents; private transient Counter numEventsAfterWatermark; @Override public void open(Configuration parameters) { numOutOfOrderKeyedDataPointEvents = getRuntimeContext().getMetricGroup().addGroup("kinesisanalytics") .addGroup("Function", this.getClass().getName()) .counter("numOutOfOrderKeyedDataPointEvents"); numEventsAfterWatermark = getRuntimeContext().getMetricGroup().addGroup("kinesisanalytics") .addGroup("Function", this.getClass().getName()) .counter("numKeyedDataPointEventsAfterWatermark"); } @Override public void processElement(KeyedEvent<DataPointEvent> event, ProcessFunction<KeyedEvent<DataPointEvent>, KeyedEvent<DataPointEvent>>.Context context, Collector<KeyedEvent<DataPointEvent>> collector) { var current = event.getPayload(); if (previous != null && current.getTag().equals(previous.getTag()) && current.getEventTimestamp() < previous.getEventTimestamp()) { numOutOfOrderKeyedDataPointEvents.inc(); } var currentWm = context.timerService().currentWatermark(); if (event.getEventTimestamp() < currentWm) { numEventsAfterWatermark.inc(); } previous = current; collector.collect(event); } }