yuxiqian commented on code in PR #3988:
URL: https://github.com/apache/flink-cdc/pull/3988#discussion_r2053760265


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java:
##########
@@ -165,39 +168,82 @@ public void close() throws IOException {
     }
 
     // 
------------------------------------------------------------------------------------------
+    protected boolean isStreamSplitAllAssigned;
+    protected List<SourceSplitBase> pendingStreamSplits = null;
+
+    private Optional<SourceSplitBase> getNextStreamSplit() {
+        if (pendingStreamSplits == null) {
+            StartupOptions startupOptions = sourceConfig.getStartupOptions();
+
+            Offset startingOffset;
+            switch (startupOptions.startupMode) {
+                case LATEST_OFFSET:
+                    startingOffset = 
dialect.displayCurrentOffset(sourceConfig);
+                    break;
+                case EARLIEST_OFFSET:
+                    startingOffset = offsetFactory.createInitialOffset();
+                    break;
+                case TIMESTAMP:
+                    startingOffset =
+                            offsetFactory.createTimestampOffset(
+                                    startupOptions.startupTimestampMillis);
+                    break;
+                case SPECIFIC_OFFSETS:
+                    startingOffset =
+                            offsetFactory.newOffset(
+                                    startupOptions.specificOffsetFile,
+                                    
startupOptions.specificOffsetPos.longValue());
+                    break;
+                default:
+                    throw new IllegalStateException(
+                            "Unsupported startup mode " + 
startupOptions.startupMode);
+            }
+
+            pendingStreamSplits =
+                    new ArrayList<>(
+                            createStreamSplits(
+                                    startingOffset,
+                                    offsetFactory.createNoStoppingOffset(),
+                                    new ArrayList<>(),
+                                    new HashMap<>(),
+                                    0,
+                                    false,
+                                    true));
+            Preconditions.checkArgument(
+                    pendingStreamSplits.size() <= 
enumeratorContext.currentParallelism(),
+                    "%s stream splits generated, which is greater than current 
parallelism %s. Some splits might never be assigned.",
+                    pendingStreamSplits.size(),
+                    enumeratorContext.currentParallelism());
+        }
 
-    public StreamSplit createStreamSplit() {
-        StartupOptions startupOptions = sourceConfig.getStartupOptions();
-
-        Offset startingOffset;
-        switch (startupOptions.startupMode) {
-            case LATEST_OFFSET:
-                startingOffset = dialect.displayCurrentOffset(sourceConfig);
-                break;
-            case EARLIEST_OFFSET:
-                startingOffset = offsetFactory.createInitialOffset();
-                break;
-            case TIMESTAMP:
-                startingOffset =
-                        
offsetFactory.createTimestampOffset(startupOptions.startupTimestampMillis);
-                break;
-            case SPECIFIC_OFFSETS:
-                startingOffset =
-                        offsetFactory.newOffset(
-                                startupOptions.specificOffsetFile,
-                                startupOptions.specificOffsetPos.longValue());
-                break;
-            default:
-                throw new IllegalStateException(
-                        "Unsupported startup mode " + 
startupOptions.startupMode);
+        if (pendingStreamSplits.isEmpty()) {
+            return Optional.empty();
+        } else {
+            SourceSplitBase nextSplit = pendingStreamSplits.remove(0);
+            if (pendingStreamSplits.isEmpty()) {
+                isStreamSplitAllAssigned = true;
+            }
+            return Optional.of(nextSplit);
         }
+    }
 
-        return new StreamSplit(
-                STREAM_SPLIT_ID,
-                startingOffset,
-                offsetFactory.createNoStoppingOffset(),
-                new ArrayList<>(),
-                new HashMap<>(),
-                0);
+    protected List<SourceSplitBase> createStreamSplits(
+            Offset minOffset,
+            Offset stoppingOffset,
+            List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos,
+            Map<TableId, TableChanges.TableChange> tableSchemas,
+            int totalFinishedSplitSize,
+            boolean isSuspended,
+            boolean isSnapshotCompleted) {
+        return Collections.singletonList(
+                new StreamSplit(
+                        STREAM_SPLIT_ID,

Review Comment:
   Good idea, applied it in the mocked incremental source. Shall we leave the 
split name unchanged for the default implementation since it has only one 
stream split?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to