/usr/share/kafka_2.11-0.8.2.1/bin$ ./kafka-topics.sh --describe --topic
capi --zookeeper (someserver)

Topic:capi PartitionCount:1 ReplicationFactor:1 Configs:

Topic: capi Partition: 0 Leader: 0 Replicas: 0 Isr: 0


There are no events to consume from this topic, this I confirm by running
the console consumer.

./kafka-console-consumer.sh --topic topicname --zookeeper (some server)


The flink connector is other consumer. This is happening in our
pre-production machines consistently, I will also try to reproduce this
locally.

java.lang.RuntimeException: Unable to find a leader for partitions:
[FetchPartition {topic=location, partition=58, offset=-915623761776},
FetchPartition {topic=location, partition=60, offset=-915623761776},
FetchPartition {topic=location, partition=54, offset=-915623761776},
FetchPartition {topic=location, partition=56, offset=-915623761776},
FetchPartition {topic=location, partition=66, offset=-915623761776},
FetchPartition {topic=location, partition=68, offset=-915623761776},
FetchPartition {topic=location, partition=62, offset=-915623761776},
FetchPartition {topic=location, partition=64, offset=-915623761776},
FetchPartition {topic=location, partition=74, offset=-915623761776},
FetchPartition {topic=location, partition=76, offset=-915623761776},
FetchPartition {topic=location, partition=70, offset=-915623761776},
FetchPartition {topic=location, partition=72, offset=-915623761776},
FetchPartition {topic=location, partition=82, offset=-915623761776},
FetchPartition {topic=location, partition=84, offset=-915623761776},
FetchPartition {topic=location, partition=78, offset=-915623761776},
FetchPartition {topic=location, partition=80, offset=-915623761776},
FetchPartition {topic=location, partition=26, offset=-915623761776},
FetchPartition {topic=location, partition=28, offset=-915623761776},
FetchPartition {topic=location, partition=22, offset=-915623761776},
FetchPartition {topic=location, partition=24, offset=-915623761776},
FetchPartition {topic=location, partition=34, offset=-915623761776},
FetchPartition {topic=location, partition=36, offset=-915623761776},
FetchPartition {topic=location, partition=30, offset=-915623761776},
FetchPartition {topic=location, partition=32, offset=-915623761776},
FetchPartition {topic=location, partition=42, offset=-915623761776},
FetchPartition {topic=location, partition=44, offset=-915623761776},
FetchPartition {topic=location, partition=38, offset=-915623761776},
FetchPartition {topic=location, partition=40, offset=-915623761776},
FetchPartition {topic=location, partition=50, offset=-915623761776},
FetchPartition {topic=location, partition=52, offset=-915623761776},
FetchPartition {topic=location, partition=46, offset=-915623761776},
FetchPartition {topic=location, partition=48, offset=-915623761776},
FetchPartition {topic=location, partition=122, offset=-915623761776},
FetchPartition {topic=location, partition=124, offset=-915623761776},
FetchPartition {topic=location, partition=118, offset=-915623761776},
FetchPartition {topic=location, partition=120, offset=-915623761776},
FetchPartition {topic=location, partition=2, offset=-915623761776},
FetchPartition {topic=location, partition=130, offset=-915623761776},
FetchPartition {topic=location, partition=4, offset=-915623761776},
FetchPartition {topic=location, partition=132, offset=-915623761776},
FetchPartition {topic=location, partition=126, offset=-915623761776},
FetchPartition {topic=location, partition=0, offset=-915623761776},
FetchPartition {topic=location, partition=128, offset=-915623761776},
FetchPartition {topic=location, partition=10, offset=-915623761776},
FetchPartition {topic=location, partition=138, offset=-915623761776},
FetchPartition {topic=location, partition=12, offset=-915623761776},
FetchPartition {topic=location, partition=140, offset=-915623761776},
FetchPartition {topic=location, partition=6, offset=-915623761776},
FetchPartition {topic=location, partition=134, offset=-915623761776},
FetchPartition {topic=location, partition=8, offset=-915623761776},
FetchPartition {topic=location, partition=136, offset=-915623761776},
FetchPartition {topic=location, partition=18, offset=-915623761776},
FetchPartition {topic=location, partition=146, offset=-915623761776},
FetchPartition {topic=location, partition=20, offset=-915623761776},
FetchPartition {topic=location, partition=148, offset=-915623761776},
FetchPartition {topic=location, partition=14, offset=-915623761776},
FetchPartition {topic=location, partition=142, offset=-915623761776},
FetchPartition {topic=location, partition=16, offset=-915623761776},
FetchPartition {topic=location, partition=144, offset=-915623761776},
FetchPartition {topic=location, partition=90, offset=-915623761776},
FetchPartition {topic=location, partition=92, offset=-915623761776},
FetchPartition {topic=location, partition=86, offset=-915623761776},
FetchPartition {topic=location, partition=88, offset=-915623761776},
FetchPartition {topic=location, partition=98, offset=-915623761776},
FetchPartition {topic=location, partition=100, offset=-915623761776},
FetchPartition {topic=location, partition=94, offset=-915623761776},
FetchPartition {topic=location, partition=96, offset=-915623761776},
FetchPartition {topic=location, partition=106, offset=-915623761776},
FetchPartition {topic=location, partition=108, offset=-915623761776},
FetchPartition {topic=location, partition=102, offset=-915623761776},
FetchPartition {topic=location, partition=104, offset=-915623761776},
FetchPartition {topic=location, partition=114, offset=-915623761776},
FetchPartition {topic=location, partition=116, offset=-915623761776},
FetchPartition {topic=location, partition=110, offset=-915623761776},
FetchPartition {topic=location, partition=112, offset=-915623761776}]

