loserwang1024 commented on code in PR #3988:
URL: https://github.com/apache/flink-cdc/pull/3988#discussion_r2055653749


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java:
##########
@@ -172,7 +181,7 @@ public void handleSourceEvent(int subtaskId, SourceEvent 
sourceEvent) {
             LOG.info(
                     "The enumerator receives notice from subtask {} for the 
stream split assignment. ",
                     subtaskId);
-            this.streamSplitTaskId = subtaskId;
+            this.streamSplitTaskIds.add(subtaskId);

Review Comment:
   1. Maybe use set rather than list? Map will be better.
   2. What if the stream split's taskId changed, for example,  addSplitsBack or 
restart,
   3. Maybe we should a map whose key is stream split id.
   
   I remember  @ruanhang1993 used to mention it when I intruduce newly added 
framework. @ruanhang1993 , CC



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java:
##########
@@ -165,39 +170,89 @@ public void close() throws IOException {
     }
 
     // 
------------------------------------------------------------------------------------------
+    protected boolean isStreamSplitAllAssigned;
+    protected List<SourceSplitBase> pendingStreamSplits = null;
+
+    private Optional<SourceSplitBase> getNextStreamSplit() {
+        if (pendingStreamSplits == null) {
+            StartupOptions startupOptions = sourceConfig.getStartupOptions();
 
-    public StreamSplit createStreamSplit() {
-        StartupOptions startupOptions = sourceConfig.getStartupOptions();
-
-        Offset startingOffset;
-        switch (startupOptions.startupMode) {
-            case LATEST_OFFSET:
-                startingOffset = dialect.displayCurrentOffset(sourceConfig);
-                break;
-            case EARLIEST_OFFSET:
-                startingOffset = offsetFactory.createInitialOffset();
-                break;
-            case TIMESTAMP:
-                startingOffset =
-                        
offsetFactory.createTimestampOffset(startupOptions.startupTimestampMillis);
-                break;
-            case SPECIFIC_OFFSETS:
-                startingOffset =
-                        offsetFactory.newOffset(
-                                startupOptions.specificOffsetFile,
-                                startupOptions.specificOffsetPos.longValue());
-                break;
-            default:
-                throw new IllegalStateException(
-                        "Unsupported startup mode " + 
startupOptions.startupMode);
+            Offset startingOffset;
+            switch (startupOptions.startupMode) {
+                case LATEST_OFFSET:
+                    startingOffset = 
dialect.displayCurrentOffset(sourceConfig);
+                    break;
+                case EARLIEST_OFFSET:
+                    startingOffset = offsetFactory.createInitialOffset();
+                    break;
+                case TIMESTAMP:
+                    startingOffset =
+                            offsetFactory.createTimestampOffset(
+                                    startupOptions.startupTimestampMillis);
+                    break;
+                case SPECIFIC_OFFSETS:
+                    startingOffset =
+                            offsetFactory.newOffset(
+                                    startupOptions.specificOffsetFile,
+                                    
startupOptions.specificOffsetPos.longValue());
+                    break;
+                default:
+                    throw new IllegalStateException(
+                            "Unsupported startup mode " + 
startupOptions.startupMode);
+            }
+
+            pendingStreamSplits =
+                    new ArrayList<>(
+                            createStreamSplits(
+                                    sourceConfig,
+                                    startingOffset,
+                                    offsetFactory.createNoStoppingOffset(),
+                                    new ArrayList<>(),
+                                    new HashMap<>(),
+                                    0,
+                                    false,
+                                    true));
+            Preconditions.checkArgument(
+                    pendingStreamSplits.size() == numberOfStreamSplits,
+                    "Inconsistent number of stream splits. Reported %s, but 
was %s",
+                    numberOfStreamSplits,
+                    pendingStreamSplits.size());
+            Preconditions.checkArgument(
+                    pendingStreamSplits.size() <= 
enumeratorContext.currentParallelism(),
+                    "%s stream splits generated, which is greater than current 
parallelism %s. Some splits might never be assigned.",
+                    pendingStreamSplits.size(),
+                    enumeratorContext.currentParallelism());
         }
 
-        return new StreamSplit(
-                STREAM_SPLIT_ID,
-                startingOffset,
-                offsetFactory.createNoStoppingOffset(),
-                new ArrayList<>(),
-                new HashMap<>(),
-                0);
+        if (pendingStreamSplits.isEmpty()) {
+            return Optional.empty();
+        } else {
+            SourceSplitBase nextSplit = pendingStreamSplits.remove(0);

Review Comment:
   maybe a queue to poll?



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java:
##########
@@ -165,39 +170,89 @@ public void close() throws IOException {
     }
 
     // 
------------------------------------------------------------------------------------------
+    protected boolean isStreamSplitAllAssigned;
+    protected List<SourceSplitBase> pendingStreamSplits = null;
+
+    private Optional<SourceSplitBase> getNextStreamSplit() {
+        if (pendingStreamSplits == null) {

Review Comment:
   It's easier to understand to use a flag rather than whether 
pendingStreamSplits == null to determine whether need to createStreamSplits



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java:
##########
@@ -252,56 +260,106 @@ public void close() throws IOException {
     }
 
     // 
--------------------------------------------------------------------------------------------
-
-    public StreamSplit createStreamSplit() {
-        final List<SchemalessSnapshotSplit> assignedSnapshotSplit =
-                snapshotSplitAssigner.getAssignedSplits().values().stream()
-                        .sorted(Comparator.comparing(SourceSplitBase::splitId))
-                        .collect(Collectors.toList());
-
-        Map<String, Offset> splitFinishedOffsets = 
snapshotSplitAssigner.getSplitFinishedOffsets();
-        final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new 
ArrayList<>();
-
-        Offset minOffset = null, maxOffset = null;
-        for (SchemalessSnapshotSplit split : assignedSnapshotSplit) {
-            // find the min and max offset of change log
-            Offset changeLogOffset = splitFinishedOffsets.get(split.splitId());
-            if (minOffset == null || changeLogOffset.isBefore(minOffset)) {
-                minOffset = changeLogOffset;
+    // Overridable methods

Review Comment:
   Why variable is not put into begining? And I don't unstand what's 
Overridable methods meaning hear?



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java:
##########
@@ -238,12 +243,15 @@ public void startAssignNewlyAddedTables() {
 
     @Override
     public void onStreamSplitUpdated() {
-        snapshotSplitAssigner.onStreamSplitUpdated();
+        if (++updatedStreamSplitCount == numberOfStreamSplits) {

Review Comment:
   What if request is duplicate with same split rather than each split a stream 
update request?



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java:
##########
@@ -129,13 +133,14 @@ public void onFinishedSplits(Map<String, Offset> 
splitFinishedOffsets) {
     @Override
     public void addSplits(Collection<SourceSplitBase> splits) {
         // we don't store the split, but will re-create stream split later
-        isStreamSplitAssigned = false;
+        isStreamSplitAllAssigned = false;

Review Comment:
   We do it because we just need one stream split. What if 
ddSplits(Collection<SourceSplitBase> splits)  is less than numberOfStreamSplits?



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java:
##########
@@ -124,7 +133,7 @@ public void addSplitsBack(List<SourceSplitBase> splits, int 
subtaskId) {
                 
splits.stream().filter(SourceSplitBase::isStreamSplit).findAny();
         if (streamSplit.isPresent()) {
             LOG.info("The enumerator adds add stream split back: {}", 
streamSplit);
-            this.streamSplitTaskId = null;
+            this.streamSplitTaskIds.clear();

Review Comment:
   If only one split is back, clear all?



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java:
##########
@@ -159,16 +168,11 @@ public Optional<SourceSplitBase> getNext() {
                 // we need to wait snapshot-assigner to be finished before
                 // assigning the stream split. Otherwise, records emitted from 
stream split
                 // might be out-of-order in terms of same primary key with 
snapshot splits.
-                isStreamSplitAssigned = true;

Review Comment:
   why remove it when is InitialAssigningFinished  but reserved when is 
NewlyAddedAssigningFinished?



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java:
##########
@@ -277,13 +285,15 @@ private void requestStreamSplitUpdateIfNeed() {
                 && 
isNewlyAddedAssigningSnapshotFinished(splitAssigner.getAssignerStatus())) {
             // If enumerator knows which reader is assigned stream split, just 
send to this reader,
             // nor sends to all registered readers.
-            if (streamSplitTaskId != null) {
+            if (!streamSplitTaskIds.isEmpty()) {

Review Comment:
   What if one streamSplitTaskId is assigned successfully while another is 
fail? The code will go to the else logic.
   
   maybe: !streamSplitTaskIds.isEmpty() && streamSplitTaskIds.size() == 
streamSplitTaskIds ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to