I have a Kafka Streams application which is currently failing to start due to the following ProducerFencedException:
"Caused by: org.apache.kafka.common.errors.ProducerFencedException: task [0_57] Abort sending since producer got fenced with a previous record (key ABCD value [B@4debf146 timestamp 1531159392586) to topic my-stream-1-store-changelog due to Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.” My stream application has exactly-once processing enabled and also has a state store with a logging enabled. The application had been running for some time but was recently shutdown and now when I try to start it back up, it always fails due to ProducerFencedExceptions like the one shown above. From what I can tell, these exceptions are occurring because the producer transactions are timing out causing their transactionId to become invalid. I believe the producer transactions are timing out due to the recovery of the state store taking longer than the 1 minute default transaction timeout period. My reasoning for this is that when I look at the Kafka Broker logs I see the following sequence of events: 1. The Kafka Streams application is started and I see the following logs appear in the Kafka Broker indicating the producer transactions have been initialized: "[2018-07-10T01:34:21,112Z] [INFO ] [kafka-request-handler-0] [k.c.t.TransactionCoordinator] [TransactionCoordinator id=79213818] Initialized transactionalId my-stream-1-0_37 with producerId 6011 and producer epoch 33 on partition __transaction_state-41” 2. When I go back to the Kafka Streams application logs I can see that the stream threads are still recovering their state stores from the changelog topic due to the following log messages: "[2018-07-10T01:34:23,164Z] [INFO ] [my-stream-1-755e7bc7-831d-4d3f-8d4c-2d2641095afa-StreamThread-5] [c.a.a.s.k.s.StateRestorationMonitor] Starting restoration of topic [my-stream-1-store-changelog] partition [27] for state store [store] with starting offset [0] and ending offset [2834487]" 3. Over a minute goes by and state store restoration is still taking place and then I see the following log messages appear in the Kafka Broker: "[2018-07-10T01:36:29,542Z] [INFO ] [kafka-request-handler-4] [k.c.t.TransactionCoordinator] [TransactionCoordinator id=79213818] Completed rollback ongoing transaction of transactionalId: my-stream-1-0_37 due to timeout” "[2018-07-10T01:36:48,387Z] [ERROR] [kafka-request-handler-5] [kafka.server.ReplicaManager] [ReplicaManager broker=79213818] Error processing append operation on partition my-stream-1-store-changelog-37 org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is no longer valid. There is probably another producer with a newer epoch. 33 (request epoch), 34 (server epoch)” 4. Soon after that the Kafka Streams application transitions into the ERROR state and does not recover. So from what I can tell it appears that the producer transactions are timing out because the state store recovery process is taking over a minute to complete, and while the recovery is taking place the stream threads are not committing their transactions. If this is the case, I wonder if it would make sense to not begin the producer transactions until after the state store recovery has completed? This would help to prevent long state store recoveries from potentially causing the transactions to time out. Thanks, David