at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.findLeaderForPartitions(LegacyFetcher.java:323)

at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:162)

at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)

at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)

at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)

at java.lang.Thread.run(Thread.java:745)

On Wed, Apr 20, 2016 at 1:12 PM, Robert Metzger <rmetz...@apache.org> wrote:

> Hi,
> I just tried it with Kafka 0.8.2.0 and 0.8.2.1 and for both versions
> everything worked fine.
> How many partitions does your topic have?
>
> Can you send me the full logs of the Kafka consumer?
>
> On Tue, Apr 19, 2016 at 6:05 PM, Balaji Rajagopalan <
> balaji.rajagopa...@olacabs.com> wrote:
>
>> Flink version : 1.0.0
>> Kafka version : 0.8.2.1
>>
>> Try to use a topic which has no message posted to it, at the time flink
>> starts.
>>
>> On Tue, Apr 19, 2016 at 5:41 PM, Robert Metzger <rmetz...@apache.org>
>> wrote:
>>
>>> Can you provide me with the exact Flink and Kafka versions you are using
>>> and the steps to reproduce the issue?
>>>
>>> On Tue, Apr 19, 2016 at 2:06 PM, Balaji Rajagopalan <
>>> balaji.rajagopa...@olacabs.com> wrote:
>>>
>>>> It does not seem to fully work if there is no data in the kafka stream,
>>>> the flink application emits this error and bails, could this be missed use
>>>> case in the fix.
>>>>
>>>> On Tue, Apr 19, 2016 at 3:58 PM, Robert Metzger <rmetz...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'm sorry, the documentation in the JIRA issue is a bit incorrect. The
>>>>> issue has been fixed in all versions including and after 1.0.0. Earlier
>>>>> releases (0.10, 0.9) will fail when the leader changes.
>>>>> However, you don't necessarily need to upgrade to Flink 1.0.0 to
>>>>> resolve the issue: With checkpointing enabled, your job will fail on a
>>>>> leader change, then Flink will restart the Kafka consumers and they'll 
>>>>> find
>>>>> the new leaders.
>>>>> Starting from Flink 1.0.0 the Kafka consumer will handle leader
>>>>> changes without failing.
>>>>>
>>>>> Regards,
>>>>> Robert
>>>>>
>>>>> On Tue, Apr 19, 2016 at 12:17 PM, Balaji Rajagopalan <
>>>>> balaji.rajagopa...@olacabs.com> wrote:
>>>>>
>>>>>> I am facing this exception repeatedly while trying to consume from
>>>>>> kafka topic. It seems it was reported in 1.0.0 and fixed in 1.0.0, how 
>>>>>> can
>>>>>>  I be sure that is fixed in the version of flink that I am using, does it
>>>>>> require me to install patch updates ?
>>>>>>
>>>>>> Caused by: java.lang.RuntimeException: Unable to find a leader for
>>>>>> partitions: [FetchPartition {topic=capi, partition=0, 
>>>>>> offset=-915623761776}]
>>>>>> at
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.findLeaderForPartitions(LegacyFetcher.java:323)
>>>>>> at
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:162)
>>>>>> at
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>> https://issues.apache.org/jira/browse/FLINK-3368
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to