Hello, in Flink 19, AWS managed Flink flink-connector-kafka:3.3.0-1.19
After i enable watermark alignment at KafkaSource, It starts throwing uncaught WakeupException. It happens: * On every checkpoint unless i disable offset committing: setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false") * randomly Stack trace is for both cases. java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.common.errors.WakeupException at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:529) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242) at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1759) at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1717) at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:127) at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ... 6 more Watermark alignment stops working after recovery. Checking the code, i see that long consumerPosition = consumer.position(tp); at line KafkaPartitionSplitReader.java:127 is the only call to consumer.position that is not wrapped on retryOnWakeup on the whole file ( there are a few calls in there ) Tested wrapping it and i am able to make my app work without any exception. I could make a PR, waiting for ASF Self-Service account. But i dont really understand what the race condition here and not able to reproduce on tests. Hints and help would be appreciated, Thanks