Hi,

I’m experiencing the same issue on Flink 18.1.

I have a slightly different job graph. I have a single Kafka Source 
(parallelism 6) that is consuming from 2 topics, one topic with 4 partitions 
and one topic with 6 partitions.

The autoWatermarkInteval change to 0 didn’t fix my issue.

Did you ever find a solution to this problem please?

On 2024/01/17 16:38:40 "adrianbartnik.mailbox.org via user" wrote:
> Hi everyone,
> 
> we are struggling to understand how Flink handles watermarks from Kafka when 
> reprocessing events using their event time.
> 
> The goal of the Flink job is to consume events from 3 Kafka topics (3 
> partitions each), order them using their event time (using a process 
> function) and write them to a single output topic. We are using Flink 1.15.2 
> and have a BoundedOutOfOrderness of 5 seconds.
> 
> Job Graph:
> 
> Kafka Topic 1 ---- Kafka Source ---- Map Operator---
> ~80k events                                                                   
>               |
>                                                                               
>                             |
> Kafka Topic 2 ---- Kafka Source ---- Map Operator ---- Union Operator ---- 
> KeyBy Operator --- Process Function ----- Kafka Output
> ~5k events                                                                    
>                |
>                                                                               
>                             |
> Kafka Topic 3 ---- Kafka Source ----  Map Operator---
> ~1k events
> 
> Our Assumptions
> 
>   *   The Union-Operator doesn‘t make a difference for the watermarks, e.g. 
> it simply propagates the watermarks of the Kafka Sources to the 
> ProcessFunction
>   *   Each Flink Kafka Source emits one watermark that is the minimum among 
> the minimum extracted timestamps across all it's partitions (per-partition 
> watermarks)
> Desired Behaviour
> 
>   *   The Flink Kafka Source advance their Watermarks according to their 
> progress of processing the events. The ProcessFunction buffers each event in 
> its local state and only releases them (ordered by observered event time) 
> once it received a watermark from all its Kafka Sources.
> 
> Actual Behaviour
> 
>   *   When starting the Flink Job, it reprocesses the existing data in the 
> Kafka Topics. We see that the Watermark advances (In the Flink WebUI) way to 
> quickly (especially for the large stream) and the ProcessFunction quickly 
> sees a very recent watermark, when Flink still processes lots of old events 
> from the large topic. This very recent watermark causes the ProcessFunction 
> to mark all other events coming in as late and drops them.
> 
> Other Observations
> 
>   *   If we set the AutoWatermarkingInterval to 0, the processing function 
> orders the events correctly based on event time (We are generating watermarks 
> based a TimestampAssigner and a custom Watermarking Strategy). The timestamp 
> assignment is by event.
>   *   If we leave the AutoWatermarkingInterval to 200 (default value), the 
> process function considers ~95% of all incoming events as late - regardless 
> if we assign timestamps by event or emit them periodically.
>   *   We changed the Watermarking Generator to Periodic and we see the same 
> results
>   *   We have tried alignment groups, however, in this case, the job doesn’t 
> make any progress at all and seems stuck.
> 
> Our Questions
> 
>   *   Why do we see the behavior that we are seen? Where is our knowledge gap 
> on how the Flink Kafka Source generates its watermarks?
>   *   Are the Watermarks for the Flink Kafka source generated and emitted 
> after each event or periodically? How is it related the each partition?
>   *   Why does it works if we set the AutoWatermarkingInterval to 0? What 
> does this change for Flink in the Watermark generation and propagation?
>   *   Why doesn’t alignment group work in this context and why does it seem 
> stuck?
> 
> We are thankful for every input!
> Cheers
> 
> Source Code of Process Function
> public class ProcessFunction extends KeyedProcessFunction<String, Event , 
> Event > {
> 
>     private transient MapState<Long, List< Event >> queueState = null;
> 
>     @Override
>     public void open(Configuration config) {
>         TypeInformation<Long> key = TypeInformation.of(new TypeHint<Long>() 
> {});
>         TypeInformation<List< Event>> value = TypeInformation.of(new 
> TypeHint<List< Event >() {});
>         queueState = getRuntimeContext().getMapState(new 
> MapStateDescriptor<>("events-by-timestamp", key, value));
>     }
> 
>     @Override
>     public void processElement(Event event, KeyedProcessFunction<String, 
> Event, Event>.Context ctx, Collector<Event> out) throws Exception {
> 
>         TimerService timerService = ctx.timerService();
>         if (ctx.timestamp() > timerService.currentWatermark()) {
> 
>             List< Event > listEvents = queueState.get(ctx.timestamp());
>             if (isEmpty(listEvents)) {
>                 listEvents = new ArrayList<>();
>             }
> 
>             listEvents.add(event);
>             queueState.put(ctx.timestamp(), listEvents);
>             timerService.registerEventTimeTimer(ctx.timestamp());
>         } else {
>             // Event considered late, write to side output to debug
>             ctx.output(sideOutputLateEventsProcessFunction, event);
>         }
>     }
> 
>     @Override
>     public void onTimer(long timestamp, KeyedProcessFunction<String, Event, 
> Event >.OnTimerContext ctx, Collector< Event > out) throws Exception {
>         queueState.get(timestamp).forEach(out::collect);
>         queueState.remove(timestamp);
>     }
> }
> 
> Source Code of Watermark Generator
> public class WatermarkEventTimeGenerator implements 
> WatermarkGenerator<RawMessageWithHeaders>, Serializable {
> 
> private final long maxOutOfOrdeness;
> 
>   public WatermarkEventTimeGenerator(long maxOutOfOrdeness) {
>     this.maxOutOfOrdeness = maxOutOfOrdeness;
>   }
> 
>   @Override
>   public void onEvent(RawMessageWithHeaders event, long eventTimestamp, 
> WatermarkOutput output) {
>     Watermark lastWatermark = new Watermark(eventTimestamp - 
> maxOutOfOrdeness);
>     output.emitWatermark(lastWatermark);
>   }
> 
>   @Override
>   public void onPeriodicEmit(WatermarkOutput output) {
>   }
> }
> 
> Source Code of Watermark Strategy
> WatermarkStrategy<RawMessageWithHeaders> strategy =
>   WatermarkStrategy.forGenerator(r -> new 
> WatermarkEventTimeGenerator(Duration.ofSeconds(5).toMillis()))
>     .withTimestampAssigner(new 
> EventTimeExtractor()).withIdleness(Duration.ofMillis(100));
> 
> 
> 

Reply via email to