Hi Ashish, Originally, Flink always performed full recovery in case of a failure, i.e., it restarted the complete application. There is some ongoing work to improve this and make recovery more fine-grained (FLIP-1 [1]). Some parts have been added for 1.3.0.
I'm not familiar with the details, but Stefan (in CC) should be able to answer your specific question. Best, Fabian [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures 2018-01-19 20:59 GMT+01:00 ashish pok <ashish...@yahoo.com>: > Team, > > One more question to the community regarding hardening Flink Apps. > > Let me start off by saying we do have known Kafka bottlenecks which we are > in the midst of resolving. So during certain times of day, a lot of our > Flink Apps are seeing Kafka Producer timeout issues. Most of the logs are > some flavor of this: > > java.lang.Exception: Failed to send data to Kafka: Expiring 28 record(s) > for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus > linger time > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase. > checkErroneous(FlinkKafkaProducerBase.java:373) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010. > invokeInternal(FlinkKafkaProducer010.java:302) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010. > processElement(FlinkKafkaProducer010.java:421) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain.java:549) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain.java:524) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain.java:504) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator.java:831) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator.java:809) > at org.apache.flink.streaming.api.operators.StreamMap. > processElement(StreamMap.java:41) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.pushToOperator(OperatorChain.java:549) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain.java:524) > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain.java:504) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator.java:831) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator.java:809) > at org.apache.flink.streaming.api.operators.StreamMap. > processElement(StreamMap.java:41) > at org.apache.flink.streaming.runtime.io.StreamInputProcessor. > processInput(StreamInputProcessor.java:207) > at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run( > OneInputStreamTask.java:69) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:264) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 28 > record(s) for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation > plus linger time > > Timeouts are not necessarily good but I am sure we understand this is > bound to happen (hopefully lesser). > > The issue for us however is it almost looks like Flink is stopping and > restarting all operators (a lot of other operators including Map, Reduce > and Process functions if not all) along with Kafka Producers. We are > processing pretty substantial load in Flink and dont really intend to > enable Rocks/HDFS checkpointing in some of these Apps - we are ok to > sustain some data loss when App crashes completely or something along those > lines. However, what we are noticing here is all the data that are in > memory for sliding window functions are also lost completely because of > this. I would have thought because of the retry settings in Kafka Producer, > even those 28 events in queue should have been recovered let alone over a > million events in Memory State waiting to be Folded/Reduced for the sliding > window. This doesnt feel right :) > > Is only way to solve this is by creating Rocks/HDFS checkpoint? Why would > almost all Job Graph restart on an operator timeout? Do I need to do > something simple like disable Operator chaining? We really really are > trying to just use Memory and not any other state for these heavy hitting > streams. > > Thanks for your help, > > Ashish >