Hi Mu, which version of flink are you using? I checked the latest branches for 1.2 - 1.5 to look for findLeaderForPartitions at line 205 in Kafka08Fetcher but they did not match. From what I can see in the code, there is a MARKER partition state with topic "n/a" but that is explicitly removed from the list of partitions to find leaders for in the code and solely used during cancelling the fetcher.
I don't know whether this is possible, but I suppose there could be more than one marker and we should call removeAll() instead - @Gordon, can you elaborate/check whether this could happen? Nico On 06/03/18 12:51, Mu Kong wrote: > Hi, > > I have encountered a wired problem. > After I start the job for several days, Flink gave me the following error: > > /java.lang.RuntimeException: Unable to find a leader for partitions: > [Partition: KafkaTopicPartition{topic='n/a', partition=-1}, > KafkaPartitionHandle=[n/a,-1], offset=(not set)]/ > / at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:495)/ > / at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:205)/ > / at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)/ > / at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)/ > / at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)/ > / at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)/ > / at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)/ > / at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)/ > / at java.lang.Thread.run(Thread.java:748)/ > / > / > The Flink job died after this error and tried to restart but in vain at > the end. > > Is there any reason why Flink was unable to find a leader for the partition? > A more confusing question would be why is it trying to find topic 'n/a', > instead of the topic we have specified? > > Thanks in advance! > > Best regards, > Mu
signature.asc
Description: OpenPGP digital signature