Jiangjie Qin created FLINK-35924:
------------------------------------
Summary: Improve the SourceReaderBase to support the
RecordsWithSplitIds share internal buffer from SplitReader.
Key: FLINK-35924
URL: https://issues.apache.org/jira/browse/FLINK-35924
Project: Flink
Issue Type: Improvement
Components: Connectors / Common
Affects Versions: 1.20.0
Reporter: Jiangjie Qin
Recently, we saw corrupted {{RecordsWithSplitIds}} in one of our Iceberg source
implementation. The problem is following sequence:
# The {{SourceReaderBase}} does not have anything in the element queue. And
the current fetch has been drained.
# the main thread invokes {{{}SourceReaderBase.pollNext(){}}}, sees nothing to
consume, and goes into the {{SourceReaderBase.finishedOrAvailableLater()}} call.
# A {{SplitFetcher}} thread just finished consuming from a split and put the
last batch of records into the element queue. The SplitFetcher becomes Idle
after that, i.e. no assigned splits and no pending tasks.
#
The main thread invokes {{SplitFetcherManager.maybeShutdownFinishedFetchers()}}
to shutdown idle fetchers. So the {{SplitFetcher}} in (3) is shutdown. As a
result the {{SplitReader}} is also closed.
# The main thread then sees a non-empty element queue, with the last batch put
by the {{SplitFetcher}} which has just shutdown.
# The main thread invokes {{SourceReaderBase.pollNext()}} again to process the
last batch, but receives an exception, because the records contained in the
batch has been released as a part of the {{SplitReader}} closure in (4)
The problem here is that the {{SourceReaderBase}} implementation assumes that
once a batch of records (i.e. {{{}RecordsWithSplitIds{}}}) is generated, these
records are available even after the {{SplitReader}} which generated them is
closed.
This assumption forces some of the connector implementations to copy the
records fetched from the {{{}SplitReader{}}}, and hence introduces additional
overhead.
This patch aims to improve the {{SourceReaderBase}} to ensure that a
{{SplitReader}} will not be closed until all the records it have emitted have
been processed.
There is also a somewhat orthogonal problem that the {{SplitFetchers}} are
being closed too aggressively. We've seen that in most cases, a dedicated
SplitFetcher is created to handle a split and closed right away after the
current split is finished but before the next split arrives. I'll create a
separate patch to introduce a short period of timeout before we define a
{{SplitFetcher}} thread as idle.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)