Can it be, that you hit: https://issues.apache.org/jira/browse/KAFKA-6634
-Matthias On 7/9/18 7:58 PM, David Chu wrote: > I have a Kafka Streams application which is currently failing to start due to > the following ProducerFencedException: > > "Caused by: org.apache.kafka.common.errors.ProducerFencedException: task > [0_57] Abort sending since producer got fenced with a previous record (key > ABCD value [B@4debf146 timestamp 1531159392586) to topic > my-stream-1-store-changelog due to Producer attempted an operation with an > old epoch. Either there is a newer producer with the same transactionalId, or > the producer's transaction has been expired by the broker.” > > My stream application has exactly-once processing enabled and also has a > state store with a logging enabled. The application had been running for > some time but was recently shutdown and now when I try to start it back up, > it always fails due to ProducerFencedExceptions like the one shown above. > From what I can tell, these exceptions are occurring because the producer > transactions are timing out causing their transactionId to become invalid. I > believe the producer transactions are timing out due to the recovery of the > state store taking longer than the 1 minute default transaction timeout > period. My reasoning for this is that when I look at the Kafka Broker logs I > see the following sequence of events: > > 1. The Kafka Streams application is started and I see the following logs > appear in the Kafka Broker indicating the producer transactions have been > initialized: > > "[2018-07-10T01:34:21,112Z] [INFO ] [kafka-request-handler-0] > [k.c.t.TransactionCoordinator] [TransactionCoordinator id=79213818] > Initialized transactionalId my-stream-1-0_37 with producerId 6011 and > producer epoch 33 on partition __transaction_state-41” > > 2. When I go back to the Kafka Streams application logs I can see that the > stream threads are still recovering their state stores from the changelog > topic due to the following log messages: > > "[2018-07-10T01:34:23,164Z] [INFO ] > [my-stream-1-755e7bc7-831d-4d3f-8d4c-2d2641095afa-StreamThread-5] > [c.a.a.s.k.s.StateRestorationMonitor] Starting restoration of topic > [my-stream-1-store-changelog] partition [27] for state store [store] with > starting offset [0] and ending offset [2834487]" > > 3. Over a minute goes by and state store restoration is still taking place > and then I see the following log messages appear in the Kafka Broker: > > "[2018-07-10T01:36:29,542Z] [INFO ] [kafka-request-handler-4] > [k.c.t.TransactionCoordinator] [TransactionCoordinator id=79213818] > Completed rollback ongoing transaction of transactionalId: my-stream-1-0_37 > due to timeout” > > "[2018-07-10T01:36:48,387Z] [ERROR] [kafka-request-handler-5] > [kafka.server.ReplicaManager] [ReplicaManager broker=79213818] Error > processing append operation on partition my-stream-1-store-changelog-37 > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 33 > (request epoch), 34 (server epoch)” > > 4. Soon after that the Kafka Streams application transitions into the ERROR > state and does not recover. > > So from what I can tell it appears that the producer transactions are timing > out because the state store recovery process is taking over a minute to > complete, and while the recovery is taking place the stream threads are not > committing their transactions. If this is the case, I wonder if it would > make sense to not begin the producer transactions until after the state store > recovery has completed? This would help to prevent long state store > recoveries from potentially causing the transactions to time out. > > Thanks, > David > > > >
signature.asc
Description: OpenPGP digital signature