Hi, folks I’m running Flink application that use HybridSource, patched with fixes FLINK-27479 and FLINK-27529
This application use HybridSource and presto plugin to read from a few thousands s3 directories, and then switch to reading from Kafka. Reading from s3 could cause intermittent errors, that usually are fixed after retrying, but there is a problem, when Flink try to recover from this failure and restart from checkpoint: java.lang.NullPointerException: Source for index=0 not available at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104) at org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36) Complete scenario: 1. CheckpointCoordinator - Completed checkpoint 14 for job 00000000000000000000000000000000 2. HybridSource successfully completed processing a few SourceFactories, that reads from s3 3. Next SourceFactory try to read contents of s3 dir, and it cause an error Unable to execute HTTP request: Read timed out 4. CheckpointCoordinator - Restoring job 00000000000000000000000000000000 from Checkpoint 14 5. HybridSourceSplitEnumerator - Restoring enumerator for sourceIndex=47 6. This restoring fail, because of NullPointerException: in HybridSourceSplitEnumerator.close 7. Again, CheckpointCoordinator trying to - Restoring job 00000000000000000000000000000000 from Checkpoint 14 8. It causes 2022/08/02 22:26:52.469 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught exception in the SplitEnumerator for Source Source: hybrid-source while handling operator event SourceEventWrapper[SourceReaderFinishedEvent{sourceIndex=-1}] from subtask 10. Triggering job failover. java.lang.NullPointerException: Source for index=0 not available at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104) at org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36) at org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:149) at org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:223) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$1(SourceCoordinator.java:182) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:344) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) 10. And this pattern continues forever: Flink try restoring from checkpoint, but it fails, because of NullPointerException: Source for index=0 not available Any idea, what could be the cause of the problem? Could some experts in HybridSource look at the issue? I have attached extract of JobMgr log, that contains related information, I can send complete log, but its size is a few M. The problem is reproducible, after a few hours run in my environment. And I think we need Jira for this issue, could someone, please, create it?
bf-29-JM-err-analysis.log
Description: bf-29-JM-err-analysis.log