[ 
https://issues.apache.org/jira/browse/FLINK-6264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976253#comment-15976253
 ] 

Gyula Fora commented on FLINK-6264:
-----------------------------------

I am not sure what causes the exception, probably [~tzulitai] is right.

In any case I think we should recover from this problem without crashing the 
job because this seems to be something occuring relatively frequently.
This probably means we have to retry fetching the metadata (for the affected 
partitions or all) a couple of times with some backoff maybe to give some time 
for Kafka to recover as well (if that causes the problem.)

In many cases we have noticed that after a problematic leader change/broker 
death it takes some time (seconds or minutes) until Kafka goes back in a state 
that will operate normally, we should try to bridge these gaps without crashing 
because that's much worse.

> Kafka consumer fails if can't find leader for partition
> -------------------------------------------------------
>
>                 Key: FLINK-6264
>                 URL: https://issues.apache.org/jira/browse/FLINK-6264
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.2.0
>            Reporter: Gyula Fora
>
> We have observed the following error many times when brokers failed/were 
> restarted:
> java.lang.RuntimeException: Unable to find a leader for partitions: 
> [Partition: KafkaTopicPartition{topic='mytopic', partition=10}, 
> KafkaPartitionHandle=[mytopic,10], offset=-1]
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:474)
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:194)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:256)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656)
>       at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to