Hi Kevin,

We’ve merged the fix into the 1.20 branch, and AFAIK, the community will
kick off the next patch release (1.20.3) soon. In the meantime, if you’re
interested, you may build from the release-1.20 branch[1] to verify the fix.


Thank you again for reporting the issue and for your valuable feedback.

[1] https://github.com/apache/flink/tree/release-1.20

Best,
Zakelly

On Wed, Sep 10, 2025 at 1:33 AM Kevin Kim <[email protected]>
wrote:

> Thank you! Looking forward to testing the fix on my side.
>
> Kevin
>
> On Tue, Sep 9, 2025 at 9:15 AM Gabor Somogyi <[email protected]>
> wrote:
>
>> Hi Kevin,
>>
>> We've already merged the fix to master, backports are on the way :)
>>
>> BR,
>> G
>>
>>
>> On Tue, Sep 9, 2025 at 2:26 PM Kevin Kim <[email protected]>
>> wrote:
>>
>>> Hi Zakelly,
>>>
>>> Yes, this job contains both operator state and keyed state. Happy to
>>> provide more details as needed.
>>>
>>> Kevin
>>>
>>> On Mon, Sep 8, 2025 at 10:48 PM Zakelly Lan <[email protected]>
>>> wrote:
>>>
>>>> Hi Kevin,
>>>>
>>>> One more thing I want to make sure about the job. Is this a datastream
>>>> job with operator state (not only keyed states after `keyby()`)?
>>>>
>>>>
>>>> Thanks!
>>>>
>>>> Best,
>>>> Zakelly
>>>>
>>>> On Fri, Sep 5, 2025 at 9:53 PM Kevin Kim <
>>>> [email protected]> wrote:
>>>>
>>>>> Thanks so much!
>>>>>
>>>>> More context: this file merging feature is really promising for our
>>>>> use case. Given the large state size and parallelism, we occasionally run
>>>>> into S3 rate limits at checkpoint/savepoint time.
>>>>>
>>>>> The few times I tried this file merging, it helped quite a bit, but
>>>>> I've had to turn it off for now due to this NPE, which happens 
>>>>> occasionally
>>>>> but not always
>>>>>
>>>>> On Fri, Sep 5, 2025, 1:58 AM Gabor Somogyi <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Guys,
>>>>>>
>>>>>> This sounds severe and I'm also taking a look...
>>>>>>
>>>>>> G
>>>>>>
>>>>>>
>>>>>> On Fri, Sep 5, 2025 at 4:44 AM Zakelly Lan <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Kevin,
>>>>>>>
>>>>>>> Thanks for the details. It's really helpful. I'm trying to reproduce
>>>>>>> this according to your setup. Will let you know if any updates.
>>>>>>>
>>>>>>> I create a jira issue to track this:
>>>>>>> https://issues.apache.org/jira/browse/FLINK-38327
>>>>>>>
>>>>>>>
>>>>>>> Best,
>>>>>>> Zakelly
>>>>>>>
>>>>>>> On Thu, Sep 4, 2025 at 7:42 PM Kevin Kim <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hi Zakelly,
>>>>>>>>
>>>>>>>> Thanks for the reply.
>>>>>>>>
>>>>>>>> This job is running on Kubernetes using the Apache Flink Kubernetes
>>>>>>>> operator. This NullPointerException happened during a job restart 
>>>>>>>> after one
>>>>>>>> of the TaskManagers restarted because the underlying node running the
>>>>>>>> TaskManager pod was scaled down for maintenance. There was no 
>>>>>>>> rescaling or
>>>>>>>> parallelism change.
>>>>>>>>
>>>>>>>> The job is quite large due to heavy input traffic + state size:
>>>>>>>> 2100 parallelism
>>>>>>>> taskmanager.numberOfTaskSlots: 14 (so 150 TaskManagers total)
>>>>>>>> RocksDBStateBackend used for state management.
>>>>>>>> Checkpoints/savepoints are written to S3 in AWS.
>>>>>>>> According to the Flink Checkpoint UI, the full state size is ~600GB
>>>>>>>>
>>>>>>>> Please let me know if more details would be helpful.
>>>>>>>>
>>>>>>>> Best regards,
>>>>>>>> Kevin
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Sep 4, 2025 at 6:16 AM Zakelly Lan <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Kevin,
>>>>>>>>>
>>>>>>>>> Would you please provide more info about the setup? Is this a
>>>>>>>>> failover or manual job restart with or without a change of parallelism
>>>>>>>>> (rescale)?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Zakelly
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Sep 3, 2025 at 2:43 AM Kevin Kim <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Has anyone seen this NullPointerException after enabling
>>>>>>>>>> checkpoint file merging? I'm running a job with Flink 1.20.2 with 
>>>>>>>>>> these
>>>>>>>>>> configs:
>>>>>>>>>>
>>>>>>>>>> execution.checkpointing.file-merging.enabled: true
>>>>>>>>>> execution.checkpointing.file-merging.max-file-size: 32m
>>>>>>>>>> execution.checkpointing.timeout: "10min"
>>>>>>>>>> execution.checkpointing.tolerable-failed-checkpoints: 3
>>>>>>>>>> execution.checkpointing.min-pause: "2min"
>>>>>>>>>> execution.checkpointing.interval: "2min"
>>>>>>>>>>
>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.isManagedByFileMergingManager(FileMergingSnapshotManagerBase.java:927)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$11(FileMergingSnapshotManagerBase.java:866)
>>>>>>>>>> at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1220)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$12(FileMergingSnapshotManagerBase.java:861)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1003)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>>>>>>>> at
>>>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.restoreStateHandles(FileMergingSnapshotManagerBase.java:858)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManagerRestoreOperation.restore(SubtaskFileMergingManagerRestoreOperation.java:102)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.registerRestoredStateToFileMergingManager(StreamTaskStateInitializerImpl.java:353)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:275)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>>>>>>>>>> at java.base/java.lang.Thread.run(Thread.java:840)
>>>>>>>>>>
>>>>>>>>>

Reply via email to