What I actually meant was partition reassignment (
https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool
).
No topics were deleted.
We added a bunch of new servers and needed to reassign some partitions to
spread the load.

No, I haven't set the setNumberOfExecutionRetries().

On Thu, Sep 24, 2015 at 10:06 PM, Robert Metzger <rmetz...@apache.org>
wrote:

> Hi Jakob,
>
> what do you exactly mean by rebalance of topics? Did the leader of the
> partitions change?
> Were topics deleted?
>
> Flink's KafkaConsumer does not try to recover from these exceptions. We
> rely on Flink's fault tolerance mechanisms to restart the data consumption
> (from the last valid offset).
> Do you have set the setNumberOfExecutionRetries() on the ExecutionConfig?
>
>
> On Thu, Sep 24, 2015 at 9:57 PM, Jakob Ericsson <jakob.erics...@gmail.com>
> wrote:
>
>> We did some rebalance of topics in our Kafka cluster today. I had a flink
>> job running and it crashed when some of the partitions were moved, other
>> consumers (non flink) continued to work.
>>
>> Should I configure it differently or could this be a bug?
>>
>> 09/24/2015 15:34:31     Source: Custom Source(3/4) switched to FAILED
>> java.lang.Exception: Error while fetching from broker:
>> Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
>>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>>         at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>         at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>>         at java.lang.Class.newInstance(Class.java:442)
>>         at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
>>         at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
>>         at
>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)
>>
>>         at
>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>>         at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
>>         at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>         at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:168)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.IOException: Error while fetching from broker:
>> Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
>>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>>         at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>         at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>>         at java.lang.Class.newInstance(Class.java:442)
>>         at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
>>         at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
>>         at
>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)
>>
>>         at
>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:421)
>>
>>
>> On Fri, Sep 18, 2015 at 2:30 PM, Robert Metzger <rmetz...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> did you manually add a Kafka dependency into your project? Maybe you are
>>> overwriting the Kafka version to a lower version?
>>>
>>> I'm sorry that our consumer is crashing when its supposed to read an
>>> invalid topic .. but In general, thats a good behavior ;)
>>>
>>> Maybe you can check whether the topic exists from your user code?
>>> The getPartitionsForTopic() method is actually a public static method that
>>> you can call.
>>> If its throwing an exception, the topic doesn't exist anymore.
>>>
>>>
>>> Robert
>>>
>>> On Fri, Sep 18, 2015 at 2:21 PM, Jakob Ericsson <
>>> jakob.erics...@gmail.com> wrote:
>>>
>>>> Hit another problem. It is probably related to a topic that still exist
>>>> in zk but is not used anymore (therefore no partitions) or I want to start
>>>> a listener for a topic that hasn't yet been created. I would like it not to
>>>> crash.
>>>>
>>>> Also, some funny Scala <-> Java
>>>>
>>>> Exception in thread "main" java.lang.NoSuchMethodError:
>>>> kafka.common.ErrorMapping.InvalidTopicCode()S
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:619)
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:280)
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer081.<init>(FlinkKafkaConsumer081.java:55)
>>>>
>>>> On Fri, Sep 18, 2015 at 11:02 AM, Jakob Ericsson <
>>>> jakob.erics...@gmail.com> wrote:
>>>>
>>>>> That will work. We have some utility classes for exposing the ZK-info.
>>>>>
>>>>> On Fri, Sep 18, 2015 at 10:50 AM, Robert Metzger <rmetz...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Jakob,
>>>>>>
>>>>>> currently, its not possible to subscribe to multiple topics with one
>>>>>> FlinkKafkaConsumer.
>>>>>>
>>>>>> So for now, you have to create a FKC for each topic .. so you'll end
>>>>>> up with 50 sources.
>>>>>>
>>>>>> As soon as Kafka releases the new consumer, it will support
>>>>>> subscribing to multiple topics (I think even with pattern support) and we
>>>>>> can easily expose the APIs also to the FlinkKafkaConsumer.
>>>>>> As you can see here:
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan
>>>>>> Kafka has plans to release the new consumer API in October.
>>>>>> As soon as the new API is out, we'll support it.
>>>>>>
>>>>>> I hope this solution is okay for you. If not, please let me know ;)
>>>>>>
>>>>>>
>>>>>> Robert
>>>>>>
>>>>>> On Fri, Sep 18, 2015 at 10:43 AM, Jakob Ericsson <
>>>>>> jakob.erics...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Would it be possible to get the FlinkKafkaConsumer to support
>>>>>>> multiple topics, like a list?
>>>>>>>
>>>>>>> Or would it be better to instantiate one FlinkKafkaConsumers per
>>>>>>> topic and add as a source?
>>>>>>> We have about 40-50 topics to listen for one job.
>>>>>>> Or even better, supply a regexp pattern that defines the queues,
>>>>>>> this means that you have to do some queries against ZK to get 
>>>>>>> information
>>>>>>> about topics.
>>>>>>>
>>>>>>> Kind regards,
>>>>>>> Jakob
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to