Hello Pushkar, I'm assuming you have the same Kafka version (2.5.1) at the Streams client side here: in those old versions, Kafka Streams relies on the embedded Producer clients to handle timeouts, which requires users to correctly configure such values.
In newer version (2.8+) We have made Kafka Streams more robust to Server side disconnects or soft failures that may cause timeouts: https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams. So I'd suggest you upgrade to those versions, and see if those symptoms go away. Guozhang On Sun, Oct 31, 2021 at 5:59 AM Pushkar Deole <pdeole2...@gmail.com> wrote: > Hi All, > > I am getting below issue in streams application. Kafka cluster is a 3 > broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers restarted > at the same time when below exception occurred in streams application so I > can relate below exception to those brokers restarts. However, what is > worrying me is the streams application did not process any events after > below exception. So the question is: > 1. how can i make the streams application resilient to broker issues e.g. > the producer underneath streams should have connected to another broker > instance at the time 1 broker went down, but possible the 2nd broker went > down immediately that's why it timed out > 2. In general how does streams handle broker issue and when does it decide > to connect to another broker instance in case one instance seems to be in > error? > > > {"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception > processing processor thread - > > analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2 > stream - task [0_5] Abort sending since an error caught with a previous > record (timestamp 1635596258179) to topic analytics-incoming-feed due to > org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for > analytics-incoming-feed-4:120000 ms has passed since batch > creation\nTimeout exception caught when sending record to topic > analytics-incoming-feed. This might happen if the producer cannot send data > to the Kafka cluster and thus, its internal buffer fills up. This can also > happen if the broker is slow to respond, if the network connection to the > broker was interrupted, or if similar circumstances arise. You can increase > producer parameter `max.block.ms` to increase this > > timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":40000,"stack_trace":"org.apache.kafka.streams.errors.StreamsException: > task [0_5] Abort sending since an error caught with a previous record > (timestamp 1635596258179) to topic analytics-incoming-feed due to > org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for > analytics-incoming-feed-4:120000 ms has passed since batch > creation\nTimeout exception caught when sending record to topic > analytics-incoming-feed. This might happen if the producer cannot send data > to the Kafka cluster and thus, its internal buffer fills up. This can also > happen if the broker is slow to respond, if the network connection to the > broker was interrupted, or if similar circumstances arise. You can increase > producer parameter `max.block.ms` to increase this timeout.\n\tat > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)\n\tat > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)\n\tat > > datadog.trace.instrumentation.kafka_clients.KafkaProducerInstrumentation$ProducerCallback.onCompletion(KafkaProducerInstrumentation.java:142)\n\tat > > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1356)\n\tat > > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)\n\tat > > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)\n\tat > > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:676)\n\tat > > org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:380)\n\tat > > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)\n\tat > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)\n\tat > java.base/java.lang.Thread.run(Unknown Source)\nCaused by: > org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for > analytics-incoming-feed-4:120000 ms has passed since batch creation\n"} > -- -- Guozhang