Ok, I see. Hang Ruan <ruanhang1...@gmail.com> 于2024年3月20日周三 16:08写道:
> Hi, 3pang zhu. > > This `Scan Newly added tables` feature requires restarting the job from > the savepoint. We cannot add new tables to the running job without > restarting by now. > > Best, > Hang > > 3pang zhu <zhu3p...@gmail.com> 于2024年3月20日周三 15:22写道: > >> this link has describe the usage for [Scan Newly Added Tables] >> https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/mysql-cdc/#scan-newly-added-tables >> . >> if we can use if without restarting job. i have try this patch, use a >> schedule task in MysqlSnapshotSplitAssigner#open(), when added table more >> than twice, it occur this issue >> https://github.com/apache/flink-cdc/issues/2282 >> >> >> >> .../flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java >> | 26 +++++++++++++++++++++++--- >> >> .../flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java >> | 5 +++-- >> 2 files changed, 26 insertions(+), 5 deletions(-) >> >> diff --cc >> flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.jav >> index 0536a262,0536a262..d52acc26 >> --- >> a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java >> +++ >> b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java >> @@@ -56,7 -56,7 +56,9 @@@ import java.util.Set >> import java.util.concurrent.CopyOnWriteArrayList; >> import java.util.concurrent.ExecutorService; >> import java.util.concurrent.Executors; >> ++import java.util.concurrent.ScheduledExecutorService; >> import java.util.concurrent.ThreadFactory; >> ++import java.util.concurrent.TimeUnit; >> import java.util.stream.Collectors; >> >> import static >> com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables; >> @@@ -94,6 -94,6 +96,7 @@@ public class MySqlSnapshotSplitAssigne >> private MySqlChunkSplitter chunkSplitter; >> private boolean isTableIdCaseSensitive; >> private ExecutorService executor; >> ++ private ScheduledExecutorService scheduledExecutor; >> >> @Nullable private Long checkpointIdToFinish; >> >> @@@ -179,12 -179,12 +182,24 @@@ >> @Override >> public void open() { >> chunkSplitter.open(); >> -- discoveryCaptureTables(); >> -- captureNewlyAddedTables(); >> -- startAsynchronouslySplit(); >> ++ if (scheduledExecutor == null) { >> ++ ThreadFactory threadFactory = >> ++ new >> ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build(); >> ++ this.scheduledExecutor = >> Executors.newSingleThreadScheduledExecutor(threadFactory); >> ++ } >> ++ scheduledExecutor.scheduleAtFixedRate( >> ++ () -> { >> ++ discoveryCaptureTables(); >> ++ captureNewlyAddedTables(); >> ++ startAsynchronouslySplit(); >> ++ }, >> ++ 0, >> ++ 1, >> ++ TimeUnit.MINUTES); >> } >> >> private void discoveryCaptureTables() { >> ++ LOG.info("start discovery capture tables"); >> // discovery the tables lazily >> if (needToDiscoveryTables()) { >> long start = System.currentTimeMillis(); >> @@@ -216,6 -216,6 +231,7 @@@ >> } >> >> private void captureNewlyAddedTables() { >> ++ LOG.info("start to capture newly added tables"); >> if (sourceConfig.isScanNewlyAddedTableEnabled()) { >> // check whether we got newly added tables >> try (JdbcConnection jdbc = >> openJdbcConnection(sourceConfig)) { >> @@@ -282,6 -282,6 +298,7 @@@ >> } >> >> private void startAsynchronouslySplit() { >> ++ LOG.info("start asynchronously split"); >> if (chunkSplitter.hasNextChunk() || !remainingTables.isEmpty()) >> { >> if (executor == null) { >> ThreadFactory threadFactory = >> @@@ -497,6 -497,6 +514,9 @@@ >> if (executor != null) { >> executor.shutdown(); >> } >> ++ if (scheduledExecutor != null) { >> ++ scheduledExecutor.shutdown(); >> ++ } >> } >> >