Background:
Using Flink v. 1.13.2 on AWS, with job parallelism of 4.

Ingress data from AWS Kinesis are not partitioned by the correct key according 
to business logic. For that reason, events are repartitioned by using a 
KeyedStream produced by calling keyBy(.) function providing the correct logical 
key.

A BoundedOutOfOrderness watermark strategy with a maxOutOfOrderness of 60 sec 
and the appropriate TimestampAssigner is assigned after consuming from AWS 
Kinesis.

To get the events in order after repartitioning a naive sort approach based on 
KeyedProcessFunction is used. The implementation is heavily inspired by: 
https://github.com/apache/flink/blob/ce11ce307e7c90ecbc92a42815d9e3cbae707675/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java#L367

This solution is also based on this cookbook: 
https://stackoverflow.com/a/59468155/2906175


Problem Description:
1) To evaluate the correctness of the sorting, the event timestamps of 
subsequent events are compared (current.eventTime - previous.eventTime > 0) in 
a simple downstream map function. Result: all event times are in an ascending 
order.

2) Parallel to the comparison of the subsequent event times, the event time of 
the current event is compared with the current watermark 
(context.timerService().currentWatermark()). Result: Watermarks and event 
timestamps are NOT ascending. On some events the event timestamp is lower than 
the current watermark.

I somehow suspect the passing of the watermark in the KeyedProcessFunction 
might be not as expected. Is the watermark forwarded when out.collect(.) is 
called? Or is there something else wrong in my approach.

Sort function:
@Slf4j
public abstract class KeyedAbstractBufferFunction
    extends KeyedProcessFunction<String, DataPointEvent, DataPointEvent> {

  public static final String EVENT_QUEUE_STATE_NAME = "eventQueuesStateName";

  private final TypeSerializer<DataPointEvent> inputSerializer;
  private transient MapState<Long, List<DataPointEvent>> elementQueueState;
  
  public KeyedAbstractBufferFunction2(TypeSerializer<DataPointEvent> 
inputSerializer) {
    super();
    this.inputSerializer = inputSerializer;
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    elementQueueState =
        getRuntimeContext().getMapState(new 
MapStateDescriptor<>(EVENT_QUEUE_STATE_NAME,
            LongSerializer.INSTANCE, new ListSerializer<>(inputSerializer)));
  }

  protected void processElementsOnTimer(Stream<DataPointEvent> elements, long 
timestamp, Context ctx,
                                        Collector<DataPointEvent> out) {
    elements.forEachOrdered(event -> {
      try {
        out.collect(event);
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
    });
  }
  
  @Override
  public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<DataPointEvent> out) throws Exception {
    PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
    var currentWatermark = ctx.timerService().currentWatermark();
    while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= 
currentWatermark) {
      long nextTimestamp = sortedTimestamps.poll();
      try (Stream<DataPointEvent> elements = 
elementQueueState.get(nextTimestamp).stream()) {
        processElementsOnTimer(elements, nextTimestamp, ctx, out);
      }
      elementQueueState.remove(nextTimestamp);
    }
  }

  @Override
  public void processElement(DataPointEvent element, Context ctx, 
Collector<DataPointEvent> out) throws Exception {
    long timestamp = element.getEventTimestamp();
    if (timestamp > ctx.timerService().currentWatermark()) {
      // we have an event with a valid timestamp, so we buffer it until we 
receive the proper watermark.
      bufferEvent(element, timestamp, ctx);
    }
  }

  private PriorityQueue<Long> getSortedTimestamps() throws Exception {
    PriorityQueue<Long> sortedTimestamps = new PriorityQueue<>();
    for (Long timestamp : elementQueueState.keys()) {
      sortedTimestamps.offer(timestamp);
    }
    return sortedTimestamps;
  }

  private void bufferEvent(DataPointEvent event, long currentTime, Context ctx) 
throws Exception {
    List<DataPointEvent> elementsForTimestamp = 
elementQueueState.get(currentTime);
    if (elementsForTimestamp == null) {
      elementsForTimestamp = new ArrayList<>();
      registerTimer(event.getEventTimestamp(), ctx);
    }

    elementsForTimestamp.add(event);
    elementQueueState.put(currentTime, elementsForTimestamp);
  }

  protected void registerTimer(long timestamp, Context ctx) {
    
ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() 
+ 1);
  }
}

Evaluation Function:
@Slf4j
public class KeyedDataPointEventOutOfOrderMetricsFunction extends
  ProcessFunction<KeyedEvent<DataPointEvent>, KeyedEvent<DataPointEvent>> {

  private transient DataPointEvent previous;

  private transient Counter numOutOfOrderKeyedDataPointEvents;
  private transient Counter numEventsAfterWatermark;

  @Override
  public void open(Configuration parameters) {
    numOutOfOrderKeyedDataPointEvents = 
getRuntimeContext().getMetricGroup().addGroup("kinesisanalytics")
      .addGroup("Function", this.getClass().getName())
      .counter("numOutOfOrderKeyedDataPointEvents");
    numEventsAfterWatermark = 
getRuntimeContext().getMetricGroup().addGroup("kinesisanalytics")
      .addGroup("Function", this.getClass().getName())
      .counter("numKeyedDataPointEventsAfterWatermark");
  }

  @Override
  public void processElement(KeyedEvent<DataPointEvent> event,
                             ProcessFunction<KeyedEvent<DataPointEvent>, 
KeyedEvent<DataPointEvent>>.Context context,
                             Collector<KeyedEvent<DataPointEvent>> collector) {
    var current = event.getPayload();
    if (previous != null &&
      current.getTag().equals(previous.getTag()) &&
      current.getEventTimestamp() < previous.getEventTimestamp()) {
      numOutOfOrderKeyedDataPointEvents.inc();
    }
    var currentWm = context.timerService().currentWatermark();
    if (event.getEventTimestamp() < currentWm) {
      numEventsAfterWatermark.inc();
    }
    previous = current;
    collector.collect(event);
  }
}

Reply via email to