Fabian,

Thanks for your feedback - very helpful as usual !

This is sort of becoming a huge problem for us right now because of our Kafka 
situation. For some reason I missed this detail going through the docs. We are 
definitely seeing heavy dose of data loss when Kafka timeouts are happening. 

We actually have 1.4 version - I’d be interested to understand if anything can 
be done in 1.4 to prevent this scenario.

One other thought I had was an ability to invoke “Checkpointing before Restart 
/ Recovery” -> meaning I don’t necessarily need to checkpoint periodically but 
I do want to make sure on a explicit restart / rescheduling like this, we do 
have a decent “last known” state. Not sure if this is currently doable.

Thanks, Ashish

> On Jan 23, 2018, at 5:03 AM, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Ashish,
> 
> Originally, Flink always performed full recovery in case of a failure, i.e., 
> it restarted the complete application.
> There is some ongoing work to improve this and make recovery more 
> fine-grained (FLIP-1 [1]). 
> Some parts have been added for 1.3.0.
> 
> I'm not familiar with the details, but Stefan (in CC) should be able to 
> answer your specific question.
> 
> Best, Fabian
> 
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures
>  
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+:+Fine+Grained+Recovery+from+Task+Failures>
> 
> 2018-01-19 20:59 GMT+01:00 ashish pok <ashish...@yahoo.com 
> <mailto:ashish...@yahoo.com>>:
> Team,
> 
> One more question to the community regarding hardening Flink Apps.
> 
> Let me start off by saying we do have known Kafka bottlenecks which we are in 
> the midst of resolving. So during certain times of day, a lot of our Flink 
> Apps are seeing Kafka Producer timeout issues. Most of the logs are some 
> flavor of this:
> 
> java.lang.Exception: Failed to send data to Kafka: Expiring 28 record(s) for 
> dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus linger time
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
>       at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
>       at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>       at org.apache.flink.streaming.runtime.io 
> <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
>       at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 28 
> record(s) for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation 
> plus linger time
> 
> Timeouts are not necessarily good but I am sure we understand this is bound 
> to happen (hopefully lesser). 
> 
> The issue for us however is it almost looks like Flink is stopping and 
> restarting all operators (a lot of other operators including Map, Reduce and 
> Process functions if not all) along with Kafka Producers. We are processing 
> pretty substantial load in Flink and dont really intend to enable Rocks/HDFS 
> checkpointing in some of these Apps - we are ok to sustain some data loss 
> when App crashes completely or something along those lines. However, what we 
> are noticing here is all the data that are in memory for sliding window 
> functions are also lost completely because of this. I would have thought 
> because of the retry settings in Kafka Producer, even those 28 events in 
> queue should have been recovered let alone over a million events in Memory 
> State waiting to be Folded/Reduced for the sliding window. This doesnt feel 
> right :) 
> 
> Is only way to solve this is by creating Rocks/HDFS checkpoint? Why would 
> almost all Job Graph restart on an operator timeout? Do I need to do 
> something simple like disable Operator chaining? We really really are trying 
> to just use Memory and not any other state for these heavy hitting streams. 
> 
> Thanks for your help,
> 
> Ashish
> 

Reply via email to