Hi, 3pang zhu. This `Scan Newly added tables` feature requires restarting the job from the savepoint. We cannot add new tables to the running job without restarting by now.
Best, Hang 3pang zhu <zhu3p...@gmail.com> 于2024年3月20日周三 15:22写道: > this link has describe the usage for [Scan Newly Added Tables] > https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/mysql-cdc/#scan-newly-added-tables > . > if we can use if without restarting job. i have try this patch, use a > schedule task in MysqlSnapshotSplitAssigner#open(), when added table more > than twice, it occur this issue > https://github.com/apache/flink-cdc/issues/2282 > > > > .../flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java > | 26 +++++++++++++++++++++++--- > > .../flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java > | 5 +++-- > 2 files changed, 26 insertions(+), 5 deletions(-) > > diff --cc > flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.jav > index 0536a262,0536a262..d52acc26 > --- > a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java > +++ > b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java > @@@ -56,7 -56,7 +56,9 @@@ import java.util.Set > import java.util.concurrent.CopyOnWriteArrayList; > import java.util.concurrent.ExecutorService; > import java.util.concurrent.Executors; > ++import java.util.concurrent.ScheduledExecutorService; > import java.util.concurrent.ThreadFactory; > ++import java.util.concurrent.TimeUnit; > import java.util.stream.Collectors; > > import static > com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables; > @@@ -94,6 -94,6 +96,7 @@@ public class MySqlSnapshotSplitAssigne > private MySqlChunkSplitter chunkSplitter; > private boolean isTableIdCaseSensitive; > private ExecutorService executor; > ++ private ScheduledExecutorService scheduledExecutor; > > @Nullable private Long checkpointIdToFinish; > > @@@ -179,12 -179,12 +182,24 @@@ > @Override > public void open() { > chunkSplitter.open(); > -- discoveryCaptureTables(); > -- captureNewlyAddedTables(); > -- startAsynchronouslySplit(); > ++ if (scheduledExecutor == null) { > ++ ThreadFactory threadFactory = > ++ new > ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build(); > ++ this.scheduledExecutor = > Executors.newSingleThreadScheduledExecutor(threadFactory); > ++ } > ++ scheduledExecutor.scheduleAtFixedRate( > ++ () -> { > ++ discoveryCaptureTables(); > ++ captureNewlyAddedTables(); > ++ startAsynchronouslySplit(); > ++ }, > ++ 0, > ++ 1, > ++ TimeUnit.MINUTES); > } > > private void discoveryCaptureTables() { > ++ LOG.info("start discovery capture tables"); > // discovery the tables lazily > if (needToDiscoveryTables()) { > long start = System.currentTimeMillis(); > @@@ -216,6 -216,6 +231,7 @@@ > } > > private void captureNewlyAddedTables() { > ++ LOG.info("start to capture newly added tables"); > if (sourceConfig.isScanNewlyAddedTableEnabled()) { > // check whether we got newly added tables > try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) > { > @@@ -282,6 -282,6 +298,7 @@@ > } > > private void startAsynchronouslySplit() { > ++ LOG.info("start asynchronously split"); > if (chunkSplitter.hasNextChunk() || !remainingTables.isEmpty()) { > if (executor == null) { > ThreadFactory threadFactory = > @@@ -497,6 -497,6 +514,9 @@@ > if (executor != null) { > executor.shutdown(); > } > ++ if (scheduledExecutor != null) { > ++ scheduledExecutor.shutdown(); > ++ } > } >