[ 
https://issues.apache.org/jira/browse/FLINK-32754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751437#comment-17751437
 ] 

Hang Ruan commented on FLINK-32754:
-----------------------------------

[~Yu Chen] [~yunta] ,Thanks for the issue.

I have opened a duplicate issue FLINK-31268 about it. And I have raised a 
[PR|https://github.com/apache/flink/pull/22048] to fix.

> Using SplitEnumeratorContext.metricGroup() in restoreEnumerator causes NPE
> --------------------------------------------------------------------------
>
>                 Key: FLINK-32754
>                 URL: https://issues.apache.org/jira/browse/FLINK-32754
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.17.0, 1.17.1
>            Reporter: Yu Chen
>            Priority: Major
>         Attachments: image-2023-08-04-18-28-05-897.png
>
>
> We registered some metrics in the `enumerator` of the flip-27 source via 
> `SplitEnumerator.metricGroup()`, but found that the task prints NPE logs in 
> JM when restoring, suggesting that `SplitEnumerator. metricGroup()` is null.
> {*}Meanwhile, the task does not experience failover, and the Checkpoints 
> cannot be successfully created even after the task is in running state{*}.
> We found that the implementation class of `SplitEnumerator` is 
> `LazyInitializedCoordinatorContext`, however, the metricGroup() is 
> initialized after calling lazyInitialize(). By reviewing the code, we found 
> that at the time of SourceCoordinator.resetToCheckpoint(), lazyInitialize() 
> has not been called yet, so NPE is thrown.
> *Q: Why does this bug prevent the task from creating the Checkpoint?*
> `SourceCoordinator.resetToCheckpoint()` throws an NPE which results in the 
> member variable `enumerator` in `SourceCoordinator` being null. 
> Unfortunately, all Checkpoint-related calls in `SourceCoordinator` are called 
> via `runInEventLoop()`.
> In `runInEventLoop()`, if the enumerator is null, it will return directly.
> *Q: Why this bug doesn't trigger a task failover?*
> In `RecreateOnResetOperatorCoordinator.resetAndStart()`, if 
> `internalCoordinator.resetToCheckpoint` throws an exception, then it will 
> catch the exception and call `cleanAndFailJob ` to try to fail the job.
> However, `globalFailureHandler` is also initialized in `lazyInitialize()`, 
> while `schedulerExecutor.execute` will ignore the NPE triggered by 
> `globalFailureHandler.handleGlobalFailure(e)`.
> Thus it appears that the task did not failover.
> !image-2023-08-04-18-28-05-897.png|width=963,height=443!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to