Hi Sachin,
A couple of observations which might lead to improve your situation:
* The first timeout/exception happens within the change log writer which is
set to only one second
* I/O jitter towards the DFS can easily lead to occasional responses
that take more than one second
* This is up to the point that for our jobs on Kubernetes we cannot
afford to enable the change log feature
* Moderately augmenting the timeout configuration might improve your
situation, however, change log is probably quite sensitive to this
* Your checkpoint interval is set so 15 minutes, I would try to increase
the checkpoint timeout at least a little bit
* When restarting, your task manager CPU is congested up to the point that
pekko receiver loop fails to give timely responses
* Did you take profiles on the Profiler tab of your task manager? That
usually gives me best information on bottlenecks that are not on task level
(needs to be enabled in config)
* Unfortunately, when loading a checkpoint/savepoint from DFS, it takes
long time. I see a lot of unbuffered I/O towards the DFS during that phase.
However that is difficult to fix without a Flink fork.
* In your configuration I see local recovery, is that actually used on
the restarted task managers on Yarn? (you probably checked that đ )
* You give RocksDB roughly an overall size of ~300Gb, when RocksDB data is
likely around 50Gb
* What is the motivation for going with parallelism 120,
* Is it based (bottlenecked) on RocksDB I/O, or CPU capacity?
* Assuming that you didnât augment the default value for max parallelism
256, a parallelism of 120 means that 90% of your subtasks process 2 key-slots
each, whereas 10% process 3 key-slots. That leads to a processing bias of +50%
(3/2)
So much for now, I hope that helps.
Will you be at the FlinkForward Conference in Barcelona? There we could also
exchange experience in person âŠ
Thias
From: Sachin Mittal <[email protected]>
Sent: Saturday, September 20, 2025 11:41 AM
To: user <[email protected]>
Subject: [External] A random checkpoint failure creates an avalanche of restarts
â EXTERNAL MESSAGE â CAUTION: Think Before You Click â
Hi,
So we are running Flink 19.1 on AWS EMR using Yarn as resource manager.
We have a fairly large cluster with 120 parallelism and 30 task managers
running with 4 task slots.
Here are some of the important configs:
taskmanager.numberOfTaskSlots: 4
jobmanager.memory.process.size: 12g
taskmanager.memory.process.size: 24g
taskmanager.memory.task.off-heap.size: 1g
taskmanager.memory.managed.fraction: 0.5
taskmanager.memory.network.fraction: 0.05
taskmanager.memory.jvm-overhead.fraction: 0.05
state.backend.type: rocksdb
state.checkpoints.dir: s3://bucket/flink-checkpoints
state.backend.incremental: 'true'
state.backend.local-recovery: 'true'
state.backend.changelog.enabled: 'true'
state.backend.changelog.storage: filesystem
dstl.dfs.base-path: s3://bucket/changelog
dstl.dfs.compression.enabled: 'true'
My job checkpoint configs are:
env.enableCheckpointing(900000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(900000L);
env.getCheckpointConfig().setCheckpointTimeout(900000L);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableUnalignedCheckpoints();
Also checkpoint full size can be very large upto 50 GB.
Now it was running fine for a long time but randomly we got the following
exception:
java.io.IOException: Could not perform checkpoint 70 for operator ...
(80/120)#0.
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1326)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
at
org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned.barrierReceived(AlternatingWaitingForFirstBarrierUnaligned.java:78)
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:56)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:122)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not
complete snapshot 70 for operator ... (80/120)#0. Failure reason: Checkpoint
was declined.
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:281)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:185)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:720)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:352)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$16(StreamTask.java:1369)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1357)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1314)
... 22 more
Caused by: java.io.IOException: The upload for 1439 has already failed
previously
at
org.apache.flink.changelog.fs.FsStateChangelogWriter.ensureCanPersist(FsStateChangelogWriter.java:463)
at
org.apache.flink.changelog.fs.FsStateChangelogWriter.persistInternal(FsStateChangelogWriter.java:234)
at
org.apache.flink.changelog.fs.FsStateChangelogWriter.persist(FsStateChangelogWriter.java:217)
at
org.apache.flink.state.changelog.ChangelogKeyedStateBackend.snapshot(ChangelogKeyedStateBackend.java:406)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:258)
... 33 more
Caused by: java.util.concurrent.TimeoutException: Attempt 3 timed out after
1000ms
at
org.apache.flink.changelog.fs.RetryingExecutor$RetriableActionAttempt.fmtError(RetryingExecutor.java:319)
at
org.apache.flink.changelog.fs.RetryingExecutor$RetriableActionAttempt.lambda$scheduleTimeout$1(RetryingExecutor.java:314)
After this I start getting many timeout exceptions where randomly one or other
task manager becomes unreachable and restarts start happening very frequently.
Here are few of these stack traces:
java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
container_... timed out.
at
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1550)
at
org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.run(DefaultHeartbeatMonitor.java:158)
...
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Connection unexpectedly closed by remote task manager '...'. This might
indicate that the remote task manager was lost.
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:134)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
...
org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with id ...
is no longer reachable.
at
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1566)
at
org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.reportHeartbeatRpcFailure(DefaultHeartbeatMonitor.java:126)
at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262)
at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248)
...
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Lost connection to task manager '...'. This indicates that the remote task
manager was lost.
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:165)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:346)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:325)
...
Caused by:
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
recvAddress(..) failed: Connection reset by peer
Diese Nachricht ist ausschliesslich fĂŒr den Adressaten bestimmt und beinhaltet
unter UmstÀnden vertrauliche Mitteilungen. Da die Vertraulichkeit von
e-Mail-Nachrichten nicht gewĂ€hrleistet werden kann, ĂŒbernehmen wir keine
Haftung fĂŒr die GewĂ€hrung der Vertraulichkeit und Unversehrtheit dieser
Mitteilung. Bei irrtĂŒmlicher Zustellung bitten wir Sie um Benachrichtigung per
e-Mail und um Löschung dieser Nachricht sowie eventueller AnhÀnge. Jegliche
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng
verboten.
This message is intended only for the named recipient and may contain
confidential or privileged information. As the confidentiality of email
communication cannot be guaranteed, we do not accept any responsibility for the
confidentiality and the intactness of this message. If you have received it in
error, please advise the sender by return e-mail and delete this message and
any attachments. Any unauthorised use or dissemination of this information is
strictly prohibited.