[ 
https://issues.apache.org/jira/browse/FLINK-35924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-35924:
---------------------------------
    Fix Version/s: 2.0.0

> Improve the SourceReaderBase to support the RecordsWithSplitIds share 
> internal buffer from SplitReader.
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-35924
>                 URL: https://issues.apache.org/jira/browse/FLINK-35924
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Common
>    Affects Versions: 1.20.0
>            Reporter: Jiangjie Qin
>            Assignee: Jiangjie Qin
>            Priority: Major
>             Fix For: 2.0.0
>
>
> Recently, we saw corrupted {{RecordsWithSplitIds}} in one of our Iceberg 
> source implementation. The problem is following sequence:
>  # The {{SourceReaderBase}} does not have anything in the element queue. And 
> the current fetch has been drained.
>  # the main thread invokes {{{}SourceReaderBase.pollNext(){}}}, sees nothing 
> to consume, and goes into the {{SourceReaderBase.finishedOrAvailableLater()}} 
> call.
>  # A {{SplitFetcher}} thread just finished consuming from a split and put the 
> last batch of records into the element queue. The SplitFetcher becomes Idle 
> after that, i.e. no assigned splits and no pending tasks.
>  # 
> The main thread invokes 
> {{SplitFetcherManager.maybeShutdownFinishedFetchers()}} to shutdown idle 
> fetchers. So the {{SplitFetcher}} in (3) is shutdown. As a result the 
> {{SplitReader}} is also closed.
>  # The main thread then sees a non-empty element queue, with the last batch 
> put by the {{SplitFetcher}} which has just shutdown.
>  # The main thread invokes {{SourceReaderBase.pollNext()}} again to process 
> the last batch, but receives an exception, because the records contained in 
> the batch has been released as a part of the {{SplitReader}} closure in (4)
> The problem here is that the {{SourceReaderBase}} implementation assumes that 
> once a batch of records (i.e. {{{}RecordsWithSplitIds{}}}) is generated, 
> these records are available even after the {{SplitReader}} which generated 
> them is closed.
> This assumption forces some of the connector implementations to copy the 
> records fetched from the {{{}SplitReader{}}}, and hence introduces additional 
> overhead.
> This patch aims to improve the {{SourceReaderBase}} to ensure that a 
> {{SplitReader}} will not be closed until all the records it have emitted have 
> been processed.
> There is also a somewhat orthogonal problem that the {{SplitFetchers}} are 
> being closed too aggressively. We've seen that in most cases, a dedicated 
> SplitFetcher is created to handle a split and closed right away after the 
> current split is finished but before the next split arrives. I'll create a 
> separate patch to introduce a short period of timeout before we define a 
> {{SplitFetcher}} thread as idle.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to