[ https://issues.apache.org/jira/browse/FLINK-38327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zakelly Lan resolved FLINK-38327. --------------------------------- Resolution: Fixed > NPE during recovery from file-merged checkpoint after FO > -------------------------------------------------------- > > Key: FLINK-38327 > URL: https://issues.apache.org/jira/browse/FLINK-38327 > Project: Flink > Issue Type: Bug > Affects Versions: 2.0.0, 1.20.2, 2.1.0 > Reporter: Zakelly Lan > Assignee: Zakelly Lan > Priority: Major > Labels: pull-request-available > Fix For: 2.0.1, 1.20.3, 2.2.0, 2.1.1 > > > Report: https://lists.apache.org/thread/yzcxqdfsfdykgzdfkovf65jbwy4j6g0y > This job is running on Kubernetes using the Apache Flink Kubernetes operator. > This NullPointerException happened during a job restart after one of the > TaskManagers restarted because the underlying node running the TaskManager > pod was scaled down for maintenance. There was no rescaling or parallelism > change. > The job is quite large due to heavy input traffic + state size: > 2100 parallelism > taskmanager.numberOfTaskSlots: 14 (so 150 TaskManagers total) > RocksDBStateBackend used for state management. Checkpoints/savepoints are > written to S3 in AWS. > According to the Flink Checkpoint UI, the full state size is ~600GB > {code:java} > execution.checkpointing.file-merging.enabled: true > execution.checkpointing.file-merging.max-file-size: 32m > execution.checkpointing.timeout: "10min" > execution.checkpointing.tolerable-failed-checkpoints: 3 > execution.checkpointing.min-pause: "2min" > execution.checkpointing.interval: "2min" > {code} > {code:java} > java.lang.NullPointerException > at > org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.isManagedByFileMergingManager(FileMergingSnapshotManagerBase.java:927) > at > org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$11(FileMergingSnapshotManagerBase.java:866) > at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1220) > at > org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$12(FileMergingSnapshotManagerBase.java:861) > at > java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at > java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) > at > java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) > at > java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at > java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at > java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) > at > java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at > java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) > at > java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > at > java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596) > at > java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276) > at > java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > at > java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596) > at > java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276) > at java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1003) > at > java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845) > at > java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > at > java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596) > at > java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276) > at > java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > at > java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596) > at > org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.restoreStateHandles(FileMergingSnapshotManagerBase.java:858) > at > org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManagerRestoreOperation.restore(SubtaskFileMergingManagerRestoreOperation.java:102) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.registerRestoredStateToFileMergingManager(StreamTaskStateInitializerImpl.java:353) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:275) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > at java.base/java.lang.Thread.run(Thread.java:840) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)