Varun Narayanan Chakravarthy created FLINK-33402:

             Summary: Hybrid Source Concurrency Race Condition Fixes and 
Related Bugs
                 Key: FLINK-33402
             Project: Flink
          Issue Type: Bug
          Components: Connectors / HybridSource
    Affects Versions: 1.16.1
         Environment: Apache Flink 1.16.1
Mac OSX, Linux etc. 
            Reporter: Varun Narayanan Chakravarthy
         Attachments: hybridSourceEnumeratorAndReaderFixes.patch

Hello Team,
I noticed that there is data loss when using Hybrid Source. We are reading from 
a series of concrete File Sources ~100. All these locations are chained 
together using the Hybrid source.
The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid 
Sources switches the next source before the current source is complete. 
Similarly for the Hybrid Source readers. I have also shared the patch file that 
fixes the issue.
>From the logs:

*Task Manager logs:* 
2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0] INFO  
o.apache.flink.connector.base.source.reader.SourceReaderBase  - Adding split(s) 
to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451)  
hosts=[localhost] ID=0000000229 position=null] 2023-10-10 17:46:23.715 [Source 
Data Fetcher for Source: parquet-source (1/2)#0] INFO  
org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to Random IO seek policy 
2023-10-10 17:46:23.715 [Source Data Fetcher for Source: parquet-source 
(1/2)#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to Random 
IO seek policy 2023-10-10 17:46:24.012 [Source: parquet-source (1/2)#0] INFO  
o.apache.flink.connector.base.source.reader.SourceReaderBase  - Finished 
reading split(s) [0000000154] 2023-10-10 17:46:24.012 [Source Data Fetcher for 
Source: parquet-source (1/2)#0] INFO  
o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Finished reading 
from splits [0000000154] 2023-10-10 17:46:24.014 [Source: parquet-source 
(1/2)#0] INFO  o.apache.flink.connector.base.source.reader.SourceReaderBase  - 
Reader received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: 
parquet-source (1/2)#0] DEBUG 
o.a.flink.connector.base.source.hybrid.HybridSourceReader  - No more splits for 
subtask=0 sourceIndex=11 
 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source 
(1/2)#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to Random 
IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: 
parquet-source (1/2)#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - 
Switching to Random IO seek policy 2023-10-10 17:46:24.116 [Source: 
parquet-source (1/2)#0] INFO  
o.a.flink.connector.base.source.hybrid.HybridSourceReader  - Switch source 
event: subtask=0 sourceIndex=12 
source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e 2023-10-10 
17:46:24.116 [Source: parquet-source (1/2)#0] INFO  
o.apache.flink.connector.base.source.reader.SourceReaderBase  - Closing Source 
Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0] INFO  
o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Shutting down 
split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: 
parquet-source (1/2)#0] INFO  
o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Split fetcher 0 
exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0] DEBUG 
o.a.flink.connector.base.source.hybrid.HybridSourceReader  - Reader closed: 
subtask=0 sourceIndex=11 
We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing.  
This is assigned to Reader with ID 0000000229. Now, we can see from the logs 
this split is added after the no-more splits event and is NOT read.

*Job Manager logs:*
2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO  
o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner  - Assigning remote split 
to requesting host '10': Optional[FileSourceSplit: 
s3://REDACTED/part-1-13189.snappy [0, 94451)  hosts=[localhost] ID=0000000229 
2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO  
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Assigned split 
to subtask 0 : FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451)  
hosts=[localhost] ID=0000000229 position=null
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
parquet-source received split request from parallel task 1 (#0)
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] DEBUG 
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSplitRequest 
subtask=1 sourceIndex=11 pendingSplits={}
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Subtask 1 (on 
host '') is requesting a file source split
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner  - Assigning remote split 
to requesting host '10': Optional[FileSourceSplit: 
s3://REDACTED/part-0-13127.snappy [0, 88108)  hosts=[localhost] ID=0000000045 
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Assigned split 
to subtask 1 : FileSourceSplit: s3://REDACTED/part-0-13127.snappy [0, 88108)  
hosts=[localhost] ID=0000000045 position=null
2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO  
o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
parquet-source received split request from parallel task 0 (#0)
2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] DEBUG 
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSplitRequest 
subtask=0 sourceIndex=11 pendingSplits={}
2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO  
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Subtask 0 (on 
host '') is requesting a file source split
2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO  
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - No more splits 
available for subtask 0
2023-10-10 17:46:24.049 [SourceCoordinator-Source: parquet-source] INFO  
o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
parquet-source received split request from parallel task 1 (#0)
2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] DEBUG 
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSplitRequest 
subtask=1 sourceIndex=11 pendingSplits={}
2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO  
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Subtask 1 (on 
host '') is requesting a file source split
2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO  
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - No more splits 
available for subtask 1
2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG 
o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
parquet-source received custom event from parallel task 1 (#0): 
2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG 
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSourceEvent 
SourceReaderFinishedEvent\{sourceIndex=11} subtask=1 pendingSplits={}
2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG 
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - All readers finished, 
ready to switch enumerator!

The assigned split is never processed.
I traced the race conditions bug to the HybridSourceSplitEnumerator and 
There are race in both the source and the reader side. The attached patch 
ensures that the switch from one source to another and one reader to another 
happen in an atomic fashion with respect to the rest of the code. All section 
of the code that use the currentReader or currentEnumerator are read-locked and 
the code for reader/enumerator switch is written lock. This ensures that no 
other function is executed when the switch for reader/enumerator occurs. 
Applying just the fixes to HybridSourceSplitEnumerator will resolve the 
majority of the data loss but not all. But, for complete correctness fixes are 
needed in both locations.

With these fixes applied, our team using Flink, at scale of 1B+ records/hour 
with 180 Task Managers, did not see any data loss issue. There was also no 
noticeable impact on performance due to the read-write mutexes and concurrency 

Additonally, integer comparision of objects needs to use `equals` otherwise it 
won't work above 128. This issue has been reported before, by another user.

If the above fixes are valid, please let me know. I would be happy to create a 
branch and PR against the repo. I have completed and signed the individual CLA 
and will be emailing it soon.

This message was sent by Atlassian Jira

Reply via email to