loserwang1024 commented on code in PR #3988: URL: https://github.com/apache/flink-cdc/pull/3988#discussion_r2055653749
########## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java: ########## @@ -172,7 +181,7 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { LOG.info( "The enumerator receives notice from subtask {} for the stream split assignment. ", subtaskId); - this.streamSplitTaskId = subtaskId; + this.streamSplitTaskIds.add(subtaskId); Review Comment: 1. Maybe use set rather than list? Map will be better. 2. What if the stream split's taskId changed, for example, addSplitsBack or restart, 3. Maybe we should a map whose key is stream split id. I remember @ruanhang1993 used to mention it when I intruduce newly added framework. @ruanhang1993 , CC ########## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java: ########## @@ -165,39 +170,89 @@ public void close() throws IOException { } // ------------------------------------------------------------------------------------------ + protected boolean isStreamSplitAllAssigned; + protected List<SourceSplitBase> pendingStreamSplits = null; + + private Optional<SourceSplitBase> getNextStreamSplit() { + if (pendingStreamSplits == null) { + StartupOptions startupOptions = sourceConfig.getStartupOptions(); - public StreamSplit createStreamSplit() { - StartupOptions startupOptions = sourceConfig.getStartupOptions(); - - Offset startingOffset; - switch (startupOptions.startupMode) { - case LATEST_OFFSET: - startingOffset = dialect.displayCurrentOffset(sourceConfig); - break; - case EARLIEST_OFFSET: - startingOffset = offsetFactory.createInitialOffset(); - break; - case TIMESTAMP: - startingOffset = - offsetFactory.createTimestampOffset(startupOptions.startupTimestampMillis); - break; - case SPECIFIC_OFFSETS: - startingOffset = - offsetFactory.newOffset( - startupOptions.specificOffsetFile, - startupOptions.specificOffsetPos.longValue()); - break; - default: - throw new IllegalStateException( - "Unsupported startup mode " + startupOptions.startupMode); + Offset startingOffset; + switch (startupOptions.startupMode) { + case LATEST_OFFSET: + startingOffset = dialect.displayCurrentOffset(sourceConfig); + break; + case EARLIEST_OFFSET: + startingOffset = offsetFactory.createInitialOffset(); + break; + case TIMESTAMP: + startingOffset = + offsetFactory.createTimestampOffset( + startupOptions.startupTimestampMillis); + break; + case SPECIFIC_OFFSETS: + startingOffset = + offsetFactory.newOffset( + startupOptions.specificOffsetFile, + startupOptions.specificOffsetPos.longValue()); + break; + default: + throw new IllegalStateException( + "Unsupported startup mode " + startupOptions.startupMode); + } + + pendingStreamSplits = + new ArrayList<>( + createStreamSplits( + sourceConfig, + startingOffset, + offsetFactory.createNoStoppingOffset(), + new ArrayList<>(), + new HashMap<>(), + 0, + false, + true)); + Preconditions.checkArgument( + pendingStreamSplits.size() == numberOfStreamSplits, + "Inconsistent number of stream splits. Reported %s, but was %s", + numberOfStreamSplits, + pendingStreamSplits.size()); + Preconditions.checkArgument( + pendingStreamSplits.size() <= enumeratorContext.currentParallelism(), + "%s stream splits generated, which is greater than current parallelism %s. Some splits might never be assigned.", + pendingStreamSplits.size(), + enumeratorContext.currentParallelism()); } - return new StreamSplit( - STREAM_SPLIT_ID, - startingOffset, - offsetFactory.createNoStoppingOffset(), - new ArrayList<>(), - new HashMap<>(), - 0); + if (pendingStreamSplits.isEmpty()) { + return Optional.empty(); + } else { + SourceSplitBase nextSplit = pendingStreamSplits.remove(0); Review Comment: maybe a queue to poll? ########## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java: ########## @@ -165,39 +170,89 @@ public void close() throws IOException { } // ------------------------------------------------------------------------------------------ + protected boolean isStreamSplitAllAssigned; + protected List<SourceSplitBase> pendingStreamSplits = null; + + private Optional<SourceSplitBase> getNextStreamSplit() { + if (pendingStreamSplits == null) { Review Comment: It's easier to understand to use a flag rather than whether pendingStreamSplits == null to determine whether need to createStreamSplits ########## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java: ########## @@ -252,56 +260,106 @@ public void close() throws IOException { } // -------------------------------------------------------------------------------------------- - - public StreamSplit createStreamSplit() { - final List<SchemalessSnapshotSplit> assignedSnapshotSplit = - snapshotSplitAssigner.getAssignedSplits().values().stream() - .sorted(Comparator.comparing(SourceSplitBase::splitId)) - .collect(Collectors.toList()); - - Map<String, Offset> splitFinishedOffsets = snapshotSplitAssigner.getSplitFinishedOffsets(); - final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>(); - - Offset minOffset = null, maxOffset = null; - for (SchemalessSnapshotSplit split : assignedSnapshotSplit) { - // find the min and max offset of change log - Offset changeLogOffset = splitFinishedOffsets.get(split.splitId()); - if (minOffset == null || changeLogOffset.isBefore(minOffset)) { - minOffset = changeLogOffset; + // Overridable methods Review Comment: Why variable is not put into begining? And I don't unstand what's Overridable methods meaning hear? ########## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java: ########## @@ -238,12 +243,15 @@ public void startAssignNewlyAddedTables() { @Override public void onStreamSplitUpdated() { - snapshotSplitAssigner.onStreamSplitUpdated(); + if (++updatedStreamSplitCount == numberOfStreamSplits) { Review Comment: What if request is duplicate with same split rather than each split a stream update request? ########## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java: ########## @@ -129,13 +133,14 @@ public void onFinishedSplits(Map<String, Offset> splitFinishedOffsets) { @Override public void addSplits(Collection<SourceSplitBase> splits) { // we don't store the split, but will re-create stream split later - isStreamSplitAssigned = false; + isStreamSplitAllAssigned = false; Review Comment: We do it because we just need one stream split. What if ddSplits(Collection<SourceSplitBase> splits) is less than numberOfStreamSplits? ########## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java: ########## @@ -124,7 +133,7 @@ public void addSplitsBack(List<SourceSplitBase> splits, int subtaskId) { splits.stream().filter(SourceSplitBase::isStreamSplit).findAny(); if (streamSplit.isPresent()) { LOG.info("The enumerator adds add stream split back: {}", streamSplit); - this.streamSplitTaskId = null; + this.streamSplitTaskIds.clear(); Review Comment: If only one split is back, clear all? ########## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java: ########## @@ -159,16 +168,11 @@ public Optional<SourceSplitBase> getNext() { // we need to wait snapshot-assigner to be finished before // assigning the stream split. Otherwise, records emitted from stream split // might be out-of-order in terms of same primary key with snapshot splits. - isStreamSplitAssigned = true; Review Comment: why remove it when is InitialAssigningFinished but reserved when is NewlyAddedAssigningFinished? ########## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java: ########## @@ -277,13 +285,15 @@ private void requestStreamSplitUpdateIfNeed() { && isNewlyAddedAssigningSnapshotFinished(splitAssigner.getAssignerStatus())) { // If enumerator knows which reader is assigned stream split, just send to this reader, // nor sends to all registered readers. - if (streamSplitTaskId != null) { + if (!streamSplitTaskIds.isEmpty()) { Review Comment: What if one streamSplitTaskId is assigned successfully while another is fail? The code will go to the else logic. maybe: !streamSplitTaskIds.isEmpty() && streamSplitTaskIds.size() == streamSplitTaskIds ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org