Hi Piotr, using 0.11.0 Kafka version On Sat, May 5, 2018 at 10:19 AM Piotr Nowojski <pi...@data-artisans.com> wrote:
> FlinkKafka011Producer uses Kafka 0.11.0.2. > > However I’m not sure if bumping KafkaProducer version solves this issue or > upgrading Kafka. What Kafka version are you using? > > Piotrek > > > On 4 May 2018, at 17:55, Alexander Smirnov <alexander.smirn...@gmail.com> > wrote: > > Thanks for quick turnaround Stefan, Piotr > > This is a rare reproducible issue and I will keep an eye on it > > searching on the Stack Overflow I found > https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash > > They say that the problem is fixed in 0.10.2.1 of kafka producer so I > wonder which version is used in FlinkKafkaProducer integration. For earlier > versions it is proposed to use configuration: > > final Properties props = new Properties();... > props.put(ProducerConfig.RETRIES_CONFIG, 10); > props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, > Integer.toString(Integer.MAX_VALUE));props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, > 20000); > > > > > On Fri, May 4, 2018 at 4:58 PM Piotr Nowojski <pi...@data-artisans.com> > wrote: > >> Hi, >> >> I think Stefan is right. Quick google search points to this: >> https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition >> >> Please let us know if changing your configuration will solve the problem! >> >> Piotrek >> >> On 4 May 2018, at 15:53, Stefan Richter <s.rich...@data-artisans.com> >> wrote: >> >> Hi, >> >> I think in general this means that your producer client does not connect >> to the correct Broker (the leader) but to a broker that is just a follower >> and the follower can not execute that request. However, I am not sure what >> causes this in the context of the FlinkKafkaProducer. Maybe Piotr (in CC) >> has an idea? >> >> Best, >> Stefan >> >> Am 04.05.2018 um 15:45 schrieb Alexander Smirnov < >> alexander.smirn...@gmail.com>: >> >> Hi, >> >> what could cause the following exception? >> >> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: >> Failed to send data to Kafka: This server is not the leader for that >> topic-partition. >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93) >> at >> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219) >> at >> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) >> at >> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >> at >> com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162) >> >> >> Thank you, >> Alex >> >> >> >> >