We did some rebalance of topics in our Kafka cluster today. I had a flink job running and it crashed when some of the partitions were moved, other consumers (non flink) continued to work.
Should I configure it differently or could this be a bug? 09/24/2015 15:34:31 Source: Custom Source(3/4) switched to FAILED java.lang.Exception: Error while fetching from broker: Exception for partition 6: kafka.common.UnknownTopicOrPartitionException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at java.lang.Class.newInstance(Class.java:442) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala) at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405) at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:168) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Error while fetching from broker: Exception for partition 6: kafka.common.UnknownTopicOrPartitionException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at java.lang.Class.newInstance(Class.java:442) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala) at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405) at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:421) On Fri, Sep 18, 2015 at 2:30 PM, Robert Metzger <rmetz...@apache.org> wrote: > Hi, > > did you manually add a Kafka dependency into your project? Maybe you are > overwriting the Kafka version to a lower version? > > I'm sorry that our consumer is crashing when its supposed to read an > invalid topic .. but In general, thats a good behavior ;) > > Maybe you can check whether the topic exists from your user code? > The getPartitionsForTopic() method is actually a public static method that > you can call. > If its throwing an exception, the topic doesn't exist anymore. > > > Robert > > On Fri, Sep 18, 2015 at 2:21 PM, Jakob Ericsson <jakob.erics...@gmail.com> > wrote: > >> Hit another problem. It is probably related to a topic that still exist >> in zk but is not used anymore (therefore no partitions) or I want to start >> a listener for a topic that hasn't yet been created. I would like it not to >> crash. >> >> Also, some funny Scala <-> Java >> >> Exception in thread "main" java.lang.NoSuchMethodError: >> kafka.common.ErrorMapping.InvalidTopicCode()S >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:619) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:280) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer081.<init>(FlinkKafkaConsumer081.java:55) >> >> On Fri, Sep 18, 2015 at 11:02 AM, Jakob Ericsson < >> jakob.erics...@gmail.com> wrote: >> >>> That will work. We have some utility classes for exposing the ZK-info. >>> >>> On Fri, Sep 18, 2015 at 10:50 AM, Robert Metzger <rmetz...@apache.org> >>> wrote: >>> >>>> Hi Jakob, >>>> >>>> currently, its not possible to subscribe to multiple topics with one >>>> FlinkKafkaConsumer. >>>> >>>> So for now, you have to create a FKC for each topic .. so you'll end up >>>> with 50 sources. >>>> >>>> As soon as Kafka releases the new consumer, it will support subscribing >>>> to multiple topics (I think even with pattern support) and we can easily >>>> expose the APIs also to the FlinkKafkaConsumer. >>>> As you can see here: >>>> https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan >>>> Kafka has plans to release the new consumer API in October. >>>> As soon as the new API is out, we'll support it. >>>> >>>> I hope this solution is okay for you. If not, please let me know ;) >>>> >>>> >>>> Robert >>>> >>>> On Fri, Sep 18, 2015 at 10:43 AM, Jakob Ericsson < >>>> jakob.erics...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> Would it be possible to get the FlinkKafkaConsumer to support multiple >>>>> topics, like a list? >>>>> >>>>> Or would it be better to instantiate one FlinkKafkaConsumers per topic >>>>> and add as a source? >>>>> We have about 40-50 topics to listen for one job. >>>>> Or even better, supply a regexp pattern that defines the queues, this >>>>> means that you have to do some queries against ZK to get information about >>>>> topics. >>>>> >>>>> Kind regards, >>>>> Jakob >>>>> >>>> >>>> >>> >> >