loserwang1024 commented on code in PR #3510: URL: https://github.com/apache/flink-cdc/pull/3510#discussion_r1883576694
########## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java: ########## @@ -319,45 +335,91 @@ private int computeTablesPendingSnapshot() { return numTablesPendingSnapshot; } - @Override - public Optional<SourceSplitBase> getNext() { - if (!remainingSplits.isEmpty()) { - // return remaining splits firstly - Iterator<SchemalessSnapshotSplit> iterator = remainingSplits.iterator(); - SchemalessSnapshotSplit split = iterator.next(); - iterator.remove(); - assignedSplits.put(split.splitId(), split); - enumeratorMetrics - .getTableMetrics(split.getTableId()) - .finishProcessSplit(split.splitId()); - return Optional.of(split.toSnapshotSplit(tableSchemas.get(split.getTableId()))); - } else { - // it's turn for new table - TableId nextTable = remainingTables.pollFirst(); - if (nextTable != null) { - // split the given table into chunks (snapshot splits) - Collection<SnapshotSplit> splits = chunkSplitter.generateSplits(nextTable); - final Map<TableId, TableChanges.TableChange> tableSchema = new HashMap<>(); - if (!splits.isEmpty()) { + private void startAsynchronouslySplit() { + if (chunkSplitter.hasNextChunk() || !remainingTables.isEmpty()) { + if (executor == null) { + ThreadFactory threadFactory = + new ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build(); + this.executor = Executors.newSingleThreadExecutor(threadFactory); + } + executor.submit(this::splitChunksForRemainingTables); + } + } + + private void splitTable(TableId nextTable) { + LOG.info("Start splitting table {} into chunks...", nextTable); + long start = System.currentTimeMillis(); + int chunkNum = 0; + boolean hasRecordSchema = false; + // split the given table into chunks (snapshot splits) + do { + synchronized (lock) { + Collection<SnapshotSplit> splits; + try { + splits = chunkSplitter.generateSplits(nextTable); + } catch (Exception e) { + throw new IllegalStateException( + "Error when splitting chunks for " + nextTable, e); + } + + if (!hasRecordSchema && !splits.isEmpty()) { + hasRecordSchema = true; + final Map<TableId, TableChanges.TableChange> tableSchema = new HashMap<>(); tableSchema.putAll(splits.iterator().next().getTableSchemas()); + tableSchemas.putAll(tableSchema); } final List<SchemalessSnapshotSplit> schemalessSnapshotSplits = splits.stream() .map(SnapshotSplit::toSchemalessSnapshotSplit) .collect(Collectors.toList()); + chunkNum += splits.size(); remainingSplits.addAll(schemalessSnapshotSplits); - tableSchemas.putAll(tableSchema); - if (!alreadyProcessedTables.contains(nextTable)) { - enumeratorMetrics.startSnapshotTables(1); - } - alreadyProcessedTables.add(nextTable); List<String> splitIds = schemalessSnapshotSplits.stream() .map(SchemalessSnapshotSplit::splitId) .collect(Collectors.toList()); enumeratorMetrics.getTableMetrics(nextTable).addNewSplits(splitIds); + + if (!chunkSplitter.hasNextChunk()) { + remainingTables.remove(nextTable); + } + lock.notify(); + } + } while (chunkSplitter.hasNextChunk()); + long end = System.currentTimeMillis(); + LOG.info( + "Split table {} into {} chunks, time cost: {}ms.", + nextTable, + chunkNum, + end - start); + } + + @Override + public Optional<SourceSplitBase> getNext() { Review Comment: > Do we need `waitTableDiscoveryReady` here or not ? I remove it because : current table discovery is not async, it is a blocking operation when assigner is started. if remainingTables and remainingSplits and alreadyProcessedTables are all empty, it meaning that no table is found. Thus, it also no use to wait here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org