Jiangjie Qin created FLINK-35924: ------------------------------------ Summary: Improve the SourceReaderBase to support the RecordsWithSplitIds share internal buffer from SplitReader. Key: FLINK-35924 URL: https://issues.apache.org/jira/browse/FLINK-35924 Project: Flink Issue Type: Improvement Components: Connectors / Common Affects Versions: 1.20.0 Reporter: Jiangjie Qin
Recently, we saw corrupted {{RecordsWithSplitIds}} in one of our Iceberg source implementation. The problem is following sequence: # The {{SourceReaderBase}} does not have anything in the element queue. And the current fetch has been drained. # the main thread invokes {{{}SourceReaderBase.pollNext(){}}}, sees nothing to consume, and goes into the {{SourceReaderBase.finishedOrAvailableLater()}} call. # A {{SplitFetcher}} thread just finished consuming from a split and put the last batch of records into the element queue. The SplitFetcher becomes Idle after that, i.e. no assigned splits and no pending tasks. # The main thread invokes {{SplitFetcherManager.maybeShutdownFinishedFetchers()}} to shutdown idle fetchers. So the {{SplitFetcher}} in (3) is shutdown. As a result the {{SplitReader}} is also closed. # The main thread then sees a non-empty element queue, with the last batch put by the {{SplitFetcher}} which has just shutdown. # The main thread invokes {{SourceReaderBase.pollNext()}} again to process the last batch, but receives an exception, because the records contained in the batch has been released as a part of the {{SplitReader}} closure in (4) The problem here is that the {{SourceReaderBase}} implementation assumes that once a batch of records (i.e. {{{}RecordsWithSplitIds{}}}) is generated, these records are available even after the {{SplitReader}} which generated them is closed. This assumption forces some of the connector implementations to copy the records fetched from the {{{}SplitReader{}}}, and hence introduces additional overhead. This patch aims to improve the {{SourceReaderBase}} to ensure that a {{SplitReader}} will not be closed until all the records it have emitted have been processed. There is also a somewhat orthogonal problem that the {{SplitFetchers}} are being closed too aggressively. We've seen that in most cases, a dedicated SplitFetcher is created to handle a split and closed right away after the current split is finished but before the next split arrives. I'll create a separate patch to introduce a short period of timeout before we define a {{SplitFetcher}} thread as idle. -- This message was sent by Atlassian Jira (v8.20.10#820010)