What I actually meant was partition reassignment ( https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool ). No topics were deleted. We added a bunch of new servers and needed to reassign some partitions to spread the load.
No, I haven't set the setNumberOfExecutionRetries(). On Thu, Sep 24, 2015 at 10:06 PM, Robert Metzger <rmetz...@apache.org> wrote: > Hi Jakob, > > what do you exactly mean by rebalance of topics? Did the leader of the > partitions change? > Were topics deleted? > > Flink's KafkaConsumer does not try to recover from these exceptions. We > rely on Flink's fault tolerance mechanisms to restart the data consumption > (from the last valid offset). > Do you have set the setNumberOfExecutionRetries() on the ExecutionConfig? > > > On Thu, Sep 24, 2015 at 9:57 PM, Jakob Ericsson <jakob.erics...@gmail.com> > wrote: > >> We did some rebalance of topics in our Kafka cluster today. I had a flink >> job running and it crashed when some of the partitions were moved, other >> consumers (non flink) continued to work. >> >> Should I configure it differently or could this be a bug? >> >> 09/24/2015 15:34:31 Source: Custom Source(3/4) switched to FAILED >> java.lang.Exception: Error while fetching from broker: >> Exception for partition 6: kafka.common.UnknownTopicOrPartitionException >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> Method) >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:422) >> at java.lang.Class.newInstance(Class.java:442) >> at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) >> at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala) >> at >> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405) >> >> at >> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:168) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.io.IOException: Error while fetching from broker: >> Exception for partition 6: kafka.common.UnknownTopicOrPartitionException >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> Method) >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:422) >> at java.lang.Class.newInstance(Class.java:442) >> at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) >> at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala) >> at >> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405) >> >> at >> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:421) >> >> >> On Fri, Sep 18, 2015 at 2:30 PM, Robert Metzger <rmetz...@apache.org> >> wrote: >> >>> Hi, >>> >>> did you manually add a Kafka dependency into your project? Maybe you are >>> overwriting the Kafka version to a lower version? >>> >>> I'm sorry that our consumer is crashing when its supposed to read an >>> invalid topic .. but In general, thats a good behavior ;) >>> >>> Maybe you can check whether the topic exists from your user code? >>> The getPartitionsForTopic() method is actually a public static method that >>> you can call. >>> If its throwing an exception, the topic doesn't exist anymore. >>> >>> >>> Robert >>> >>> On Fri, Sep 18, 2015 at 2:21 PM, Jakob Ericsson < >>> jakob.erics...@gmail.com> wrote: >>> >>>> Hit another problem. It is probably related to a topic that still exist >>>> in zk but is not used anymore (therefore no partitions) or I want to start >>>> a listener for a topic that hasn't yet been created. I would like it not to >>>> crash. >>>> >>>> Also, some funny Scala <-> Java >>>> >>>> Exception in thread "main" java.lang.NoSuchMethodError: >>>> kafka.common.ErrorMapping.InvalidTopicCode()S >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:619) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:280) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer081.<init>(FlinkKafkaConsumer081.java:55) >>>> >>>> On Fri, Sep 18, 2015 at 11:02 AM, Jakob Ericsson < >>>> jakob.erics...@gmail.com> wrote: >>>> >>>>> That will work. We have some utility classes for exposing the ZK-info. >>>>> >>>>> On Fri, Sep 18, 2015 at 10:50 AM, Robert Metzger <rmetz...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi Jakob, >>>>>> >>>>>> currently, its not possible to subscribe to multiple topics with one >>>>>> FlinkKafkaConsumer. >>>>>> >>>>>> So for now, you have to create a FKC for each topic .. so you'll end >>>>>> up with 50 sources. >>>>>> >>>>>> As soon as Kafka releases the new consumer, it will support >>>>>> subscribing to multiple topics (I think even with pattern support) and we >>>>>> can easily expose the APIs also to the FlinkKafkaConsumer. >>>>>> As you can see here: >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan >>>>>> Kafka has plans to release the new consumer API in October. >>>>>> As soon as the new API is out, we'll support it. >>>>>> >>>>>> I hope this solution is okay for you. If not, please let me know ;) >>>>>> >>>>>> >>>>>> Robert >>>>>> >>>>>> On Fri, Sep 18, 2015 at 10:43 AM, Jakob Ericsson < >>>>>> jakob.erics...@gmail.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Would it be possible to get the FlinkKafkaConsumer to support >>>>>>> multiple topics, like a list? >>>>>>> >>>>>>> Or would it be better to instantiate one FlinkKafkaConsumers per >>>>>>> topic and add as a source? >>>>>>> We have about 40-50 topics to listen for one job. >>>>>>> Or even better, supply a regexp pattern that defines the queues, >>>>>>> this means that you have to do some queries against ZK to get >>>>>>> information >>>>>>> about topics. >>>>>>> >>>>>>> Kind regards, >>>>>>> Jakob >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >