Hi Nick,

I'm sorry you ran into the issue. Is it possible that Flink's Kafka
consumer falls back in the topic so far that the offsets it's requesting
are invalid?

For that, the retention time of Kafka has to be pretty short.

Skipping records under load is something currently not supported by Flink
itself. The only idea I had for handling this would be to give the
DeserializationSchema a call back to request the latest offset from Kafka
to determine the lag. With that, the schema could determine a "dropping
rate" to catch up.
What would you as an application developer expect to handle the situation?


Just out of curiosity: What's the throughput you have on the Kafka topic?


On Fri, Jan 15, 2016 at 10:13 PM, Nick Dimiduk <ndimi...@gmail.com> wrote:

> Hi folks,
>
> I have a streaming job that consumes from of a kafka topic. The topic is
> pretty active so the local-mode single worker is obviously not able to keep
> up with the fire-hose. I expect the job to skip records and continue on.
> However, I'm getting an exception from the LegacyFetcher which kills the
> job. This is very much *not* what I want. Any thoughts? The only thing I
> find when I search for this error message is a link back to FLINK-2656. I'm
> running roughly 0.10-release/HEAD.
>
> Thanks a lot,
> Nick
>
> java.lang.Exception: Found invalid offsets more than once in partitions
> [FetchPartition {partition=X, offset=Y}] Exceptions:
>         at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399)
>         at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>         at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Found invalid offsets more than
> once in partitions [FetchPartition {partition=X, offset=Y}] Exceptions:
>         at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:412)
>
>

Reply via email to