Hi everyone,

we are struggling to understand how Flink handles watermarks from Kafka when 
reprocessing events using their event time.

The goal of the Flink job is to consume events from 3 Kafka topics (3 
partitions each), order them using their event time (using a process function) 
and write them to a single output topic. We are using Flink 1.15.2 and have a 
BoundedOutOfOrderness of 5 seconds.

Job Graph:

Kafka Topic 1 ---- Kafka Source ---- Map Operator---
~80k events                                                                     
            |
                                                                                
                          |
Kafka Topic 2 ---- Kafka Source ---- Map Operator ---- Union Operator ---- 
KeyBy Operator --- Process Function ----- Kafka Output
~5k events                                                                      
             |
                                                                                
                          |
Kafka Topic 3 ---- Kafka Source ----  Map Operator---
~1k events

Our Assumptions

  *   The Union-Operator doesn‘t make a difference for the watermarks, e.g. it 
simply propagates the watermarks of the Kafka Sources to the ProcessFunction
  *   Each Flink Kafka Source emits one watermark that is the minimum among the 
minimum extracted timestamps across all it's partitions (per-partition 
watermarks)
Desired Behaviour

  *   The Flink Kafka Source advance their Watermarks according to their 
progress of processing the events. The ProcessFunction buffers each event in 
its local state and only releases them (ordered by observered event time) once 
it received a watermark from all its Kafka Sources.

Actual Behaviour

  *   When starting the Flink Job, it reprocesses the existing data in the 
Kafka Topics. We see that the Watermark advances (In the Flink WebUI) way to 
quickly (especially for the large stream) and the ProcessFunction quickly sees 
a very recent watermark, when Flink still processes lots of old events from the 
large topic. This very recent watermark causes the ProcessFunction to mark all 
other events coming in as late and drops them.

Other Observations

  *   If we set the AutoWatermarkingInterval to 0, the processing function 
orders the events correctly based on event time (We are generating watermarks 
based a TimestampAssigner and a custom Watermarking Strategy). The timestamp 
assignment is by event.
  *   If we leave the AutoWatermarkingInterval to 200 (default value), the 
process function considers ~95% of all incoming events as late - regardless if 
we assign timestamps by event or emit them periodically.
  *   We changed the Watermarking Generator to Periodic and we see the same 
results
  *   We have tried alignment groups, however, in this case, the job doesn’t 
make any progress at all and seems stuck.

Our Questions

  *   Why do we see the behavior that we are seen? Where is our knowledge gap 
on how the Flink Kafka Source generates its watermarks?
  *   Are the Watermarks for the Flink Kafka source generated and emitted after 
each event or periodically? How is it related the each partition?
  *   Why does it works if we set the AutoWatermarkingInterval to 0? What does 
this change for Flink in the Watermark generation and propagation?
  *   Why doesn’t alignment group work in this context and why does it seem 
stuck?

We are thankful for every input!
Cheers

Source Code of Process Function
public class ProcessFunction extends KeyedProcessFunction<String, Event , Event 
> {

    private transient MapState<Long, List< Event >> queueState = null;

    @Override
    public void open(Configuration config) {
        TypeInformation<Long> key = TypeInformation.of(new TypeHint<Long>() {});
        TypeInformation<List< Event>> value = TypeInformation.of(new 
TypeHint<List< Event >() {});
        queueState = getRuntimeContext().getMapState(new 
MapStateDescriptor<>("events-by-timestamp", key, value));
    }

    @Override
    public void processElement(Event event, KeyedProcessFunction<String, Event, 
Event>.Context ctx, Collector<Event> out) throws Exception {

        TimerService timerService = ctx.timerService();
        if (ctx.timestamp() > timerService.currentWatermark()) {

            List< Event > listEvents = queueState.get(ctx.timestamp());
            if (isEmpty(listEvents)) {
                listEvents = new ArrayList<>();
            }

            listEvents.add(event);
            queueState.put(ctx.timestamp(), listEvents);
            timerService.registerEventTimeTimer(ctx.timestamp());
        } else {
            // Event considered late, write to side output to debug
            ctx.output(sideOutputLateEventsProcessFunction, event);
        }
    }

    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<String, Event, 
Event >.OnTimerContext ctx, Collector< Event > out) throws Exception {
        queueState.get(timestamp).forEach(out::collect);
        queueState.remove(timestamp);
    }
}

Source Code of Watermark Generator
public class WatermarkEventTimeGenerator implements 
WatermarkGenerator<RawMessageWithHeaders>, Serializable {

private final long maxOutOfOrdeness;

  public WatermarkEventTimeGenerator(long maxOutOfOrdeness) {
    this.maxOutOfOrdeness = maxOutOfOrdeness;
  }

  @Override
  public void onEvent(RawMessageWithHeaders event, long eventTimestamp, 
WatermarkOutput output) {
    Watermark lastWatermark = new Watermark(eventTimestamp - maxOutOfOrdeness);
    output.emitWatermark(lastWatermark);
  }

  @Override
  public void onPeriodicEmit(WatermarkOutput output) {
  }
}

Source Code of Watermark Strategy
WatermarkStrategy<RawMessageWithHeaders> strategy =
  WatermarkStrategy.forGenerator(r -> new 
WatermarkEventTimeGenerator(Duration.ofSeconds(5).toMillis()))
    .withTimestampAssigner(new 
EventTimeExtractor()).withIdleness(Duration.ofMillis(100));


Reply via email to