@Robert: Is it possible to add a "fallback" strategy to the consumer?
Something like "if offsets cannot be found, use latest"?

I would make this an optional feature to activate. I would think it is
quite surprising to users if records start being skipped in certain
situations. But I can see that this would be desirable sometimes.

More control over skipping the records could be something to implement in
an extended version of the Kafka Consumer. A user could define a policy
that, in case consumer falls behind producer more than X (offsets), it
starts requesting the latest offsets (rather than the following), thereby
skipping a bunch of records.



On Sat, Jan 16, 2016 at 3:14 PM, Robert Metzger <rmetz...@apache.org> wrote:

> Hi Nick,
>
> I'm sorry you ran into the issue. Is it possible that Flink's Kafka
> consumer falls back in the topic so far that the offsets it's requesting
> are invalid?
>
> For that, the retention time of Kafka has to be pretty short.
>
> Skipping records under load is something currently not supported by Flink
> itself. The only idea I had for handling this would be to give the
> DeserializationSchema a call back to request the latest offset from Kafka
> to determine the lag. With that, the schema could determine a "dropping
> rate" to catch up.
> What would you as an application developer expect to handle the situation?
>
>
> Just out of curiosity: What's the throughput you have on the Kafka topic?
>
>
> On Fri, Jan 15, 2016 at 10:13 PM, Nick Dimiduk <ndimi...@gmail.com> wrote:
>
>> Hi folks,
>>
>> I have a streaming job that consumes from of a kafka topic. The topic is
>> pretty active so the local-mode single worker is obviously not able to keep
>> up with the fire-hose. I expect the job to skip records and continue on.
>> However, I'm getting an exception from the LegacyFetcher which kills the
>> job. This is very much *not* what I want. Any thoughts? The only thing I
>> find when I search for this error message is a link back to FLINK-2656. I'm
>> running roughly 0.10-release/HEAD.
>>
>> Thanks a lot,
>> Nick
>>
>> java.lang.Exception: Found invalid offsets more than once in partitions
>> [FetchPartition {partition=X, offset=Y}] Exceptions:
>>         at
>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>>         at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399)
>>         at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>         at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Found invalid offsets more than
>> once in partitions [FetchPartition {partition=X, offset=Y}] Exceptions:
>>         at
>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:412)
>>
>>
>

Reply via email to