[ https://issues.apache.org/jira/browse/FLINK-33402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784240#comment-17784240 ]
Varun Narayanan Chakravarthy commented on FLINK-33402: ------------------------------------------------------ Created a PR: https://github.com/apache/flink/pull/23687 > Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in > Data Loss > ------------------------------------------------------------------------------------ > > Key: FLINK-33402 > URL: https://issues.apache.org/jira/browse/FLINK-33402 > Project: Flink > Issue Type: Bug > Components: Connectors / HybridSource > Affects Versions: 1.16.1 > Environment: Apache Flink 1.16.1 > Mac OSX, Linux etc. > Reporter: Varun Narayanan Chakravarthy > Priority: Critical > Labels: pull-request-available > Attachments: hybridSourceEnumeratorAndReaderFixes.patch > > Original Estimate: 2h > Remaining Estimate: 2h > > Hello Team, > I noticed that there is data loss when using Hybrid Source. We are reading > from a series of concrete File Sources ~100. All these locations are chained > together using the Hybrid source. > The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid > Sources switches the next source before the current source is complete. > Similarly for the Hybrid Source readers. I have also shared the patch file > that fixes the issue. > From the logs: > *Task Manager logs:* > 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Adding > split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, > 94451) hosts=[localhost] ID=0000000229 position=null] 2023-10-10 > 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO > org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek > policy 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: > parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - > Switching to Random IO seek policy 2023-10-10 17:46:24.012 [Source: > parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Finished > reading split(s) [0000000154] 2023-10-10 17:46:24.012 [Source Data Fetcher > for Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished > reading from splits [0000000154] 2023-10-10 17:46:24.014 [Source: > parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Reader > received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: parquet-source > (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader > - No more splits for subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source > (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to > Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for > Source: parquet-source (1/2)#0|#0] INFO > org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Switch source > event: subtask=0 sourceIndex=12 > source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Closing > Source Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] > INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting > down split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: > parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Split fetcher > 0 exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Reader closed: > subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing. > This is assigned to Reader with ID 0000000229. Now, we can see from the logs > this split is added after the no-more splits event and is NOT read. > *Job Manager logs:* > 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO > o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote > split to requesting host '10': Optional[FileSourceSplit: > s3://REDACTED/part-1-13189.snappy [0, 94451) hosts=[localhost] ID=0000000229 > position=null] > 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO > o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Assigned split > to subtask 0 : FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451) > hosts=[localhost] ID=0000000229 position=null > 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO > o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source: > parquet-source received split request from parallel task 1 (#0) > 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] DEBUG > o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest > subtask=1 sourceIndex=11 pendingSplits={} > 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO > o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 1 (on > host '10.4.168.40') is requesting a file source split > 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO > o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote > split to requesting host '10': Optional[FileSourceSplit: > s3://REDACTED/part-0-13127.snappy [0, 88108) hosts=[localhost] ID=0000000045 > position=null] > 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO > o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Assigned split > to subtask 1 : FileSourceSplit: s3://REDACTED/part-0-13127.snappy [0, 88108) > hosts=[localhost] ID=0000000045 position=null > 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO > o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source: > parquet-source received split request from parallel task 0 (#0) > 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] DEBUG > o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest > subtask=0 sourceIndex=11 pendingSplits={} > 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO > o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 0 (on > host '10.4.192.125') is requesting a file source split > 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO > o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - No more splits > available for subtask 0 > 2023-10-10 17:46:24.049 [SourceCoordinator-Source: parquet-source] INFO > o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source: > parquet-source received split request from parallel task 1 (#0) > 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] DEBUG > o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest > subtask=1 sourceIndex=11 pendingSplits={} > 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO > o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 1 (on > host '10.4.168.40') is requesting a file source split > 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO > o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - No more splits > available for subtask 1 > 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG > o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source: > parquet-source received custom event from parallel task 1 (#0): > SourceReaderFinishedEvent\{sourceIndex=11} > 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG > o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSourceEvent > SourceReaderFinishedEvent\{sourceIndex=11} subtask=1 pendingSplits={} > 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG > o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - All readers > finished, ready to switch enumerator! > The assigned split is never processed. > I traced the race conditions bug to the HybridSourceSplitEnumerator and > HybridSourceSplitReader. > There are race in both the source and the reader side. The attached patch > ensures that the switch from one source to another and one reader to another > happen in an atomic fashion with respect to the rest of the code. All section > of the code that use the currentReader or currentEnumerator are read-locked > and the code for reader/enumerator switch is written lock. This ensures that > no other function is executed when the switch for reader/enumerator occurs. > Applying just the fixes to HybridSourceSplitEnumerator will resolve the > majority of the data loss but not all. But, for complete correctness fixes > are needed in both locations. Additionally, current readers also needs to be > reset before proceeding. > With these fixes applied, our team using Flink, at scale of 1B+ records/hour > with 180 Task Managers, did not see any data loss issue. There was also no > noticeable impact on performance due to the read-write mutexes and > concurrency control. > Additonally, integer comparision of objects needs to use `equals` otherwise > it won't work above 128. This > [issue|https://www.mail-archive.com/issues@flink.apache.org/msg647008.html] > has been reported before, by another user. > If the above fixes are valid, please let me know. I would be happy to create > a branch and PR against the repo. I have completed and signed the individual > CLA and will be emailing it soon. -- This message was sent by Atlassian Jira (v8.20.10#820010)