[ https://issues.apache.org/jira/browse/FLINK-6264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976253#comment-15976253 ]
Gyula Fora commented on FLINK-6264: ----------------------------------- I am not sure what causes the exception, probably [~tzulitai] is right. In any case I think we should recover from this problem without crashing the job because this seems to be something occuring relatively frequently. This probably means we have to retry fetching the metadata (for the affected partitions or all) a couple of times with some backoff maybe to give some time for Kafka to recover as well (if that causes the problem.) In many cases we have noticed that after a problematic leader change/broker death it takes some time (seconds or minutes) until Kafka goes back in a state that will operate normally, we should try to bridge these gaps without crashing because that's much worse. > Kafka consumer fails if can't find leader for partition > ------------------------------------------------------- > > Key: FLINK-6264 > URL: https://issues.apache.org/jira/browse/FLINK-6264 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.2.0 > Reporter: Gyula Fora > > We have observed the following error many times when brokers failed/were > restarted: > java.lang.RuntimeException: Unable to find a leader for partitions: > [Partition: KafkaTopicPartition{topic='mytopic', partition=10}, > KafkaPartitionHandle=[mytopic,10], offset=-1] > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:474) > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:194) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:256) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.15#6346)