Try setting the Kafka producer config option for number of retries ("retries") to a large number, to avoid the timeout. It defaults to zero. Do note that retries may result reordered records.
On Wed, Jan 24, 2018 at 7:07 PM, Ashish Pokharel <ashish...@yahoo.com> wrote: > Fabian, > > Thanks for your feedback - very helpful as usual ! > > This is sort of becoming a huge problem for us right now because of our > Kafka situation. For some reason I missed this detail going through the > docs. We are definitely seeing heavy dose of data loss when Kafka timeouts > are happening. > > We actually have 1.4 version - I’d be interested to understand if anything > can be done in 1.4 to prevent this scenario. > > One other thought I had was an ability to invoke “Checkpointing before > Restart / Recovery” -> meaning I don’t necessarily need to checkpoint > periodically but I do want to make sure on a explicit restart / > rescheduling like this, we do have a decent “last known” state. Not sure if > this is currently doable. > > Thanks, Ashish > > On Jan 23, 2018, at 5:03 AM, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi Ashish, > > Originally, Flink always performed full recovery in case of a failure, > i.e., it restarted the complete application. > There is some ongoing work to improve this and make recovery more > fine-grained (FLIP-1 [1]). > Some parts have been added for 1.3.0. > > I'm not familiar with the details, but Stefan (in CC) should be able to > answer your specific question. > > Best, Fabian > > [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP- > 1+%3A+Fine+Grained+Recovery+from+Task+Failures > <https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+:+Fine+Grained+Recovery+from+Task+Failures> > > 2018-01-19 20:59 GMT+01:00 ashish pok <ashish...@yahoo.com>: > >> Team, >> >> One more question to the community regarding hardening Flink Apps. >> >> Let me start off by saying we do have known Kafka bottlenecks which we >> are in the midst of resolving. So during certain times of day, a lot of our >> Flink Apps are seeing Kafka Producer timeout issues. Most of the logs are >> some flavor of this: >> >> java.lang.Exception: Failed to send data to Kafka: Expiring 28 record(s) >> for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus >> linger time >> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc >> erBase.checkErroneous(FlinkKafkaProducerBase.java:373) >> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc >> er010.invokeInternal(FlinkKafkaProducer010.java:302) >> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc >> er010.processElement(FlinkKafkaProducer010.java:421) >> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >> ngChainingOutput.pushToOperator(OperatorChain.java:549) >> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >> ngChainingOutput.collect(OperatorChain.java:524) >> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >> ngChainingOutput.collect(OperatorChain.java:504) >> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >> tor$CountingOutput.collect(AbstractStreamOperator.java:831) >> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >> tor$CountingOutput.collect(AbstractStreamOperator.java:809) >> at org.apache.flink.streaming.api.operators.StreamMap.processEl >> ement(StreamMap.java:41) >> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >> ngChainingOutput.pushToOperator(OperatorChain.java:549) >> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >> ngChainingOutput.collect(OperatorChain.java:524) >> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >> ngChainingOutput.collect(OperatorChain.java:504) >> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >> tor$CountingOutput.collect(AbstractStreamOperator.java:831) >> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >> tor$CountingOutput.collect(AbstractStreamOperator.java:809) >> at org.apache.flink.streaming.api.operators.StreamMap.processEl >> ement(StreamMap.java:41) >> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p >> rocessInput(StreamInputProcessor.java:207) >> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask. >> run(OneInputStreamTask.java:69) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >> StreamTask.java:264) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 28 >> record(s) for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation >> plus linger time >> >> Timeouts are not necessarily good but I am sure we understand this is >> bound to happen (hopefully lesser). >> >> The issue for us however is it almost looks like Flink is stopping and >> restarting all operators (a lot of other operators including Map, Reduce >> and Process functions if not all) along with Kafka Producers. We are >> processing pretty substantial load in Flink and dont really intend to >> enable Rocks/HDFS checkpointing in some of these Apps - we are ok to >> sustain some data loss when App crashes completely or something along those >> lines. However, what we are noticing here is all the data that are in >> memory for sliding window functions are also lost completely because of >> this. I would have thought because of the retry settings in Kafka Producer, >> even those 28 events in queue should have been recovered let alone over a >> million events in Memory State waiting to be Folded/Reduced for the sliding >> window. This doesnt feel right :) >> >> Is only way to solve this is by creating Rocks/HDFS checkpoint? Why would >> almost all Job Graph restart on an operator timeout? Do I need to do >> something simple like disable Operator chaining? We really really are >> trying to just use Memory and not any other state for these heavy hitting >> streams. >> >> Thanks for your help, >> >> Ashish >> > > >