Mrart commented on PR #3983:
URL: https://github.com/apache/flink-cdc/pull/3983#issuecomment-2792404015

   > @Mrart @loserwang1024 i have a question. i think the performance issue is 
that, when add new table to config, the stream split will change to snapshot 
split , and when every snapshot split finished , it will add stream split once 
for every snapshot, and the stream split reader will be set null, so the stream 
split reader will be recreated. the schema will reload for the config 
tablefilter. and the change code how to performance the logic ? in 
IncrementalSourceSplitReader.java , method pollSplitRecords have follow code :
   > 
   > ```java
   > private ChangeEventRecords pollSplitRecords() throws InterruptedException {
   >         Iterator<SourceRecords> dataIt = null;
   >         if (currentFetcher == null) {
   >             // (1) Reads stream split firstly and then read snapshot split
   >             if (streamSplits.size() > 0) {
   >                 // the stream split may come from:
   >                 // (a) the initial stream split
   >                 // (b) added back stream-split in newly added table process
   >                 StreamSplit nextSplit = streamSplits.poll();
   >                 submitStreamSplit(nextSplit);
   >             } else if (snapshotSplits.size() > 0) {
   >                 submitSnapshotSplit(snapshotSplits.poll());
   >             } else {
   >                 LOG.info("No available split to read.");
   >             }
   > 
   >             if (currentFetcher != null) {
   >                 dataIt = currentFetcher.pollSplitRecords();
   >             } else {
   >                 currentSplitId = null;
   >             }
   >             return dataIt == null ? finishedSplit() : forRecords(dataIt);
   >         } else if (currentFetcher instanceof IncrementalSourceScanFetcher) 
{
   >             // (2) try to switch to stream split reading util current 
snapshot split finished
   >             dataIt = currentFetcher.pollSplitRecords();
   >             if (dataIt != null) {
   >                 // first fetch data of snapshot split, return and emit the 
records of snapshot split
   >                 ChangeEventRecords records;
   >                 if (context.isHasAssignedStreamSplit()) {
   >                     records = 
forNewAddedTableFinishedSplit(currentSplitId, dataIt);
   >                     closeScanFetcher();
   >                     closeStreamFetcher();
   >                 } else {
   >                     records = forRecords(dataIt);
   >                     SnapshotSplit nextSplit = snapshotSplits.poll();
   >                     if (nextSplit != null) {
   >                         checkState(reusedScanFetcher != null);
   >                         submitSnapshotSplit(nextSplit);
   >                     } else {
   >                         closeScanFetcher();
   >                     }
   >                 }
   >                 return records;
   >             } else {
   >                 return finishedSplit();
   >             }
   >         } else if (currentFetcher instanceof 
IncrementalSourceStreamFetcher) {
   >             // (3) switch to snapshot split reading if there are newly 
added snapshot splits
   >             dataIt = currentFetcher.pollSplitRecords();
   >             if (dataIt != null) {
   >                 // try to switch to read snapshot split if there are new 
added snapshot
   >                 SnapshotSplit nextSplit = snapshotSplits.poll();
   >                 if (nextSplit != null) {
   >                     closeStreamFetcher();
   >                     LOG.info("It's turn to switch next fetch reader to 
snapshot split reader");
   >                     submitSnapshotSplit(nextSplit);
   >                 }
   >                 return ChangeEventRecords.forRecords(STREAM_SPLIT_ID, 
dataIt);
   >             } else {
   >                 // null will be returned after receiving suspend stream 
event
   >                 // finish current stream split reading
   >                 closeStreamFetcher();
   >                 return finishedSplit();
   >             }
   >         } else {
   >             throw new IllegalStateException("Unsupported reader type.");
   >         }
   >     }
   > 
   >     private void closeStreamFetcher() {
   >         if (reusedStreamFetcher != null) {
   >             LOG.debug("Close stream reader {}", 
reusedStreamFetcher.getClass().getCanonicalName());
   >             reusedStreamFetcher.close();
   >             if (currentFetcher == reusedStreamFetcher) {
   >                 currentFetcher = null;
   >             }
   >             reusedStreamFetcher = null;
   >         }
   >     }
   > ```
   This pr is preventing the fetcher from reclaiming when it is idle
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to