[ 
https://issues.apache.org/jira/browse/FLINK-33402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Varun Narayanan Chakravarthy updated FLINK-33402:
-------------------------------------------------
    Description: 
Hello Team,
I noticed that there is data loss when using Hybrid Source. We are reading from 
a series of concrete File Sources ~100. All these locations are chained 
together using the Hybrid source.
The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid 
Sources switches the next source before the current source is complete. 
Similarly for the Hybrid Source readers. I have also shared the patch file that 
fixes the issue.
>From the logs:

*Task Manager logs:* 
2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO  
o.apache.flink.connector.base.source.reader.SourceReaderBase  - Adding split(s) 
to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451)  
hosts=[localhost] ID=0000000229 position=null] 
2023-10-10 17:46:23.715 [Source Data Fetcher for Source: parquet-source 
(1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to 
Random IO seek policy 
2023-10-10 17:46:23.715 [Source Data Fetcher for Source: parquet-source 
(1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to 
Random IO seek policy 
2023-10-10 17:46:24.012 [Source: parquet-source (1/2)#0|#0] INFO  
o.apache.flink.connector.base.source.reader.SourceReaderBase  - Finished 
reading split(s) [0000000154] 
2023-10-10 17:46:24.012 [Source Data Fetcher for Source: parquet-source 
(1/2)#0|#0] INFO  o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  
- Finished reading from splits [0000000154] 
2023-10-10 17:46:24.014 [Source: parquet-source (1/2)#0|#0] INFO  
o.apache.flink.connector.base.source.reader.SourceReaderBase  - Reader received 
NoMoreSplits event. 
2023-10-10 17:46:24.014 [Source: parquet-source (1/2)#0|#0] DEBUG 
o.a.flink.connector.base.source.hybrid.HybridSourceReader  - No more splits for 
subtask=0 sourceIndex=11 
currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
 
2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source 
(1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to 
Random IO seek policy 
2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source 
(1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to 
Random IO seek policy 
2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO  
o.a.flink.connector.base.source.hybrid.HybridSourceReader  - Switch source 
event: subtask=0 sourceIndex=12 
source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e 2023-10-10 
17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO  
o.apache.flink.connector.base.source.reader.SourceReaderBase  - Closing Source 
Reader. 
2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO  
o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Shutting down 
split fetcher 0 
2023-10-10 17:46:24.198 [Source Data Fetcher for Source: parquet-source 
(1/2)#0|#0] INFO  o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  
- Split fetcher 0 exited. 
2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG 
o.a.flink.connector.base.source.hybrid.HybridSourceReader  - Reader closed: 
subtask=0 sourceIndex=11 
currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8

```
We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing.  
This is assigned to Reader with ID 0000000229. Now, we can see from the logs 
this split is added after the no-more splits event and is NOT read.

```



*Job Manager logs:*
2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO  
o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner  - Assigning remote split 
to requesting host '10': Optional[FileSourceSplit: 
s3://REDACTED/part-1-13189.snappy [0, 94451)  hosts=[localhost] ID=0000000229 
position=null]
2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO  
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Assigned split 
to subtask 0 : FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451)  
hosts=[localhost] ID=0000000229 position=null
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
parquet-source received split request from parallel task 1 (#0)
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] DEBUG 
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSplitRequest 
subtask=1 sourceIndex=11 pendingSplits={}
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Subtask 1 (on 
host '10.4.168.40') is requesting a file source split
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner  - Assigning remote split 
to requesting host '10': Optional[FileSourceSplit: 
s3://REDACTED/part-0-13127.snappy [0, 88108)  hosts=[localhost] ID=0000000045 
position=null]
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Assigned split 
to subtask 1 : FileSourceSplit: s3://REDACTED/part-0-13127.snappy [0, 88108)  
hosts=[localhost] ID=0000000045 position=null
2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO  
o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
parquet-source received split request from parallel task 0 (#0)
2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] DEBUG 
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSplitRequest 
subtask=0 sourceIndex=11 pendingSplits={}
2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO  
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Subtask 0 (on 
host '10.4.192.125') is requesting a file source split
2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO  
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - No more splits 
available for subtask 0
2023-10-10 17:46:24.049 [SourceCoordinator-Source: parquet-source] INFO  
o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
parquet-source received split request from parallel task 1 (#0)
2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] DEBUG 
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSplitRequest 
subtask=1 sourceIndex=11 pendingSplits={}
2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO  
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Subtask 1 (on 
host '10.4.168.40') is requesting a file source split
2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO  
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - No more splits 
available for subtask 1
2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG 
o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
parquet-source received custom event from parallel task 1 (#0): 
SourceReaderFinishedEvent\{sourceIndex=11}
2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG 
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSourceEvent 
SourceReaderFinishedEvent\{sourceIndex=11} subtask=1 pendingSplits={}
2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG 
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - All readers finished, 
ready to switch enumerator!

The assigned split is never processed.
I traced the race conditions bug to the HybridSourceSplitEnumerator and 
HybridSourceSplitReader.
There are race in both the source and the reader side. The attached patch 
ensures that the switch from one source to another and one reader to another 
happen in an atomic fashion with respect to the rest of the code. All section 
of the code that use the currentReader or currentEnumerator are read-locked and 
the code for reader/enumerator switch is written lock. This ensures that no 
other function is executed when the switch for reader/enumerator occurs. 
Applying just the fixes to HybridSourceSplitEnumerator will resolve the 
majority of the data loss but not all. But, for complete correctness fixes are 
needed in both locations. Additionally, current readers also needs to be reset 
before proceeding.

With these fixes applied, our team using Flink, at scale of 1B+ records/hour 
with 180 Task Managers, did not see any data loss issue. There was also no 
noticeable impact on performance due to the read-write mutexes and concurrency 
control.

Additonally, integer comparision of objects needs to use `equals` otherwise it 
won't work above 128. This 
[issue|https://www.mail-archive.com/issues@flink.apache.org/msg647008.html] has 
been reported before, by another user.

If the above fixes are valid, please let me know. I would be happy to create a 
branch and PR against the repo. I have completed and signed the individual CLA 
and will be emailing it soon.

  was:
Hello Team,
I noticed that there is data loss when using Hybrid Source. We are reading from 
a series of concrete File Sources ~100. All these locations are chained 
together using the Hybrid source.
The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid 
Sources switches the next source before the current source is complete. 
Similarly for the Hybrid Source readers. I have also shared the patch file that 
fixes the issue.
>From the logs:

*Task Manager logs:* 
2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO  
o.apache.flink.connector.base.source.reader.SourceReaderBase  - Adding split(s) 
to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451)  
hosts=[localhost] ID=0000000229 position=null] 
2023-10-10 17:46:23.715 [Source Data Fetcher for Source: parquet-source 
(1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to 
Random IO seek policy 
2023-10-10 17:46:23.715 [Source Data Fetcher for Source: parquet-source 
(1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to 
Random IO seek policy 
2023-10-10 17:46:24.012 [Source: parquet-source (1/2)#0|#0] INFO  
o.apache.flink.connector.base.source.reader.SourceReaderBase  - Finished 
reading split(s) [0000000154] 
2023-10-10 17:46:24.012 [Source Data Fetcher for Source: parquet-source 
(1/2)#0|#0] INFO  o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  
- Finished reading from splits [0000000154] 
2023-10-10 17:46:24.014 [Source: parquet-source (1/2)#0|#0] INFO  
o.apache.flink.connector.base.source.reader.SourceReaderBase  - Reader received 
NoMoreSplits event. 
2023-10-10 17:46:24.014 [Source: parquet-source (1/2)#0|#0] DEBUG 
o.a.flink.connector.base.source.hybrid.HybridSourceReader  - No more splits for 
subtask=0 sourceIndex=11 
currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
 
2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source 
(1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to 
Random IO seek policy 
2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source 
(1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to 
Random IO seek policy 
2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO  
o.a.flink.connector.base.source.hybrid.HybridSourceReader  - Switch source 
event: subtask=0 sourceIndex=12 
source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e 2023-10-10 
17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO  
o.apache.flink.connector.base.source.reader.SourceReaderBase  - Closing Source 
Reader. 
2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO  
o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Shutting down 
split fetcher 0 
2023-10-10 17:46:24.198 [Source Data Fetcher for Source: parquet-source 
(1/2)#0|#0] INFO  o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  
- Split fetcher 0 exited. 
2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG 
o.a.flink.connector.base.source.hybrid.HybridSourceReader  - Reader closed: 
subtask=0 sourceIndex=11 
currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing.  
This is assigned to Reader with ID 0000000229. Now, we can see from the logs 
this split is added after the no-more splits event and is NOT read.

*Job Manager logs:*
2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO  
o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner  - Assigning remote split 
to requesting host '10': Optional[FileSourceSplit: 
s3://REDACTED/part-1-13189.snappy [0, 94451)  hosts=[localhost] ID=0000000229 
position=null]
2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO  
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Assigned split 
to subtask 0 : FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451)  
hosts=[localhost] ID=0000000229 position=null
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
parquet-source received split request from parallel task 1 (#0)
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] DEBUG 
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSplitRequest 
subtask=1 sourceIndex=11 pendingSplits={}
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Subtask 1 (on 
host '10.4.168.40') is requesting a file source split
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner  - Assigning remote split 
to requesting host '10': Optional[FileSourceSplit: 
s3://REDACTED/part-0-13127.snappy [0, 88108)  hosts=[localhost] ID=0000000045 
position=null]
2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Assigned split 
to subtask 1 : FileSourceSplit: s3://REDACTED/part-0-13127.snappy [0, 88108)  
hosts=[localhost] ID=0000000045 position=null
2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO  
o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
parquet-source received split request from parallel task 0 (#0)
2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] DEBUG 
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSplitRequest 
subtask=0 sourceIndex=11 pendingSplits={}
2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO  
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Subtask 0 (on 
host '10.4.192.125') is requesting a file source split
2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO  
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - No more splits 
available for subtask 0
2023-10-10 17:46:24.049 [SourceCoordinator-Source: parquet-source] INFO  
o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
parquet-source received split request from parallel task 1 (#0)
2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] DEBUG 
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSplitRequest 
subtask=1 sourceIndex=11 pendingSplits={}
2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO  
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Subtask 1 (on 
host '10.4.168.40') is requesting a file source split
2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO  
o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - No more splits 
available for subtask 1
2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG 
o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
parquet-source received custom event from parallel task 1 (#0): 
SourceReaderFinishedEvent\{sourceIndex=11}
2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG 
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSourceEvent 
SourceReaderFinishedEvent\{sourceIndex=11} subtask=1 pendingSplits={}
2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG 
o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - All readers finished, 
ready to switch enumerator!

The assigned split is never processed.
I traced the race conditions bug to the HybridSourceSplitEnumerator and 
HybridSourceSplitReader.
There are race in both the source and the reader side. The attached patch 
ensures that the switch from one source to another and one reader to another 
happen in an atomic fashion with respect to the rest of the code. All section 
of the code that use the currentReader or currentEnumerator are read-locked and 
the code for reader/enumerator switch is written lock. This ensures that no 
other function is executed when the switch for reader/enumerator occurs. 
Applying just the fixes to HybridSourceSplitEnumerator will resolve the 
majority of the data loss but not all. But, for complete correctness fixes are 
needed in both locations. Additionally, current readers also needs to be reset 
before proceeding.

With these fixes applied, our team using Flink, at scale of 1B+ records/hour 
with 180 Task Managers, did not see any data loss issue. There was also no 
noticeable impact on performance due to the read-write mutexes and concurrency 
control.

Additonally, integer comparision of objects needs to use `equals` otherwise it 
won't work above 128. This 
[issue|https://www.mail-archive.com/issues@flink.apache.org/msg647008.html] has 
been reported before, by another user.

If the above fixes are valid, please let me know. I would be happy to create a 
branch and PR against the repo. I have completed and signed the individual CLA 
and will be emailing it soon.


> Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in 
> Data Loss
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-33402
>                 URL: https://issues.apache.org/jira/browse/FLINK-33402
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / HybridSource
>    Affects Versions: 1.16.1
>         Environment: Apache Flink 1.16.1
> Mac OSX, Linux etc. 
>            Reporter: Varun Narayanan Chakravarthy
>            Assignee: Varun Narayanan Chakravarthy
>            Priority: Critical
>              Labels: pull-request-available
>         Attachments: hybridSourceEnumeratorAndReaderFixes.patch
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Hello Team,
> I noticed that there is data loss when using Hybrid Source. We are reading 
> from a series of concrete File Sources ~100. All these locations are chained 
> together using the Hybrid source.
> The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid 
> Sources switches the next source before the current source is complete. 
> Similarly for the Hybrid Source readers. I have also shared the patch file 
> that fixes the issue.
> From the logs:
> *Task Manager logs:* 
> 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Adding 
> split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 
> 94451)  hosts=[localhost] ID=0000000229 position=null] 
> 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: parquet-source 
> (1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to 
> Random IO seek policy 
> 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: parquet-source 
> (1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to 
> Random IO seek policy 
> 2023-10-10 17:46:24.012 [Source: parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Finished 
> reading split(s) [0000000154] 
> 2023-10-10 17:46:24.012 [Source Data Fetcher for Source: parquet-source 
> (1/2)#0|#0] INFO  o.a.flink.connector.base.source.reader.fetcher.SplitFetcher 
>  - Finished reading from splits [0000000154] 
> 2023-10-10 17:46:24.014 [Source: parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Reader 
> received NoMoreSplits event. 
> 2023-10-10 17:46:24.014 [Source: parquet-source (1/2)#0|#0] DEBUG 
> o.a.flink.connector.base.source.hybrid.HybridSourceReader  - No more splits 
> for subtask=0 sourceIndex=11 
> currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
>  
> 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source 
> (1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to 
> Random IO seek policy 
> 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source 
> (1/2)#0|#0] INFO  org.apache.hadoop.fs.s3a.S3AInputStream  - Switching to 
> Random IO seek policy 
> 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO  
> o.a.flink.connector.base.source.hybrid.HybridSourceReader  - Switch source 
> event: subtask=0 sourceIndex=12 
> source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e 
> 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO  
> o.apache.flink.connector.base.source.reader.SourceReaderBase  - Closing 
> Source Reader. 
> 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO  
> o.a.flink.connector.base.source.reader.fetcher.SplitFetcher  - Shutting down 
> split fetcher 0 
> 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: parquet-source 
> (1/2)#0|#0] INFO  o.a.flink.connector.base.source.reader.fetcher.SplitFetcher 
>  - Split fetcher 0 exited. 
> 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG 
> o.a.flink.connector.base.source.hybrid.HybridSourceReader  - Reader closed: 
> subtask=0 sourceIndex=11 
> currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8
> ```
> We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing.  
> This is assigned to Reader with ID 0000000229. Now, we can see from the logs 
> this split is added after the no-more splits event and is NOT read.
> ```
> *Job Manager logs:*
> 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner  - Assigning remote 
> split to requesting host '10': Optional[FileSourceSplit: 
> s3://REDACTED/part-1-13189.snappy [0, 94451)  hosts=[localhost] ID=0000000229 
> position=null]
> 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Assigned split 
> to subtask 0 : FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, 94451)  
> hosts=[localhost] ID=0000000229 position=null
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
> o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
> parquet-source received split request from parallel task 1 (#0)
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] DEBUG 
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSplitRequest 
> subtask=1 sourceIndex=11 pendingSplits={}
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Subtask 1 (on 
> host '10.4.168.40') is requesting a file source split
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner  - Assigning remote 
> split to requesting host '10': Optional[FileSourceSplit: 
> s3://REDACTED/part-0-13127.snappy [0, 88108)  hosts=[localhost] ID=0000000045 
> position=null]
> 2023-10-10 17:46:23.786 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Assigned split 
> to subtask 1 : FileSourceSplit: s3://REDACTED/part-0-13127.snappy [0, 88108)  
> hosts=[localhost] ID=0000000045 position=null
> 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO  
> o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
> parquet-source received split request from parallel task 0 (#0)
> 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] DEBUG 
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSplitRequest 
> subtask=0 sourceIndex=11 pendingSplits={}
> 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Subtask 0 (on 
> host '10.4.192.125') is requesting a file source split
> 2023-10-10 17:46:24.013 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - No more splits 
> available for subtask 0
> 2023-10-10 17:46:24.049 [SourceCoordinator-Source: parquet-source] INFO  
> o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
> parquet-source received split request from parallel task 1 (#0)
> 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] DEBUG 
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSplitRequest 
> subtask=1 sourceIndex=11 pendingSplits={}
> 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - Subtask 1 (on 
> host '10.4.168.40') is requesting a file source split
> 2023-10-10 17:46:24.050 [SourceCoordinator-Source: parquet-source] INFO  
> o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator  - No more splits 
> available for subtask 1
> 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG 
> o.apache.flink.runtime.source.coordinator.SourceCoordinator  - Source Source: 
> parquet-source received custom event from parallel task 1 (#0): 
> SourceReaderFinishedEvent\{sourceIndex=11}
> 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG 
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - handleSourceEvent 
> SourceReaderFinishedEvent\{sourceIndex=11} subtask=1 pendingSplits={}
> 2023-10-10 17:46:24.051 [SourceCoordinator-Source: parquet-source] DEBUG 
> o.a.f.c.base.source.hybrid.HybridSourceSplitEnumerator  - All readers 
> finished, ready to switch enumerator!
> The assigned split is never processed.
> I traced the race conditions bug to the HybridSourceSplitEnumerator and 
> HybridSourceSplitReader.
> There are race in both the source and the reader side. The attached patch 
> ensures that the switch from one source to another and one reader to another 
> happen in an atomic fashion with respect to the rest of the code. All section 
> of the code that use the currentReader or currentEnumerator are read-locked 
> and the code for reader/enumerator switch is written lock. This ensures that 
> no other function is executed when the switch for reader/enumerator occurs. 
> Applying just the fixes to HybridSourceSplitEnumerator will resolve the 
> majority of the data loss but not all. But, for complete correctness fixes 
> are needed in both locations. Additionally, current readers also needs to be 
> reset before proceeding.
> With these fixes applied, our team using Flink, at scale of 1B+ records/hour 
> with 180 Task Managers, did not see any data loss issue. There was also no 
> noticeable impact on performance due to the read-write mutexes and 
> concurrency control.
> Additonally, integer comparision of objects needs to use `equals` otherwise 
> it won't work above 128. This 
> [issue|https://www.mail-archive.com/issues@flink.apache.org/msg647008.html] 
> has been reported before, by another user.
> If the above fixes are valid, please let me know. I would be happy to create 
> a branch and PR against the repo. I have completed and signed the individual 
> CLA and will be emailing it soon.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to