@Robert: Is it possible to add a "fallback" strategy to the consumer? Something like "if offsets cannot be found, use latest"?
I would make this an optional feature to activate. I would think it is quite surprising to users if records start being skipped in certain situations. But I can see that this would be desirable sometimes. More control over skipping the records could be something to implement in an extended version of the Kafka Consumer. A user could define a policy that, in case consumer falls behind producer more than X (offsets), it starts requesting the latest offsets (rather than the following), thereby skipping a bunch of records. On Sat, Jan 16, 2016 at 3:14 PM, Robert Metzger <rmetz...@apache.org> wrote: > Hi Nick, > > I'm sorry you ran into the issue. Is it possible that Flink's Kafka > consumer falls back in the topic so far that the offsets it's requesting > are invalid? > > For that, the retention time of Kafka has to be pretty short. > > Skipping records under load is something currently not supported by Flink > itself. The only idea I had for handling this would be to give the > DeserializationSchema a call back to request the latest offset from Kafka > to determine the lag. With that, the schema could determine a "dropping > rate" to catch up. > What would you as an application developer expect to handle the situation? > > > Just out of curiosity: What's the throughput you have on the Kafka topic? > > > On Fri, Jan 15, 2016 at 10:13 PM, Nick Dimiduk <ndimi...@gmail.com> wrote: > >> Hi folks, >> >> I have a streaming job that consumes from of a kafka topic. The topic is >> pretty active so the local-mode single worker is obviously not able to keep >> up with the fire-hose. I expect the job to skip records and continue on. >> However, I'm getting an exception from the LegacyFetcher which kills the >> job. This is very much *not* what I want. Any thoughts? The only thing I >> find when I search for this error message is a link back to FLINK-2656. I'm >> running roughly 0.10-release/HEAD. >> >> Thanks a lot, >> Nick >> >> java.lang.Exception: Found invalid offsets more than once in partitions >> [FetchPartition {partition=X, offset=Y}] Exceptions: >> at >> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.RuntimeException: Found invalid offsets more than >> once in partitions [FetchPartition {partition=X, offset=Y}] Exceptions: >> at >> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:412) >> >> >