ruanhang1993 commented on code in PR #3510: URL: https://github.com/apache/flink-cdc/pull/3510#discussion_r1883771180
########## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java: ########## @@ -319,45 +335,91 @@ private int computeTablesPendingSnapshot() { return numTablesPendingSnapshot; } - @Override - public Optional<SourceSplitBase> getNext() { - if (!remainingSplits.isEmpty()) { - // return remaining splits firstly - Iterator<SchemalessSnapshotSplit> iterator = remainingSplits.iterator(); - SchemalessSnapshotSplit split = iterator.next(); - iterator.remove(); - assignedSplits.put(split.splitId(), split); - enumeratorMetrics - .getTableMetrics(split.getTableId()) - .finishProcessSplit(split.splitId()); - return Optional.of(split.toSnapshotSplit(tableSchemas.get(split.getTableId()))); - } else { - // it's turn for new table - TableId nextTable = remainingTables.pollFirst(); - if (nextTable != null) { - // split the given table into chunks (snapshot splits) - Collection<SnapshotSplit> splits = chunkSplitter.generateSplits(nextTable); - final Map<TableId, TableChanges.TableChange> tableSchema = new HashMap<>(); - if (!splits.isEmpty()) { + private void startAsynchronouslySplit() { + if (chunkSplitter.hasNextChunk() || !remainingTables.isEmpty()) { + if (executor == null) { + ThreadFactory threadFactory = + new ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build(); + this.executor = Executors.newSingleThreadExecutor(threadFactory); + } + executor.submit(this::splitChunksForRemainingTables); + } + } + + private void splitTable(TableId nextTable) { + LOG.info("Start splitting table {} into chunks...", nextTable); + long start = System.currentTimeMillis(); + int chunkNum = 0; + boolean hasRecordSchema = false; + // split the given table into chunks (snapshot splits) + do { + synchronized (lock) { + Collection<SnapshotSplit> splits; + try { + splits = chunkSplitter.generateSplits(nextTable); + } catch (Exception e) { + throw new IllegalStateException( + "Error when splitting chunks for " + nextTable, e); + } + + if (!hasRecordSchema && !splits.isEmpty()) { + hasRecordSchema = true; + final Map<TableId, TableChanges.TableChange> tableSchema = new HashMap<>(); tableSchema.putAll(splits.iterator().next().getTableSchemas()); + tableSchemas.putAll(tableSchema); } final List<SchemalessSnapshotSplit> schemalessSnapshotSplits = splits.stream() .map(SnapshotSplit::toSchemalessSnapshotSplit) .collect(Collectors.toList()); + chunkNum += splits.size(); remainingSplits.addAll(schemalessSnapshotSplits); - tableSchemas.putAll(tableSchema); - if (!alreadyProcessedTables.contains(nextTable)) { - enumeratorMetrics.startSnapshotTables(1); - } - alreadyProcessedTables.add(nextTable); List<String> splitIds = schemalessSnapshotSplits.stream() .map(SchemalessSnapshotSplit::splitId) .collect(Collectors.toList()); enumeratorMetrics.getTableMetrics(nextTable).addNewSplits(splitIds); + + if (!chunkSplitter.hasNextChunk()) { + remainingTables.remove(nextTable); + } + lock.notify(); + } + } while (chunkSplitter.hasNextChunk()); + long end = System.currentTimeMillis(); + LOG.info( + "Split table {} into {} chunks, time cost: {}ms.", + nextTable, + chunkNum, + end - start); + } + + @Override + public Optional<SourceSplitBase> getNext() { + synchronized (lock) { + checkSplitterErrors(); + if (!remainingSplits.isEmpty()) { + // return remaining splits firstly + Iterator<SchemalessSnapshotSplit> iterator = remainingSplits.iterator(); + SchemalessSnapshotSplit split = iterator.next(); + iterator.remove(); + assignedSplits.put(split.splitId(), split); + addAlreadyProcessedTablesIfNotExists(split.getTableId()); + enumeratorMetrics + .getTableMetrics(split.getTableId()) + .finishProcessSplit(split.splitId()); + return Optional.of(split.toSnapshotSplit(tableSchemas.get(split.getTableId()))); + } else if (!remainingTables.isEmpty()) { + try { + // wait for the asynchronous split to complete + lock.wait(); Review Comment: We need to use `wait` to release this lock. Then the lock can be gotten by other thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org