Hi, I’m experiencing the same issue on Flink 18.1.
I have a slightly different job graph. I have a single Kafka Source (parallelism 6) that is consuming from 2 topics, one topic with 4 partitions and one topic with 6 partitions. The autoWatermarkInteval change to 0 didn’t fix my issue. Did you ever find a solution to this problem please? On 2024/01/17 16:38:40 "adrianbartnik.mailbox.org via user" wrote: > Hi everyone, > > we are struggling to understand how Flink handles watermarks from Kafka when > reprocessing events using their event time. > > The goal of the Flink job is to consume events from 3 Kafka topics (3 > partitions each), order them using their event time (using a process > function) and write them to a single output topic. We are using Flink 1.15.2 > and have a BoundedOutOfOrderness of 5 seconds. > > Job Graph: > > Kafka Topic 1 ---- Kafka Source ---- Map Operator--- > ~80k events > | > > | > Kafka Topic 2 ---- Kafka Source ---- Map Operator ---- Union Operator ---- > KeyBy Operator --- Process Function ----- Kafka Output > ~5k events > | > > | > Kafka Topic 3 ---- Kafka Source ---- Map Operator--- > ~1k events > > Our Assumptions > > * The Union-Operator doesn‘t make a difference for the watermarks, e.g. > it simply propagates the watermarks of the Kafka Sources to the > ProcessFunction > * Each Flink Kafka Source emits one watermark that is the minimum among > the minimum extracted timestamps across all it's partitions (per-partition > watermarks) > Desired Behaviour > > * The Flink Kafka Source advance their Watermarks according to their > progress of processing the events. The ProcessFunction buffers each event in > its local state and only releases them (ordered by observered event time) > once it received a watermark from all its Kafka Sources. > > Actual Behaviour > > * When starting the Flink Job, it reprocesses the existing data in the > Kafka Topics. We see that the Watermark advances (In the Flink WebUI) way to > quickly (especially for the large stream) and the ProcessFunction quickly > sees a very recent watermark, when Flink still processes lots of old events > from the large topic. This very recent watermark causes the ProcessFunction > to mark all other events coming in as late and drops them. > > Other Observations > > * If we set the AutoWatermarkingInterval to 0, the processing function > orders the events correctly based on event time (We are generating watermarks > based a TimestampAssigner and a custom Watermarking Strategy). The timestamp > assignment is by event. > * If we leave the AutoWatermarkingInterval to 200 (default value), the > process function considers ~95% of all incoming events as late - regardless > if we assign timestamps by event or emit them periodically. > * We changed the Watermarking Generator to Periodic and we see the same > results > * We have tried alignment groups, however, in this case, the job doesn’t > make any progress at all and seems stuck. > > Our Questions > > * Why do we see the behavior that we are seen? Where is our knowledge gap > on how the Flink Kafka Source generates its watermarks? > * Are the Watermarks for the Flink Kafka source generated and emitted > after each event or periodically? How is it related the each partition? > * Why does it works if we set the AutoWatermarkingInterval to 0? What > does this change for Flink in the Watermark generation and propagation? > * Why doesn’t alignment group work in this context and why does it seem > stuck? > > We are thankful for every input! > Cheers > > Source Code of Process Function > public class ProcessFunction extends KeyedProcessFunction<String, Event , > Event > { > > private transient MapState<Long, List< Event >> queueState = null; > > @Override > public void open(Configuration config) { > TypeInformation<Long> key = TypeInformation.of(new TypeHint<Long>() > {}); > TypeInformation<List< Event>> value = TypeInformation.of(new > TypeHint<List< Event >() {}); > queueState = getRuntimeContext().getMapState(new > MapStateDescriptor<>("events-by-timestamp", key, value)); > } > > @Override > public void processElement(Event event, KeyedProcessFunction<String, > Event, Event>.Context ctx, Collector<Event> out) throws Exception { > > TimerService timerService = ctx.timerService(); > if (ctx.timestamp() > timerService.currentWatermark()) { > > List< Event > listEvents = queueState.get(ctx.timestamp()); > if (isEmpty(listEvents)) { > listEvents = new ArrayList<>(); > } > > listEvents.add(event); > queueState.put(ctx.timestamp(), listEvents); > timerService.registerEventTimeTimer(ctx.timestamp()); > } else { > // Event considered late, write to side output to debug > ctx.output(sideOutputLateEventsProcessFunction, event); > } > } > > @Override > public void onTimer(long timestamp, KeyedProcessFunction<String, Event, > Event >.OnTimerContext ctx, Collector< Event > out) throws Exception { > queueState.get(timestamp).forEach(out::collect); > queueState.remove(timestamp); > } > } > > Source Code of Watermark Generator > public class WatermarkEventTimeGenerator implements > WatermarkGenerator<RawMessageWithHeaders>, Serializable { > > private final long maxOutOfOrdeness; > > public WatermarkEventTimeGenerator(long maxOutOfOrdeness) { > this.maxOutOfOrdeness = maxOutOfOrdeness; > } > > @Override > public void onEvent(RawMessageWithHeaders event, long eventTimestamp, > WatermarkOutput output) { > Watermark lastWatermark = new Watermark(eventTimestamp - > maxOutOfOrdeness); > output.emitWatermark(lastWatermark); > } > > @Override > public void onPeriodicEmit(WatermarkOutput output) { > } > } > > Source Code of Watermark Strategy > WatermarkStrategy<RawMessageWithHeaders> strategy = > WatermarkStrategy.forGenerator(r -> new > WatermarkEventTimeGenerator(Duration.ofSeconds(5).toMillis())) > .withTimestampAssigner(new > EventTimeExtractor()).withIdleness(Duration.ofMillis(100)); > > >