Hi Sachin,

A couple of observations which might lead to improve your situation:


  *   The first timeout/exception happens within the change log writer which is 
set to only one second
     *   I/O jitter towards the DFS can easily lead to occasional responses 
that take more than one second
     *   This is up to the point that for our jobs on Kubernetes we cannot 
afford to enable the change log feature
     *   Moderately augmenting the timeout configuration might improve your 
situation, however, change log is probably quite sensitive to this
  *   Your checkpoint interval is set so 15 minutes, I would try to increase 
the checkpoint timeout at least a little bit
  *   When restarting, your task manager CPU is congested up to the point that 
pekko receiver loop fails to give timely responses
     *   Did you take profiles on the Profiler tab of your task manager? That 
usually gives me best information on bottlenecks that are not on task level 
(needs to be enabled in config)
  *   Unfortunately, when loading a checkpoint/savepoint from DFS, it takes 
long time. I see a lot of unbuffered I/O towards the DFS during that phase. 
However that is difficult to fix without a Flink fork.
     *   In your configuration I see local recovery, is that actually used on 
the restarted task managers on Yarn? (you probably checked that 😊 )
  *   You give RocksDB roughly an overall size of ~300Gb, when RocksDB data is 
likely around 50Gb
  *   What is the motivation for going with parallelism 120,
     *   Is it based (bottlenecked) on RocksDB I/O, or CPU capacity?
     *   Assuming that you didn’t augment the default value for max parallelism 
256, a parallelism of 120 means that 90% of your subtasks process 2 key-slots 
each, whereas 10% process 3 key-slots. That leads to a processing bias of +50% 
(3/2)

So much for now, I hope that helps.
Will you be at the FlinkForward Conference in Barcelona? There we could also 
exchange experience in person 


Thias




From: Sachin Mittal <[email protected]>
Sent: Saturday, September 20, 2025 11:41 AM
To: user <[email protected]>
Subject: [External] A random checkpoint failure creates an avalanche of restarts

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi,
So we are running Flink 19.1 on AWS EMR using Yarn as resource manager.
We have a fairly large cluster with 120 parallelism and 30 task managers 
running with 4 task slots.

Here are some of the important configs:
taskmanager.numberOfTaskSlots: 4
jobmanager.memory.process.size: 12g
taskmanager.memory.process.size: 24g
taskmanager.memory.task.off-heap.size: 1g
taskmanager.memory.managed.fraction: 0.5
taskmanager.memory.network.fraction: 0.05
taskmanager.memory.jvm-overhead.fraction: 0.05
state.backend.type: rocksdb
state.checkpoints.dir: s3://bucket/flink-checkpoints
state.backend.incremental: 'true'
state.backend.local-recovery: 'true'
state.backend.changelog.enabled: 'true'
state.backend.changelog.storage: filesystem
dstl.dfs.base-path: s3://bucket/changelog
dstl.dfs.compression.enabled: 'true'

My job checkpoint configs are:

env.enableCheckpointing(900000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(900000L);
env.getCheckpointConfig().setCheckpointTimeout(900000L);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableUnalignedCheckpoints();
Also checkpoint full size can be very large upto 50 GB.
Now it was running fine for a long time but randomly we got the following 
exception:

java.io.IOException: Could not perform checkpoint 70 for operator ... 
(80/120)#0.
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1326)
at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
at 
org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned.barrierReceived(AlternatingWaitingForFirstBarrierUnaligned.java:78)
at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:56)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:122)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not 
complete snapshot 70 for operator ... (80/120)#0. Failure reason: Checkpoint 
was declined.
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:281)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:185)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:720)
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:352)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$16(StreamTask.java:1369)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1357)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1314)
... 22 more
Caused by: java.io.IOException: The upload for 1439 has already failed 
previously
at 
org.apache.flink.changelog.fs.FsStateChangelogWriter.ensureCanPersist(FsStateChangelogWriter.java:463)
at 
org.apache.flink.changelog.fs.FsStateChangelogWriter.persistInternal(FsStateChangelogWriter.java:234)
at 
org.apache.flink.changelog.fs.FsStateChangelogWriter.persist(FsStateChangelogWriter.java:217)
at 
org.apache.flink.state.changelog.ChangelogKeyedStateBackend.snapshot(ChangelogKeyedStateBackend.java:406)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:258)
... 33 more
Caused by: java.util.concurrent.TimeoutException: Attempt 3 timed out after 
1000ms
at 
org.apache.flink.changelog.fs.RetryingExecutor$RetriableActionAttempt.fmtError(RetryingExecutor.java:319)
at 
org.apache.flink.changelog.fs.RetryingExecutor$RetriableActionAttempt.lambda$scheduleTimeout$1(RetryingExecutor.java:314)

After this I start getting many timeout exceptions where randomly one or other 
task manager becomes unreachable and restarts start happening very frequently. 
Here are few of these stack traces:


java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 
container_... timed out.
at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1550)
at 
org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.run(DefaultHeartbeatMonitor.java:158)
...

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager '...'. This might 
indicate that the remote task manager was lost.
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:134)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
...

org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with id ... 
is no longer reachable.
at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1566)
at 
org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.reportHeartbeatRpcFailure(DefaultHeartbeatMonitor.java:126)
at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262)
at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248)
...

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Lost connection to task manager '...'. This indicates that the remote task 
manager was lost.
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:165)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:346)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:325)
...
Caused by: 
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: 
recvAddress(..) failed: Connection reset by peer



Diese Nachricht ist ausschliesslich fĂŒr den Adressaten bestimmt und beinhaltet 
unter UmstÀnden vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewĂ€hrleistet werden kann, ĂŒbernehmen wir keine 
Haftung fĂŒr die GewĂ€hrung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtĂŒmlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller AnhÀnge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to