Thank you! Looking forward to testing the fix on my side. Kevin
On Tue, Sep 9, 2025 at 9:15 AM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Hi Kevin, > > We've already merged the fix to master, backports are on the way :) > > BR, > G > > > On Tue, Sep 9, 2025 at 2:26 PM Kevin Kim <kevin....@alumni.princeton.edu> > wrote: > >> Hi Zakelly, >> >> Yes, this job contains both operator state and keyed state. Happy to >> provide more details as needed. >> >> Kevin >> >> On Mon, Sep 8, 2025 at 10:48 PM Zakelly Lan <zakelly....@gmail.com> >> wrote: >> >>> Hi Kevin, >>> >>> One more thing I want to make sure about the job. Is this a datastream >>> job with operator state (not only keyed states after `keyby()`)? >>> >>> >>> Thanks! >>> >>> Best, >>> Zakelly >>> >>> On Fri, Sep 5, 2025 at 9:53 PM Kevin Kim <kevin....@alumni.princeton.edu> >>> wrote: >>> >>>> Thanks so much! >>>> >>>> More context: this file merging feature is really promising for our use >>>> case. Given the large state size and parallelism, we occasionally run into >>>> S3 rate limits at checkpoint/savepoint time. >>>> >>>> The few times I tried this file merging, it helped quite a bit, but >>>> I've had to turn it off for now due to this NPE, which happens occasionally >>>> but not always >>>> >>>> On Fri, Sep 5, 2025, 1:58 AM Gabor Somogyi <gabor.g.somo...@gmail.com> >>>> wrote: >>>> >>>>> Hi Guys, >>>>> >>>>> This sounds severe and I'm also taking a look... >>>>> >>>>> G >>>>> >>>>> >>>>> On Fri, Sep 5, 2025 at 4:44 AM Zakelly Lan <zakelly....@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Kevin, >>>>>> >>>>>> Thanks for the details. It's really helpful. I'm trying to reproduce >>>>>> this according to your setup. Will let you know if any updates. >>>>>> >>>>>> I create a jira issue to track this: >>>>>> https://issues.apache.org/jira/browse/FLINK-38327 >>>>>> >>>>>> >>>>>> Best, >>>>>> Zakelly >>>>>> >>>>>> On Thu, Sep 4, 2025 at 7:42 PM Kevin Kim < >>>>>> kevin....@alumni.princeton.edu> wrote: >>>>>> >>>>>>> Hi Zakelly, >>>>>>> >>>>>>> Thanks for the reply. >>>>>>> >>>>>>> This job is running on Kubernetes using the Apache Flink Kubernetes >>>>>>> operator. This NullPointerException happened during a job restart after >>>>>>> one >>>>>>> of the TaskManagers restarted because the underlying node running the >>>>>>> TaskManager pod was scaled down for maintenance. There was no rescaling >>>>>>> or >>>>>>> parallelism change. >>>>>>> >>>>>>> The job is quite large due to heavy input traffic + state size: >>>>>>> 2100 parallelism >>>>>>> taskmanager.numberOfTaskSlots: 14 (so 150 TaskManagers total) >>>>>>> RocksDBStateBackend used for state management. >>>>>>> Checkpoints/savepoints are written to S3 in AWS. >>>>>>> According to the Flink Checkpoint UI, the full state size is ~600GB >>>>>>> >>>>>>> Please let me know if more details would be helpful. >>>>>>> >>>>>>> Best regards, >>>>>>> Kevin >>>>>>> >>>>>>> >>>>>>> On Thu, Sep 4, 2025 at 6:16 AM Zakelly Lan <zakelly....@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Kevin, >>>>>>>> >>>>>>>> Would you please provide more info about the setup? Is this a >>>>>>>> failover or manual job restart with or without a change of parallelism >>>>>>>> (rescale)? >>>>>>>> >>>>>>>> >>>>>>>> Best, >>>>>>>> Zakelly >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Sep 3, 2025 at 2:43 AM Kevin Kim <kkim...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Has anyone seen this NullPointerException after enabling >>>>>>>>> checkpoint file merging? I'm running a job with Flink 1.20.2 with >>>>>>>>> these >>>>>>>>> configs: >>>>>>>>> >>>>>>>>> execution.checkpointing.file-merging.enabled: true >>>>>>>>> execution.checkpointing.file-merging.max-file-size: 32m >>>>>>>>> execution.checkpointing.timeout: "10min" >>>>>>>>> execution.checkpointing.tolerable-failed-checkpoints: 3 >>>>>>>>> execution.checkpointing.min-pause: "2min" >>>>>>>>> execution.checkpointing.interval: "2min" >>>>>>>>> >>>>>>>>> java.lang.NullPointerException >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.isManagedByFileMergingManager(FileMergingSnapshotManagerBase.java:927) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$11(FileMergingSnapshotManagerBase.java:866) >>>>>>>>> at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1220) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$12(FileMergingSnapshotManagerBase.java:861) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) >>>>>>>>> at >>>>>>>>> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276) >>>>>>>>> at >>>>>>>>> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276) >>>>>>>>> at >>>>>>>>> java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1003) >>>>>>>>> at >>>>>>>>> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276) >>>>>>>>> at >>>>>>>>> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >>>>>>>>> at >>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.restoreStateHandles(FileMergingSnapshotManagerBase.java:858) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManagerRestoreOperation.restore(SubtaskFileMergingManagerRestoreOperation.java:102) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.registerRestoredStateToFileMergingManager(StreamTaskStateInitializerImpl.java:353) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:275) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) >>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) >>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) >>>>>>>>> at java.base/java.lang.Thread.run(Thread.java:840) >>>>>>>>> >>>>>>>>