[ https://issues.apache.org/jira/browse/FLINK-28817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17576998#comment-17576998 ]
Michael commented on FLINK-28817: --------------------------------- Clarification for the problem: 1. -- HybridSourceSplitEnumerator.switchEnumerator failed with com.amazonaws.SdkClientException: Unable to execute HTTP request: Read timed out Caused by: java.net.SocketTimeoutException: Read timed out This is intermittent error, it is usually fixed, when Flink recover from checkpoint & repeat the operation 2. -- Flink starts recovering from checkpoint: CheckpointCoordinator - Restoring job 00000000000000000000000000000000 from Checkpoint SourceCoordinator - Closing SourceCoordinator for source Source: hybrid-source. SourceCoordinator - Restoring SplitEnumerator of source Source: hybrid-source from checkpoint. SourceCoordinator - Starting split enumerator for source Source: hybrid-source. HybridSourceSplitEnumerator - Restoring enumerator for sourceIndex=788 HybridSourceSplitEnumerator - Starting enumerator for sourceIndex=788 3. -- HybridSourceSplitEnumerator receives SourceReaderFinishedEvent\{sourceIndex=-1} HybridSourceSplitEnumerator - handleSourceEvent SourceReaderFinishedEvent\{sourceIndex=-1} subtask=6 4. -- Processing this event cause 2022/08/08 08:39:34.862 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught exception in the SplitEnumerator for Source Source: hybrid-source while handling operator event SourceEventWrapper[SourceReaderFinishedEvent\{sourceIndex=-1}] from subtask 6. Triggering job failover. java.lang.NullPointerException: Source for index=0 is not available from sources: \{788=org.apache.flink.connector.file.src.SppFileSource@5a3803f3} at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104) at org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36) at org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:152) at org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:226) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$1(SourceCoordinator.java:182) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:344) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) I'm running my version of the Hybrid Sources with additional logging, so line numbers & some names could be different from Flink Github. My Observation: the problem is intermittent, sometimes it works ok, i.e. SourceReaderFinishedEvent comes with correct sourceIndex. As I see from my log, it happens if my SourceFactory.create() is executed BEFORE HybridSourceSplitEnumerator - handleSourceEvent SourceReaderFinishedEvent\{sourceIndex=-1}. If HybridSourceSplitEnumerator - handleSourceEvent is executed before my SourceFactory.create(), then sourceIndex=-1 in SourceReaderFinishedEvent [~thw] , [~mason6345] could you, please, look at this issue? Preconditions-checkNotNull-error log from JobMgr is attached > NullPointerException in HybridSource when restoring from checkpoint > ------------------------------------------------------------------- > > Key: FLINK-28817 > URL: https://issues.apache.org/jira/browse/FLINK-28817 > Project: Flink > Issue Type: Bug > Components: Connectors / Common > Affects Versions: 1.14.4, 1.15.1 > Reporter: Michael > Priority: Major > Attachments: bf-29-JM-err-analysis.log > > > Scenario: > # CheckpointCoordinator - Completed checkpoint 14 for job > 00000000000000000000000000000000 > # HybridSource successfully completed processing a few SourceFactories, that > reads from s3 > # Next SourceFactory try to read contents of s3 dir, and it cause an error > Unable to execute HTTP request: Read timed out > # CheckpointCoordinator - Restoring job 00000000000000000000000000000000 > from Checkpoint 14 > # HybridSourceSplitEnumerator - Restoring enumerator for sourceIndex=47 > # This restoring fail, because of NullPointerException: in > HybridSourceSplitEnumerator.close: > # Because of this issue, all future restoring from checkpoint also failed > Extract from the log: -------------- > 2022/08/02 22:26:51.227 INFO o.a.f.r.c.CheckpointCoordinator - Restoring job > 00000000000000000000000000000000 from Checkpoint 14 @ 1659478803949 for > 00000000000000000000000000000000 located at > s3://spp-state-371299021277-tech-aidata-di/mb-backfill-jul-20-backfill-prd/2/checkpoints/00000000000000000000000000000000/chk-14. > 2022/08/02 22:26:51.240 INFO o.a.f.r.c.CheckpointCoordinator - No master > state to restore > 2022/08/02 22:26:51.240 INFO o.a.f.r.o.c.RecreateOnResetOperatorCoordinator > - Resetting coordinator to checkpoint. > 2022/08/02 22:26:51.241 INFO o.a.f.r.s.c.SourceCoordinator - Closing > SourceCoordinator for source Source: hybrid-source. > 2022/08/02 22:26:51.424 INFO o.a.f.r.s.c.SourceCoordinator - Restoring > SplitEnumerator of source Source: hybrid-source from checkpoint. > 2022/08/02 22:26:51.425 INFO o.a.f.r.s.c.SourceCoordinator - Starting split > enumerator for source Source: hybrid-source. > 2022/08/02 22:26:51.426 INFO c.i.d.s.f.s.c.b.HourlyFileSourceFactory - > Reading input data from path > s3://idl-kafka-connect-ued-raw-uw2-data-lake-prd/data/topics/sbseg-qbo-clickstream/d_20220729-2300 > for 2022-07-29T23:00:00Z > 2022/08/02 22:26:51.426 INFO o.a.f.c.b.s.h.HybridSourceSplitEnumerator - > Restoring enumerator for sourceIndex=47 > > 2022/08/02 22:26:51.435 INFO o.a.f.runtime.jobmaster.JobMaster - Trying to > recover from a global failure. > org.apache.flink.util.FlinkException: Global failure triggered by > OperatorCoordinator for 'Source: hybrid-source -> decrypt -> map2Events -> > filterOutNulls -> assignTimestampsAndWatermarks -> logRawJson' (operator > fd9fbc680ee884c4eafd0b9c2d3d007f). > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:545) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > ... > Caused by: java.lang.NullPointerException: null > at > org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.close(HybridSourceSplitEnumerator.java:246) > at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.close(SourceCoordinator.java:151) > at > org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:70) > at java.lang.Thread.run(Thread.java:750) > ----------------------------------- -- This message was sent by Atlassian Jira (v8.20.10#820010)