Hi Sachin,

For this particular error, 
“org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition.”, could you try setting the number of 
retries to something large like this:

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
...
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

This will retry the produce requests and should hopefully solve your immediate 
problem.

Thanks
Eno


On 25/03/2017, 08:35, "Sachin Mittal" <sjmit...@gmail.com> wrote:

    Hi,
    We have encountered another case of series of errors which I would need
    more help in understanding.
    
    In logs we see message like this:
    ERROR 2017-03-25 03:41:40,001 [kafka-producer-network-thread |
    85-StreamThread-3-producer]:
    org.apache.kafka.streams.processor.internals.RecordCollectorImpl - task
    [0_1] Error sending record to topic new-part-advice-key-table-changelog. No
    more offsets will be recorded for this task and the exception will
    eventually be thrown
    
    then some millisecond later
    ERROR 2017-03-25 03:41:40,149 [StreamThread-3]:
    org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
    [StreamThread-3] Failed while executing StreamTask 0_1 due to flush state:
    org.apache.kafka.streams.errors.StreamsException: task [0_1] exception
    caught when producing
        at
    
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119)
    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
        at
    
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:127)
    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
        at
    
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:422)
    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
        at
    
org.apache.kafka.streams.processor.internals.StreamThread$4.apply(StreamThread.java:555)
    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
        at
    
org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:513)
    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
        at
    
org.apache.kafka.streams.processor.internals.StreamThread.flushAllState(StreamThread.java:551)
    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
        at
    
org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:463)
    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
        at
    
org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:408)
    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
        at
    
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:389)
    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
    org.apache.kafka.common.errors.NotLeaderForPartitionException: This server
    is not the leader for that topic-partition.
    
    finally we get this
    ERROR 2017-03-25 03:41:45,724 [StreamThread-3]: com.advice.TestKafkaAdvice
    - Uncaught exception:
    org.apache.kafka.streams.errors.StreamsException: Exception caught in
    process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000,
    topic=advice-stream, partition=1, offset=48062286
        at
    
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
        at
    
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:651)
    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
        at
    
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:378)
    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
    Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_1]
    exception caught when producing
        at
    
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119)
    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
        at
    
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76)
    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
        at
    
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:64)
    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
    
    
    Again it is not clear why in this case we need to shut down the steams
    thread and eventually the application. Shouldn't we capture this error too?
    
    Thanks
    Sachin
    


Reply via email to