Jiangjie Qin created FLINK-35924:
------------------------------------

             Summary: Improve the SourceReaderBase to support the 
RecordsWithSplitIds share internal buffer from SplitReader.
                 Key: FLINK-35924
                 URL: https://issues.apache.org/jira/browse/FLINK-35924
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Common
    Affects Versions: 1.20.0
            Reporter: Jiangjie Qin


Recently, we saw corrupted {{RecordsWithSplitIds}} in one of our Iceberg source 
implementation. The problem is following sequence:
 # The {{SourceReaderBase}} does not have anything in the element queue. And 
the current fetch has been drained.
 # the main thread invokes {{{}SourceReaderBase.pollNext(){}}}, sees nothing to 
consume, and goes into the {{SourceReaderBase.finishedOrAvailableLater()}} call.
 # A {{SplitFetcher}} thread just finished consuming from a split and put the 
last batch of records into the element queue. The SplitFetcher becomes Idle 
after that, i.e. no assigned splits and no pending tasks.
 # 
The main thread invokes {{SplitFetcherManager.maybeShutdownFinishedFetchers()}} 
to shutdown idle fetchers. So the {{SplitFetcher}} in (3) is shutdown. As a 
result the {{SplitReader}} is also closed.
 # The main thread then sees a non-empty element queue, with the last batch put 
by the {{SplitFetcher}} which has just shutdown.
 # The main thread invokes {{SourceReaderBase.pollNext()}} again to process the 
last batch, but receives an exception, because the records contained in the 
batch has been released as a part of the {{SplitReader}} closure in (4)

The problem here is that the {{SourceReaderBase}} implementation assumes that 
once a batch of records (i.e. {{{}RecordsWithSplitIds{}}}) is generated, these 
records are available even after the {{SplitReader}} which generated them is 
closed.

This assumption forces some of the connector implementations to copy the 
records fetched from the {{{}SplitReader{}}}, and hence introduces additional 
overhead.

This patch aims to improve the {{SourceReaderBase}} to ensure that a 
{{SplitReader}} will not be closed until all the records it have emitted have 
been processed.

There is also a somewhat orthogonal problem that the {{SplitFetchers}} are 
being closed too aggressively. We've seen that in most cases, a dedicated 
SplitFetcher is created to handle a split and closed right away after the 
current split is finished but before the next split arrives. I'll create a 
separate patch to introduce a short period of timeout before we define a 
{{SplitFetcher}} thread as idle.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to