Hi Ashish,

Originally, Flink always performed full recovery in case of a failure,
i.e., it restarted the complete application.
There is some ongoing work to improve this and make recovery more
fine-grained (FLIP-1 [1]).
Some parts have been added for 1.3.0.

I'm not familiar with the details, but Stefan (in CC) should be able to
answer your specific question.

Best, Fabian

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures

2018-01-19 20:59 GMT+01:00 ashish pok <ashish...@yahoo.com>:

> Team,
>
> One more question to the community regarding hardening Flink Apps.
>
> Let me start off by saying we do have known Kafka bottlenecks which we are
> in the midst of resolving. So during certain times of day, a lot of our
> Flink Apps are seeing Kafka Producer timeout issues. Most of the logs are
> some flavor of this:
>
> java.lang.Exception: Failed to send data to Kafka: Expiring 28 record(s)
> for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus
> linger time
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.
> checkErroneous(FlinkKafkaProducerBase.java:373)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.
> invokeInternal(FlinkKafkaProducer010.java:302)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.
> processElement(FlinkKafkaProducer010.java:421)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:524)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:504)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:831)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:809)
> at org.apache.flink.streaming.api.operators.StreamMap.
> processElement(StreamMap.java:41)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:524)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:504)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:831)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:809)
> at org.apache.flink.streaming.api.operators.StreamMap.
> processElement(StreamMap.java:41)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:207)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:69)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:264)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 28
> record(s) for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation
> plus linger time
>
> Timeouts are not necessarily good but I am sure we understand this is
> bound to happen (hopefully lesser).
>
> The issue for us however is it almost looks like Flink is stopping and
> restarting all operators (a lot of other operators including Map, Reduce
> and Process functions if not all) along with Kafka Producers. We are
> processing pretty substantial load in Flink and dont really intend to
> enable Rocks/HDFS checkpointing in some of these Apps - we are ok to
> sustain some data loss when App crashes completely or something along those
> lines. However, what we are noticing here is all the data that are in
> memory for sliding window functions are also lost completely because of
> this. I would have thought because of the retry settings in Kafka Producer,
> even those 28 events in queue should have been recovered let alone over a
> million events in Memory State waiting to be Folded/Reduced for the sliding
> window. This doesnt feel right :)
>
> Is only way to solve this is by creating Rocks/HDFS checkpoint? Why would
> almost all Job Graph restart on an operator timeout? Do I need to do
> something simple like disable Operator chaining? We really really are
> trying to just use Memory and not any other state for these heavy hitting
> streams.
>
> Thanks for your help,
>
> Ashish
>

Reply via email to