Mrart commented on PR #3983: URL: https://github.com/apache/flink-cdc/pull/3983#issuecomment-2792404015
> @Mrart @loserwang1024 i have a question. i think the performance issue is that, when add new table to config, the stream split will change to snapshot split , and when every snapshot split finished , it will add stream split once for every snapshot, and the stream split reader will be set null, so the stream split reader will be recreated. the schema will reload for the config tablefilter. and the change code how to performance the logic ? in IncrementalSourceSplitReader.java , method pollSplitRecords have follow code : > > ```java > private ChangeEventRecords pollSplitRecords() throws InterruptedException { > Iterator<SourceRecords> dataIt = null; > if (currentFetcher == null) { > // (1) Reads stream split firstly and then read snapshot split > if (streamSplits.size() > 0) { > // the stream split may come from: > // (a) the initial stream split > // (b) added back stream-split in newly added table process > StreamSplit nextSplit = streamSplits.poll(); > submitStreamSplit(nextSplit); > } else if (snapshotSplits.size() > 0) { > submitSnapshotSplit(snapshotSplits.poll()); > } else { > LOG.info("No available split to read."); > } > > if (currentFetcher != null) { > dataIt = currentFetcher.pollSplitRecords(); > } else { > currentSplitId = null; > } > return dataIt == null ? finishedSplit() : forRecords(dataIt); > } else if (currentFetcher instanceof IncrementalSourceScanFetcher) { > // (2) try to switch to stream split reading util current snapshot split finished > dataIt = currentFetcher.pollSplitRecords(); > if (dataIt != null) { > // first fetch data of snapshot split, return and emit the records of snapshot split > ChangeEventRecords records; > if (context.isHasAssignedStreamSplit()) { > records = forNewAddedTableFinishedSplit(currentSplitId, dataIt); > closeScanFetcher(); > closeStreamFetcher(); > } else { > records = forRecords(dataIt); > SnapshotSplit nextSplit = snapshotSplits.poll(); > if (nextSplit != null) { > checkState(reusedScanFetcher != null); > submitSnapshotSplit(nextSplit); > } else { > closeScanFetcher(); > } > } > return records; > } else { > return finishedSplit(); > } > } else if (currentFetcher instanceof IncrementalSourceStreamFetcher) { > // (3) switch to snapshot split reading if there are newly added snapshot splits > dataIt = currentFetcher.pollSplitRecords(); > if (dataIt != null) { > // try to switch to read snapshot split if there are new added snapshot > SnapshotSplit nextSplit = snapshotSplits.poll(); > if (nextSplit != null) { > closeStreamFetcher(); > LOG.info("It's turn to switch next fetch reader to snapshot split reader"); > submitSnapshotSplit(nextSplit); > } > return ChangeEventRecords.forRecords(STREAM_SPLIT_ID, dataIt); > } else { > // null will be returned after receiving suspend stream event > // finish current stream split reading > closeStreamFetcher(); > return finishedSplit(); > } > } else { > throw new IllegalStateException("Unsupported reader type."); > } > } > > private void closeStreamFetcher() { > if (reusedStreamFetcher != null) { > LOG.debug("Close stream reader {}", reusedStreamFetcher.getClass().getCanonicalName()); > reusedStreamFetcher.close(); > if (currentFetcher == reusedStreamFetcher) { > currentFetcher = null; > } > reusedStreamFetcher = null; > } > } > ``` This pr is preventing the fetcher from reclaiming when it is idle -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org