As the error message suggests, you can increase `max.block.ms` for this
case: If a broker is down, it may take some time for the producer to
fail over to a different broker (before the producer can fail over, the
broker must elect a new partition leader, and only afterward can inform
the producer about the new broker that must be used to write into the
partition). Increasing `max.block.ms` gives the producer (and thus the
brokers) more time to do the fail-over.
If the fail-over fails (ie, times out), the producer raises the
exception and KS is forced to stop processing, to avoid data loss
(because the producer did buffer some data that would be lost due to the
error if we commit offsets).
In general, Kafka Streams tries to handle as many broker/client errors
as possible (and newer version handle more cases than older version).
But there are always some case that cannot be handled by Kafka Streams.
Of course, changing client config (producer/consumer and Kafka Streams)
can help to make it more robust.
Thus, it comes down to monitoring of Kafka Streams:
For Kafka Streams, using newer versions you can actually register a
uncaught exception handler that allows you to restart failed threads. In
older versions of Kafka Streams, you can also register a callback, but
it only informs you that a thread died. In older versions you would need
to `close()` KafkaStreams and create a new instance and `start()` it to
recover a died thread.
Hope this helps,
-Matthias
On 10/31/21 5:52 AM, Pushkar Deole wrote:
Hi All,
I am getting below issue in streams application. Kafka cluster is a 3
broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers restarted
at the same time when below exception occurred in streams application so I
can relate below exception to those brokers restarts. However, what is
worrying me is the streams application did not process any events after
below exception. So the question is:
1. how can i make the streams application resilient to broker issues e.g.
the producer underneath streams should have connected to another broker
instance at the time 1 broker went down, but possible the 2nd broker went
down immediately that's why it timed out
2. In general how does streams handle broker issue and when does it decide
to connect to another broker instance in case one instance seems to be in
error?
{"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception
processing processor thread -
analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2
stream - task [0_5] Abort sending since an error caught with a previous
record (timestamp 1635596258179) to topic analytics-incoming-feed due to
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:120000 ms has passed since batch
creation\nTimeout exception caught when sending record to topic
analytics-incoming-feed. This might happen if the producer cannot send data
to the Kafka cluster and thus, its internal buffer fills up. This can also
happen if the broker is slow to respond, if the network connection to the
broker was interrupted, or if similar circumstances arise. You can increase
producer parameter `max.block.ms` to increase this
timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":40000,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
task [0_5] Abort sending since an error caught with a previous record
(timestamp 1635596258179) to topic analytics-incoming-feed due to
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:120000 ms has passed since batch
creation\nTimeout exception caught when sending record to topic
analytics-incoming-feed. This might happen if the producer cannot send data
to the Kafka cluster and thus, its internal buffer fills up. This can also
happen if the broker is slow to respond, if the network connection to the
broker was interrupted, or if similar circumstances arise. You can increase
producer parameter `max.block.ms` to increase this timeout.\n\tat
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)\n\tat
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)\n\tat
datadog.trace.instrumentation.kafka_clients.KafkaProducerInstrumentation$ProducerCallback.onCompletion(KafkaProducerInstrumentation.java:142)\n\tat
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1356)\n\tat
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)\n\tat
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)\n\tat
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:676)\n\tat
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:380)\n\tat
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)\n\tat
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)\n\tat
java.base/java.lang.Thread.run(Unknown Source)\nCaused by:
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:120000 ms has passed since batch creation\n"}