Claudia,

I leader change is a retryable error. What is your producer config for
`retries`? You might want to increase it such that the producer does not
throw the exception immediately but retries couple of times -- you might
also want to adjust `retry.backoff.ms` that sets the time to wait until
the producer retries.

-Matthias

On 5/15/18 6:30 AM, Claudia Wegmann wrote:
> Hey there,
> 
> I've got a few Kafka Streams services which run smoothly most of the time. 
> Sometimes, however, some of them get an exception "Abort sending since an 
> error caught with a previous record" (see below for a full example). The 
> Stream Service having this exception just stops its work altogether. After 
> restarting it, the service starts to process all the messages that piled up 
> and all is fine again. Is it possible for the Kafka Streams service to 
> recover from such a situation itself?
> 
> Thx for the input and best regards,
> Claudia
> 
> 
> A example stacktrace:
> 15.5.2018 13:13:07Exception in thread 
> "test-service-a19c940e-beee-486d-8ec0-2d06dc869f88-StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException: task [0_10] Abort sending 
> since an error caught with a previous record (key 
> 25:1001:152401f272ae48658197cbfeda008967 value [B@11e6d177 timestamp 
> 1526382782489) to topic test-service-CountBasedSlidingWindowStore-changelog 
> due to org.apache.kafka.common.errors.NotLeaderForPartitionException: This 
> server is not the leader for that topic-partition..
> 15.5.2018 13:13:07     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
> 15.5.2018 13:13:07     at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
> 15.5.2018 13:13:07     at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
> 15.5.2018 13:13:07     at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
> 15.5.2018 13:13:07     at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596)
> 15.5.2018 13:13:07     at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
> 15.5.2018 13:13:07     at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
> 15.5.2018 13:13:07     at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
> 15.5.2018 13:13:07     at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
> 15.5.2018 13:13:07     at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
> 15.5.2018 13:13:07     at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
> 15.5.2018 13:13:07     at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
> 15.5.2018 13:13:07     at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
> 15.5.2018 13:13:07     at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
> 15.5.2018 13:13:07     at java.lang.Thread.run(Thread.java:748)
> 15.5.2018 13:13:07Caused by: 
> org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
> not the leader for that topic-partition.
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to