Using Flink v. 1.13.2 on AWS, with job parallelism of 4.

Ingress data from AWS Kinesis are not partitioned by the correct key according 
to business logic. For that reason, events are repartitioned by using a 
KeyedStream produced by calling keyBy(.) function providing the correct logical 

A BoundedOutOfOrderness watermark strategy with a maxOutOfOrderness of 60 sec 
and the appropriate TimestampAssigner is assigned after consuming from AWS 

To get the events in order after repartitioning a naive sort approach based on 
KeyedProcessFunction is used. The implementation is heavily inspired by:

This solution is also based on this cookbook:

Problem Description:
1) To evaluate the correctness of the sorting, the event timestamps of 
subsequent events are compared (current.eventTime - previous.eventTime > 0) in 
a simple downstream map function. Result: all event times are in an ascending 

2) Parallel to the comparison of the subsequent event times, the event time of 
the current event is compared with the current watermark 
(context.timerService().currentWatermark()). Result: Watermarks and event 
timestamps are NOT ascending. On some events the event timestamp is lower than 
the current watermark.

I somehow suspect the passing of the watermark in the KeyedProcessFunction 
might be not as expected. Is the watermark forwarded when out.collect(.) is 
called? Or is there something else wrong in my approach.

Sort function:
public abstract class KeyedAbstractBufferFunction
    extends KeyedProcessFunction<String, DataPointEvent, DataPointEvent> {

  public static final String EVENT_QUEUE_STATE_NAME = "eventQueuesStateName";

  private final TypeSerializer<DataPointEvent> inputSerializer;
  private transient MapState<Long, List<DataPointEvent>> elementQueueState;
  public KeyedAbstractBufferFunction2(TypeSerializer<DataPointEvent> 
inputSerializer) {
    this.inputSerializer = inputSerializer;

  public void open(Configuration parameters) throws Exception {;
    elementQueueState =
            LongSerializer.INSTANCE, new ListSerializer<>(inputSerializer)));

  protected void processElementsOnTimer(Stream<DataPointEvent> elements, long 
timestamp, Context ctx,
                                        Collector<DataPointEvent> out) {
    elements.forEachOrdered(event -> {
      try {
      } catch (Exception e) {
        throw new RuntimeException(e);
  public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<DataPointEvent> out) throws Exception {
    PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
    var currentWatermark = ctx.timerService().currentWatermark();
    while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= 
currentWatermark) {
      long nextTimestamp = sortedTimestamps.poll();
      try (Stream<DataPointEvent> elements = 
elementQueueState.get(nextTimestamp).stream()) {
        processElementsOnTimer(elements, nextTimestamp, ctx, out);

  public void processElement(DataPointEvent element, Context ctx, 
Collector<DataPointEvent> out) throws Exception {
    long timestamp = element.getEventTimestamp();
    if (timestamp > ctx.timerService().currentWatermark()) {
      // we have an event with a valid timestamp, so we buffer it until we 
receive the proper watermark.
      bufferEvent(element, timestamp, ctx);

  private PriorityQueue<Long> getSortedTimestamps() throws Exception {
    PriorityQueue<Long> sortedTimestamps = new PriorityQueue<>();
    for (Long timestamp : elementQueueState.keys()) {
    return sortedTimestamps;

  private void bufferEvent(DataPointEvent event, long currentTime, Context ctx) 
throws Exception {
    List<DataPointEvent> elementsForTimestamp = 
    if (elementsForTimestamp == null) {
      elementsForTimestamp = new ArrayList<>();
      registerTimer(event.getEventTimestamp(), ctx);

    elementQueueState.put(currentTime, elementsForTimestamp);

  protected void registerTimer(long timestamp, Context ctx) {
+ 1);

Evaluation Function:
public class KeyedDataPointEventOutOfOrderMetricsFunction extends
  ProcessFunction<KeyedEvent<DataPointEvent>, KeyedEvent<DataPointEvent>> {

  private transient DataPointEvent previous;

  private transient Counter numOutOfOrderKeyedDataPointEvents;
  private transient Counter numEventsAfterWatermark;

  public void open(Configuration parameters) {
    numOutOfOrderKeyedDataPointEvents = 
      .addGroup("Function", this.getClass().getName())
    numEventsAfterWatermark = 
      .addGroup("Function", this.getClass().getName())

  public void processElement(KeyedEvent<DataPointEvent> event,
KeyedEvent<DataPointEvent>>.Context context,
                             Collector<KeyedEvent<DataPointEvent>> collector) {
    var current = event.getPayload();
    if (previous != null &&
      current.getTag().equals(previous.getTag()) &&
      current.getEventTimestamp() < previous.getEventTimestamp()) {;
    var currentWm = context.timerService().currentWatermark();
    if (event.getEventTimestamp() < currentWm) {;
    previous = current;

Reply via email to