Team,
One more question to the community regarding hardening Flink Apps.
Let me start off by saying we do have known Kafka bottlenecks which we are in
the midst of resolving. So during certain times of day, a lot of our Flink Apps
are seeing Kafka Producer timeout issues. Most of the logs are some flavor of
this:
java.lang.Exception: Failed to send data to Kafka: Expiring 28 record(s) for
dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus linger time
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at
java.lang.Thread.run(Thread.java:745)Caused by:
org.apache.kafka.common.errors.TimeoutException: Expiring 28 record(s) for
dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus linger time
Timeouts are not necessarily good but I am sure we understand this is bound to
happen (hopefully lesser).
The issue for us however is it almost looks like Flink is stopping and
restarting all operators (a lot of other operators including Map, Reduce and
Process functions if not all) along with Kafka Producers. We are processing
pretty substantial load in Flink and dont really intend to enable Rocks/HDFS
checkpointing in some of these Apps - we are ok to sustain some data loss when
App crashes completely or something along those lines. However, what we are
noticing here is all the data that are in memory for sliding window functions
are also lost completely because of this. I would have thought because of the
retry settings in Kafka Producer, even those 28 events in queue should have
been recovered let alone over a million events in Memory State waiting to be
Folded/Reduced for the sliding window. This doesnt feel right :)
Is only way to solve this is by creating Rocks/HDFS checkpoint? Why would
almost all Job Graph restart on an operator timeout? Do I need to do something
simple like disable Operator chaining? We really really are trying to just use
Memory and not any other state for these heavy hitting streams.
Thanks for your help,
Ashish