This goes back to the idea that streaming applications should never go down. I'd much rather consume at max capacity and knowingly drop some portion of the incoming pipe than have the streaming job crash. Of course, once the job itself is robust, I still need the runtime to be robust -- YARN vs (potential) Mesos vs standalone cluster will be my next consideration.
I can share some details about my setup, but not at this time; in part because I don't have my metrics available at the moment and in part because this is a public, archived list. On Sat, Jan 16, 2016 at 8:23 AM, Stephan Ewen <se...@apache.org> wrote: > @Robert: Is it possible to add a "fallback" strategy to the consumer? > Something like "if offsets cannot be found, use latest"? > > I would make this an optional feature to activate. I would think it is > quite surprising to users if records start being skipped in certain > situations. But I can see that this would be desirable sometimes. > > More control over skipping the records could be something to implement in > an extended version of the Kafka Consumer. A user could define a policy > that, in case consumer falls behind producer more than X (offsets), it > starts requesting the latest offsets (rather than the following), thereby > skipping a bunch of records. > > > > On Sat, Jan 16, 2016 at 3:14 PM, Robert Metzger <rmetz...@apache.org> > wrote: > >> Hi Nick, >> >> I'm sorry you ran into the issue. Is it possible that Flink's Kafka >> consumer falls back in the topic so far that the offsets it's requesting >> are invalid? >> >> For that, the retention time of Kafka has to be pretty short. >> >> Skipping records under load is something currently not supported by Flink >> itself. The only idea I had for handling this would be to give the >> DeserializationSchema a call back to request the latest offset from Kafka >> to determine the lag. With that, the schema could determine a "dropping >> rate" to catch up. >> What would you as an application developer expect to handle the situation? >> >> >> Just out of curiosity: What's the throughput you have on the Kafka topic? >> >> >> On Fri, Jan 15, 2016 at 10:13 PM, Nick Dimiduk <ndimi...@gmail.com> >> wrote: >> >>> Hi folks, >>> >>> I have a streaming job that consumes from of a kafka topic. The topic is >>> pretty active so the local-mode single worker is obviously not able to keep >>> up with the fire-hose. I expect the job to skip records and continue on. >>> However, I'm getting an exception from the LegacyFetcher which kills the >>> job. This is very much *not* what I want. Any thoughts? The only thing I >>> find when I search for this error message is a link back to FLINK-2656. I'm >>> running roughly 0.10-release/HEAD. >>> >>> Thanks a lot, >>> Nick >>> >>> java.lang.Exception: Found invalid offsets more than once in partitions >>> [FetchPartition {partition=X, offset=Y}] Exceptions: >>> at >>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399) >>> at >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >>> at >>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>> at java.lang.Thread.run(Thread.java:745) >>> Caused by: java.lang.RuntimeException: Found invalid offsets more than >>> once in partitions [FetchPartition {partition=X, offset=Y}] Exceptions: >>> at >>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:412) >>> >>> >> >