Hi Matthias, Regarding the Flink managed memory, what I have observed is that even after setting the fraction as 0.5, upon restart, it uses the entire 12 GB allocated for every task manager. Hence it is using the entire 360 GB for all 30 task managers.
I don't sink to Kafka, so I think the checkpoint timeout settings are OK. Regarding pekko timeout settings: I see the default for *pekko.ask.timeout* is 10 sec. Should I increase this setting value to say 60 sec ? pekko.tcp.timeout is 20 sec. Any recommendations here to increase this also to say 60 sec ? Finally I was thinking to set: heartbeat.rpc-failure-threshold: 10 heartbeat.timeout: 100000 Are these settings values OK, or are there any more settings you recommend to alter for task manager failures due to timeouts or connection loss ? Finally I see the default for *pipeline.max-parallelism* as -1. So I think it will use the parallelism I set for the task. From flink UI I see messages getting distributed more or less equally among its subtasks. So please let me know to start with tinkering different timeouts and increasing them is the good option here ? Thanks Sachin On Mon, Sep 22, 2025 at 4:39 PM Schwalbe Matthias < [email protected]> wrote: > Hi Sachin, > > > > Thanks for your clarifications. > > Having prepared my previous answer in short time, I’ve realized to not > having been specific enough … > > It is also a mixture of requests for the precise situation your are in, > and a couple of recommendations of where to look first. > > > > So let me backtrack a little bit (I marked out positions in your answer): > > > > - The cause of the original crash of the job is in the non-timeliness > of S3 response to changelog storage: > - “at > > org.apache.flink.changelog.fs.FsStateChangelogWriter.persist(FsStateChangelogWriter.java: > 217)” > - This happens also for our jobs up to the point that we > disactivated changelog for the time being > - You might find a configuration that increases this specific > timeout, in order to run into the overall situation less frequently, or > - If otherwise the write I/O pattern is not too unfavorable, > disactivate changelog feature > - As per (1) my request for the motivation of using 120 parallelism is > really for the motivation/driver of your decision: > - You would adjust configuration if the decision was driven by CPU > congestion, or by state I/O congestion, or in your case because Kafka > topic > has 120 partitions > - If you don’t see CPU or I/O congestion, ignore this point > - If instead on the respective FlameGraph tab of your tasks you see > that major time is spent in RocksDB get/update, we can reconsider this > question > - As per (2) changelog writing is the cause of the original exception, > that might easily be jitter for some requests that take more than 1 second > to respond > - As said, I didn’t have enough time to deeply look into this > error, I’ve got it myself, and have deactivated changelog until I can > resolve it > - Augmenting dstl.dfs.upload.timeout might also help, I guess > - As per (3), that’s a secondary hint, dstl timeout is your current > driver > - Checkpoint time lasts the whole period of 15 minutes > - I don’t know if that also applies to the checkpoint timeout, or > just for once it is triggered, however > - If you sink the results to Kafka, the Kafka transaction timeout > needs to be longer than the checkpoint period, see [1] > - As per (4), busyTimeMsPerSecond gives you only time spent in > processing events, not time spent otherwise in the taskmanager process > - The profiler tab gives you an overall picture per taskmanager, > not per running subtask > - As per (5), pekko timout might indeed give you a little more > stability, however if the original timeout is not sufficient, that gives > you a hint, that at certain periods (like checkpoint load), your CPU is > congested > - Hence the Profiler recommendation > - Pekko processing should always be timely for a sound system > - As per (6), I was not specific enough in my answer, I meant that you > configured ~300Gb RocksDb cache memory, where the overall state size is > only ~50Gb. I.e. you configured 6 times what is actually used, and that > memory could be used in other places > - taskmanager.memory.managed.fraction: 0.5 …: 30 TM * 24Gb * 0.5 > minus some other extra mem: ~300Gb RAM > - I go up to 100 % overprovision for RocksDb cache to have a reserve > - As per (7), I’m referring to [2], if not mistaken it defaults to 256 > - Flink internally dissects the key space into these 256 key-ranges > and then spreads these to the actual parallelism of an operator > - Having and consuming 120 Kafka partitions is only relevant up to > the first keyed operator, where the key get rehashed into this 256 > key-ranges > - 256 key-ranges get more-or-less spread to 120 parallelism, hence > 104 subtasks process 2 key-ranges, and the remaining 16 subtasks > process 3 > key-ranges > - Considering optimal entropy of the key-space these 16 subtasks > process 50% more events then the others > - Beware, don’t change the configuration as that makes the state > incompatible. Max-parallelism needs to be decides for first job start > > > > > > Cheers > > > > Thias > > > > > > > > > > > > > > > > [1] > https://www.ververica.com/knowledge-base/best-practices-for-using-kafka-sources/sinks-in-flink-jobs > > [2] > https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#pipeline-max-parallelism > > > > > > *From:* Sachin Mittal <[email protected]> > *Sent:* Monday, September 22, 2025 10:38 AM > *To:* Schwalbe Matthias <[email protected]> > *Cc:* user <[email protected]> > *Subject:* [External] Re: A random checkpoint failure creates an > avalanche of restarts > > > > ⚠*EXTERNAL MESSAGE – CAUTION: Think Before You Click *⚠ > > > > Hi, > > So we are running a massively scaled OTT analytics platform. We have 10s > of millions of users watching OTT content. > > 1. So we have decided to have 120 partitions for our Kafka topic to > receive this data, hence we run with a parallelism of 120. > > This is the motivation of running with a parallelism of 120. To > efficiently process data by scaling horizontally. > > From the Flink UI console I can see it has no problems creating 120 > parallel subtasks with each task manager having 4 slots, so 30 task > managers in total. > > > > 2. As per what I have understood from you is that, you have suggested > that many times changelog file upload to s3 takes long time hence you are > suggesting to increase: > > > dstl.dfs.upload.timeout: from default of 1 sec to a higher value. Is this > correct ? > > 3. Also I have seen that the entire checkpointing is done within a few > seconds, so do I still need to increase the checkpointing timeout to more > than 15/10 minutes ? > > > > Next time problem surfaces, I would enable the profiler to see what is > making the CPU congested during restarts. > > However from Flink metrics I see (4) busyTimeMsPerSecond to be fairly low > for all my tasks. > > Also as per Yarn documentation for AWS EMR, local recovery is used and I > see restarts time much lower since I have enabled this configuration. > > > > 5. ) Also let me know if increasing these heartbeat related > configurations would fix the problem ? > > > > heartbeat.rpc-failure-threshold: 10 > > heartbeat.timeout: 100000 > > > > (6) Also I have not understood what you mean by: You give RocksDB roughly > an overall size of ~300Gb ? > > Since statebackend is stored in s3, it should have all the size it needs. > I have allocated 100 GB of local disk space for each task manager and I see > disk utilization of only 25%. > > > > (7) Also I did not understand your this comment: > > - Assuming that you didn’t augment the default value for max > parallelism 256, a parallelism of 120 means that 90% of your subtasks > process 2 key-slots each, whereas 10% process 3 key-slots. That leads to a > processing bias of +50% (3/2) > > Which exact flink or autoscaler config you are mentioning here for default > value of max parallelism? I don't think I have changed anything here and am > running with defaults. > > > > Finally, since I stay in India, I will not be able to attend the > FlinkForward Conference in Barcelona. > > > > Thanks > > Sachin > > > > > > > > On Mon, Sep 22, 2025 at 1:03 PM Schwalbe Matthias < > [email protected]> wrote: > > Hi Sachin, > > > > A couple of observations which might lead to improve your situation: > > > > - The first timeout/exception happens within the change log writer > which is set to only one second > > > - I/O jitter towards the DFS can easily lead to occasional responses > that take more than one second > - This is up to the point that for our jobs on Kubernetes we cannot > afford to enable the change log feature > - Moderately augmenting the timeout configuration might improve > your situation, however, change log is probably quite sensitive to this > > > - Your checkpoint interval is set so 15 minutes, I would try to > increase the checkpoint timeout at least a little bit > - When restarting, your task manager CPU is congested up to the point > that pekko receiver loop fails to give timely responses > > > - Did you take profiles on the Profiler tab of your task manager? That > usually gives me best information on bottlenecks that are not on task > level > (needs to be enabled in config) > > > - Unfortunately, when loading a checkpoint/savepoint from DFS, it > takes long time. I see a lot of unbuffered I/O towards the DFS during that > phase. However that is difficult to fix without a Flink fork. > > > - In your configuration I see local recovery, is that actually used on > the restarted task managers on Yarn? (you probably checked that 😊 ) > > > - You give RocksDB roughly an overall size of ~300Gb, when RocksDB > data is likely around 50Gb > - What is the motivation for going with parallelism 120, > > > - Is it based (bottlenecked) on RocksDB I/O, or CPU capacity? > - Assuming that you didn’t augment the default value for max > parallelism 256, a parallelism of 120 means that 90% of your subtasks > process 2 key-slots each, whereas 10% process 3 key-slots. That leads > to a > processing bias of +50% (3/2) > > > > So much for now, I hope that helps. > > Will you be at the FlinkForward Conference in Barcelona? There we could > also exchange experience in person … > > > > Thias > > > > > > > > > > *From:* Sachin Mittal <[email protected]> > *Sent:* Saturday, September 20, 2025 11:41 AM > *To:* user <[email protected]> > *Subject:* [External] A random checkpoint failure creates an avalanche of > restarts > > > > ⚠*EXTERNAL MESSAGE – CAUTION: Think Before You Click *⚠ > > > > Hi, > > So we are running Flink 19.1 on AWS EMR using Yarn as resource manager. > > We have a fairly large cluster with 120 parallelism and 30 task managers > running with 4 task slots. > > > > Here are some of the important configs: > > taskmanager.numberOfTaskSlots: 4 > > jobmanager.memory.process.size: 12g > > taskmanager.memory.process.size: 24g > > taskmanager.memory.task.off-heap.size: 1g > > taskmanager.memory.managed.fraction: 0.5 > > taskmanager.memory.network.fraction: 0.05 > > taskmanager.memory.jvm-overhead.fraction: 0.05 > > state.backend.type: rocksdb > > state.checkpoints.dir: s3://bucket/flink-checkpoints > > state.backend.incremental: 'true' > > state.backend.local-recovery: 'true' > > state.backend.changelog.enabled: 'true' > > state.backend.changelog.storage: filesystem > > dstl.dfs.base-path: s3://bucket/changelog > > dstl.dfs.compression.enabled: 'true' > > > > My job checkpoint configs are: > > env.enableCheckpointing(900000L); > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.*EXACTLY_ONCE*); > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(900000L); > env.getCheckpointConfig().setCheckpointTimeout(900000L); > env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.*MAX_VALUE*); > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > env.getCheckpointConfig().enableUnalignedCheckpoints(); > > Also checkpoint full size can be very large upto 50 GB. > > Now it was running fine for a long time but randomly we got the following > exception: > > > > java.io.IOException: Could not perform checkpoint 70 for operator ... (80/ > 120)#0. > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java: > 1326) > > at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java: > 147) > > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java: > 287) > > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$ > 100(SingleCheckpointBarrierHandler.java:64) > > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java: > 488) > > at > org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned.barrierReceived(AlternatingWaitingForFirstBarrierUnaligned.java: > 78) > > at > org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java: > 56) > > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$ > 2(SingleCheckpointBarrierHandler.java:234) > > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java: > 262) > > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java: > 231) > > at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java: > 181) > > at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java: > 159) > > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java: > 122) > > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java: > 65) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java: > 579) > > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java: > 231) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java: > 909) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java: > 858) > > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java: > 958) > > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java: > 937) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > > at java.base/java.lang.Thread.run(Thread.java:840) > > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could > not complete snapshot 70 for operator ... (80/120)#0. Failure reason: > Checkpoint was declined. > > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java: > 281) > > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java: > 185) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java: > 348) > > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java: > 228) > > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java: > 213) > > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java: > 192) > > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java: > 720) > > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java: > 352) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$ > 16(StreamTask.java:1369) > > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > .runThrowing(StreamTaskActionExecutor.java:50) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java: > 1357) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java: > 1314) > > ... 22 more > > Caused by: java.io.IOException: The upload for 1439 has already failed > previously > > at > org.apache.flink.changelog.fs.FsStateChangelogWriter.ensureCanPersist(FsStateChangelogWriter.java: > 463) > > at > org.apache.flink.changelog.fs.FsStateChangelogWriter.persistInternal(FsStateChangelogWriter.java: > 234) > > at > org.apache.flink.changelog.fs.FsStateChangelogWriter.persist(FsStateChangelogWriter.java: > 217) > > at > org.apache.flink.state.changelog.ChangelogKeyedStateBackend.snapshot(ChangelogKeyedStateBackend.java: > 406) > > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java: > 258) > > ... 33 more > > Caused by: java.util.concurrent.TimeoutException: Attempt 3 timed out > after 1000ms > > at > org.apache.flink.changelog.fs.RetryingExecutor$RetriableActionAttempt.fmtError(RetryingExecutor.java: > 319) > > at > org.apache.flink.changelog.fs.RetryingExecutor$RetriableActionAttempt.lambda$scheduleTimeout$ > 1(RetryingExecutor.java:314) > > > > After this I start getting many timeout exceptions where randomly one or > other task manager becomes unreachable and restarts start happening very > frequently. Here are few of these stack traces: > > > > > > java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id > container_... timed out. > > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java: > 1550) > > at > org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.run(DefaultHeartbeatMonitor.java: > 158) > > ... > > > > org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: > Connection unexpectedly closed by remote task manager '...'. This might > indicate that the remote task manager was lost. > > at > org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java: > 134) > > at org.apache.flink.shaded.netty4 > .io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java: > 305) > > at org.apache.flink.shaded.netty4 > .io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java: > 281) > > at org.apache.flink.shaded.netty4 > .io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java: > 274) > > ... > > > > org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with id > ... is no longer reachable. > > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java: > 1566) > > at > org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.reportHeartbeatRpcFailure(DefaultHeartbeatMonitor.java: > 126) > > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java: > 275) > > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java: > 267) > > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java: > 262) > > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$ > 0(HeartbeatManagerImpl.java:248) > > ... > > > > org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: > Lost connection to task manager '...'. This indicates that the remote task > manager was lost. > > at > org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java: > 165) > > at org.apache.flink.shaded.netty4 > .io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java: > 346) > > at org.apache.flink.shaded.netty4 > .io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java: > 325) > > ... > > Caused by: > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: > recvAddress(..) failed: Connection reset by peer > > > > > > > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >
