Thanks for quick turnaround Stefan, Piotr This is a rare reproducible issue and I will keep an eye on it
searching on the Stack Overflow I found https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash They say that the problem is fixed in 0.10.2.1 of kafka producer so I wonder which version is used in FlinkKafkaProducer integration. For earlier versions it is proposed to use configuration: final Properties props = new Properties();... props.put(ProducerConfig.RETRIES_CONFIG, 10); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000); On Fri, May 4, 2018 at 4:58 PM Piotr Nowojski <pi...@data-artisans.com> wrote: > Hi, > > I think Stefan is right. Quick google search points to this: > https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition > > Please let us know if changing your configuration will solve the problem! > > Piotrek > > On 4 May 2018, at 15:53, Stefan Richter <s.rich...@data-artisans.com> > wrote: > > Hi, > > I think in general this means that your producer client does not connect > to the correct Broker (the leader) but to a broker that is just a follower > and the follower can not execute that request. However, I am not sure what > causes this in the context of the FlinkKafkaProducer. Maybe Piotr (in CC) > has an idea? > > Best, > Stefan > > Am 04.05.2018 um 15:45 schrieb Alexander Smirnov < > alexander.smirn...@gmail.com>: > > Hi, > > what could cause the following exception? > > org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed > to send data to Kafka: This server is not the leader for that > topic-partition. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162) > > > Thank you, > Alex > > > >