ruanhang1993 commented on code in PR #3510:
URL: https://github.com/apache/flink-cdc/pull/3510#discussion_r1883771180


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java:
##########
@@ -319,45 +335,91 @@ private int computeTablesPendingSnapshot() {
         return numTablesPendingSnapshot;
     }
 
-    @Override
-    public Optional<SourceSplitBase> getNext() {
-        if (!remainingSplits.isEmpty()) {
-            // return remaining splits firstly
-            Iterator<SchemalessSnapshotSplit> iterator = 
remainingSplits.iterator();
-            SchemalessSnapshotSplit split = iterator.next();
-            iterator.remove();
-            assignedSplits.put(split.splitId(), split);
-            enumeratorMetrics
-                    .getTableMetrics(split.getTableId())
-                    .finishProcessSplit(split.splitId());
-            return 
Optional.of(split.toSnapshotSplit(tableSchemas.get(split.getTableId())));
-        } else {
-            // it's turn for new table
-            TableId nextTable = remainingTables.pollFirst();
-            if (nextTable != null) {
-                // split the given table into chunks (snapshot splits)
-                Collection<SnapshotSplit> splits = 
chunkSplitter.generateSplits(nextTable);
-                final Map<TableId, TableChanges.TableChange> tableSchema = new 
HashMap<>();
-                if (!splits.isEmpty()) {
+    private void startAsynchronouslySplit() {
+        if (chunkSplitter.hasNextChunk() || !remainingTables.isEmpty()) {
+            if (executor == null) {
+                ThreadFactory threadFactory =
+                        new 
ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build();
+                this.executor = 
Executors.newSingleThreadExecutor(threadFactory);
+            }
+            executor.submit(this::splitChunksForRemainingTables);
+        }
+    }
+
+    private void splitTable(TableId nextTable) {
+        LOG.info("Start splitting table {} into chunks...", nextTable);
+        long start = System.currentTimeMillis();
+        int chunkNum = 0;
+        boolean hasRecordSchema = false;
+        // split the given table into chunks (snapshot splits)
+        do {
+            synchronized (lock) {
+                Collection<SnapshotSplit> splits;
+                try {
+                    splits = chunkSplitter.generateSplits(nextTable);
+                } catch (Exception e) {
+                    throw new IllegalStateException(
+                            "Error when splitting chunks for " + nextTable, e);
+                }
+
+                if (!hasRecordSchema && !splits.isEmpty()) {
+                    hasRecordSchema = true;
+                    final Map<TableId, TableChanges.TableChange> tableSchema = 
new HashMap<>();
                     
tableSchema.putAll(splits.iterator().next().getTableSchemas());
+                    tableSchemas.putAll(tableSchema);
                 }
                 final List<SchemalessSnapshotSplit> schemalessSnapshotSplits =
                         splits.stream()
                                 .map(SnapshotSplit::toSchemalessSnapshotSplit)
                                 .collect(Collectors.toList());
+                chunkNum += splits.size();
                 remainingSplits.addAll(schemalessSnapshotSplits);
-                tableSchemas.putAll(tableSchema);
-                if (!alreadyProcessedTables.contains(nextTable)) {
-                    enumeratorMetrics.startSnapshotTables(1);
-                }
-                alreadyProcessedTables.add(nextTable);
                 List<String> splitIds =
                         schemalessSnapshotSplits.stream()
                                 .map(SchemalessSnapshotSplit::splitId)
                                 .collect(Collectors.toList());
                 
enumeratorMetrics.getTableMetrics(nextTable).addNewSplits(splitIds);
+
+                if (!chunkSplitter.hasNextChunk()) {
+                    remainingTables.remove(nextTable);
+                }
+                lock.notify();
+            }
+        } while (chunkSplitter.hasNextChunk());
+        long end = System.currentTimeMillis();
+        LOG.info(
+                "Split table {} into {} chunks, time cost: {}ms.",
+                nextTable,
+                chunkNum,
+                end - start);
+    }
+
+    @Override
+    public Optional<SourceSplitBase> getNext() {
+        synchronized (lock) {
+            checkSplitterErrors();
+            if (!remainingSplits.isEmpty()) {
+                // return remaining splits firstly
+                Iterator<SchemalessSnapshotSplit> iterator = 
remainingSplits.iterator();
+                SchemalessSnapshotSplit split = iterator.next();
+                iterator.remove();
+                assignedSplits.put(split.splitId(), split);
+                addAlreadyProcessedTablesIfNotExists(split.getTableId());
+                enumeratorMetrics
+                        .getTableMetrics(split.getTableId())
+                        .finishProcessSplit(split.splitId());
+                return 
Optional.of(split.toSnapshotSplit(tableSchemas.get(split.getTableId())));
+            } else if (!remainingTables.isEmpty()) {
+                try {
+                    // wait for the asynchronous split to complete
+                    lock.wait();

Review Comment:
   We need to use `wait` to release this lock. Then the lock can be gotten by 
other thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to