We did some rebalance of topics in our Kafka cluster today. I had a flink
job running and it crashed when some of the partitions were moved, other
consumers (non flink) continued to work.

Should I configure it differently or could this be a bug?

09/24/2015 15:34:31     Source: Custom Source(3/4) switched to FAILED
java.lang.Exception: Error while fetching from broker:
Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
        at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at java.lang.Class.newInstance(Class.java:442)
        at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
        at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
        at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)

        at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
        at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:168)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Error while fetching from broker:
Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
        at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at java.lang.Class.newInstance(Class.java:442)
        at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
        at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
        at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)

        at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:421)


On Fri, Sep 18, 2015 at 2:30 PM, Robert Metzger <rmetz...@apache.org> wrote:

> Hi,
>
> did you manually add a Kafka dependency into your project? Maybe you are
> overwriting the Kafka version to a lower version?
>
> I'm sorry that our consumer is crashing when its supposed to read an
> invalid topic .. but In general, thats a good behavior ;)
>
> Maybe you can check whether the topic exists from your user code?
> The getPartitionsForTopic() method is actually a public static method that
> you can call.
> If its throwing an exception, the topic doesn't exist anymore.
>
>
> Robert
>
> On Fri, Sep 18, 2015 at 2:21 PM, Jakob Ericsson <jakob.erics...@gmail.com>
> wrote:
>
>> Hit another problem. It is probably related to a topic that still exist
>> in zk but is not used anymore (therefore no partitions) or I want to start
>> a listener for a topic that hasn't yet been created. I would like it not to
>> crash.
>>
>> Also, some funny Scala <-> Java
>>
>> Exception in thread "main" java.lang.NoSuchMethodError:
>> kafka.common.ErrorMapping.InvalidTopicCode()S
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:619)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:280)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer081.<init>(FlinkKafkaConsumer081.java:55)
>>
>> On Fri, Sep 18, 2015 at 11:02 AM, Jakob Ericsson <
>> jakob.erics...@gmail.com> wrote:
>>
>>> That will work. We have some utility classes for exposing the ZK-info.
>>>
>>> On Fri, Sep 18, 2015 at 10:50 AM, Robert Metzger <rmetz...@apache.org>
>>> wrote:
>>>
>>>> Hi Jakob,
>>>>
>>>> currently, its not possible to subscribe to multiple topics with one
>>>> FlinkKafkaConsumer.
>>>>
>>>> So for now, you have to create a FKC for each topic .. so you'll end up
>>>> with 50 sources.
>>>>
>>>> As soon as Kafka releases the new consumer, it will support subscribing
>>>> to multiple topics (I think even with pattern support) and we can easily
>>>> expose the APIs also to the FlinkKafkaConsumer.
>>>> As you can see here:
>>>> https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan
>>>> Kafka has plans to release the new consumer API in October.
>>>> As soon as the new API is out, we'll support it.
>>>>
>>>> I hope this solution is okay for you. If not, please let me know ;)
>>>>
>>>>
>>>> Robert
>>>>
>>>> On Fri, Sep 18, 2015 at 10:43 AM, Jakob Ericsson <
>>>> jakob.erics...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Would it be possible to get the FlinkKafkaConsumer to support multiple
>>>>> topics, like a list?
>>>>>
>>>>> Or would it be better to instantiate one FlinkKafkaConsumers per topic
>>>>> and add as a source?
>>>>> We have about 40-50 topics to listen for one job.
>>>>> Or even better, supply a regexp pattern that defines the queues, this
>>>>> means that you have to do some queries against ZK to get information about
>>>>> topics.
>>>>>
>>>>> Kind regards,
>>>>> Jakob
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to