Hi Zakelly, Thanks for the reply.
This job is running on Kubernetes using the Apache Flink Kubernetes operator. This NullPointerException happened during a job restart after one of the TaskManagers restarted because the underlying node running the TaskManager pod was scaled down for maintenance. There was no rescaling or parallelism change. The job is quite large due to heavy input traffic + state size: 2100 parallelism taskmanager.numberOfTaskSlots: 14 (so 150 TaskManagers total) RocksDBStateBackend used for state management. Checkpoints/savepoints are written to S3 in AWS. According to the Flink Checkpoint UI, the full state size is ~600GB Please let me know if more details would be helpful. Best regards, Kevin On Thu, Sep 4, 2025 at 6:16 AM Zakelly Lan <[email protected]> wrote: > Hi Kevin, > > Would you please provide more info about the setup? Is this a failover or > manual job restart with or without a change of parallelism (rescale)? > > > Best, > Zakelly > > > > On Wed, Sep 3, 2025 at 2:43 AM Kevin Kim <[email protected]> wrote: > >> Has anyone seen this NullPointerException after enabling checkpoint file >> merging? I'm running a job with Flink 1.20.2 with these configs: >> >> execution.checkpointing.file-merging.enabled: true >> execution.checkpointing.file-merging.max-file-size: 32m >> execution.checkpointing.timeout: "10min" >> execution.checkpointing.tolerable-failed-checkpoints: 3 >> execution.checkpointing.min-pause: "2min" >> execution.checkpointing.interval: "2min" >> >> java.lang.NullPointerException >> at >> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.isManagedByFileMergingManager(FileMergingSnapshotManagerBase.java:927) >> at >> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$11(FileMergingSnapshotManagerBase.java:866) >> at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1220) >> at >> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$12(FileMergingSnapshotManagerBase.java:861) >> at >> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) >> at >> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) >> at >> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) >> at >> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) >> at >> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) >> at >> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) >> at >> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) >> at >> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) >> at >> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) >> at >> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) >> at >> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) >> at >> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) >> at >> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) >> at >> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >> at >> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596) >> at >> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276) >> at >> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992) >> at >> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) >> at >> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) >> at >> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) >> at >> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) >> at >> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >> at >> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596) >> at >> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276) >> at java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1003) >> at >> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845) >> at >> java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734) >> at >> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) >> at >> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) >> at >> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) >> at >> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) >> at >> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >> at >> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596) >> at >> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276) >> at >> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992) >> at >> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) >> at >> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) >> at >> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) >> at >> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) >> at >> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >> at >> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596) >> at >> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.restoreStateHandles(FileMergingSnapshotManagerBase.java:858) >> at >> org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManagerRestoreOperation.restore(SubtaskFileMergingManagerRestoreOperation.java:102) >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.registerRestoredStateToFileMergingManager(StreamTaskStateInitializerImpl.java:353) >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:275) >> at >> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) >> at >> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) >> at >> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) >> at java.base/java.lang.Thread.run(Thread.java:840) >> >
