On Sunday, January 17, 2016, Stephan Ewen <se...@apache.org> wrote:

> I agree, real time streams should never go down.
>

 Glad to hear that :)


> [snip] Both should be supported.
>

Agreed.


> Since we interpret streaming very broadly (also including analysis of
> historic streams or timely data), the "backpressure/catch-up" mode seemed
> natural as the first one to implement.
>

Indeed, this is what my job is doing. I have set it to, lacking a valid
offset, start from the beginning. I have to presume that in my case the
stream data is expiring faster than my consumers can keep up. However I
haven't investigated proper monitoring yet.


> The "load shedding" variant can probably even be realized in the Kafka
> consumer, without complex modifications to the core Flink runtime itself.
>

I agree here as well. Indeed, this exception is being thrown from the
consumer, not the runtime.


> On Sun, Jan 17, 2016 at 12:42 AM, Nick Dimiduk <ndimi...@gmail.com
> <javascript:_e(%7B%7D,'cvml','ndimi...@gmail.com');>> wrote:
>
>> This goes back to the idea that streaming applications should never go
>> down. I'd much rather consume at max capacity and knowingly drop some
>> portion of the incoming pipe than have the streaming job crash. Of course,
>> once the job itself is robust, I still need the runtime to be robust --
>> YARN vs (potential) Mesos vs standalone cluster will be my next
>> consideration.
>>
>> I can share some details about my setup, but not at this time; in part
>> because I don't have my metrics available at the moment and in part because
>> this is a public, archived list.
>>
>> On Sat, Jan 16, 2016 at 8:23 AM, Stephan Ewen <se...@apache.org
>> <javascript:_e(%7B%7D,'cvml','se...@apache.org');>> wrote:
>>
>>> @Robert: Is it possible to add a "fallback" strategy to the consumer?
>>> Something like "if offsets cannot be found, use latest"?
>>>
>>> I would make this an optional feature to activate. I would think it is
>>> quite surprising to users if records start being skipped in certain
>>> situations. But I can see that this would be desirable sometimes.
>>>
>>> More control over skipping the records could be something to implement
>>> in an extended version of the Kafka Consumer. A user could define a policy
>>> that, in case consumer falls behind producer more than X (offsets), it
>>> starts requesting the latest offsets (rather than the following), thereby
>>> skipping a bunch of records.
>>>
>>>
>>>
>>> On Sat, Jan 16, 2016 at 3:14 PM, Robert Metzger <rmetz...@apache.org
>>> <javascript:_e(%7B%7D,'cvml','rmetz...@apache.org');>> wrote:
>>>
>>>> Hi Nick,
>>>>
>>>> I'm sorry you ran into the issue. Is it possible that Flink's Kafka
>>>> consumer falls back in the topic so far that the offsets it's requesting
>>>> are invalid?
>>>>
>>>> For that, the retention time of Kafka has to be pretty short.
>>>>
>>>> Skipping records under load is something currently not supported by
>>>> Flink itself. The only idea I had for handling this would be to give the
>>>> DeserializationSchema a call back to request the latest offset from Kafka
>>>> to determine the lag. With that, the schema could determine a "dropping
>>>> rate" to catch up.
>>>> What would you as an application developer expect to handle the
>>>> situation?
>>>>
>>>>
>>>> Just out of curiosity: What's the throughput you have on the Kafka
>>>> topic?
>>>>
>>>>
>>>> On Fri, Jan 15, 2016 at 10:13 PM, Nick Dimiduk <ndimi...@gmail.com
>>>> <javascript:_e(%7B%7D,'cvml','ndimi...@gmail.com');>> wrote:
>>>>
>>>>> Hi folks,
>>>>>
>>>>> I have a streaming job that consumes from of a kafka topic. The topic
>>>>> is pretty active so the local-mode single worker is obviously not able to
>>>>> keep up with the fire-hose. I expect the job to skip records and continue
>>>>> on. However, I'm getting an exception from the LegacyFetcher which kills
>>>>> the job. This is very much *not* what I want. Any thoughts? The only thing
>>>>> I find when I search for this error message is a link back to FLINK-2656.
>>>>> I'm running roughly 0.10-release/HEAD.
>>>>>
>>>>> Thanks a lot,
>>>>> Nick
>>>>>
>>>>> java.lang.Exception: Found invalid offsets more than once in
>>>>> partitions [FetchPartition {partition=X, offset=Y}] Exceptions:
>>>>>         at
>>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>>>>>         at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399)
>>>>>         at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.lang.RuntimeException: Found invalid offsets more than
>>>>> once in partitions [FetchPartition {partition=X, offset=Y}] Exceptions:
>>>>>         at
>>>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:412)
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to