[ https://issues.apache.org/jira/browse/FLINK-35924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jiangjie Qin updated FLINK-35924: --------------------------------- Fix Version/s: 2.0.0 > Improve the SourceReaderBase to support the RecordsWithSplitIds share > internal buffer from SplitReader. > ------------------------------------------------------------------------------------------------------- > > Key: FLINK-35924 > URL: https://issues.apache.org/jira/browse/FLINK-35924 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common > Affects Versions: 1.20.0 > Reporter: Jiangjie Qin > Assignee: Jiangjie Qin > Priority: Major > Fix For: 2.0.0 > > > Recently, we saw corrupted {{RecordsWithSplitIds}} in one of our Iceberg > source implementation. The problem is following sequence: > # The {{SourceReaderBase}} does not have anything in the element queue. And > the current fetch has been drained. > # the main thread invokes {{{}SourceReaderBase.pollNext(){}}}, sees nothing > to consume, and goes into the {{SourceReaderBase.finishedOrAvailableLater()}} > call. > # A {{SplitFetcher}} thread just finished consuming from a split and put the > last batch of records into the element queue. The SplitFetcher becomes Idle > after that, i.e. no assigned splits and no pending tasks. > # > The main thread invokes > {{SplitFetcherManager.maybeShutdownFinishedFetchers()}} to shutdown idle > fetchers. So the {{SplitFetcher}} in (3) is shutdown. As a result the > {{SplitReader}} is also closed. > # The main thread then sees a non-empty element queue, with the last batch > put by the {{SplitFetcher}} which has just shutdown. > # The main thread invokes {{SourceReaderBase.pollNext()}} again to process > the last batch, but receives an exception, because the records contained in > the batch has been released as a part of the {{SplitReader}} closure in (4) > The problem here is that the {{SourceReaderBase}} implementation assumes that > once a batch of records (i.e. {{{}RecordsWithSplitIds{}}}) is generated, > these records are available even after the {{SplitReader}} which generated > them is closed. > This assumption forces some of the connector implementations to copy the > records fetched from the {{{}SplitReader{}}}, and hence introduces additional > overhead. > This patch aims to improve the {{SourceReaderBase}} to ensure that a > {{SplitReader}} will not be closed until all the records it have emitted have > been processed. > There is also a somewhat orthogonal problem that the {{SplitFetchers}} are > being closed too aggressively. We've seen that in most cases, a dedicated > SplitFetcher is created to handle a split and closed right away after the > current split is finished but before the next split arrives. I'll create a > separate patch to introduce a short period of timeout before we define a > {{SplitFetcher}} thread as idle. -- This message was sent by Atlassian Jira (v8.20.10#820010)