Hi All, I am getting below issue in streams application. Kafka cluster is a 3 broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers restarted at the same time when below exception occurred in streams application so I can relate below exception to those brokers restarts. However, what is worrying me is the streams application did not process any events after below exception. So the question is: 1. how can i make the streams application resilient to broker issues e.g. the producer underneath streams should have connected to another broker instance at the time 1 broker went down, but possible the 2nd broker went down immediately that's why it timed out 2. In general how does streams handle broker issue and when does it decide to connect to another broker instance in case one instance seems to be in error?
{"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception processing processor thread - analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2 stream - task [0_5] Abort sending since an error caught with a previous record (timestamp 1635596258179) to topic analytics-incoming-feed due to org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for analytics-incoming-feed-4:120000 ms has passed since batch creation\nTimeout exception caught when sending record to topic analytics-incoming-feed. This might happen if the producer cannot send data to the Kafka cluster and thus, its internal buffer fills up. This can also happen if the broker is slow to respond, if the network connection to the broker was interrupted, or if similar circumstances arise. You can increase producer parameter `max.block.ms` to increase this timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":40000,"stack_trace":"org.apache.kafka.streams.errors.StreamsException: task [0_5] Abort sending since an error caught with a previous record (timestamp 1635596258179) to topic analytics-incoming-feed due to org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for analytics-incoming-feed-4:120000 ms has passed since batch creation\nTimeout exception caught when sending record to topic analytics-incoming-feed. This might happen if the producer cannot send data to the Kafka cluster and thus, its internal buffer fills up. This can also happen if the broker is slow to respond, if the network connection to the broker was interrupted, or if similar circumstances arise. You can increase producer parameter `max.block.ms` to increase this timeout.\n\tat org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)\n\tat org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)\n\tat datadog.trace.instrumentation.kafka_clients.KafkaProducerInstrumentation$ProducerCallback.onCompletion(KafkaProducerInstrumentation.java:142)\n\tat org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1356)\n\tat org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)\n\tat org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)\n\tat org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:676)\n\tat org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:380)\n\tat org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)\n\tat org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)\n\tat java.base/java.lang.Thread.run(Unknown Source)\nCaused by: org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for analytics-incoming-feed-4:120000 ms has passed since batch creation\n"}