Thx for the pointers! I didn't set any of the conifg parameters you named. So retries defaults to 0.
Another question though: There is no 'retries' config for streams, just 'retry.backoff.ms'. Do I set ProducerConfig.retries in my streams app? Also I do have to set 'max.in.flight.requests.per.connection' to 1 to still guarantee ordering, right? Best, Claudia -----Ursprüngliche Nachricht----- Von: Matthias J. Sax <matth...@confluent.io> Gesendet: Dienstag, 15. Mai 2018 22:58 An: users@kafka.apache.org Betreff: Re: Exception stopps data processing (Kafka Streams) Claudia, I leader change is a retryable error. What is your producer config for `retries`? You might want to increase it such that the producer does not throw the exception immediately but retries couple of times -- you might also want to adjust `retry.backoff.ms` that sets the time to wait until the producer retries. -Matthias On 5/15/18 6:30 AM, Claudia Wegmann wrote: > Hey there, > > I've got a few Kafka Streams services which run smoothly most of the time. > Sometimes, however, some of them get an exception "Abort sending since an > error caught with a previous record" (see below for a full example). The > Stream Service having this exception just stops its work altogether. After > restarting it, the service starts to process all the messages that piled up > and all is fine again. Is it possible for the Kafka Streams service to > recover from such a situation itself? > > Thx for the input and best regards, > Claudia > > > A example stacktrace: > 15.5.2018 13:13:07Exception in thread > "test-service-a19c940e-beee-486d-8ec0-2d06dc869f88-StreamThread-1" > org.apache.kafka.streams.errors.StreamsException: task [0_10] Abort sending > since an error caught with a previous record (key > 25:1001:152401f272ae48658197cbfeda008967 value [B@11e6d177 timestamp > 1526382782489) to topic test-service-CountBasedSlidingWindowStore-changelog > due to org.apache.kafka.common.errors.NotLeaderForPartitionException: This > server is not the leader for that topic-partition.. > 15.5.2018 13:13:07 at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118) > 15.5.2018 13:13:07 at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) > 15.5.2018 13:13:07 at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) > 15.5.2018 13:13:07 at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) > 15.5.2018 13:13:07 at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) > 15.5.2018 13:13:07 at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557) > 15.5.2018 13:13:07 at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481) > 15.5.2018 13:13:07 at > org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) > 15.5.2018 13:13:07 at > org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692) > 15.5.2018 13:13:07 at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) > 15.5.2018 13:13:07 at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482) > 15.5.2018 13:13:07 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) > 15.5.2018 13:13:07 at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) > 15.5.2018 13:13:07 at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) > 15.5.2018 13:13:07 at java.lang.Thread.run(Thread.java:748) > 15.5.2018 13:13:07Caused by: > org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is > not the leader for that topic-partition. > >