Hi All,
My setup is shown in the attached JPEG file.
In my setup, I have a stretch cluster spread across 2 data-centres 
(Geographically distant). The network latency as measured by ping 
round-trip-time is about 50ms.
There are 4 brokers in each DC.
In each of these data-centres, I have a Kafka Producer application and a 
KStream application.
My test involves the following :
Make producer in DC1 produce records at the rate of say about 3000 messages per 
second.
Shutdown all brokers in DC2, so as to simulate a site-outage, for about 30 
minutes.

When I re-start the brokers in DC2, I encounter the following errors in the 
logs of my KStream application. (Note : The Kafka producer application does not 
seem to suffer from any such errors).
Obviously, I can see that the replica lag in the brokers of DC2 has increased, 
but is gradually reducing due to the brokers in DC2 now trying to fetch records 
from DC1 brokers.
However, the KStream application shuts down and cannot be started up till the 
replica lag for all the partitions of the topic from which the KStream 
application consumes, is completed.
The errors I see are as follows in the KStream application and soon after, it 
shuts down.

org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch.
Written offsets would not be recorded and no more records would be sent since 
the producer is fenced, indicating the task may be migrated out; it means all 
tasks belonging to this thread should be migrated.
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:215)
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196)
        at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)
        at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:707)
        at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:693)
        at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:640)
        at 
org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:574)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
        at 
org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:561)
        at java.base/java.lang.Iterable.forEach(Iterable.java:75)
        at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:561)
        at 
org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$3(Sender.java:785)
        at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
        at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:584)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:576)
        at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:327)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.InvalidProducerEpochException: 
Producer attempted to produce with an old epoch.
2022-04-21T16:27:08.469 
[mtx-caf-0ec709e3-23a7-4083-b636-7c66d4fbb992-StreamThread-54] INFO  
c.m.a.k.t.SessionBasedDataUsageAccumulator - MSG=Shutting Down
2022-04-21T16:27:08.470 
[mtx-caf-0ec709e3-23a7-4083-b636-7c66d4fbb992-StreamThread-54] INFO  
o.a.k.s.p.internals.StreamTask - MSG=stream-thread 
[mtx-caf-0ec709e3-23a7-4083-b636-7c66d4fbb992-StreamThread-54] task [0_143] 
Suspended running
2022-04-21T16:27:08.475 
[mtx-caf-0ec709e3-23a7-4083-b636-7c66d4fbb992-StreamThread-54] INFO  
o.a.k.clients.consumer.KafkaConsumer - MSG=[Consumer 
clientId=mtx-caf-0ec709e3-23a7-4083-b636-7c66d4fbb992-StreamThread-54-restore-consumer,
 groupId=null] Unsubscribed all topics or patterns and assigned partitions

Reply via email to