Ok, I see.

Hang Ruan <ruanhang1...@gmail.com> 于2024年3月20日周三 16:08写道:

> Hi, 3pang zhu.
>
> This `Scan Newly added tables` feature requires restarting the job from
> the savepoint. We cannot add new tables to the running job without
> restarting by now.
>
> Best,
> Hang
>
> 3pang zhu <zhu3p...@gmail.com> 于2024年3月20日周三 15:22写道:
>
>> this link has describe the usage for [Scan Newly Added Tables]
>> https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/mysql-cdc/#scan-newly-added-tables
>> .
>> if we can use if without restarting job. i have try this patch, use a
>> schedule task in MysqlSnapshotSplitAssigner#open(), when added table more
>> than twice, it occur this issue
>> https://github.com/apache/flink-cdc/issues/2282
>>
>>
>>  
>> .../flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
>>                  | 26 +++++++++++++++++++++++---
>>  
>> .../flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
>>  |  5 +++--
>>  2 files changed, 26 insertions(+), 5 deletions(-)
>>
>> diff --cc
>> flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.jav
>> index 0536a262,0536a262..d52acc26
>> ---
>> a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
>> +++
>> b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
>> @@@ -56,7 -56,7 +56,9 @@@ import java.util.Set
>>   import java.util.concurrent.CopyOnWriteArrayList;
>>   import java.util.concurrent.ExecutorService;
>>   import java.util.concurrent.Executors;
>> ++import java.util.concurrent.ScheduledExecutorService;
>>   import java.util.concurrent.ThreadFactory;
>> ++import java.util.concurrent.TimeUnit;
>>   import java.util.stream.Collectors;
>>
>>   import static
>> com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables;
>> @@@ -94,6 -94,6 +96,7 @@@ public class MySqlSnapshotSplitAssigne
>>       private MySqlChunkSplitter chunkSplitter;
>>       private boolean isTableIdCaseSensitive;
>>       private ExecutorService executor;
>> ++    private ScheduledExecutorService scheduledExecutor;
>>
>>       @Nullable private Long checkpointIdToFinish;
>>
>> @@@ -179,12 -179,12 +182,24 @@@
>>       @Override
>>       public void open() {
>>           chunkSplitter.open();
>> --        discoveryCaptureTables();
>> --        captureNewlyAddedTables();
>> --        startAsynchronouslySplit();
>> ++        if (scheduledExecutor == null) {
>> ++            ThreadFactory threadFactory =
>> ++                    new
>> ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build();
>> ++            this.scheduledExecutor =
>> Executors.newSingleThreadScheduledExecutor(threadFactory);
>> ++        }
>> ++        scheduledExecutor.scheduleAtFixedRate(
>> ++                () -> {
>> ++                    discoveryCaptureTables();
>> ++                    captureNewlyAddedTables();
>> ++                    startAsynchronouslySplit();
>> ++                },
>> ++                0,
>> ++                1,
>> ++                TimeUnit.MINUTES);
>>       }
>>
>>       private void discoveryCaptureTables() {
>> ++        LOG.info("start discovery capture tables");
>>           // discovery the tables lazily
>>           if (needToDiscoveryTables()) {
>>               long start = System.currentTimeMillis();
>> @@@ -216,6 -216,6 +231,7 @@@
>>       }
>>
>>       private void captureNewlyAddedTables() {
>> ++        LOG.info("start to capture newly added tables");
>>           if (sourceConfig.isScanNewlyAddedTableEnabled()) {
>>               // check whether we got newly added tables
>>               try (JdbcConnection jdbc =
>> openJdbcConnection(sourceConfig)) {
>> @@@ -282,6 -282,6 +298,7 @@@
>>       }
>>
>>       private void startAsynchronouslySplit() {
>> ++        LOG.info("start asynchronously split");
>>           if (chunkSplitter.hasNextChunk() || !remainingTables.isEmpty())
>> {
>>               if (executor == null) {
>>                   ThreadFactory threadFactory =
>> @@@ -497,6 -497,6 +514,9 @@@
>>           if (executor != null) {
>>               executor.shutdown();
>>           }
>> ++        if (scheduledExecutor != null) {
>> ++            scheduledExecutor.shutdown();
>> ++        }
>>       }
>>
>

Reply via email to