Fabian, Thanks for your feedback - very helpful as usual !
This is sort of becoming a huge problem for us right now because of our Kafka situation. For some reason I missed this detail going through the docs. We are definitely seeing heavy dose of data loss when Kafka timeouts are happening. We actually have 1.4 version - I’d be interested to understand if anything can be done in 1.4 to prevent this scenario. One other thought I had was an ability to invoke “Checkpointing before Restart / Recovery” -> meaning I don’t necessarily need to checkpoint periodically but I do want to make sure on a explicit restart / rescheduling like this, we do have a decent “last known” state. Not sure if this is currently doable. Thanks, Ashish > On Jan 23, 2018, at 5:03 AM, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi Ashish, > > Originally, Flink always performed full recovery in case of a failure, i.e., > it restarted the complete application. > There is some ongoing work to improve this and make recovery more > fine-grained (FLIP-1 [1]). > Some parts have been added for 1.3.0. > > I'm not familiar with the details, but Stefan (in CC) should be able to > answer your specific question. > > Best, Fabian > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures > > <https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+:+Fine+Grained+Recovery+from+Task+Failures> > > 2018-01-19 20:59 GMT+01:00 ashish pok <ashish...@yahoo.com > <mailto:ashish...@yahoo.com>>: > Team, > > One more question to the community regarding hardening Flink Apps. > > Let me start off by saying we do have known Kafka bottlenecks which we are in > the midst of resolving. So during certain times of day, a lot of our Flink > Apps are seeing Kafka Producer timeout issues. Most of the logs are some > flavor of this: > > java.lang.Exception: Failed to send data to Kafka: Expiring 28 record(s) for > dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus linger time > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at org.apache.flink.streaming.runtime.io > <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:207) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 28 > record(s) for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation > plus linger time > > Timeouts are not necessarily good but I am sure we understand this is bound > to happen (hopefully lesser). > > The issue for us however is it almost looks like Flink is stopping and > restarting all operators (a lot of other operators including Map, Reduce and > Process functions if not all) along with Kafka Producers. We are processing > pretty substantial load in Flink and dont really intend to enable Rocks/HDFS > checkpointing in some of these Apps - we are ok to sustain some data loss > when App crashes completely or something along those lines. However, what we > are noticing here is all the data that are in memory for sliding window > functions are also lost completely because of this. I would have thought > because of the retry settings in Kafka Producer, even those 28 events in > queue should have been recovered let alone over a million events in Memory > State waiting to be Folded/Reduced for the sliding window. This doesnt feel > right :) > > Is only way to solve this is by creating Rocks/HDFS checkpoint? Why would > almost all Job Graph restart on an operator timeout? Do I need to do > something simple like disable Operator chaining? We really really are trying > to just use Memory and not any other state for these heavy hitting streams. > > Thanks for your help, > > Ashish >