Hi everyone, we are struggling to understand how Flink handles watermarks from Kafka when reprocessing events using their event time.
The goal of the Flink job is to consume events from 3 Kafka topics (3 partitions each), order them using their event time (using a process function) and write them to a single output topic. We are using Flink 1.15.2 and have a BoundedOutOfOrderness of 5 seconds. Job Graph: Kafka Topic 1 ---- Kafka Source ---- Map Operator--- ~80k events | | Kafka Topic 2 ---- Kafka Source ---- Map Operator ---- Union Operator ---- KeyBy Operator --- Process Function ----- Kafka Output ~5k events | | Kafka Topic 3 ---- Kafka Source ---- Map Operator--- ~1k events Our Assumptions * The Union-Operator doesn‘t make a difference for the watermarks, e.g. it simply propagates the watermarks of the Kafka Sources to the ProcessFunction * Each Flink Kafka Source emits one watermark that is the minimum among the minimum extracted timestamps across all it's partitions (per-partition watermarks) Desired Behaviour * The Flink Kafka Source advance their Watermarks according to their progress of processing the events. The ProcessFunction buffers each event in its local state and only releases them (ordered by observered event time) once it received a watermark from all its Kafka Sources. Actual Behaviour * When starting the Flink Job, it reprocesses the existing data in the Kafka Topics. We see that the Watermark advances (In the Flink WebUI) way to quickly (especially for the large stream) and the ProcessFunction quickly sees a very recent watermark, when Flink still processes lots of old events from the large topic. This very recent watermark causes the ProcessFunction to mark all other events coming in as late and drops them. Other Observations * If we set the AutoWatermarkingInterval to 0, the processing function orders the events correctly based on event time (We are generating watermarks based a TimestampAssigner and a custom Watermarking Strategy). The timestamp assignment is by event. * If we leave the AutoWatermarkingInterval to 200 (default value), the process function considers ~95% of all incoming events as late - regardless if we assign timestamps by event or emit them periodically. * We changed the Watermarking Generator to Periodic and we see the same results * We have tried alignment groups, however, in this case, the job doesn’t make any progress at all and seems stuck. Our Questions * Why do we see the behavior that we are seen? Where is our knowledge gap on how the Flink Kafka Source generates its watermarks? * Are the Watermarks for the Flink Kafka source generated and emitted after each event or periodically? How is it related the each partition? * Why does it works if we set the AutoWatermarkingInterval to 0? What does this change for Flink in the Watermark generation and propagation? * Why doesn’t alignment group work in this context and why does it seem stuck? We are thankful for every input! Cheers Source Code of Process Function public class ProcessFunction extends KeyedProcessFunction<String, Event , Event > { private transient MapState<Long, List< Event >> queueState = null; @Override public void open(Configuration config) { TypeInformation<Long> key = TypeInformation.of(new TypeHint<Long>() {}); TypeInformation<List< Event>> value = TypeInformation.of(new TypeHint<List< Event >() {}); queueState = getRuntimeContext().getMapState(new MapStateDescriptor<>("events-by-timestamp", key, value)); } @Override public void processElement(Event event, KeyedProcessFunction<String, Event, Event>.Context ctx, Collector<Event> out) throws Exception { TimerService timerService = ctx.timerService(); if (ctx.timestamp() > timerService.currentWatermark()) { List< Event > listEvents = queueState.get(ctx.timestamp()); if (isEmpty(listEvents)) { listEvents = new ArrayList<>(); } listEvents.add(event); queueState.put(ctx.timestamp(), listEvents); timerService.registerEventTimeTimer(ctx.timestamp()); } else { // Event considered late, write to side output to debug ctx.output(sideOutputLateEventsProcessFunction, event); } } @Override public void onTimer(long timestamp, KeyedProcessFunction<String, Event, Event >.OnTimerContext ctx, Collector< Event > out) throws Exception { queueState.get(timestamp).forEach(out::collect); queueState.remove(timestamp); } } Source Code of Watermark Generator public class WatermarkEventTimeGenerator implements WatermarkGenerator<RawMessageWithHeaders>, Serializable { private final long maxOutOfOrdeness; public WatermarkEventTimeGenerator(long maxOutOfOrdeness) { this.maxOutOfOrdeness = maxOutOfOrdeness; } @Override public void onEvent(RawMessageWithHeaders event, long eventTimestamp, WatermarkOutput output) { Watermark lastWatermark = new Watermark(eventTimestamp - maxOutOfOrdeness); output.emitWatermark(lastWatermark); } @Override public void onPeriodicEmit(WatermarkOutput output) { } } Source Code of Watermark Strategy WatermarkStrategy<RawMessageWithHeaders> strategy = WatermarkStrategy.forGenerator(r -> new WatermarkEventTimeGenerator(Duration.ofSeconds(5).toMillis())) .withTimestampAssigner(new EventTimeExtractor()).withIdleness(Duration.ofMillis(100));