Hello,
in Flink 19, AWS managed Flink

flink-connector-kafka:3.3.0-1.19

After i enable watermark alignment at KafkaSource, It starts throwing
uncaught WakeupException.
It happens:
* On every checkpoint unless i disable offset committing:
   setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false")
* randomly

Stack trace is for both cases.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected
exception while polling the records
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.WakeupException
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:529)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1759)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1717)
at 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:127)
at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
... 6 more

Watermark alignment stops working after recovery.

Checking the code, i see that

long consumerPosition = consumer.position(tp);

at line KafkaPartitionSplitReader.java:127 is the only call to
consumer.position that is not wrapped on retryOnWakeup on the whole
file ( there are a few calls in there )

Tested wrapping it and i am able to make my app work without any exception.
I could make a PR, waiting for ASF Self-Service account.
But i dont really understand what the race condition here and not able
to reproduce on tests.

Hints and help would be appreciated,

Thanks

Reply via email to