[ https://issues.apache.org/jira/browse/FLINK-33402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Varun Narayanan Chakravarthy updated FLINK-33402: ------------------------------------------------- Description: Hello Team, I noticed that there is data loss when using Hybrid Source. We are reading from a series of concrete File Sources ~100. All these locations are chained together using the Hybrid source. The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid Sources switches the next source before the current source is complete. Similarly for the Hybrid Source readers. I have also shared the patch file that fixes the issue. >From the logs: *Task Manager logs:* 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO o.apache.flink.connector.base.source.reader.SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451) hosts=[localhost] ID=0000000229 position=null] 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy 2023-10-10 17:46:24.012 [Source: parquet-source (1/2)#0|#0] INFO o.apache.flink.connector.base.source.reader.SourceReaderBase - Finished reading split(s) [0000000154] 2023-10-10 17:46:24.012 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished reading from splits [0000000154] 2023-10-10 17:46:24.014 [Source: parquet-source (1/2)#0|#0] INFO o.apache.flink.connector.base.source.reader.SourceReaderBase - Reader received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: parquet-source (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader - No more splits for subtask=0 sourceIndex=11 currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO o.a.flink.connector.base.source.hybrid.HybridSourceReader - Switch source event: subtask=0 sourceIndex=12 source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO o.apache.flink.connector.base.source.reader.SourceReaderBase - Closing Source Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting down split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Split fetcher 0 exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader - Reader closed: subtask=0 sourceIndex=11 currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 ``` We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing. This is assigned to Reader with ID 0000000229. Now, we can see from the logs this split is added after the no-more splits event and is NOT read. ``` *Job Manager logs:* 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote split to requesting host '10': Optional[FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451) hosts=[localhost] ID=0000000229 position=null] 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Assigned split to subtask 0 : FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451) hosts=[localhost] ID=0000000229 position=null 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source: parquet-source received split request from parallel task 1 (#0) 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] DEBUG o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest subtask=1 sourceIndex=11 pendingSplits={} 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 1 (on host '10.4.168.40') is requesting a file source split 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote split to requesting host '10': Optional[FileSourceSplit: s3://REDACTED/part-0-13127.snappy [0, 88108) hosts=[localhost] ID=0000000045 position=null] 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Assigned split to subtask 1 : FileSourceSplit: s3://REDACTED/part-0-13127.snappy [0, 88108) hosts=[localhost] ID=0000000045 position=null 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source: parquet-source received split request from parallel task 0 (#0) 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] DEBUG o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest subtask=0 sourceIndex=11 pendingSplits={} 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 0 (on host '10.4.192.125') is requesting a file source split 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - No more splits available for subtask 0 2023-10-10 17:46:24.049 [SourceCoordinator-Source: parquet-source] INFO o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source: parquet-source received split request from parallel task 1 (#0) 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] DEBUG o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest subtask=1 sourceIndex=11 pendingSplits={} 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 1 (on host '10.4.168.40') is requesting a file source split 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - No more splits available for subtask 1 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source: parquet-source received custom event from parallel task 1 (#0): SourceReaderFinishedEvent\{sourceIndex=11} 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSourceEvent SourceReaderFinishedEvent\{sourceIndex=11} subtask=1 pendingSplits={} 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - All readers finished, ready to switch enumerator! The assigned split is never processed. I traced the race conditions bug to the HybridSourceSplitEnumerator and HybridSourceSplitReader. There are race in both the source and the reader side. The attached patch ensures that the switch from one source to another and one reader to another happen in an atomic fashion with respect to the rest of the code. All section of the code that use the currentReader or currentEnumerator are read-locked and the code for reader/enumerator switch is written lock. This ensures that no other function is executed when the switch for reader/enumerator occurs. Applying just the fixes to HybridSourceSplitEnumerator will resolve the majority of the data loss but not all. But, for complete correctness fixes are needed in both locations. Additionally, current readers also needs to be reset before proceeding. With these fixes applied, our team using Flink, at scale of 1B+ records/hour with 180 Task Managers, did not see any data loss issue. There was also no noticeable impact on performance due to the read-write mutexes and concurrency control. Additonally, integer comparision of objects needs to use `equals` otherwise it won't work above 128. This [issue|https://www.mail-archive.com/issues@flink.apache.org/msg647008.html] has been reported before, by another user. If the above fixes are valid, please let me know. I would be happy to create a branch and PR against the repo. I have completed and signed the individual CLA and will be emailing it soon. was: Hello Team, I noticed that there is data loss when using Hybrid Source. We are reading from a series of concrete File Sources ~100. All these locations are chained together using the Hybrid source. The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid Sources switches the next source before the current source is complete. Similarly for the Hybrid Source readers. I have also shared the patch file that fixes the issue. >From the logs: *Task Manager logs:* 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO o.apache.flink.connector.base.source.reader.SourceReaderBase - Adding split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451) hosts=[localhost] ID=0000000229 position=null] 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy 2023-10-10 17:46:24.012 [Source: parquet-source (1/2)#0|#0] INFO o.apache.flink.connector.base.source.reader.SourceReaderBase - Finished reading split(s) [0000000154] 2023-10-10 17:46:24.012 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished reading from splits [0000000154] 2023-10-10 17:46:24.014 [Source: parquet-source (1/2)#0|#0] INFO o.apache.flink.connector.base.source.reader.SourceReaderBase - Reader received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: parquet-source (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader - No more splits for subtask=0 sourceIndex=11 currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO o.a.flink.connector.base.source.hybrid.HybridSourceReader - Switch source event: subtask=0 sourceIndex=12 source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO o.apache.flink.connector.base.source.reader.SourceReaderBase - Closing Source Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting down split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Split fetcher 0 exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader - Reader closed: subtask=0 sourceIndex=11 currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing. This is assigned to Reader with ID 0000000229. Now, we can see from the logs this split is added after the no-more splits event and is NOT read. *Job Manager logs:* 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote split to requesting host '10': Optional[FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451) hosts=[localhost] ID=0000000229 position=null] 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Assigned split to subtask 0 : FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451) hosts=[localhost] ID=0000000229 position=null 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source: parquet-source received split request from parallel task 1 (#0) 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] DEBUG o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest subtask=1 sourceIndex=11 pendingSplits={} 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 1 (on host '10.4.168.40') is requesting a file source split 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote split to requesting host '10': Optional[FileSourceSplit: s3://REDACTED/part-0-13127.snappy [0, 88108) hosts=[localhost] ID=0000000045 position=null] 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Assigned split to subtask 1 : FileSourceSplit: s3://REDACTED/part-0-13127.snappy [0, 88108) hosts=[localhost] ID=0000000045 position=null 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source: parquet-source received split request from parallel task 0 (#0) 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] DEBUG o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest subtask=0 sourceIndex=11 pendingSplits={} 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 0 (on host '10.4.192.125') is requesting a file source split 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - No more splits available for subtask 0 2023-10-10 17:46:24.049 [SourceCoordinator-Source: parquet-source] INFO o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source: parquet-source received split request from parallel task 1 (#0) 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] DEBUG o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest subtask=1 sourceIndex=11 pendingSplits={} 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 1 (on host '10.4.168.40') is requesting a file source split 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - No more splits available for subtask 1 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source: parquet-source received custom event from parallel task 1 (#0): SourceReaderFinishedEvent\{sourceIndex=11} 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSourceEvent SourceReaderFinishedEvent\{sourceIndex=11} subtask=1 pendingSplits={} 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - All readers finished, ready to switch enumerator! The assigned split is never processed. I traced the race conditions bug to the HybridSourceSplitEnumerator and HybridSourceSplitReader. There are race in both the source and the reader side. The attached patch ensures that the switch from one source to another and one reader to another happen in an atomic fashion with respect to the rest of the code. All section of the code that use the currentReader or currentEnumerator are read-locked and the code for reader/enumerator switch is written lock. This ensures that no other function is executed when the switch for reader/enumerator occurs. Applying just the fixes to HybridSourceSplitEnumerator will resolve the majority of the data loss but not all. But, for complete correctness fixes are needed in both locations. Additionally, current readers also needs to be reset before proceeding. With these fixes applied, our team using Flink, at scale of 1B+ records/hour with 180 Task Managers, did not see any data loss issue. There was also no noticeable impact on performance due to the read-write mutexes and concurrency control. Additonally, integer comparision of objects needs to use `equals` otherwise it won't work above 128. This [issue|https://www.mail-archive.com/issues@flink.apache.org/msg647008.html] has been reported before, by another user. If the above fixes are valid, please let me know. I would be happy to create a branch and PR against the repo. I have completed and signed the individual CLA and will be emailing it soon. > Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in > Data Loss > ------------------------------------------------------------------------------------ > > Key: FLINK-33402 > URL: https://issues.apache.org/jira/browse/FLINK-33402 > Project: Flink > Issue Type: Bug > Components: Connectors / HybridSource > Affects Versions: 1.16.1 > Environment: Apache Flink 1.16.1 > Mac OSX, Linux etc. > Reporter: Varun Narayanan Chakravarthy > Assignee: Varun Narayanan Chakravarthy > Priority: Critical > Labels: pull-request-available > Attachments: hybridSourceEnumeratorAndReaderFixes.patch > > Original Estimate: 2h > Remaining Estimate: 2h > > Hello Team, > I noticed that there is data loss when using Hybrid Source. We are reading > from a series of concrete File Sources ~100. All these locations are chained > together using the Hybrid source. > The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid > Sources switches the next source before the current source is complete. > Similarly for the Hybrid Source readers. I have also shared the patch file > that fixes the issue. > From the logs: > *Task Manager logs:* > 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Adding > split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, > 94451) hosts=[localhost] ID=0000000229 position=null] > 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: parquet-source > (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to > Random IO seek policy > 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: parquet-source > (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to > Random IO seek policy > 2023-10-10 17:46:24.012 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Finished > reading split(s) [0000000154] > 2023-10-10 17:46:24.012 [Source Data Fetcher for Source: parquet-source > (1/2)#0|#0] INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher > - Finished reading from splits [0000000154] > 2023-10-10 17:46:24.014 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Reader > received NoMoreSplits event. > 2023-10-10 17:46:24.014 [Source: parquet-source (1/2)#0|#0] DEBUG > o.a.flink.connector.base.source.hybrid.HybridSourceReader - No more splits > for subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > > 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source > (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to > Random IO seek policy > 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source > (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to > Random IO seek policy > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Switch source > event: subtask=0 sourceIndex=12 > source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Closing > Source Reader. > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting down > split fetcher 0 > 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: parquet-source > (1/2)#0|#0] INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher > - Split fetcher 0 exited. > 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Reader closed: > subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > ``` > We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing. > This is assigned to Reader with ID 0000000229. Now, we can see from the logs > this split is added after the no-more splits event and is NOT read. > ``` > *Job Manager logs:* > 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO > o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote > split to requesting host '10': Optional[FileSourceSplit: > s3://REDACTED/part-1-13189.snappy [0, 94451) hosts=[localhost] ID=0000000229 > position=null] > 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO > o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Assigned split > to subtask 0 : FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451) > hosts=[localhost] ID=0000000229 position=null > 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO > o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source: > parquet-source received split request from parallel task 1 (#0) > 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] DEBUG > o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest > subtask=1 sourceIndex=11 pendingSplits={} > 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO > o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 1 (on > host '10.4.168.40') is requesting a file source split > 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO > o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote > split to requesting host '10': Optional[FileSourceSplit: > s3://REDACTED/part-0-13127.snappy [0, 88108) hosts=[localhost] ID=0000000045 > position=null] > 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO > o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Assigned split > to subtask 1 : FileSourceSplit: s3://REDACTED/part-0-13127.snappy [0, 88108) > hosts=[localhost] ID=0000000045 position=null > 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO > o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source: > parquet-source received split request from parallel task 0 (#0) > 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] DEBUG > o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest > subtask=0 sourceIndex=11 pendingSplits={} > 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO > o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 0 (on > host '10.4.192.125') is requesting a file source split > 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO > o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - No more splits > available for subtask 0 > 2023-10-10 17:46:24.049 [SourceCoordinator-Source: parquet-source] INFO > o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source: > parquet-source received split request from parallel task 1 (#0) > 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] DEBUG > o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSplitRequest > subtask=1 sourceIndex=11 pendingSplits={} > 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO > o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - Subtask 1 (on > host '10.4.168.40') is requesting a file source split > 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO > o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - No more splits > available for subtask 1 > 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG > o.apache.flink.runtime.source.coordinator.SourceCoordinator - Source Source: > parquet-source received custom event from parallel task 1 (#0): > SourceReaderFinishedEvent\{sourceIndex=11} > 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG > o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - handleSourceEvent > SourceReaderFinishedEvent\{sourceIndex=11} subtask=1 pendingSplits={} > 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG > o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator - All readers > finished, ready to switch enumerator! > The assigned split is never processed. > I traced the race conditions bug to the HybridSourceSplitEnumerator and > HybridSourceSplitReader. > There are race in both the source and the reader side. The attached patch > ensures that the switch from one source to another and one reader to another > happen in an atomic fashion with respect to the rest of the code. All section > of the code that use the currentReader or currentEnumerator are read-locked > and the code for reader/enumerator switch is written lock. This ensures that > no other function is executed when the switch for reader/enumerator occurs. > Applying just the fixes to HybridSourceSplitEnumerator will resolve the > majority of the data loss but not all. But, for complete correctness fixes > are needed in both locations. Additionally, current readers also needs to be > reset before proceeding. > With these fixes applied, our team using Flink, at scale of 1B+ records/hour > with 180 Task Managers, did not see any data loss issue. There was also no > noticeable impact on performance due to the read-write mutexes and > concurrency control. > Additonally, integer comparision of objects needs to use `equals` otherwise > it won't work above 128. This > [issue|https://www.mail-archive.com/issues@flink.apache.org/msg647008.html] > has been reported before, by another user. > If the above fixes are valid, please let me know. I would be happy to create > a branch and PR against the repo. I have completed and signed the individual > CLA and will be emailing it soon. -- This message was sent by Atlassian Jira (v8.20.10#820010)