Today I started seeing the following exception across all of the exactly-once 
kafka sink apps I have deployed

org.apache.kafka.common.errors.TimeoutException: 
org.apache.kafka.common.errors.TimeoutException: Timeout expired while 
initializing transactional state in 60000ms.
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired 
while initializing transactional state in 60000ms.

The apps are all on Flink v1.10.2

I tried the following workarounds sequentially for a single app but I still 
continued to get the same exception
- changing the sink uid and restoring with allowing non-restored-state
- changing the kafka producer id and restoring with allowing non-restored-state
- changing the output kafka topic to a new one and restoring with allowing 
non-restored-state
- deploying from scratch (no previous checkpoint/savepoint)
- doubling the timeout for state initialization from 60s to 120s

My mental model is that we have completely disassociated the flink app from any 
pending transactions on the kafka side (by changing the uid, producer id, and 
output topic) and so it should be able to recover from scratch. The kafka 
clusters are otherwise healthy and accepting writes for non-exactly-once flink 
apps and all other kafka producers.

On the kafka side, we have the following configs set.

transaction.max.timeout.ms=3600000
transaction.remove.expired.transaction.cleanup.interval.ms=86400000

I'm considering changing the cleanup to something shorter so that if there are 
hanging transactions on the kafka side then maybe they can get garbage 
collected sooner. Or I might just wait it out and accept the downtime.

But otherwise, I am out of ideas and unsure how to proceed. Any help would be 
much appreciated.

Reply via email to