Thanks for quick turnaround Stefan, Piotr

This is a rare reproducible issue and I will keep an eye on it

searching on the Stack Overflow I found
https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash

They say that the problem is fixed in 0.10.2.1 of kafka producer so I
wonder which version is used in FlinkKafkaProducer integration. For earlier
versions it is proposed to use configuration:

final Properties props = new Properties();...
props.put(ProducerConfig.RETRIES_CONFIG, 10);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
Integer.toString(Integer.MAX_VALUE));props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
20000);




On Fri, May 4, 2018 at 4:58 PM Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> I think Stefan is right. Quick google search points to this:
> https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition
>
> Please let us know if changing your configuration will solve the problem!
>
> Piotrek
>
> On 4 May 2018, at 15:53, Stefan Richter <s.rich...@data-artisans.com>
> wrote:
>
> Hi,
>
> I think in general this means that your producer client does not connect
> to the correct Broker (the leader) but to a broker that is just a follower
> and the follower can not execute that request. However, I am not sure what
> causes this in the context of the FlinkKafkaProducer. Maybe Piotr (in CC)
> has an idea?
>
> Best,
> Stefan
>
> Am 04.05.2018 um 15:45 schrieb Alexander Smirnov <
> alexander.smirn...@gmail.com>:
>
> Hi,
>
> what could cause the following exception?
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed
> to send data to Kafka: This server is not the leader for that
> topic-partition.
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93)
> at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162)
>
>
> Thank you,
> Alex
>
>
>
>

Reply via email to