[ https://issues.apache.org/jira/browse/FLINK-32754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751437#comment-17751437 ]
Hang Ruan commented on FLINK-32754: ----------------------------------- [~Yu Chen] [~yunta] ,Thanks for the issue. I have opened a duplicate issue FLINK-31268 about it. And I have raised a [PR|https://github.com/apache/flink/pull/22048] to fix. > Using SplitEnumeratorContext.metricGroup() in restoreEnumerator causes NPE > -------------------------------------------------------------------------- > > Key: FLINK-32754 > URL: https://issues.apache.org/jira/browse/FLINK-32754 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.17.0, 1.17.1 > Reporter: Yu Chen > Priority: Major > Attachments: image-2023-08-04-18-28-05-897.png > > > We registered some metrics in the `enumerator` of the flip-27 source via > `SplitEnumerator.metricGroup()`, but found that the task prints NPE logs in > JM when restoring, suggesting that `SplitEnumerator. metricGroup()` is null. > {*}Meanwhile, the task does not experience failover, and the Checkpoints > cannot be successfully created even after the task is in running state{*}. > We found that the implementation class of `SplitEnumerator` is > `LazyInitializedCoordinatorContext`, however, the metricGroup() is > initialized after calling lazyInitialize(). By reviewing the code, we found > that at the time of SourceCoordinator.resetToCheckpoint(), lazyInitialize() > has not been called yet, so NPE is thrown. > *Q: Why does this bug prevent the task from creating the Checkpoint?* > `SourceCoordinator.resetToCheckpoint()` throws an NPE which results in the > member variable `enumerator` in `SourceCoordinator` being null. > Unfortunately, all Checkpoint-related calls in `SourceCoordinator` are called > via `runInEventLoop()`. > In `runInEventLoop()`, if the enumerator is null, it will return directly. > *Q: Why this bug doesn't trigger a task failover?* > In `RecreateOnResetOperatorCoordinator.resetAndStart()`, if > `internalCoordinator.resetToCheckpoint` throws an exception, then it will > catch the exception and call `cleanAndFailJob ` to try to fail the job. > However, `globalFailureHandler` is also initialized in `lazyInitialize()`, > while `schedulerExecutor.execute` will ignore the NPE triggered by > `globalFailureHandler.handleGlobalFailure(e)`. > Thus it appears that the task did not failover. > !image-2023-08-04-18-28-05-897.png|width=963,height=443! -- This message was sent by Atlassian Jira (v8.20.10#820010)