Yes, https://issues.apache.org/jira/browse/KAFKA-6634 <https://issues.apache.org/jira/browse/KAFKA-6634> seems to explain the issue I’m seeing; however, I’m running Kafka and Kafka Streams on version 1.1.0 so I wonder why this issue is still occurring?
-David > On Jul 10, 2018, at 9:38 AM, Matthias J. Sax <matth...@confluent.io> wrote: > > Can it be, that you hit: https://issues.apache.org/jira/browse/KAFKA-6634 > > -Matthias > > On 7/9/18 7:58 PM, David Chu wrote: >> I have a Kafka Streams application which is currently failing to start due >> to the following ProducerFencedException: >> >> "Caused by: org.apache.kafka.common.errors.ProducerFencedException: task >> [0_57] Abort sending since producer got fenced with a previous record (key >> ABCD value [B@4debf146 timestamp 1531159392586) to topic >> my-stream-1-store-changelog due to Producer attempted an operation with an >> old epoch. Either there is a newer producer with the same transactionalId, >> or the producer's transaction has been expired by the broker.” >> >> My stream application has exactly-once processing enabled and also has a >> state store with a logging enabled. The application had been running for >> some time but was recently shutdown and now when I try to start it back up, >> it always fails due to ProducerFencedExceptions like the one shown above. >> From what I can tell, these exceptions are occurring because the producer >> transactions are timing out causing their transactionId to become invalid. >> I believe the producer transactions are timing out due to the recovery of >> the state store taking longer than the 1 minute default transaction timeout >> period. My reasoning for this is that when I look at the Kafka Broker logs >> I see the following sequence of events: >> >> 1. The Kafka Streams application is started and I see the following logs >> appear in the Kafka Broker indicating the producer transactions have been >> initialized: >> >> "[2018-07-10T01:34:21,112Z] [INFO ] [kafka-request-handler-0] >> [k.c.t.TransactionCoordinator] [TransactionCoordinator id=79213818] >> Initialized transactionalId my-stream-1-0_37 with producerId 6011 and >> producer epoch 33 on partition __transaction_state-41” >> >> 2. When I go back to the Kafka Streams application logs I can see that the >> stream threads are still recovering their state stores from the changelog >> topic due to the following log messages: >> >> "[2018-07-10T01:34:23,164Z] [INFO ] >> [my-stream-1-755e7bc7-831d-4d3f-8d4c-2d2641095afa-StreamThread-5] >> [c.a.a.s.k.s.StateRestorationMonitor] Starting restoration of topic >> [my-stream-1-store-changelog] partition [27] for state store [store] with >> starting offset [0] and ending offset [2834487]" >> >> 3. Over a minute goes by and state store restoration is still taking place >> and then I see the following log messages appear in the Kafka Broker: >> >> "[2018-07-10T01:36:29,542Z] [INFO ] [kafka-request-handler-4] >> [k.c.t.TransactionCoordinator] [TransactionCoordinator id=79213818] >> Completed rollback ongoing transaction of transactionalId: my-stream-1-0_37 >> due to timeout” >> >> "[2018-07-10T01:36:48,387Z] [ERROR] [kafka-request-handler-5] >> [kafka.server.ReplicaManager] [ReplicaManager broker=79213818] Error >> processing append operation on partition my-stream-1-store-changelog-37 >> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is >> no longer valid. There is probably another producer with a newer epoch. 33 >> (request epoch), 34 (server epoch)” >> >> 4. Soon after that the Kafka Streams application transitions into the ERROR >> state and does not recover. >> >> So from what I can tell it appears that the producer transactions are timing >> out because the state store recovery process is taking over a minute to >> complete, and while the recovery is taking place the stream threads are not >> committing their transactions. If this is the case, I wonder if it would >> make sense to not begin the producer transactions until after the state >> store recovery has completed? This would help to prevent long state store >> recoveries from potentially causing the transactions to time out. >> >> Thanks, >> David >> >> >> >> >