OK.
I will try this out.

Do I need to change anything for
max.in.flight.requests.per.connection

Thanks
Sachin


On Sat, Mar 25, 2017 at 10:59 PM, Eno Thereska <eno.there...@gmail.com>
wrote:

> Hi Sachin,
>
> For this particular error, “org.apache.kafka.common.errors.
> NotLeaderForPartitionException: This server is not the leader for that
> topic-partition.”, could you try setting the number of retries to something
> large like this:
>
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
> ...
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
>
> This will retry the produce requests and should hopefully solve your
> immediate problem.
>
> Thanks
> Eno
>
>
> On 25/03/2017, 08:35, "Sachin Mittal" <sjmit...@gmail.com> wrote:
>
>     Hi,
>     We have encountered another case of series of errors which I would need
>     more help in understanding.
>
>     In logs we see message like this:
>     ERROR 2017-03-25 03:41:40,001 [kafka-producer-network-thread |
>     85-StreamThread-3-producer]:
>     org.apache.kafka.streams.processor.internals.RecordCollectorImpl -
> task
>     [0_1] Error sending record to topic new-part-advice-key-table-changelog.
> No
>     more offsets will be recorded for this task and the exception will
>     eventually be thrown
>
>     then some millisecond later
>     ERROR 2017-03-25 03:41:40,149 [StreamThread-3]:
>     org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
>     [StreamThread-3] Failed while executing StreamTask 0_1 due to flush
> state:
>     org.apache.kafka.streams.errors.StreamsException: task [0_1] exception
>     caught when producing
>         at
>     org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:119)
>     ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>         at
>     org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.flush(RecordCollectorImpl.java:127)
>     ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>         at
>     org.apache.kafka.streams.processor.internals.StreamTask.flushState(
> StreamTask.java:422)
>     ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>         at
>     org.apache.kafka.streams.processor.internals.StreamThread$4.apply(
> StreamThread.java:555)
>     ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>         at
>     org.apache.kafka.streams.processor.internals.StreamThread.
> performOnAllTasks(StreamThread.java:513)
>     [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>         at
>     org.apache.kafka.streams.processor.internals.
> StreamThread.flushAllState(StreamThread.java:551)
>     [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>         at
>     org.apache.kafka.streams.processor.internals.StreamThread.
> shutdownTasksAndState(StreamThread.java:463)
>     [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>         at
>     org.apache.kafka.streams.processor.internals.StreamThread.shutdown(
> StreamThread.java:408)
>     [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>         at
>     org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:389)
>     [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     org.apache.kafka.common.errors.NotLeaderForPartitionException: This
> server
>     is not the leader for that topic-partition.
>
>     finally we get this
>     ERROR 2017-03-25 03:41:45,724 [StreamThread-3]:
> com.advice.TestKafkaAdvice
>     - Uncaught exception:
>     org.apache.kafka.streams.errors.StreamsException: Exception caught in
>     process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000,
>     topic=advice-stream, partition=1, offset=48062286
>         at
>     org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:216)
>     ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>         at
>     org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:651)
>     ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>         at
>     org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:378)
>     ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>     Caused by: org.apache.kafka.streams.errors.StreamsException: task
> [0_1]
>     exception caught when producing
>         at
>     org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:119)
>     ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>         at
>     org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> RecordCollectorImpl.java:76)
>     ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>         at
>     org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> RecordCollectorImpl.java:64)
>     ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>
>
>     Again it is not clear why in this case we need to shut down the steams
>     thread and eventually the application. Shouldn't we capture this error
> too?
>
>     Thanks
>     Sachin
>
>
>
>

Reply via email to