loserwang1024 commented on code in PR #3510:
URL: https://github.com/apache/flink-cdc/pull/3510#discussion_r1883576694


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java:
##########
@@ -319,45 +335,91 @@ private int computeTablesPendingSnapshot() {
         return numTablesPendingSnapshot;
     }
 
-    @Override
-    public Optional<SourceSplitBase> getNext() {
-        if (!remainingSplits.isEmpty()) {
-            // return remaining splits firstly
-            Iterator<SchemalessSnapshotSplit> iterator = 
remainingSplits.iterator();
-            SchemalessSnapshotSplit split = iterator.next();
-            iterator.remove();
-            assignedSplits.put(split.splitId(), split);
-            enumeratorMetrics
-                    .getTableMetrics(split.getTableId())
-                    .finishProcessSplit(split.splitId());
-            return 
Optional.of(split.toSnapshotSplit(tableSchemas.get(split.getTableId())));
-        } else {
-            // it's turn for new table
-            TableId nextTable = remainingTables.pollFirst();
-            if (nextTable != null) {
-                // split the given table into chunks (snapshot splits)
-                Collection<SnapshotSplit> splits = 
chunkSplitter.generateSplits(nextTable);
-                final Map<TableId, TableChanges.TableChange> tableSchema = new 
HashMap<>();
-                if (!splits.isEmpty()) {
+    private void startAsynchronouslySplit() {
+        if (chunkSplitter.hasNextChunk() || !remainingTables.isEmpty()) {
+            if (executor == null) {
+                ThreadFactory threadFactory =
+                        new 
ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build();
+                this.executor = 
Executors.newSingleThreadExecutor(threadFactory);
+            }
+            executor.submit(this::splitChunksForRemainingTables);
+        }
+    }
+
+    private void splitTable(TableId nextTable) {
+        LOG.info("Start splitting table {} into chunks...", nextTable);
+        long start = System.currentTimeMillis();
+        int chunkNum = 0;
+        boolean hasRecordSchema = false;
+        // split the given table into chunks (snapshot splits)
+        do {
+            synchronized (lock) {
+                Collection<SnapshotSplit> splits;
+                try {
+                    splits = chunkSplitter.generateSplits(nextTable);
+                } catch (Exception e) {
+                    throw new IllegalStateException(
+                            "Error when splitting chunks for " + nextTable, e);
+                }
+
+                if (!hasRecordSchema && !splits.isEmpty()) {
+                    hasRecordSchema = true;
+                    final Map<TableId, TableChanges.TableChange> tableSchema = 
new HashMap<>();
                     
tableSchema.putAll(splits.iterator().next().getTableSchemas());
+                    tableSchemas.putAll(tableSchema);
                 }
                 final List<SchemalessSnapshotSplit> schemalessSnapshotSplits =
                         splits.stream()
                                 .map(SnapshotSplit::toSchemalessSnapshotSplit)
                                 .collect(Collectors.toList());
+                chunkNum += splits.size();
                 remainingSplits.addAll(schemalessSnapshotSplits);
-                tableSchemas.putAll(tableSchema);
-                if (!alreadyProcessedTables.contains(nextTable)) {
-                    enumeratorMetrics.startSnapshotTables(1);
-                }
-                alreadyProcessedTables.add(nextTable);
                 List<String> splitIds =
                         schemalessSnapshotSplits.stream()
                                 .map(SchemalessSnapshotSplit::splitId)
                                 .collect(Collectors.toList());
                 
enumeratorMetrics.getTableMetrics(nextTable).addNewSplits(splitIds);
+
+                if (!chunkSplitter.hasNextChunk()) {
+                    remainingTables.remove(nextTable);
+                }
+                lock.notify();
+            }
+        } while (chunkSplitter.hasNextChunk());
+        long end = System.currentTimeMillis();
+        LOG.info(
+                "Split table {} into {} chunks, time cost: {}ms.",
+                nextTable,
+                chunkNum,
+                end - start);
+    }
+
+    @Override
+    public Optional<SourceSplitBase> getNext() {

Review Comment:
   > Do we need `waitTableDiscoveryReady` here or not ?
   
   I remove it because : current table discovery is not async, it is a blocking 
operation when assigner is started. if remainingTables and  remainingSplits and 
alreadyProcessedTables are all empty, it meaning that no table is found. Thus, 
it also no use to wait here.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to