I have a Kafka Streams application which is currently failing to start due to 
the following ProducerFencedException:

"Caused by: org.apache.kafka.common.errors.ProducerFencedException: task [0_57] 
Abort sending since producer got fenced with a previous record (key ABCD value 
[B@4debf146 timestamp 1531159392586) to topic my-stream-1-store-changelog due 
to Producer attempted an operation with an old epoch. Either there is a newer 
producer with the same transactionalId, or the producer's transaction has been 
expired by the broker.”

My stream application has exactly-once processing enabled and also has a state 
store with a logging enabled.  The application had been running for some time 
but was recently shutdown and now when I try to start it back up, it always 
fails due to ProducerFencedExceptions like the one shown above.  From what I 
can tell, these exceptions are occurring because the producer transactions are 
timing out causing their transactionId to become invalid.  I believe the 
producer transactions are timing out due to the recovery of the state store 
taking longer than the 1 minute default transaction timeout period.  My 
reasoning for this is that when I look at the Kafka Broker logs I see the 
following sequence of events:

1. The Kafka Streams application is started and I see the following logs appear 
in the Kafka Broker indicating the producer transactions have been initialized:

"[2018-07-10T01:34:21,112Z]  [INFO ]  [kafka-request-handler-0]  
[k.c.t.TransactionCoordinator]  [TransactionCoordinator id=79213818] 
Initialized transactionalId my-stream-1-0_37 with producerId 6011 and producer 
epoch 33 on partition __transaction_state-41”

2. When I go back to the Kafka Streams application logs I can see that the 
stream threads are still recovering their state stores from the changelog topic 
due to the following log messages:

"[2018-07-10T01:34:23,164Z]  [INFO ]  
[my-stream-1-755e7bc7-831d-4d3f-8d4c-2d2641095afa-StreamThread-5]  
[c.a.a.s.k.s.StateRestorationMonitor]  Starting restoration of topic 
[my-stream-1-store-changelog] partition [27] for state store [store] with 
starting offset [0] and ending offset [2834487]"

3. Over a minute goes by and state store restoration is still taking place and 
then I see the following log messages appear in the Kafka Broker:

"[2018-07-10T01:36:29,542Z]  [INFO ]  [kafka-request-handler-4]  
[k.c.t.TransactionCoordinator]  [TransactionCoordinator id=79213818] Completed 
rollback ongoing transaction of transactionalId: my-stream-1-0_37 due to 
timeout”

"[2018-07-10T01:36:48,387Z]  [ERROR]  [kafka-request-handler-5]  
[kafka.server.ReplicaManager]  [ReplicaManager broker=79213818] Error 
processing append operation on partition my-stream-1-store-changelog-37
org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is no 
longer valid. There is probably another producer with a newer epoch. 33 
(request epoch), 34 (server epoch)”

4. Soon after that the Kafka Streams application transitions into the ERROR 
state and does not recover. 

So from what I can tell it appears that the producer transactions are timing 
out because the state store recovery process is taking over a minute to 
complete, and while the recovery is taking place the stream threads are not 
committing their transactions.  If this is the case, I wonder if it would make 
sense to not begin the producer transactions until after the state store 
recovery has completed?  This would help to prevent long state store recoveries 
from potentially causing the transactions to time out.

Thanks,
David



Reply via email to