Hi Kevin,

We've already merged the fix to master, backports are on the way :)

BR,
G


On Tue, Sep 9, 2025 at 2:26 PM Kevin Kim <kevin....@alumni.princeton.edu>
wrote:

> Hi Zakelly,
>
> Yes, this job contains both operator state and keyed state. Happy to
> provide more details as needed.
>
> Kevin
>
> On Mon, Sep 8, 2025 at 10:48 PM Zakelly Lan <zakelly....@gmail.com> wrote:
>
>> Hi Kevin,
>>
>> One more thing I want to make sure about the job. Is this a datastream
>> job with operator state (not only keyed states after `keyby()`)?
>>
>>
>> Thanks!
>>
>> Best,
>> Zakelly
>>
>> On Fri, Sep 5, 2025 at 9:53 PM Kevin Kim <kevin....@alumni.princeton.edu>
>> wrote:
>>
>>> Thanks so much!
>>>
>>> More context: this file merging feature is really promising for our use
>>> case. Given the large state size and parallelism, we occasionally run into
>>> S3 rate limits at checkpoint/savepoint time.
>>>
>>> The few times I tried this file merging, it helped quite a bit, but I've
>>> had to turn it off for now due to this NPE, which happens occasionally but
>>> not always
>>>
>>> On Fri, Sep 5, 2025, 1:58 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
>>> wrote:
>>>
>>>> Hi Guys,
>>>>
>>>> This sounds severe and I'm also taking a look...
>>>>
>>>> G
>>>>
>>>>
>>>> On Fri, Sep 5, 2025 at 4:44 AM Zakelly Lan <zakelly....@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Kevin,
>>>>>
>>>>> Thanks for the details. It's really helpful. I'm trying to reproduce
>>>>> this according to your setup. Will let you know if any updates.
>>>>>
>>>>> I create a jira issue to track this:
>>>>> https://issues.apache.org/jira/browse/FLINK-38327
>>>>>
>>>>>
>>>>> Best,
>>>>> Zakelly
>>>>>
>>>>> On Thu, Sep 4, 2025 at 7:42 PM Kevin Kim <
>>>>> kevin....@alumni.princeton.edu> wrote:
>>>>>
>>>>>> Hi Zakelly,
>>>>>>
>>>>>> Thanks for the reply.
>>>>>>
>>>>>> This job is running on Kubernetes using the Apache Flink Kubernetes
>>>>>> operator. This NullPointerException happened during a job restart after 
>>>>>> one
>>>>>> of the TaskManagers restarted because the underlying node running the
>>>>>> TaskManager pod was scaled down for maintenance. There was no rescaling 
>>>>>> or
>>>>>> parallelism change.
>>>>>>
>>>>>> The job is quite large due to heavy input traffic + state size:
>>>>>> 2100 parallelism
>>>>>> taskmanager.numberOfTaskSlots: 14 (so 150 TaskManagers total)
>>>>>> RocksDBStateBackend used for state management. Checkpoints/savepoints
>>>>>> are written to S3 in AWS.
>>>>>> According to the Flink Checkpoint UI, the full state size is ~600GB
>>>>>>
>>>>>> Please let me know if more details would be helpful.
>>>>>>
>>>>>> Best regards,
>>>>>> Kevin
>>>>>>
>>>>>>
>>>>>> On Thu, Sep 4, 2025 at 6:16 AM Zakelly Lan <zakelly....@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Kevin,
>>>>>>>
>>>>>>> Would you please provide more info about the setup? Is this a
>>>>>>> failover or manual job restart with or without a change of parallelism
>>>>>>> (rescale)?
>>>>>>>
>>>>>>>
>>>>>>> Best,
>>>>>>> Zakelly
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Sep 3, 2025 at 2:43 AM Kevin Kim <kkim...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Has anyone seen this NullPointerException after enabling checkpoint
>>>>>>>> file merging? I'm running a job with Flink 1.20.2 with these configs:
>>>>>>>>
>>>>>>>> execution.checkpointing.file-merging.enabled: true
>>>>>>>> execution.checkpointing.file-merging.max-file-size: 32m
>>>>>>>> execution.checkpointing.timeout: "10min"
>>>>>>>> execution.checkpointing.tolerable-failed-checkpoints: 3
>>>>>>>> execution.checkpointing.min-pause: "2min"
>>>>>>>> execution.checkpointing.interval: "2min"
>>>>>>>>
>>>>>>>> java.lang.NullPointerException
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.isManagedByFileMergingManager(FileMergingSnapshotManagerBase.java:927)
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$11(FileMergingSnapshotManagerBase.java:866)
>>>>>>>> at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1220)
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$12(FileMergingSnapshotManagerBase.java:861)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>>>>>>>> at
>>>>>>>> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
>>>>>>>> at
>>>>>>>> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
>>>>>>>> at
>>>>>>>> java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1003)
>>>>>>>> at
>>>>>>>> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
>>>>>>>> at
>>>>>>>> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>>>>>> at
>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.restoreStateHandles(FileMergingSnapshotManagerBase.java:858)
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManagerRestoreOperation.restore(SubtaskFileMergingManagerRestoreOperation.java:102)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.registerRestoredStateToFileMergingManager(StreamTaskStateInitializerImpl.java:353)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:275)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>>>>>>>> at java.base/java.lang.Thread.run(Thread.java:840)
>>>>>>>>
>>>>>>>

Reply via email to