JunRuiLee commented on code in PR #25414: URL: https://github.com/apache/flink/pull/25414#discussion_r1805761682
########## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/forwardgroup/StreamNodeForwardGroup.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph.forwardgroup; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.runtime.executiongraph.VertexGroupComputeUtil; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +public class StreamNodeForwardGroup { Review Comment: JavaDoc is required ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java: ########## @@ -85,6 +88,10 @@ public ResultPartitionType getResultType() { return resultType; } + public List<StreamEdge> getConsumerStreamEdges() { Review Comment: getOutputStreamEdges ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/forwardgroup/StreamNodeForwardGroup.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph.forwardgroup; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.runtime.executiongraph.VertexGroupComputeUtil; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +public class StreamNodeForwardGroup { Review Comment: Could we introduce an interface named ForwardGroup and rename the original ForwardGroup class to JobVerticesForwardGroup? The JobVerticesForwardGroup and StreamNodeForwardGroup class would then implement this interface. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasher.java: ########## @@ -31,4 +31,12 @@ public interface StreamGraphHasher { * didn't change. */ Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph); + + /** + * Generates a hash for the specified {@link StreamNode} within the {@link StreamGraph}. This + * hash is stored in the provided map and can be used to uniquely identify the {@link + * StreamNode} across job submissions, assuming its configuration remains unchanged. + */ + boolean generateHashesByStreamNode( + StreamNode streamNode, StreamGraph streamGraph, Map<Integer, byte[]> hashes); Review Comment: int streamNodeId, StreamGraph streamGraph, Map<Integer, byte[]> hashes ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/StreamNodeUpdateRequestInfo.java: ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph.util; + +import org.apache.flink.annotation.Internal; + +/** + * This class implements the {@link StreamGraphUpdateRequestInfo} interface and carries the data + * required to updates a stream node. + */ +@Internal +public class StreamNodeUpdateRequestInfo implements StreamGraphUpdateRequestInfo {} Review Comment: As mentioned in another comment. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamGraph.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.graph.StreamGraph; + +import java.util.HashMap; +import java.util.Map; + +/** Helper class that provides read-only StreamGraph. */ +@Internal +public class ImmutableStreamGraph { Review Comment: unit tests are required. ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java: ########## @@ -85,6 +88,10 @@ public ResultPartitionType getResultType() { return resultType; } + public List<StreamEdge> getConsumerStreamEdges() { + return streamEdges; + } + // -------------------------------------------------------------------------------------------- public void addConsumer(JobEdge edge) { Review Comment: ``` public void addConsumer(JobEdge edge) { // sanity check checkState(id.equals(edge.getSourceId()), "Incompatible dataset id."); if (consumers.isEmpty() && streamEdges.isEmpty()) { distributionPattern = edge.getDistributionPattern(); isBroadcast = edge.isBroadcast(); } else { checkState( distributionPattern == edge.getDistributionPattern(), "Incompatible distribution pattern."); checkState(isBroadcast == edge.isBroadcast(), "Incompatible broadcast type."); } consumers.add(edge); } ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java: ########## @@ -43,6 +44,8 @@ public class IntermediateDataSet implements java.io.Serializable { // All consumers must have the same partitioner and parallelism private final List<JobEdge> consumers = new ArrayList<>(); + private final List<StreamEdge> streamEdges = new ArrayList<>(); Review Comment: maybe outputStreamEdges? ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java: ########## @@ -44,4 +44,14 @@ public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph str return hashResult; } + Review Comment: traverseStreamGraphAndGenerateHashes could call the method generateHashesByStreamNode ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java: ########## @@ -103,6 +110,22 @@ public void addConsumer(JobEdge edge) { consumers.add(edge); } + public void addStreamEdge(StreamEdge edge) { Review Comment: addOutputStreamEdge ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/forwardgroup/StreamNodeForwardGroup.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph.forwardgroup; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.runtime.executiongraph.VertexGroupComputeUtil; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +public class StreamNodeForwardGroup { + + private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + + private int maxParallelism = JobVertex.MAX_PARALLELISM_DEFAULT; + + private final Set<Integer> startNodeIds = new HashSet<>(); + + private final Map<Integer, List<Integer>> chainedNodeIdMap = new HashMap<>(); + + public StreamNodeForwardGroup(final Set<StreamNode> startNodes) { + checkNotNull(startNodes); + + Set<Integer> configuredParallelisms = + startNodes.stream() + .filter( + startNode -> { + startNodeIds.add(startNode.getId()); + return startNode.getParallelism() > 0; + }) + .map(StreamNode::getParallelism) + .collect(Collectors.toSet()); + + checkState(configuredParallelisms.size() <= 1); + + if (configuredParallelisms.size() == 1) { + this.parallelism = configuredParallelisms.iterator().next(); + } + + Set<Integer> configuredMaxParallelisms = + startNodes.stream() + .map(StreamNode::getMaxParallelism) + .filter(val -> val > 0) + .collect(Collectors.toSet()); + + if (!configuredMaxParallelisms.isEmpty()) { + this.maxParallelism = Collections.min(configuredMaxParallelisms); + checkState( + parallelism == ExecutionConfig.PARALLELISM_DEFAULT + || maxParallelism >= parallelism, + "There is a start node in the forward group whose maximum parallelism is smaller than the group's parallelism"); + } + } + + public void setParallelism(int parallelism) { + checkState(this.parallelism == ExecutionConfig.PARALLELISM_DEFAULT); + this.parallelism = parallelism; + } + + public boolean isParallelismDecided() { + return parallelism > 0; + } + + public int getParallelism() { + checkState(isParallelismDecided()); + return parallelism; + } + + public boolean isMaxParallelismDecided() { + return maxParallelism > 0; + } + + public int getMaxParallelism() { + checkState(isMaxParallelismDecided()); + return maxParallelism; + } + + public int size() { + return startNodeIds.size(); + } + + public Set<Integer> getStartNodeIds() { + return startNodeIds; + } + + public Map<Integer, List<Integer>> getChainedNodeIdMap() { + return chainedNodeIdMap; + } + + public void putChainedNodeIds(Integer startNodeId, List<Integer> chainedNodeIds) { + this.chainedNodeIdMap.put(startNodeId, chainedNodeIds); + } + + public void mergeForwardGroup(StreamNodeForwardGroup forwardGroup) { + if (forwardGroup == null) { + return; + } + this.startNodeIds.addAll(forwardGroup.getStartNodeIds()); + this.chainedNodeIdMap.putAll(forwardGroup.getChainedNodeIdMap()); + } + + public static Map<Integer, StreamNodeForwardGroup> computeForwardGroup( Review Comment: this method should move to ForwardGroupComputeUtil and add an UT case. ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/forwardgroup/StreamNodeForwardGroup.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph.forwardgroup; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.runtime.executiongraph.VertexGroupComputeUtil; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +public class StreamNodeForwardGroup { + + private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + + private int maxParallelism = JobVertex.MAX_PARALLELISM_DEFAULT; + + private final Set<Integer> startNodeIds = new HashSet<>(); + + private final Map<Integer, List<Integer>> chainedNodeIdMap = new HashMap<>(); + + public StreamNodeForwardGroup(final Set<StreamNode> startNodes) { Review Comment: In my view, StreamNodeForwardGroup should represent a group of StreamNode objects connected by forward edges. I'm not very sure about the meaning of the field startNodes. Could you add some JavaDoc? ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphContext.java: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.graph.util.ImmutableStreamGraph; +import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo; +import org.apache.flink.streaming.api.graph.util.StreamNodeUpdateRequestInfo; + +import java.util.List; + +/** + * Defines a context for optimizing and working with a read-only view of a StreamGraph. It provides + * methods to modify StreamEdges and StreamNodes within the StreamGraph. + */ +@Internal +public interface StreamGraphContext { + + /** + * Returns a read-only view of the StreamGraph. + * + * @return a read-only view of the StreamGraph. + */ + ImmutableStreamGraph getStreamGraph(); + + /** + * Modifies stream edges within the StreamGraph. + * + * @param requestInfos the stream edges to be modified. + * @return true if the modification was successful, false otherwise. + */ + boolean modifyStreamEdge(List<StreamEdgeUpdateRequestInfo> requestInfos); + + /** + * Modifies a StreamNode within the StreamGraph. + * + * @param requestInfo the StreamNode to be modified. + * @return true if the modification was successful, false otherwise. + */ + boolean modifyStreamNode(StreamNodeUpdateRequestInfo requestInfo); Review Comment: It seems we have no plans to modify StreamNode, right? If that's the case, I prefer to remove this method and the StreamNodeUpdateRequestInfo class. Otherwise, please refactor the StreamNodeUpdateRequestInfo class by adding useful methods and fields. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/StreamEdgeUpdateRequestInfo.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; + +/** + * This class implements the {@link StreamGraphUpdateRequestInfo} interface and carries the data + * required to updates a stream edge. + */ +@Internal +public class StreamEdgeUpdateRequestInfo implements StreamGraphUpdateRequestInfo { + private final String edgeId; + private final Integer sourceId; + private final Integer targetId; + + private StreamPartitioner<?> outputPartitioner; + + public StreamEdgeUpdateRequestInfo(String edgeId, Integer sourceId, Integer targetId) { + this.edgeId = edgeId; Review Comment: I think sourceId and targetId are enough. The related changes can also be removed. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/StreamGraphUpdateRequestInfo.java: ########## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph.util; + +import org.apache.flink.annotation.Internal; + +/** Interface for different implementations of updating a stream graph. */ +@Internal +public interface StreamGraphUpdateRequestInfo {} Review Comment: useless interface, can be removed. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java: ########## @@ -238,4 +238,8 @@ public String toString() { public IntermediateDataSetID getIntermediateDatasetIdToProduce() { return intermediateDatasetIdToProduce; } + + public String getId() { Review Comment: useless ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamEdge.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.graph.StreamEdge; + +/** Helper class that provides read-only StreamEdge. */ +@Internal +public class ImmutableStreamEdge { Review Comment: unit tests are required. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamNode.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** Helper class that provides read-only StreamNode. */ +@Internal +public class ImmutableStreamNode { Review Comment: unit tests are required. ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphContextTest.java: ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge; +import org.apache.flink.streaming.api.graph.util.ImmutableStreamGraph; +import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode; +import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo; +import org.apache.flink.streaming.api.transformations.PartitionTransformation; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; +import org.apache.flink.streaming.runtime.partitioner.ForwardForConsecutiveHashPartitioner; +import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner; +import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class StreamGraphContextTest { Review Comment: DefaultStreamGraphContextTest -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org