Hi Zakelly,

Thanks for the reply.

This job is running on Kubernetes using the Apache Flink Kubernetes
operator. This NullPointerException happened during a job restart after one
of the TaskManagers restarted because the underlying node running the
TaskManager pod was scaled down for maintenance. There was no rescaling or
parallelism change.

The job is quite large due to heavy input traffic + state size:
2100 parallelism
taskmanager.numberOfTaskSlots: 14 (so 150 TaskManagers total)
RocksDBStateBackend used for state management. Checkpoints/savepoints are
written to S3 in AWS.
According to the Flink Checkpoint UI, the full state size is ~600GB

Please let me know if more details would be helpful.

Best regards,
Kevin


On Thu, Sep 4, 2025 at 6:16 AM Zakelly Lan <[email protected]> wrote:

> Hi Kevin,
>
> Would you please provide more info about the setup? Is this a failover or
> manual job restart with or without a change of parallelism (rescale)?
>
>
> Best,
> Zakelly
>
>
>
> On Wed, Sep 3, 2025 at 2:43 AM Kevin Kim <[email protected]> wrote:
>
>> Has anyone seen this NullPointerException after enabling checkpoint file
>> merging? I'm running a job with Flink 1.20.2 with these configs:
>>
>> execution.checkpointing.file-merging.enabled: true
>> execution.checkpointing.file-merging.max-file-size: 32m
>> execution.checkpointing.timeout: "10min"
>> execution.checkpointing.tolerable-failed-checkpoints: 3
>> execution.checkpointing.min-pause: "2min"
>> execution.checkpointing.interval: "2min"
>>
>> java.lang.NullPointerException
>> at
>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.isManagedByFileMergingManager(FileMergingSnapshotManagerBase.java:927)
>> at
>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$11(FileMergingSnapshotManagerBase.java:866)
>> at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1220)
>> at
>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$12(FileMergingSnapshotManagerBase.java:861)
>> at
>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>> at
>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>> at
>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
>> at
>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>> at
>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>> at
>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
>> at
>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>> at
>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>> at
>> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
>> at
>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>> at
>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>> at
>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>> at
>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>> at
>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>> at
>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>> at
>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
>> at
>> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
>> at
>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>> at
>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>> at
>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>> at
>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>> at
>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>> at
>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>> at
>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
>> at java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1003)
>> at
>> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
>> at
>> java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734)
>> at
>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>> at
>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>> at
>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>> at
>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>> at
>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>> at
>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>> at
>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
>> at
>> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
>> at
>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>> at
>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>> at
>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>> at
>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>> at
>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>> at
>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>> at
>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.restoreStateHandles(FileMergingSnapshotManagerBase.java:858)
>> at
>> org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManagerRestoreOperation.restore(SubtaskFileMergingManagerRestoreOperation.java:102)
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.registerRestoredStateToFileMergingManager(StreamTaskStateInitializerImpl.java:353)
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:275)
>> at
>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
>> at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
>> at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>> at java.base/java.lang.Thread.run(Thread.java:840)
>>
>

Reply via email to