Try setting the Kafka producer config option for number of retries
("retries") to a large number, to avoid the timeout.  It defaults to zero.
Do note that retries may result reordered records.

On Wed, Jan 24, 2018 at 7:07 PM, Ashish Pokharel <ashish...@yahoo.com>
wrote:

> Fabian,
>
> Thanks for your feedback - very helpful as usual !
>
> This is sort of becoming a huge problem for us right now because of our
> Kafka situation. For some reason I missed this detail going through the
> docs. We are definitely seeing heavy dose of data loss when Kafka timeouts
> are happening.
>
> We actually have 1.4 version - I’d be interested to understand if anything
> can be done in 1.4 to prevent this scenario.
>
> One other thought I had was an ability to invoke “Checkpointing before
> Restart / Recovery” -> meaning I don’t necessarily need to checkpoint
> periodically but I do want to make sure on a explicit restart /
> rescheduling like this, we do have a decent “last known” state. Not sure if
> this is currently doable.
>
> Thanks, Ashish
>
> On Jan 23, 2018, at 5:03 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> Hi Ashish,
>
> Originally, Flink always performed full recovery in case of a failure,
> i.e., it restarted the complete application.
> There is some ongoing work to improve this and make recovery more
> fine-grained (FLIP-1 [1]).
> Some parts have been added for 1.3.0.
>
> I'm not familiar with the details, but Stefan (in CC) should be able to
> answer your specific question.
>
> Best, Fabian
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 1+%3A+Fine+Grained+Recovery+from+Task+Failures
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+:+Fine+Grained+Recovery+from+Task+Failures>
>
> 2018-01-19 20:59 GMT+01:00 ashish pok <ashish...@yahoo.com>:
>
>> Team,
>>
>> One more question to the community regarding hardening Flink Apps.
>>
>> Let me start off by saying we do have known Kafka bottlenecks which we
>> are in the midst of resolving. So during certain times of day, a lot of our
>> Flink Apps are seeing Kafka Producer timeout issues. Most of the logs are
>> some flavor of this:
>>
>> java.lang.Exception: Failed to send data to Kafka: Expiring 28 record(s)
>> for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus
>> linger time
>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc
>> erBase.checkErroneous(FlinkKafkaProducerBase.java:373)
>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc
>> er010.invokeInternal(FlinkKafkaProducer010.java:302)
>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc
>> er010.processElement(FlinkKafkaProducer010.java:421)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.pushToOperator(OperatorChain.java:549)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.collect(OperatorChain.java:524)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.collect(OperatorChain.java:504)
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:831)
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:809)
>> at org.apache.flink.streaming.api.operators.StreamMap.processEl
>> ement(StreamMap.java:41)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.pushToOperator(OperatorChain.java:549)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.collect(OperatorChain.java:524)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.collect(OperatorChain.java:504)
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:831)
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:809)
>> at org.apache.flink.streaming.api.operators.StreamMap.processEl
>> ement(StreamMap.java:41)
>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
>> rocessInput(StreamInputProcessor.java:207)
>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
>> run(OneInputStreamTask.java:69)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:264)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 28
>> record(s) for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation
>> plus linger time
>>
>> Timeouts are not necessarily good but I am sure we understand this is
>> bound to happen (hopefully lesser).
>>
>> The issue for us however is it almost looks like Flink is stopping and
>> restarting all operators (a lot of other operators including Map, Reduce
>> and Process functions if not all) along with Kafka Producers. We are
>> processing pretty substantial load in Flink and dont really intend to
>> enable Rocks/HDFS checkpointing in some of these Apps - we are ok to
>> sustain some data loss when App crashes completely or something along those
>> lines. However, what we are noticing here is all the data that are in
>> memory for sliding window functions are also lost completely because of
>> this. I would have thought because of the retry settings in Kafka Producer,
>> even those 28 events in queue should have been recovered let alone over a
>> million events in Memory State waiting to be Folded/Reduced for the sliding
>> window. This doesnt feel right :)
>>
>> Is only way to solve this is by creating Rocks/HDFS checkpoint? Why would
>> almost all Job Graph restart on an operator timeout? Do I need to do
>> something simple like disable Operator chaining? We really really are
>> trying to just use Memory and not any other state for these heavy hitting
>> streams.
>>
>> Thanks for your help,
>>
>> Ashish
>>
>
>
>

Reply via email to