Hello Pushkar,

I'm assuming you have the same Kafka version (2.5.1) at the Streams client
side here: in those old versions, Kafka Streams relies on the embedded
Producer clients to handle timeouts, which requires users to correctly
configure such values.

In newer version (2.8+) We have made Kafka Streams more robust to Server
side disconnects or soft failures that may cause timeouts:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams.
So I'd suggest you upgrade to those versions, and see if those symptoms go
away.


Guozhang

On Sun, Oct 31, 2021 at 5:59 AM Pushkar Deole <pdeole2...@gmail.com> wrote:

> Hi All,
>
> I am getting below issue in streams application. Kafka cluster is a 3
> broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers restarted
> at the same time when below exception occurred in streams application so I
> can relate below exception to those brokers restarts. However, what is
> worrying me is the streams application did not process any events after
> below exception. So the question is:
> 1. how can i make the streams application resilient to broker issues e.g.
> the producer underneath streams should have connected to another broker
> instance at the time 1 broker went down, but possible the 2nd broker went
> down immediately that's why it timed out
> 2. In general how does streams handle broker issue and when does it decide
> to connect to another broker instance in case one instance seems to be in
> error?
>
>
> {"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception
> processing processor thread -
>
> analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2
> stream - task [0_5] Abort sending since an error caught with a previous
> record (timestamp 1635596258179) to topic analytics-incoming-feed due to
> org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
> analytics-incoming-feed-4:120000 ms has passed since batch
> creation\nTimeout exception caught when sending record to topic
> analytics-incoming-feed. This might happen if the producer cannot send data
> to the Kafka cluster and thus, its internal buffer fills up. This can also
> happen if the broker is slow to respond, if the network connection to the
> broker was interrupted, or if similar circumstances arise. You can increase
> producer parameter `max.block.ms` to increase this
>
> timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":40000,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
> task [0_5] Abort sending since an error caught with a previous record
> (timestamp 1635596258179) to topic analytics-incoming-feed due to
> org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
> analytics-incoming-feed-4:120000 ms has passed since batch
> creation\nTimeout exception caught when sending record to topic
> analytics-incoming-feed. This might happen if the producer cannot send data
> to the Kafka cluster and thus, its internal buffer fills up. This can also
> happen if the broker is slow to respond, if the network connection to the
> broker was interrupted, or if similar circumstances arise. You can increase
> producer parameter `max.block.ms` to increase this timeout.\n\tat
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)\n\tat
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)\n\tat
>
> datadog.trace.instrumentation.kafka_clients.KafkaProducerInstrumentation$ProducerCallback.onCompletion(KafkaProducerInstrumentation.java:142)\n\tat
>
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1356)\n\tat
>
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)\n\tat
>
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)\n\tat
>
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:676)\n\tat
>
> org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:380)\n\tat
>
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)\n\tat
>
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)\n\tat
> java.base/java.lang.Thread.run(Unknown Source)\nCaused by:
> org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
> analytics-incoming-feed-4:120000 ms has passed since batch creation\n"}
>


-- 
-- Guozhang

Reply via email to