Today I started seeing the following exception across all of the exactly-once kafka sink apps I have deployed
org.apache.kafka.common.errors.TimeoutException: org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms. Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms. The apps are all on Flink v1.10.2 I tried the following workarounds sequentially for a single app but I still continued to get the same exception - changing the sink uid and restoring with allowing non-restored-state - changing the kafka producer id and restoring with allowing non-restored-state - changing the output kafka topic to a new one and restoring with allowing non-restored-state - deploying from scratch (no previous checkpoint/savepoint) - doubling the timeout for state initialization from 60s to 120s My mental model is that we have completely disassociated the flink app from any pending transactions on the kafka side (by changing the uid, producer id, and output topic) and so it should be able to recover from scratch. The kafka clusters are otherwise healthy and accepting writes for non-exactly-once flink apps and all other kafka producers. On the kafka side, we have the following configs set. transaction.max.timeout.ms=3600000 transaction.remove.expired.transaction.cleanup.interval.ms=86400000 I'm considering changing the cleanup to something shorter so that if there are hanging transactions on the kafka side then maybe they can get garbage collected sooner. Or I might just wait it out and accept the downtime. But otherwise, I am out of ideas and unsure how to proceed. Any help would be much appreciated.