yuxiqian commented on code in PR #3988: URL: https://github.com/apache/flink-cdc/pull/3988#discussion_r2053760265
########## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java: ########## @@ -165,39 +168,82 @@ public void close() throws IOException { } // ------------------------------------------------------------------------------------------ + protected boolean isStreamSplitAllAssigned; + protected List<SourceSplitBase> pendingStreamSplits = null; + + private Optional<SourceSplitBase> getNextStreamSplit() { + if (pendingStreamSplits == null) { + StartupOptions startupOptions = sourceConfig.getStartupOptions(); + + Offset startingOffset; + switch (startupOptions.startupMode) { + case LATEST_OFFSET: + startingOffset = dialect.displayCurrentOffset(sourceConfig); + break; + case EARLIEST_OFFSET: + startingOffset = offsetFactory.createInitialOffset(); + break; + case TIMESTAMP: + startingOffset = + offsetFactory.createTimestampOffset( + startupOptions.startupTimestampMillis); + break; + case SPECIFIC_OFFSETS: + startingOffset = + offsetFactory.newOffset( + startupOptions.specificOffsetFile, + startupOptions.specificOffsetPos.longValue()); + break; + default: + throw new IllegalStateException( + "Unsupported startup mode " + startupOptions.startupMode); + } + + pendingStreamSplits = + new ArrayList<>( + createStreamSplits( + startingOffset, + offsetFactory.createNoStoppingOffset(), + new ArrayList<>(), + new HashMap<>(), + 0, + false, + true)); + Preconditions.checkArgument( + pendingStreamSplits.size() <= enumeratorContext.currentParallelism(), + "%s stream splits generated, which is greater than current parallelism %s. Some splits might never be assigned.", + pendingStreamSplits.size(), + enumeratorContext.currentParallelism()); + } - public StreamSplit createStreamSplit() { - StartupOptions startupOptions = sourceConfig.getStartupOptions(); - - Offset startingOffset; - switch (startupOptions.startupMode) { - case LATEST_OFFSET: - startingOffset = dialect.displayCurrentOffset(sourceConfig); - break; - case EARLIEST_OFFSET: - startingOffset = offsetFactory.createInitialOffset(); - break; - case TIMESTAMP: - startingOffset = - offsetFactory.createTimestampOffset(startupOptions.startupTimestampMillis); - break; - case SPECIFIC_OFFSETS: - startingOffset = - offsetFactory.newOffset( - startupOptions.specificOffsetFile, - startupOptions.specificOffsetPos.longValue()); - break; - default: - throw new IllegalStateException( - "Unsupported startup mode " + startupOptions.startupMode); + if (pendingStreamSplits.isEmpty()) { + return Optional.empty(); + } else { + SourceSplitBase nextSplit = pendingStreamSplits.remove(0); + if (pendingStreamSplits.isEmpty()) { + isStreamSplitAllAssigned = true; + } + return Optional.of(nextSplit); } + } - return new StreamSplit( - STREAM_SPLIT_ID, - startingOffset, - offsetFactory.createNoStoppingOffset(), - new ArrayList<>(), - new HashMap<>(), - 0); + protected List<SourceSplitBase> createStreamSplits( + Offset minOffset, + Offset stoppingOffset, + List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos, + Map<TableId, TableChanges.TableChange> tableSchemas, + int totalFinishedSplitSize, + boolean isSuspended, + boolean isSnapshotCompleted) { + return Collections.singletonList( + new StreamSplit( + STREAM_SPLIT_ID, Review Comment: Good idea, applied it in the mocked incremental source. Shall we leave the split name unchanged for the default implementation since it has only one stream split? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org