OK. I will try this out. Do I need to change anything for max.in.flight.requests.per.connection
Thanks Sachin On Sat, Mar 25, 2017 at 10:59 PM, Eno Thereska <eno.there...@gmail.com> wrote: > Hi Sachin, > > For this particular error, “org.apache.kafka.common.errors. > NotLeaderForPartitionException: This server is not the leader for that > topic-partition.”, could you try setting the number of retries to something > large like this: > > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); > ... > props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); > > This will retry the produce requests and should hopefully solve your > immediate problem. > > Thanks > Eno > > > On 25/03/2017, 08:35, "Sachin Mittal" <sjmit...@gmail.com> wrote: > > Hi, > We have encountered another case of series of errors which I would need > more help in understanding. > > In logs we see message like this: > ERROR 2017-03-25 03:41:40,001 [kafka-producer-network-thread | > 85-StreamThread-3-producer]: > org.apache.kafka.streams.processor.internals.RecordCollectorImpl - > task > [0_1] Error sending record to topic new-part-advice-key-table-changelog. > No > more offsets will be recorded for this task and the exception will > eventually be thrown > > then some millisecond later > ERROR 2017-03-25 03:41:40,149 [StreamThread-3]: > org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [StreamThread-3] Failed while executing StreamTask 0_1 due to flush > state: > org.apache.kafka.streams.errors.StreamsException: task [0_1] exception > caught when producing > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > checkForException(RecordCollectorImpl.java:119) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > org.apache.kafka.streams.processor.internals. > RecordCollectorImpl.flush(RecordCollectorImpl.java:127) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState( > StreamTask.java:422) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$4.apply( > StreamThread.java:555) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread. > performOnAllTasks(StreamThread.java:513) > [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > org.apache.kafka.streams.processor.internals. > StreamThread.flushAllState(StreamThread.java:551) > [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread. > shutdownTasksAndState(StreamThread.java:463) > [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.shutdown( > StreamThread.java:408) > [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:389) > [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > org.apache.kafka.common.errors.NotLeaderForPartitionException: This > server > is not the leader for that topic-partition. > > finally we get this > ERROR 2017-03-25 03:41:45,724 [StreamThread-3]: > com.advice.TestKafkaAdvice > - Uncaught exception: > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000, > topic=advice-stream, partition=1, offset=48062286 > at > org.apache.kafka.streams.processor.internals. > StreamTask.process(StreamTask.java:216) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:651) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:378) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > Caused by: org.apache.kafka.streams.errors.StreamsException: task > [0_1] > exception caught when producing > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > checkForException(RecordCollectorImpl.java:119) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send( > RecordCollectorImpl.java:76) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send( > RecordCollectorImpl.java:64) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > > Again it is not clear why in this case we need to shut down the steams > thread and eventually the application. Shouldn't we capture this error > too? > > Thanks > Sachin > > > >