On Sunday, January 17, 2016, Stephan Ewen <se...@apache.org> wrote: > I agree, real time streams should never go down. >
Glad to hear that :) > [snip] Both should be supported. > Agreed. > Since we interpret streaming very broadly (also including analysis of > historic streams or timely data), the "backpressure/catch-up" mode seemed > natural as the first one to implement. > Indeed, this is what my job is doing. I have set it to, lacking a valid offset, start from the beginning. I have to presume that in my case the stream data is expiring faster than my consumers can keep up. However I haven't investigated proper monitoring yet. > The "load shedding" variant can probably even be realized in the Kafka > consumer, without complex modifications to the core Flink runtime itself. > I agree here as well. Indeed, this exception is being thrown from the consumer, not the runtime. > On Sun, Jan 17, 2016 at 12:42 AM, Nick Dimiduk <ndimi...@gmail.com > <javascript:_e(%7B%7D,'cvml','ndimi...@gmail.com');>> wrote: > >> This goes back to the idea that streaming applications should never go >> down. I'd much rather consume at max capacity and knowingly drop some >> portion of the incoming pipe than have the streaming job crash. Of course, >> once the job itself is robust, I still need the runtime to be robust -- >> YARN vs (potential) Mesos vs standalone cluster will be my next >> consideration. >> >> I can share some details about my setup, but not at this time; in part >> because I don't have my metrics available at the moment and in part because >> this is a public, archived list. >> >> On Sat, Jan 16, 2016 at 8:23 AM, Stephan Ewen <se...@apache.org >> <javascript:_e(%7B%7D,'cvml','se...@apache.org');>> wrote: >> >>> @Robert: Is it possible to add a "fallback" strategy to the consumer? >>> Something like "if offsets cannot be found, use latest"? >>> >>> I would make this an optional feature to activate. I would think it is >>> quite surprising to users if records start being skipped in certain >>> situations. But I can see that this would be desirable sometimes. >>> >>> More control over skipping the records could be something to implement >>> in an extended version of the Kafka Consumer. A user could define a policy >>> that, in case consumer falls behind producer more than X (offsets), it >>> starts requesting the latest offsets (rather than the following), thereby >>> skipping a bunch of records. >>> >>> >>> >>> On Sat, Jan 16, 2016 at 3:14 PM, Robert Metzger <rmetz...@apache.org >>> <javascript:_e(%7B%7D,'cvml','rmetz...@apache.org');>> wrote: >>> >>>> Hi Nick, >>>> >>>> I'm sorry you ran into the issue. Is it possible that Flink's Kafka >>>> consumer falls back in the topic so far that the offsets it's requesting >>>> are invalid? >>>> >>>> For that, the retention time of Kafka has to be pretty short. >>>> >>>> Skipping records under load is something currently not supported by >>>> Flink itself. The only idea I had for handling this would be to give the >>>> DeserializationSchema a call back to request the latest offset from Kafka >>>> to determine the lag. With that, the schema could determine a "dropping >>>> rate" to catch up. >>>> What would you as an application developer expect to handle the >>>> situation? >>>> >>>> >>>> Just out of curiosity: What's the throughput you have on the Kafka >>>> topic? >>>> >>>> >>>> On Fri, Jan 15, 2016 at 10:13 PM, Nick Dimiduk <ndimi...@gmail.com >>>> <javascript:_e(%7B%7D,'cvml','ndimi...@gmail.com');>> wrote: >>>> >>>>> Hi folks, >>>>> >>>>> I have a streaming job that consumes from of a kafka topic. The topic >>>>> is pretty active so the local-mode single worker is obviously not able to >>>>> keep up with the fire-hose. I expect the job to skip records and continue >>>>> on. However, I'm getting an exception from the LegacyFetcher which kills >>>>> the job. This is very much *not* what I want. Any thoughts? The only thing >>>>> I find when I search for this error message is a link back to FLINK-2656. >>>>> I'm running roughly 0.10-release/HEAD. >>>>> >>>>> Thanks a lot, >>>>> Nick >>>>> >>>>> java.lang.Exception: Found invalid offsets more than once in >>>>> partitions [FetchPartition {partition=X, offset=Y}] Exceptions: >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399) >>>>> at >>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> Caused by: java.lang.RuntimeException: Found invalid offsets more than >>>>> once in partitions [FetchPartition {partition=X, offset=Y}] Exceptions: >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:412) >>>>> >>>>> >>>> >>> >> >