JunRuiLee commented on code in PR #25414:
URL: https://github.com/apache/flink/pull/25414#discussion_r1839827521


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/forwardgroup/ForwardGroupComputeUtil.java:
##########
@@ -101,6 +104,103 @@ public static Map<JobVertexID, ForwardGroup> 
computeForwardGroups(
         return ret;
     }
 
+    /**
+     * We calculate forward group by a set of chained stream nodes, and use 
the start node to
+     * identify the chain group.
+     *
+     * @param topologicallySortedChainedStreamNodesMap Topologically sorted 
chained stream nodes.
+     * @param forwardProducersRetriever Records all upstream chain groups 
which connected to the
+     *     given chain group with forward edge.
+     * @return a map of forward groups, with the start node id as the key.
+     */
+    public static Map<Integer, StreamNodeForwardGroup> 
computeStreamNodeForwardGroup(
+            final Map<StreamNode, List<StreamNode>> 
topologicallySortedChainedStreamNodesMap,
+            final Function<StreamNode, Set<StreamNode>> 
forwardProducersRetriever) {
+        // In the forwardProducersRetriever, only the upstream nodes connected 
to the given start
+        // node by the forward edge are saved. We need to calculate the chain 
groups that can be
+        // accessed with consecutive forward edges and put them in the same 
forward group.
+        final Map<StreamNode, Set<StreamNode>> nodeToGroup = new 
IdentityHashMap<>();
+        for (StreamNode currentNode : 
topologicallySortedChainedStreamNodesMap.keySet()) {
+            Set<StreamNode> currentGroup = new HashSet<>();
+            currentGroup.add(currentNode);
+            nodeToGroup.put(currentNode, currentGroup);
+            for (StreamNode producerNode : 
forwardProducersRetriever.apply(currentNode)) {
+                // Merge nodes from the current group and producer group.
+                final Set<StreamNode> producerGroup = 
nodeToGroup.get(producerNode);
+                // The producerGroup cannot be null unless the topological 
order is incorrect.
+                if (producerGroup == null) {
+                    throw new IllegalStateException(
+                            "Producer task "
+                                    + producerNode.getId()
+                                    + " forward group is null"
+                                    + " while calculating forward group for 
the consumer task "
+                                    + currentNode.getId()
+                                    + ". This should be a forward group 
building bug.");
+                }
+                // Merge the forward group groups where the upstream and 
downstream are connected by
+                // forward edge
+                if (currentGroup != producerGroup) {
+                    currentGroup =
+                            VertexGroupComputeUtil.mergeVertexGroups(
+                                    currentGroup, producerGroup, nodeToGroup);
+                }
+            }
+        }
+        final Map<Integer, StreamNodeForwardGroup> result = new HashMap<>();
+        for (Set<StreamNode> nodeGroup : 
VertexGroupComputeUtil.uniqueVertexGroups(nodeToGroup)) {
+            Map<StreamNode, List<StreamNode>> 
chainedStreamNodeGroupsByStartNode = new HashMap<>();
+            nodeGroup.forEach(
+                    startNode -> {
+                        chainedStreamNodeGroupsByStartNode.put(
+                                startNode, 
topologicallySortedChainedStreamNodesMap.get(startNode));
+                    });
+            StreamNodeForwardGroup streamNodeForwardGroup =
+                    new 
StreamNodeForwardGroup(chainedStreamNodeGroupsByStartNode);
+            for (StreamNode startNode : 
streamNodeForwardGroup.getStartNodes()) {
+                result.put(startNode.getId(), streamNodeForwardGroup);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Determines whether the target forward group can be merged into the 
source forward group.
+     *
+     * @param sourceForwardGroup The source forward group.
+     * @param targetForwardGroup The forward group needs to be merged.
+     * @return whether the merge is valid.
+     */
+    public static boolean canTargetMergeIntoSourceForwardGroup(
+            ForwardGroup sourceForwardGroup, ForwardGroup targetForwardGroup) {
+        if (sourceForwardGroup == null || targetForwardGroup == null) {
+            return false;
+        }
+
+        if (sourceForwardGroup == targetForwardGroup) {
+            return true;
+        }
+
+        if (sourceForwardGroup.isParallelismDecided()
+                && targetForwardGroup.isParallelismDecided()
+                && sourceForwardGroup.getParallelism() != 
targetForwardGroup.getParallelism()) {
+            return false;
+        }
+
+        // When the maximum parallelism of both forward groups is determined, 
the maximum
+        // parallelism of the targetForwardGroup should not be less than that 
of the
+        // sourceForwardGroup. This is because when the source has finished 
deciding its
+        // parallelism, we need to ensure that the targetForwardGroup can also 
achieve the same
+        // parallelism.
+        if (sourceForwardGroup.isMaxParallelismDecided()
+                && targetForwardGroup.isMaxParallelismDecided()
+                && sourceForwardGroup.getMaxParallelism()
+                        > targetForwardGroup.getMaxParallelism()) {
+            return false;
+        }

Review Comment:
   I think this part could be :
   ```
   if (sourceForwardGroup.isParallelismDecided()
           && targetForwardGroup.isMaxParallelismDecided()
           && sourceForwardGroup.getParallelism() > 
targetForwardGroup.getMaxParallelism()) {
       return false;
   }
   ```



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java:
##########
@@ -0,0 +1,2347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.api.common.BatchShuffleMode;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.CommitterInitContext;
+import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.api.connector.source.mocks.MockSource;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.execution.CheckpointingMode;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializerAdapter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
+import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import 
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
+import 
org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
+import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.CachedDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import 
org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction;
+import 
org.apache.flink.streaming.api.functions.source.legacy.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
+import org.apache.flink.streaming.api.legacy.io.TextInputFormat;
+import org.apache.flink.streaming.api.legacy.io.TextOutputFormat;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.operators.legacy.YieldingOperatorFactory;
+import org.apache.flink.streaming.api.transformations.CacheTransformation;
+import 
org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
+import org.apache.flink.streaming.util.TestAnyModeReadingStreamOperator;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Method;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.runtime.jobgraph.DistributionPattern.POINTWISE;
+import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.areOperatorsChainable;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for {@link StreamingJobGraphGenerator} and {@link 
AdaptiveGraphManager}.
+ *
+ * <p>ATTENTION: This test is extremely brittle. Do NOT remove, add or 
re-order test cases.
+ */
+abstract class JobGraphGeneratorTestBase {
+    abstract JobGraph createJobGraph(StreamGraph streamGraph);
+
+    @Test
+    void testParallelismOneNotChained() {
+
+        // --------- the program ---------
+
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        DataStream<Tuple2<String, String>> input =
+                env.fromData("a", "b", "c", "d", "e", "f")
+                        .map(
+                                new MapFunction<String, Tuple2<String, 
String>>() {
+
+                                    @Override
+                                    public Tuple2<String, String> map(String 
value) {
+                                        return new Tuple2<>(value, value);
+                                    }
+                                });
+
+        DataStream<Tuple2<String, String>> result =
+                input.keyBy(x -> x.f0)
+                        .map(
+                                new MapFunction<Tuple2<String, String>, 
Tuple2<String, String>>() {
+
+                                    @Override
+                                    public Tuple2<String, String> map(
+                                            Tuple2<String, String> value) {
+                                        return value;
+                                    }
+                                });
+
+        result.addSink(
+                new SinkFunction<Tuple2<String, String>>() {
+
+                    @Override
+                    public void invoke(Tuple2<String, String> value) {}
+                });
+
+        // --------- the job graph ---------
+
+        StreamGraph streamGraph = env.getStreamGraph();
+        JobGraph jobGraph = createJobGraph(streamGraph);
+        List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+
+        assertThat(jobGraph.getNumberOfVertices()).isEqualTo(2);
+        assertThat(verticesSorted.get(0).getParallelism()).isEqualTo(1);
+        assertThat(verticesSorted.get(1).getParallelism()).isEqualTo(1);
+
+        JobVertex sourceVertex = verticesSorted.get(0);
+        JobVertex mapSinkVertex = verticesSorted.get(1);
+
+        assertThat(sourceVertex.getProducedDataSets().get(0).getResultType())
+                .isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
+        
assertThat(mapSinkVertex.getInputs().get(0).getSource().getResultType())
+                .isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
+    }
+
+    /**
+     * Tests that disabled checkpointing sets the checkpointing interval to 
Long.MAX_VALUE and the
+     * checkpoint mode to {@link CheckpointingMode#AT_LEAST_ONCE}.
+     */
+    @Test
+    void testDisabledCheckpointing() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.fromData(0).print();
+        StreamGraph streamGraph = env.getStreamGraph();
+        assertThat(streamGraph.getCheckpointConfig().isCheckpointingEnabled())
+                .withFailMessage("Checkpointing enabled")
+                .isFalse();
+
+        JobGraph jobGraph = createJobGraph(streamGraph);
+
+        JobCheckpointingSettings snapshottingSettings = 
jobGraph.getCheckpointingSettings();
+        assertThat(
+                        snapshottingSettings
+                                .getCheckpointCoordinatorConfiguration()
+                                .getCheckpointInterval())
+                .isEqualTo(Long.MAX_VALUE);
+        
assertThat(snapshottingSettings.getCheckpointCoordinatorConfiguration().isExactlyOnce())
+                .isFalse();
+
+        List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+        StreamConfig streamConfig = new 
StreamConfig(verticesSorted.get(0).getConfiguration());
+        
assertThat(streamConfig.getCheckpointMode()).isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
+    }
+
+    @Test
+    void testEnabledUnalignedCheckAndDisabledCheckpointing() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.fromData(0).print();
+        StreamGraph streamGraph = env.getStreamGraph();
+        assertThat(streamGraph.getCheckpointConfig().isCheckpointingEnabled())
+                .withFailMessage("Checkpointing enabled")
+                .isFalse();
+        env.getCheckpointConfig().enableUnalignedCheckpoints(true);
+
+        JobGraph jobGraph = createJobGraph(streamGraph);
+
+        List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+        StreamConfig streamConfig = new 
StreamConfig(verticesSorted.get(0).getConfiguration());
+        
assertThat(streamConfig.getCheckpointMode()).isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
+        assertThat(streamConfig.isUnalignedCheckpointsEnabled()).isFalse();
+    }
+
+    @Test
+    void testTransformationSetParallelism() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        // The default parallelism of the environment (that is inherited by 
the source)
+        // and the parallelism of the map operator needs to be different for 
this test
+        env.setParallelism(4);
+        env.fromSequence(1L, 3L).map(i -> 
i).setParallelism(10).print().setParallelism(20);
+        StreamGraph streamGraph = env.getStreamGraph();
+
+        // check the streamGraph parallelism configured
+        final List<StreamNode> streamNodes =
+                streamGraph.getStreamNodes().stream()
+                        .sorted(Comparator.comparingInt(StreamNode::getId))
+                        .collect(Collectors.toList());
+        assertThat(streamNodes.get(0).isParallelismConfigured()).isFalse();
+        assertThat(streamNodes.get(1).isParallelismConfigured()).isTrue();
+        assertThat(streamNodes.get(2).isParallelismConfigured()).isTrue();
+
+        // check the jobGraph parallelism configured
+        JobGraph jobGraph = createJobGraph(streamGraph);
+        List<JobVertex> vertices = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+        assertThat(jobGraph.getNumberOfVertices()).isEqualTo(3);
+        assertThat(vertices.get(0).isParallelismConfigured()).isFalse();
+        assertThat(vertices.get(1).isParallelismConfigured()).isTrue();
+        assertThat(vertices.get(2).isParallelismConfigured()).isTrue();
+    }
+
+    @Test
+    void testTransformationSetMaxParallelism() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        // The max parallelism of the environment (that is inherited by the 
source)
+        // and the parallelism of the map operator needs to be different for 
this test
+        env.setMaxParallelism(4);
+
+        DataStreamSource<Long> source =
+                env.fromSequence(1L, 3L); // no explicit max parallelism set, 
grab from environment.
+        SingleOutputStreamOperator<Long> map = source.map(i -> 
i).setMaxParallelism(10);
+        DataStreamSink<Long> sink = map.print().setMaxParallelism(20);
+
+        StreamGraph streamGraph = env.getStreamGraph();
+
+        // check the streamGraph max parallelism is configured correctly
+        
assertThat(streamGraph.getStreamNode(source.getId()).getMaxParallelism()).isEqualTo(4);
+        
assertThat(streamGraph.getStreamNode(map.getId()).getMaxParallelism()).isEqualTo(10);
+        
assertThat(streamGraph.getStreamNode(sink.getTransformation().getId()).getMaxParallelism())
+                .isEqualTo(20);
+    }
+
+    @Test
+    void testChainNodeSetParallelism() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.fromSequence(1L, 3L).map(value -> 
value).print().setParallelism(env.getParallelism());
+        StreamGraph streamGraph = env.getStreamGraph();
+
+        // check the streamGraph parallelism configured
+        final List<StreamNode> streamNodes =
+                streamGraph.getStreamNodes().stream()
+                        .sorted(Comparator.comparingInt(StreamNode::getId))
+                        .collect(Collectors.toList());
+        assertThat(streamNodes.get(0).isParallelismConfigured()).isFalse();
+        assertThat(streamNodes.get(1).isParallelismConfigured()).isFalse();
+        assertThat(streamNodes.get(2).isParallelismConfigured()).isTrue();
+
+        // check the jobGraph parallelism configured
+        JobGraph jobGraph = createJobGraph(streamGraph);
+        List<JobVertex> vertices = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+        assertThat(jobGraph.getNumberOfVertices()).isEqualTo(1);
+        assertThat(vertices.get(0).isParallelismConfigured()).isTrue();
+    }
+
+    @Test
+    void testChainedSourcesSetParallelism() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        MultipleInputTransformation<Long> transform =
+                new MultipleInputTransformation<>(
+                        "mit",
+                        new UnusedOperatorFactory(),
+                        Types.LONG,
+                        env.getParallelism(),
+                        false);
+        DataStreamSource<Long> source1 =
+                env.fromSource(
+                        new NumberSequenceSource(1, 2),
+                        WatermarkStrategy.noWatermarks(),
+                        "source1");
+        DataStreamSource<Long> source2 =
+                env.fromSource(
+                        new NumberSequenceSource(1, 2),
+                        WatermarkStrategy.noWatermarks(),
+                        "source2");
+        transform.addInput(source1.getTransformation());
+        transform.addInput(source2.getTransformation());
+        transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
+        source1.setParallelism(env.getParallelism());
+        env.addOperator(transform);
+
+        StreamGraph streamGraph = env.getStreamGraph();
+
+        // check the streamGraph parallelism configured
+        final List<StreamNode> streamNodes =
+                streamGraph.getStreamNodes().stream()
+                        .sorted(Comparator.comparingInt(StreamNode::getId))
+                        .collect(Collectors.toList());
+        assertThat(streamNodes.get(0).isParallelismConfigured()).isFalse();
+        assertThat(streamNodes.get(1).isParallelismConfigured()).isTrue();
+        assertThat(streamNodes.get(2).isParallelismConfigured()).isFalse();
+
+        // check the jobGraph parallelism configured
+        JobGraph jobGraph = createJobGraph(streamGraph);
+        List<JobVertex> vertices = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+        assertThat(jobGraph.getNumberOfVertices()).isEqualTo(1);
+        assertThat(vertices.get(0).isParallelismConfigured()).isTrue();
+    }
+
+    @Test
+    void testDynamicGraphVertexParallelism() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        int defaultParallelism = 20;
+        env.setParallelism(defaultParallelism);
+        env.fromSequence(1L, 3L).map(value -> value).print();
+        StreamGraph streamGraph = env.getStreamGraph();
+
+        for (StreamNode streamNode : streamGraph.getStreamNodes()) {
+            
assertThat(streamNode.getParallelism()).isEqualTo(defaultParallelism);
+        }
+        streamGraph.setDynamic(false);
+        JobGraph jobGraph = createJobGraph(streamGraph);
+        List<JobVertex> vertices = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+        for (JobVertex vertex : vertices) {
+            assertThat(vertex.getParallelism()).isEqualTo(defaultParallelism);
+        }
+
+        for (StreamNode streamNode : streamGraph.getStreamNodes()) {
+            
assertThat(streamNode.getParallelism()).isEqualTo(defaultParallelism);
+        }
+        streamGraph.setDynamic(true);
+        jobGraph = createJobGraph(streamGraph);
+        vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
+        for (JobVertex vertex : vertices) {
+            
assertThat(vertex.getParallelism()).isEqualTo(ExecutionConfig.PARALLELISM_DEFAULT);
+        }
+    }
+
+    @Test
+    void testUnalignedCheckAndAtLeastOnce() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.fromData(0).print();
+        StreamGraph streamGraph = env.getStreamGraph();
+        env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
+        env.getCheckpointConfig().enableUnalignedCheckpoints(true);
+
+        JobGraph jobGraph = createJobGraph(streamGraph);
+
+        List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+        StreamConfig streamConfig = new 
StreamConfig(verticesSorted.get(0).getConfiguration());
+        
assertThat(streamConfig.getCheckpointMode()).isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
+        assertThat(streamConfig.isUnalignedCheckpointsEnabled()).isFalse();
+    }
+
+    @Test
+    void generatorForwardsSavepointRestoreSettings() {
+        StreamGraph streamGraph =
+                new StreamGraph(
+                        new Configuration(),
+                        new ExecutionConfig(),
+                        new CheckpointConfig(),
+                        SavepointRestoreSettings.forPath("hello"));
+
+        JobGraph jobGraph = createJobGraph(streamGraph);
+
+        SavepointRestoreSettings savepointRestoreSettings = 
jobGraph.getSavepointRestoreSettings();
+        
assertThat(savepointRestoreSettings.getRestorePath()).isEqualTo("hello");
+    }
+
+    /** Verifies that the chain start/end is correctly set. */
+    @Test
+    void testChainStartEndSetting() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        // set parallelism to 2 to avoid chaining with source in case when 
available processors is
+        // 1.
+        env.setParallelism(2);
+
+        // fromElements -> CHAIN(Map -> Print)
+        env.fromData(1, 2, 3)
+                .map(
+                        new MapFunction<Integer, Integer>() {
+                            @Override
+                            public Integer map(Integer value) throws Exception 
{
+                                return value;
+                            }
+                        })
+                .print();
+        JobGraph jobGraph = createJobGraph(env.getStreamGraph());
+
+        List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+        JobVertex sourceVertex = verticesSorted.get(0);
+        JobVertex mapPrintVertex = verticesSorted.get(1);
+
+        assertThat(sourceVertex.getProducedDataSets().get(0).getResultType())
+                .isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
+        
assertThat(mapPrintVertex.getInputs().get(0).getSource().getResultType())
+                .isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
+
+        StreamConfig sourceConfig = new 
StreamConfig(sourceVertex.getConfiguration());
+        StreamConfig mapConfig = new 
StreamConfig(mapPrintVertex.getConfiguration());
+        Map<Integer, StreamConfig> chainedConfigs =
+                
mapConfig.getTransitiveChainedTaskConfigs(getClass().getClassLoader());
+        StreamConfig printConfig = chainedConfigs.values().iterator().next();
+
+        assertThat(sourceConfig.isChainStart()).isTrue();
+        assertThat(sourceConfig.isChainEnd()).isTrue();
+
+        assertThat(mapConfig.isChainStart()).isTrue();
+        assertThat(mapConfig.isChainEnd()).isFalse();
+
+        assertThat(printConfig.isChainStart()).isFalse();
+        assertThat(printConfig.isChainEnd()).isTrue();
+    }
+
+    @Test
+    void testOperatorCoordinatorAddedToJobVertex() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Integer> stream =
+                env.fromSource(
+                        new MockSource(Boundedness.BOUNDED, 1),
+                        WatermarkStrategy.noWatermarks(),
+                        "TestingSource");
+
+        OneInputTransformation<Integer, Integer> resultTransform =
+                new OneInputTransformation<Integer, Integer>(
+                        stream.getTransformation(),
+                        "AnyName",
+                        new CoordinatedTransformOperatorFactory(),
+                        BasicTypeInfo.INT_TYPE_INFO,
+                        env.getParallelism());
+
+        new TestingSingleOutputStreamOperator<>(env, resultTransform).print();
+
+        JobGraph jobGraph = createJobGraph(env.getStreamGraph());
+
+        
assertThat(jobGraph.getVerticesAsArray()[0].getOperatorCoordinators()).hasSize(2);
+    }
+
+    /**
+     * Verifies that the resources are merged correctly for chained operators 
(covers source and
+     * sink cases) when generating job graph.
+     */
+    @Test
+    void testResourcesForChainedSourceSink() throws Exception {
+        ResourceSpec resource1 = ResourceSpec.newBuilder(0.1, 100).build();
+        ResourceSpec resource2 = ResourceSpec.newBuilder(0.2, 200).build();
+        ResourceSpec resource3 = ResourceSpec.newBuilder(0.3, 300).build();
+        ResourceSpec resource4 = ResourceSpec.newBuilder(0.4, 400).build();
+        ResourceSpec resource5 = ResourceSpec.newBuilder(0.5, 500).build();
+
+        Method opMethod = 
getSetResourcesMethodAndSetAccessible(SingleOutputStreamOperator.class);
+        Method sinkMethod = 
getSetResourcesMethodAndSetAccessible(DataStreamSink.class);
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        DataStream<Tuple2<Integer, Integer>> source =
+                env.addSource(
+                        new ParallelSourceFunction<Tuple2<Integer, Integer>>() 
{
+                            @Override
+                            public void run(SourceContext<Tuple2<Integer, 
Integer>> ctx)
+                                    throws Exception {}
+
+                            @Override
+                            public void cancel() {}
+                        });
+        opMethod.invoke(source, resource1);
+
+        DataStream<Tuple2<Integer, Integer>> map =
+                source.map(
+                        new MapFunction<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>>() {
+                            @Override
+                            public Tuple2<Integer, Integer> 
map(Tuple2<Integer, Integer> value)
+                                    throws Exception {
+                                return value;
+                            }
+                        });
+        opMethod.invoke(map, resource2);
+
+        // CHAIN(Source -> Map -> Filter)
+        DataStream<Tuple2<Integer, Integer>> filter =
+                map.filter(
+                        new FilterFunction<Tuple2<Integer, Integer>>() {
+                            @Override
+                            public boolean filter(Tuple2<Integer, Integer> 
value) throws Exception {
+                                return false;
+                            }
+                        });
+        opMethod.invoke(filter, resource3);
+
+        DataStream<Tuple2<Integer, Integer>> reduce =
+                filter.keyBy(x -> x.f0)
+                        .reduce(
+                                new ReduceFunction<Tuple2<Integer, Integer>>() 
{
+                                    @Override
+                                    public Tuple2<Integer, Integer> reduce(
+                                            Tuple2<Integer, Integer> value1,
+                                            Tuple2<Integer, Integer> value2)
+                                            throws Exception {
+                                        return new Tuple2<>(value1.f0, 
value1.f1 + value2.f1);
+                                    }
+                                });
+        opMethod.invoke(reduce, resource4);
+
+        DataStreamSink<Tuple2<Integer, Integer>> sink =
+                reduce.addSink(
+                        new SinkFunction<Tuple2<Integer, Integer>>() {
+                            @Override
+                            public void invoke(Tuple2<Integer, Integer> value) 
throws Exception {}
+                        });
+        sinkMethod.invoke(sink, resource5);
+
+        JobGraph jobGraph = createJobGraph(env.getStreamGraph());
+
+        JobVertex sourceMapFilterVertex =
+                jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
+        JobVertex reduceSinkVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
+
+        assertThat(sourceMapFilterVertex.getMinResources())
+                .isEqualTo(resource3.merge(resource2).merge(resource1));
+
+        
assertThat(reduceSinkVertex.getPreferredResources()).isEqualTo(resource4.merge(resource5));
+    }
+
+    @Test
+    void testInputOutputFormat(@TempDir java.nio.file.Path outputPath) {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        DataStream<String> source =
+                env.addSource(
+                                new InputFormatSourceFunction<>(
+                                        new TextInputFormat(new 
Path("FakePath")),
+                                        TypeInformation.of(String.class)))
+                        .returns(TypeInformation.of(String.class))
+                        .name("source");
+
+        java.nio.file.Path outputFile1 = outputPath.resolve("outputFile1");
+        java.nio.file.Path outputFile2 = outputPath.resolve("outputFile2");
+        source.writeUsingOutputFormat(new TextOutputFormat<>(new 
Path(outputFile1.toUri())))
+                .name("sink1");
+        source.writeUsingOutputFormat(new TextOutputFormat<>(new 
Path(outputFile2.toUri())))
+                .name("sink2");
+
+        StreamGraph streamGraph = env.getStreamGraph();
+        JobGraph jobGraph = createJobGraph(streamGraph);
+        assertThat(jobGraph.getNumberOfVertices()).isEqualTo(1);
+
+        JobVertex jobVertex = jobGraph.getVertices().iterator().next();
+        assertThat(jobVertex).isInstanceOf(InputOutputFormatVertex.class);
+
+        InputOutputFormatContainer formatContainer =
+                new InputOutputFormatContainer(
+                        new TaskConfig(jobVertex.getConfiguration()),
+                        Thread.currentThread().getContextClassLoader());
+        Map<OperatorID, UserCodeWrapper<? extends InputFormat<?, ?>>> 
inputFormats =
+                formatContainer.getInputFormats();
+        Map<OperatorID, UserCodeWrapper<? extends OutputFormat<?>>> 
outputFormats =
+                formatContainer.getOutputFormats();
+        assertThat(inputFormats).hasSize(1);
+        assertThat(outputFormats).hasSize(2);
+
+        Map<String, OperatorID> nameToOperatorIds = new HashMap<>();
+        StreamConfig headConfig = new 
StreamConfig(jobVertex.getConfiguration());
+        nameToOperatorIds.put(headConfig.getOperatorName(), 
headConfig.getOperatorID());
+
+        Map<Integer, StreamConfig> chainedConfigs =
+                headConfig.getTransitiveChainedTaskConfigs(
+                        Thread.currentThread().getContextClassLoader());
+        for (StreamConfig config : chainedConfigs.values()) {
+            nameToOperatorIds.put(config.getOperatorName(), 
config.getOperatorID());
+        }
+
+        InputFormat<?, ?> sourceFormat =
+                inputFormats.get(nameToOperatorIds.get("Source: 
source")).getUserCodeObject();
+        assertThat(sourceFormat).isInstanceOf(TextInputFormat.class);
+
+        OutputFormat<?> sinkFormat1 =
+                outputFormats.get(nameToOperatorIds.get("Sink: 
sink1")).getUserCodeObject();
+        assertThat(sinkFormat1).isInstanceOf(TextOutputFormat.class);
+
+        OutputFormat<?> sinkFormat2 =
+                outputFormats.get(nameToOperatorIds.get("Sink: 
sink2")).getUserCodeObject();
+        assertThat(sinkFormat2).isInstanceOf(TextOutputFormat.class);
+    }
+
+    @Test
+    void testCoordinatedOperator() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Integer> source =
+                env.fromSource(
+                        new MockSource(Boundedness.BOUNDED, 1),
+                        WatermarkStrategy.noWatermarks(),
+                        "TestSource");
+        source.sinkTo(new DiscardingSink<>());
+
+        StreamGraph streamGraph = env.getStreamGraph();
+        JobGraph jobGraph = createJobGraph(streamGraph);
+        // There should be only one job vertex.
+        assertThat(jobGraph.getNumberOfVertices()).isEqualTo(1);
+
+        JobVertex jobVertex = jobGraph.getVerticesAsArray()[0];
+        List<SerializedValue<OperatorCoordinator.Provider>> 
coordinatorProviders =
+                jobVertex.getOperatorCoordinators();
+        // There should be only one coordinator provider.
+        assertThat(coordinatorProviders).hasSize(1);
+        // The invokable class should be SourceOperatorStreamTask.
+        final ClassLoader classLoader = getClass().getClassLoader();
+        assertThat(jobVertex.getInvokableClass(classLoader))
+                .isEqualTo(SourceOperatorStreamTask.class);
+        StreamOperatorFactory operatorFactory =
+                new StreamConfig(jobVertex.getConfiguration())
+                        .getStreamOperatorFactory(classLoader);
+        assertThat(operatorFactory).isInstanceOf(SourceOperatorFactory.class);
+    }
+
+    /** Test setting exchange mode to {@link StreamExchangeMode#PIPELINED}. */
+    @Test
+    void testExchangeModePipelined() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        // fromElements -> Map -> Print
+        DataStream<Integer> sourceDataStream = env.fromData(1, 2, 3);
+
+        DataStream<Integer> partitionAfterSourceDataStream =
+                new DataStream<>(
+                        env,
+                        new PartitionTransformation<>(
+                                sourceDataStream.getTransformation(),
+                                new ForwardPartitioner<>(),
+                                StreamExchangeMode.PIPELINED));
+        DataStream<Integer> mapDataStream =
+                partitionAfterSourceDataStream.map(value -> 
value).setParallelism(1);
+
+        DataStream<Integer> partitionAfterMapDataStream =
+                new DataStream<>(
+                        env,
+                        new PartitionTransformation<>(
+                                mapDataStream.getTransformation(),
+                                new RescalePartitioner<>(),
+                                StreamExchangeMode.PIPELINED));
+        partitionAfterMapDataStream.print().setParallelism(2);
+
+        JobGraph jobGraph = createJobGraph(env.getStreamGraph());
+
+        List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+        assertThat(verticesSorted).hasSize(2);
+
+        // it can be chained with PIPELINED exchange mode
+        JobVertex sourceAndMapVertex = verticesSorted.get(0);
+
+        // PIPELINED exchange mode is translated into PIPELINED_BOUNDED result 
partition
+        
assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
+                .isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
+    }
+
+    /** Test setting exchange mode to {@link StreamExchangeMode#BATCH}. */
+    @Test
+    void testExchangeModeBatch() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.setBufferTimeout(-1);
+        // fromElements -> Map -> Print
+        DataStream<Integer> sourceDataStream = env.fromData(1, 2, 3);
+
+        DataStream<Integer> partitionAfterSourceDataStream =
+                new DataStream<>(
+                        env,
+                        new PartitionTransformation<>(
+                                sourceDataStream.getTransformation(),
+                                new ForwardPartitioner<>(),
+                                StreamExchangeMode.BATCH));
+        DataStream<Integer> mapDataStream =
+                partitionAfterSourceDataStream.map(value -> 
value).setParallelism(1);
+
+        DataStream<Integer> partitionAfterMapDataStream =
+                new DataStream<>(
+                        env,
+                        new PartitionTransformation<>(
+                                mapDataStream.getTransformation(),
+                                new RescalePartitioner<>(),
+                                StreamExchangeMode.BATCH));
+        partitionAfterMapDataStream.print().setParallelism(2);
+
+        JobGraph jobGraph = createJobGraph(env.getStreamGraph());
+
+        List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+        assertThat(verticesSorted).hasSize(3);
+
+        // it can not be chained with BATCH exchange mode
+        JobVertex sourceVertex = verticesSorted.get(0);
+        JobVertex mapVertex = verticesSorted.get(1);
+
+        // BATCH exchange mode is translated into BLOCKING result partition
+        assertThat(sourceVertex.getProducedDataSets().get(0).getResultType())
+                .isEqualTo(ResultPartitionType.BLOCKING);
+        assertThat(mapVertex.getProducedDataSets().get(0).getResultType())
+                .isEqualTo(ResultPartitionType.BLOCKING);
+    }
+
+    /** Test setting exchange mode to {@link StreamExchangeMode#UNDEFINED}. */
+    @Test
+    void testExchangeModeUndefined() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        // fromElements -> Map -> Print
+        DataStream<Integer> sourceDataStream = env.fromData(1, 2, 3);
+
+        DataStream<Integer> partitionAfterSourceDataStream =
+                new DataStream<>(
+                        env,
+                        new PartitionTransformation<>(
+                                sourceDataStream.getTransformation(),
+                                new ForwardPartitioner<>(),
+                                StreamExchangeMode.UNDEFINED));
+        DataStream<Integer> mapDataStream =
+                partitionAfterSourceDataStream.map(value -> 
value).setParallelism(1);
+
+        DataStream<Integer> partitionAfterMapDataStream =
+                new DataStream<>(
+                        env,
+                        new PartitionTransformation<>(
+                                mapDataStream.getTransformation(),
+                                new RescalePartitioner<>(),
+                                StreamExchangeMode.UNDEFINED));
+        partitionAfterMapDataStream.print().setParallelism(2);
+
+        JobGraph jobGraph = createJobGraph(env.getStreamGraph());
+
+        List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+        assertThat(verticesSorted).hasSize(2);
+
+        // it can be chained with UNDEFINED exchange mode
+        JobVertex sourceAndMapVertex = verticesSorted.get(0);
+
+        // UNDEFINED exchange mode is translated into PIPELINED_BOUNDED result 
partition by default
+        
assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
+                .isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
+    }
+
+    /** Test setting exchange mode to {@link StreamExchangeMode#HYBRID_FULL}. 
*/
+    @Test
+    void testExchangeModeHybridFull() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        // fromElements -> Map -> Print
+        DataStream<Integer> sourceDataStream = env.fromData(1, 2, 3);
+
+        DataStream<Integer> partitionAfterSourceDataStream =
+                new DataStream<>(
+                        env,
+                        new PartitionTransformation<>(
+                                sourceDataStream.getTransformation(),
+                                new ForwardPartitioner<>(),
+                                StreamExchangeMode.HYBRID_FULL));
+        DataStream<Integer> mapDataStream =
+                partitionAfterSourceDataStream.map(value -> 
value).setParallelism(1);
+
+        DataStream<Integer> partitionAfterMapDataStream =
+                new DataStream<>(
+                        env,
+                        new PartitionTransformation<>(
+                                mapDataStream.getTransformation(),
+                                new RescalePartitioner<>(),
+                                StreamExchangeMode.HYBRID_FULL));
+        partitionAfterMapDataStream.print().setParallelism(2);
+
+        JobGraph jobGraph = createJobGraph(env.getStreamGraph());
+
+        List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+        assertThat(verticesSorted).hasSize(2);
+
+        // it can be chained with HYBRID_FULL exchange mode
+        JobVertex sourceAndMapVertex = verticesSorted.get(0);
+
+        // HYBRID_FULL exchange mode is translated into HYBRID_FULL result 
partition
+        
assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
+                .isEqualTo(ResultPartitionType.HYBRID_FULL);
+    }
+
+    /** Test setting exchange mode to {@link 
StreamExchangeMode#HYBRID_SELECTIVE}. */
+    @Test
+    void testExchangeModeHybridSelective() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        // fromElements -> Map -> Print
+        DataStream<Integer> sourceDataStream = env.fromData(1, 2, 3);
+
+        DataStream<Integer> partitionAfterSourceDataStream =
+                new DataStream<>(
+                        env,
+                        new PartitionTransformation<>(
+                                sourceDataStream.getTransformation(),
+                                new ForwardPartitioner<>(),
+                                StreamExchangeMode.HYBRID_SELECTIVE));
+        DataStream<Integer> mapDataStream =
+                partitionAfterSourceDataStream.map(value -> 
value).setParallelism(1);
+
+        DataStream<Integer> partitionAfterMapDataStream =
+                new DataStream<>(
+                        env,
+                        new PartitionTransformation<>(
+                                mapDataStream.getTransformation(),
+                                new RescalePartitioner<>(),
+                                StreamExchangeMode.HYBRID_SELECTIVE));
+        partitionAfterMapDataStream.print().setParallelism(2);
+
+        JobGraph jobGraph = createJobGraph(env.getStreamGraph());
+
+        List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+        assertThat(verticesSorted).hasSize(2);
+
+        // it can be chained with HYBRID_SELECTIVE exchange mode
+        JobVertex sourceAndMapVertex = verticesSorted.get(0);
+
+        // HYBRID_SELECTIVE exchange mode is translated into HYBRID_SELECTIVE 
result partition
+        
assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
+                .isEqualTo(ResultPartitionType.HYBRID_SELECTIVE);
+    }
+
+    @Test
+    void testStreamingJobTypeByDefault() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.fromData("test").sinkTo(new DiscardingSink<>());
+        JobGraph jobGraph = createJobGraph(env.getStreamGraph());
+        assertThat(jobGraph.getJobType()).isEqualTo(JobType.STREAMING);
+    }
+
+    @Test
+    void testBatchJobType() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.fromData("test").sinkTo(new DiscardingSink<>());
+        JobGraph jobGraph = createJobGraph(env.getStreamGraph());
+        assertThat(jobGraph.getJobType()).isEqualTo(JobType.BATCH);
+    }
+
+    @Test
+    void testPartitionTypesInBatchMode() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.setParallelism(4);
+        env.disableOperatorChaining();
+        DataStream<Integer> source = env.fromData(1);
+        source
+                // set the same parallelism as the source to make it a FORWARD 
exchange
+                .map(value -> value)
+                .setParallelism(1)
+                .rescale()
+                .map(value -> value)
+                .rebalance()
+                .map(value -> value)
+                .keyBy(value -> value)
+                .map(value -> value)
+                .sinkTo(new DiscardingSink<>());
+
+        JobGraph jobGraph = createJobGraph(env.getStreamGraph());
+        List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+        assertHasOutputPartitionType(
+                verticesSorted.get(0) /* source - forward */, 
ResultPartitionType.BLOCKING);
+        assertHasOutputPartitionType(
+                verticesSorted.get(1) /* rescale */, 
ResultPartitionType.BLOCKING);
+        assertHasOutputPartitionType(
+                verticesSorted.get(2) /* rebalance */, 
ResultPartitionType.BLOCKING);
+        assertHasOutputPartitionType(
+                verticesSorted.get(3) /* keyBy */, 
ResultPartitionType.BLOCKING);
+        assertHasOutputPartitionType(
+                verticesSorted.get(4) /* forward - sink */, 
ResultPartitionType.BLOCKING);
+    }
+
+    private void assertHasOutputPartitionType(
+            JobVertex jobVertex, ResultPartitionType partitionType) {
+        
assertThat(jobVertex.getProducedDataSets().get(0).getResultType()).isEqualTo(partitionType);
+    }
+
+    @Test
+    void testNormalExchangeModeWithBufferTimeout() {
+        
testCompatibleExchangeModeWithBufferTimeout(StreamExchangeMode.PIPELINED);
+    }
+
+    private void 
testCompatibleExchangeModeWithBufferTimeout(StreamExchangeMode exchangeMode) {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setBufferTimeout(100);
+
+        DataStream<Integer> sourceDataStream = env.fromData(1, 2, 3);
+        PartitionTransformation<Integer> transformation =
+                new PartitionTransformation<>(
+                        sourceDataStream.getTransformation(),
+                        new RebalancePartitioner<>(),
+                        exchangeMode);
+
+        DataStream<Integer> partitionStream = new DataStream<>(env, 
transformation);
+        partitionStream.map(value -> value).print();
+
+        createJobGraph(env.getStreamGraph());
+    }
+
+    @Test
+    void testDisablingBufferTimeoutWithPipelinedExchanges() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        env.setBufferTimeout(-1);
+
+        env.fromData(1, 2, 3).map(value -> value).print();
+
+        final JobGraph jobGraph = createJobGraph(env.getStreamGraph());
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            final StreamConfig streamConfig = new 
StreamConfig(vertex.getConfiguration());
+            for (NonChainedOutput output :
+                    
streamConfig.getVertexNonChainedOutputs(this.getClass().getClassLoader())) {
+                assertThat(output.getBufferTimeout()).isEqualTo(-1L);
+            }
+        }
+    }
+
+    /** Test default job type. */
+    @Test
+    void testDefaultJobType() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        StreamGraph streamGraph =
+                new StreamGraphGenerator(
+                                Collections.emptyList(), env.getConfig(), 
env.getCheckpointConfig())
+                        .generate();
+        JobGraph jobGraph = createJobGraph(streamGraph);
+        assertThat(jobGraph.getJobType()).isEqualTo(JobType.STREAMING);
+    }
+
+    @Test
+    void testYieldingOperatorNotChainableToTaskChainedToLegacySource() {
+        // TODO: this test can be removed when the legacy SourceFunction API 
gets removed
+        StreamExecutionEnvironment chainEnv = 
StreamExecutionEnvironment.createLocalEnvironment(1);
+
+        chainEnv.addSource(new LegacySource())
+                .map((x) -> x)
+                // not chainable because of YieldingOperatorFactory and legacy 
source
+                .transform(
+                        "test", BasicTypeInfo.INT_TYPE_INFO, new 
YieldingTestOperatorFactory<>());
+
+        final StreamGraph streamGraph = chainEnv.getStreamGraph();
+
+        final List<StreamNode> streamNodes =
+                streamGraph.getStreamNodes().stream()
+                        .sorted(Comparator.comparingInt(StreamNode::getId))
+                        .collect(Collectors.toList());
+        assertThat(areOperatorsChainable(streamNodes.get(0), 
streamNodes.get(1), streamGraph))
+                .isTrue();
+        assertThat(areOperatorsChainable(streamNodes.get(1), 
streamNodes.get(2), streamGraph))
+                .isFalse();
+    }
+
+    @Test
+    void testJobGraphGenerationWithManyYieldingOperatorsDoesNotHang() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1);
+
+        SingleOutputStreamOperator<Long> operator =
+                env.fromSource(
+                                new NumberSequenceSource(0, 10),
+                                WatermarkStrategy.noWatermarks(),
+                                "input")
+                        .map((x) -> x);
+
+        // add 40 YieldingOperatorFactory
+        for (int i = 0; i < 40; i++) {
+            operator =
+                    operator.transform(
+                            "test",
+                            BasicTypeInfo.LONG_TYPE_INFO,
+                            new YieldingTestOperatorFactory<>());
+        }
+
+        operator.sinkTo(new DiscardingSink<>());
+
+        assertThat(CompletableFuture.runAsync(() -> 
env.getStreamGraph().getJobGraph()))

Review Comment:
   createJobGraph



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java:
##########
@@ -0,0 +1,672 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.*;
+import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroupComputeUtil;
+import org.apache.flink.runtime.jobgraph.forwardgroup.StreamNodeForwardGroup;
+import org.apache.flink.streaming.api.graph.util.JobVertexBuildContext;
+import org.apache.flink.streaming.api.graph.util.OperatorChainInfo;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+import java.util.*;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.*;
+
+/** Default implementation for {@link AdaptiveGraphGenerator}. */
+@Internal
+public class AdaptiveGraphManager implements AdaptiveGraphGenerator {
+
+    private final StreamGraph streamGraph;
+
+    private final JobGraph jobGraph;
+
+    private final StreamGraphHasher defaultStreamGraphHasher;
+
+    private final List<StreamGraphHasher> legacyStreamGraphHasher;
+
+    private final Executor serializationExecutor;
+
+    private final AtomicInteger vertexIndexId;
+
+    private final StreamGraphContext streamGraphContext;
+
+    private final Map<Integer, byte[]> hashes;
+
+    private final List<Map<Integer, byte[]>> legacyHashes;
+
+    // Records the id of stream node which job vertex is created.
+    private final Map<Integer, Integer> frozenNodeToStartNodeMap;
+
+    // When the downstream vertex is not created, we need to cache the output.
+    private final Map<Integer, Map<StreamEdge, NonChainedOutput>> 
intermediateOutputsCaches;
+
+    // Records the id of the stream node that produces the IntermediateDataSet.
+    private final Map<IntermediateDataSetID, Integer> 
intermediateDataSetIdToProducerMap;
+
+    // Records the ids of the start and end nodes for chained groups in the 
StreamNodeForwardGroup.
+    // When stream edge's partitioner is modified to forward, we need get 
forward groups by source
+    // and target node id.
+    private final Map<Integer, StreamNodeForwardGroup> 
startAndEndNodeIdToForwardGroupMap;
+
+    // Records the chain info that is not ready to create the job vertex, the 
key is the start node
+    // in this chain.
+    private final Map<Integer, OperatorChainInfo> pendingChainEntryPoints;
+
+    // The value is the stream node ids belonging to that job vertex.
+    private final Map<JobVertexID, Integer> jobVertexToStartNodeMap;
+
+    // The value is the stream node ids belonging to that job vertex.
+    private final Map<JobVertexID, List<Integer>> 
jobVertexToChainedStreamNodeIdsMap;
+
+    // We need cache all job vertices to create JobEdge for downstream vertex.
+    private final Map<Integer, JobVertex> jobVerticesCache;
+
+    // Records the ID of the job vertex that has completed execution.
+    private final Set<JobVertexID> finishedJobVertices;
+
+    private final AtomicBoolean hasHybridResultPartition;
+
+    public AdaptiveGraphManager(
+            ClassLoader userClassloader,
+            StreamGraph streamGraph,
+            Executor serializationExecutor,
+            @Nullable JobID jobId) {
+        preValidate(streamGraph, userClassloader);
+        this.streamGraph = streamGraph;
+        this.serializationExecutor = 
Preconditions.checkNotNull(serializationExecutor);
+
+        this.defaultStreamGraphHasher = new StreamGraphHasherV2();
+        this.legacyStreamGraphHasher = Collections.singletonList(new 
StreamGraphUserHashHasher());
+
+        this.hashes = new HashMap<>();
+        this.legacyHashes = Collections.singletonList(new HashMap<>());
+
+        this.jobVerticesCache = new LinkedHashMap<>();
+        this.pendingChainEntryPoints = new TreeMap<>();
+
+        this.frozenNodeToStartNodeMap = new HashMap<>();
+        this.intermediateOutputsCaches = new HashMap<>();
+        this.intermediateDataSetIdToProducerMap = new HashMap<>();
+        this.startAndEndNodeIdToForwardGroupMap = new HashMap<>();
+
+        this.vertexIndexId = new AtomicInteger(0);
+
+        this.hasHybridResultPartition = new AtomicBoolean(false);
+
+        this.jobVertexToStartNodeMap = new HashMap<>();
+        this.jobVertexToChainedStreamNodeIdsMap = new HashMap<>();
+
+        this.finishedJobVertices = new HashSet<>();
+
+        this.streamGraphContext =
+                new DefaultStreamGraphContext(
+                        streamGraph,
+                        startAndEndNodeIdToForwardGroupMap,
+                        frozenNodeToStartNodeMap,
+                        intermediateOutputsCaches);
+
+        this.jobGraph = createAndInitializeJobGraph(streamGraph, jobId);
+
+        initialization();
+    }
+
+    @Override
+    public JobGraph getJobGraph() {
+        return this.jobGraph;
+    }
+
+    @Override
+    public StreamGraphContext getStreamGraphContext() {
+        return streamGraphContext;
+    }
+
+    @Override
+    public List<JobVertex> onJobVertexFinished(JobVertexID 
finishedJobVertexId) {
+        this.finishedJobVertices.add(finishedJobVertexId);
+        List<StreamNode> streamNodes = new ArrayList<>();
+        for (StreamEdge outEdge : 
getOutputEdgesByVertexId(finishedJobVertexId)) {
+            streamNodes.add(streamGraph.getStreamNode(outEdge.getTargetId()));
+        }
+        return createJobVerticesAndUpdateGraph(streamNodes);
+    }
+
+    /**
+     * Retrieves the StreamNodeForwardGroup which provides a stream node level 
ForwardGroup.
+     *
+     * @param jobVertexId The ID of the JobVertex.
+     * @return An instance of {@link StreamNodeForwardGroup}.
+     */
+    public StreamNodeForwardGroup 
getStreamNodeForwardGroupByVertexId(JobVertexID jobVertexId) {
+        Integer startNodeId = jobVertexToStartNodeMap.get(jobVertexId);
+        return startAndEndNodeIdToForwardGroupMap.get(startNodeId);
+    }
+
+    /**
+     * Retrieves the number of operators that have not yet been converted to 
job vertex.
+     *
+     * @return The number of unconverted operators.
+     */
+    public int getPendingOperatorsCount() {
+        return streamGraph.getStreamNodes().size() - 
frozenNodeToStartNodeMap.size();
+    }
+
+    /**
+     * Retrieves the IDs of stream nodes that belong to the given job vertex.
+     *
+     * @param jobVertexId The ID of the JobVertex.
+     * @return A list of IDs of stream nodes that belong to the job vertex.
+     */
+    public List<Integer> getStreamNodeIdsByJobVertexId(JobVertexID 
jobVertexId) {
+        return jobVertexToChainedStreamNodeIdsMap.get(jobVertexId);
+    }
+
+    /**
+     * Retrieves the ID of the stream node that produces the 
IntermediateDataSet.
+     *
+     * @param intermediateDataSetID The ID of the IntermediateDataSet.
+     * @return The ID of the stream node that produces the IntermediateDataSet.
+     */
+    public Integer getProducerStreamNodeId(IntermediateDataSetID 
intermediateDataSetID) {
+        return intermediateDataSetIdToProducerMap.get(intermediateDataSetID);
+    }
+
+    private Optional<JobVertexID> findVertexByStreamNodeId(int streamNodeId) {
+        if (frozenNodeToStartNodeMap.containsKey(streamNodeId)) {
+            Integer startNodeId = frozenNodeToStartNodeMap.get(streamNodeId);
+            return Optional.of(jobVerticesCache.get(startNodeId).getID());
+        }
+        return Optional.empty();
+    }
+
+    private List<StreamEdge> getOutputEdgesByVertexId(JobVertexID jobVertexId) 
{
+        JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId);
+        List<StreamEdge> outputEdges = new ArrayList<>();
+        for (IntermediateDataSet result : jobVertex.getProducedDataSets()) {
+            outputEdges.addAll(result.getOutputStreamEdges());
+        }
+        return outputEdges;
+    }
+
+    private void initialization() {
+        List<StreamNode> sourceNodes = new ArrayList<>();
+        for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
+            sourceNodes.add(streamGraph.getStreamNode(sourceNodeId));
+        }
+        if (jobGraph.isDynamic()) {
+            setVertexParallelismsForDynamicGraphIfNecessary(sourceNodes);
+        }
+        createJobVerticesAndUpdateGraph(sourceNodes);
+    }
+
+    private List<JobVertex> createJobVerticesAndUpdateGraph(List<StreamNode> 
streamNodes) {
+        final JobVertexBuildContext jobVertexBuildContext =
+                new JobVertexBuildContext(
+                        jobGraph, streamGraph, hasHybridResultPartition, 
hashes, legacyHashes);
+
+        createOperatorChainInfos(streamNodes, jobVertexBuildContext);
+
+        recordCreatedJobVerticesInfo(jobVertexBuildContext);
+
+        generateConfigForJobVertices(jobVertexBuildContext);
+
+        return new 
ArrayList<>(jobVertexBuildContext.getJobVerticesInOrder().values());
+    }
+
+    private void generateConfigForJobVertices(JobVertexBuildContext 
jobVertexBuildContext) {
+        // this may be used by uncreated down stream vertex.
+        final Map<Integer, Map<StreamEdge, NonChainedOutput>> 
opIntermediateOutputs =
+                new HashMap<>();
+
+        setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs, 
jobVertexBuildContext);
+
+        setAllVertexNonChainedOutputsConfigs(opIntermediateOutputs, 
jobVertexBuildContext);
+
+        connectToFinishedUpStreamVertex(jobVertexBuildContext);
+
+        setPhysicalEdges(jobVertexBuildContext);
+
+        markSupportingConcurrentExecutionAttempts(jobVertexBuildContext);
+
+        validateHybridShuffleExecuteInBatchMode(jobVertexBuildContext);
+
+        // When generating in a single step, there may be differences between 
the results and the
+        // full image generation. We consider this difference to be normal 
because they do not need
+        // to be in the same shared group.
+        setSlotSharingAndCoLocation(jobVertexBuildContext);
+
+        setManagedMemoryFraction(jobVertexBuildContext);
+
+        addVertexIndexPrefixInVertexName(jobVertexBuildContext, vertexIndexId);
+
+        setVertexDescription(jobVertexBuildContext);
+
+        serializeOperatorCoordinatorsAndStreamConfig(serializationExecutor, 
jobVertexBuildContext);
+    }
+
+    private void setAllVertexNonChainedOutputsConfigs(
+            final Map<Integer, Map<StreamEdge, NonChainedOutput>> 
opIntermediateOutputs,
+            JobVertexBuildContext jobVertexBuildContext) {
+        jobVertexBuildContext
+                .getJobVerticesInOrder()
+                .keySet()
+                .forEach(
+                        startNodeId ->
+                                setVertexNonChainedOutputsConfig(
+                                        startNodeId, opIntermediateOutputs, 
jobVertexBuildContext));
+    }
+
+    private void setVertexNonChainedOutputsConfig(
+            Integer startNodeId,
+            Map<Integer, Map<StreamEdge, NonChainedOutput>> 
opIntermediateOutputs,
+            JobVertexBuildContext jobVertexBuildContext) {
+
+        OperatorChainInfo chainInfo = 
jobVertexBuildContext.getChainInfo(startNodeId);
+        StreamConfig config = 
chainInfo.getOperatorInfo(startNodeId).getVertexConfig();
+        List<StreamEdge> transitiveOutEdges = 
chainInfo.getTransitiveOutEdges();
+        LinkedHashSet<NonChainedOutput> transitiveOutputs = new 
LinkedHashSet<>();
+
+        for (StreamEdge edge : transitiveOutEdges) {
+            NonChainedOutput output = 
opIntermediateOutputs.get(edge.getSourceId()).get(edge);
+            transitiveOutputs.add(output);
+            // When a downstream vertex has been created, a connection to the 
downstream will be
+            // created, otherwise only an IntermediateDataSet will be created 
for it.
+            if 
(jobVertexBuildContext.getJobVerticesInOrder().containsKey(edge.getTargetId())) 
{
+                connect(startNodeId, edge, output, jobVerticesCache, 
jobVertexBuildContext);
+            } else {
+                JobVertex jobVertex =
+                        
jobVertexBuildContext.getJobVerticesInOrder().get(startNodeId);
+                IntermediateDataSet dataSet =
+                        jobVertex.getOrCreateResultDataSet(
+                                output.getDataSetId(), 
output.getPartitionType());
+                dataSet.addOutputStreamEdge(edge);
+                // we cache the output here for downstream vertex to create 
jobEdge.
+                intermediateOutputsCaches
+                        .computeIfAbsent(edge.getSourceId(), k -> new 
HashMap<>())
+                        .put(edge, output);
+            }
+            intermediateDataSetIdToProducerMap.put(output.getDataSetId(), 
edge.getSourceId());
+        }
+        config.setVertexNonChainedOutputs(new ArrayList<>(transitiveOutputs));
+    }
+
+    /**
+     * Responds to connect to the upstream job vertex that has completed 
execution.
+     *
+     * @param jobVertexBuildContext the job vertex build context.
+     */
+    private void connectToFinishedUpStreamVertex(JobVertexBuildContext 
jobVertexBuildContext) {
+        Map<Integer, OperatorChainInfo> chainInfos = 
jobVertexBuildContext.getChainInfosInOrder();
+        for (OperatorChainInfo chainInfo : chainInfos.values()) {
+            List<StreamEdge> transitiveInEdges = 
chainInfo.getTransitiveInEdges();
+            for (StreamEdge transitiveInEdge : transitiveInEdges) {
+                NonChainedOutput output =
+                        intermediateOutputsCaches
+                                .get(transitiveInEdge.getSourceId())
+                                .get(transitiveInEdge);
+                Integer sourceStartNodeId =
+                        
frozenNodeToStartNodeMap.get(transitiveInEdge.getSourceId());
+                connect(
+                        sourceStartNodeId,
+                        transitiveInEdge,
+                        output,
+                        jobVerticesCache,
+                        jobVertexBuildContext);
+            }
+        }
+    }
+
+    private void recordCreatedJobVerticesInfo(JobVertexBuildContext 
jobVertexBuildContext) {
+        Map<Integer, OperatorChainInfo> chainInfos = 
jobVertexBuildContext.getChainInfosInOrder();
+        for (OperatorChainInfo chainInfo : chainInfos.values()) {
+            JobVertex jobVertex = 
jobVertexBuildContext.getJobVertex(chainInfo.getStartNodeId());
+            jobVerticesCache.put(chainInfo.getStartNodeId(), jobVertex);
+            jobVertexToStartNodeMap.put(jobVertex.getID(), 
chainInfo.getStartNodeId());
+            chainInfo
+                    .getAllChainedNodes()
+                    .forEach(
+                            node -> {
+                                frozenNodeToStartNodeMap.put(
+                                        node.getId(), 
chainInfo.getStartNodeId());
+                                jobVertexToChainedStreamNodeIdsMap
+                                        .computeIfAbsent(
+                                                jobVertex.getID(), key -> new 
ArrayList<>())
+                                        .add(node.getId());
+                            });
+        }
+    }
+
+    private void createOperatorChainInfos(
+            List<StreamNode> streamNodes, JobVertexBuildContext 
jobVertexBuildContext) {
+        final Map<Integer, OperatorChainInfo> chainEntryPoints =
+                buildAndGetChainEntryPoints(streamNodes, 
jobVertexBuildContext);
+
+        final Collection<OperatorChainInfo> initialEntryPoints =
+                chainEntryPoints.entrySet().stream()
+                        .sorted(Map.Entry.comparingByKey())
+                        .map(Map.Entry::getValue)
+                        .collect(Collectors.toList());
+
+        for (OperatorChainInfo info : initialEntryPoints) {
+            createChain(
+                    info.getStartNodeId(),
+                    1,
+                    info,
+                    chainEntryPoints,
+                    false,
+                    serializationExecutor,
+                    jobVertexBuildContext,
+                    this::generateHashesByStreamNodeId);
+
+            // We need to record in edges connect to the finished job vertex 
and create the
+            // connection later.
+            StreamNode startNode = 
streamGraph.getStreamNode(info.getStartNodeId());
+            for (StreamEdge inEdge : startNode.getInEdges()) {
+                if 
(frozenNodeToStartNodeMap.containsKey(inEdge.getSourceId())) {
+                    info.addTransitiveInEdge(inEdge);
+                }
+            }
+        }
+    }
+
+    private Map<Integer, OperatorChainInfo> buildAndGetChainEntryPoints(
+            List<StreamNode> streamNodes, JobVertexBuildContext 
jobVertexBuildContext) {
+        for (StreamNode streamNode : streamNodes) {
+            int streamNodeId = streamNode.getId();
+            if (isSourceChainable(streamNode, streamGraph)) {
+                generateHashesByStreamNodeId(streamNodeId);
+                createSourceChainInfo(streamNode, pendingChainEntryPoints, 
jobVertexBuildContext);
+            } else {
+                pendingChainEntryPoints.computeIfAbsent(
+                        streamNodeId, ignored -> new 
OperatorChainInfo(streamNodeId));
+            }
+        }
+        return getChainEntryPoints();
+    }
+
+    private Map<Integer, OperatorChainInfo> getChainEntryPoints() {
+
+        final Map<Integer, OperatorChainInfo> chainEntryPoints = new 
HashMap<>();
+        Iterator<Map.Entry<Integer, OperatorChainInfo>> iterator =
+                pendingChainEntryPoints.entrySet().iterator();
+
+        while (iterator.hasNext()) {
+            Map.Entry<Integer, OperatorChainInfo> entry = iterator.next();
+            Integer startNodeId = entry.getKey();
+            OperatorChainInfo chainInfo = entry.getValue();
+            if (!isReadyToCreateJobVertex(chainInfo)) {
+                continue;
+            }
+            chainEntryPoints.put(startNodeId, chainInfo);
+            iterator.remove();
+        }
+        return chainEntryPoints;
+    }
+
+    /**
+     * Responds to calculating the forward group and reset the parallelism of 
all nodes in the same
+     * forward group to make them the same.
+     *
+     * @param sourceNodes the source nodes.
+     */
+    private void 
setVertexParallelismsForDynamicGraphIfNecessary(List<StreamNode> sourceNodes) {
+        List<StreamEdge> chainableOutputsCache = new ArrayList<>();
+        Map<Integer, List<StreamEdge>> transitiveNonChainableOutEdgesMap = new 
HashMap<>();
+        Map<StreamNode, List<StreamNode>> 
topologicallySortedChainedStreamNodesMap =
+                new LinkedHashMap<>();
+        Set<Integer> visitedStartNodes = new HashSet<>();
+
+        List<StreamNode> enterPoints =
+                getEnterPoints(sourceNodes, 
topologicallySortedChainedStreamNodesMap);
+
+        for (StreamNode streamNode : enterPoints) {
+            traverseFullGraph(
+                    streamNode.getId(),
+                    streamNode.getId(),
+                    chainableOutputsCache,
+                    transitiveNonChainableOutEdgesMap,
+                    topologicallySortedChainedStreamNodesMap,
+                    visitedStartNodes);
+        }
+        computeForwardGroupAndSetNodeParallelisms(
+                transitiveNonChainableOutEdgesMap,
+                topologicallySortedChainedStreamNodesMap,
+                chainableOutputsCache);
+    }
+
+    private List<StreamNode> getEnterPoints(
+            List<StreamNode> sourceNodes, Map<StreamNode, List<StreamNode>> 
chainedStreamNodesMap) {
+        List<StreamNode> enterPoints = new ArrayList<>();
+        for (StreamNode sourceNode : sourceNodes) {
+            if (isSourceChainable(sourceNode, streamGraph)) {
+                StreamEdge outEdge = sourceNode.getOutEdges().get(0);
+                StreamNode startNode = 
streamGraph.getStreamNode(outEdge.getTargetId());
+                chainedStreamNodesMap
+                        .computeIfAbsent(startNode, k -> new ArrayList<>())
+                        .add(sourceNode);
+                enterPoints.add(startNode);
+            } else {
+                chainedStreamNodesMap.computeIfAbsent(sourceNode, k -> new 
ArrayList<>());
+                enterPoints.add(sourceNode);
+            }
+        }
+        return enterPoints;
+    }
+
+    private void traverseFullGraph(
+            Integer startNodeId,
+            Integer currentNodeId,
+            List<StreamEdge> chainableOutputsCache,
+            Map<Integer, List<StreamEdge>> transitiveNonChainableOutEdgesMap,
+            Map<StreamNode, List<StreamNode>> 
topologicallySortedChainedStreamNodesMap,
+            Set<Integer> visitedStartNodes) {
+        if (visitedStartNodes.contains(startNodeId)) {
+            return;
+        }
+        List<StreamEdge> chainableOutputs = new ArrayList<>();
+        List<StreamEdge> nonChainableOutputs = new ArrayList<>();
+        StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
+        StreamNode startNode = streamGraph.getStreamNode(startNodeId);
+        for (StreamEdge streamEdge : currentNode.getOutEdges()) {
+            if (isChainable(streamEdge, streamGraph)) {
+                chainableOutputs.add(streamEdge);
+            } else {
+                nonChainableOutputs.add(streamEdge);
+                topologicallySortedChainedStreamNodesMap.computeIfAbsent(
+                        streamGraph.getStreamNode(streamEdge.getTargetId()),
+                        k -> new ArrayList<>());
+            }
+        }
+
+        chainableOutputsCache.addAll(chainableOutputs);
+
+        transitiveNonChainableOutEdgesMap
+                .computeIfAbsent(startNodeId, k -> new ArrayList<>())
+                .addAll(nonChainableOutputs);
+
+        
topologicallySortedChainedStreamNodesMap.get(startNode).add(currentNode);
+
+        for (StreamEdge chainable : chainableOutputs) {
+            traverseFullGraph(
+                    startNodeId,
+                    chainable.getTargetId(),
+                    chainableOutputsCache,
+                    transitiveNonChainableOutEdgesMap,
+                    topologicallySortedChainedStreamNodesMap,
+                    visitedStartNodes);
+        }
+        for (StreamEdge nonChainable : nonChainableOutputs) {
+            traverseFullGraph(
+                    nonChainable.getTargetId(),
+                    nonChainable.getTargetId(),
+                    chainableOutputsCache,
+                    transitiveNonChainableOutEdgesMap,
+                    topologicallySortedChainedStreamNodesMap,
+                    visitedStartNodes);
+        }
+        if (currentNodeId.equals(startNodeId)) {
+            visitedStartNodes.add(startNodeId);
+        }
+    }
+
+    private void computeForwardGroupAndSetNodeParallelisms(
+            Map<Integer, List<StreamEdge>> transitiveNonChainableOutEdgesMap,
+            Map<StreamNode, List<StreamNode>> 
topologicallySortedChainedStreamNodesMap,
+            List<StreamEdge> chainableOutputsCache) {
+        // Reset parallelism for chained stream nodes whose parallelism is not 
configured.
+        for (List<StreamNode> chainedStreamNodes :
+                topologicallySortedChainedStreamNodesMap.values()) {
+            boolean isParallelismConfigured =
+                    
chainedStreamNodes.stream().anyMatch(StreamNode::isParallelismConfigured);
+            if (!isParallelismConfigured && 
streamGraph.isAutoParallelismEnabled()) {
+                chainedStreamNodes.forEach(
+                        n -> 
n.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT, false));
+            }
+        }
+        // We calculate forward group by a set of chained stream nodes, and 
use the start node to
+        // identify the chain group.
+
+        // The value are start nod of upstream chain groups that connect target
+        // chain group by forward edge.
+        final Map<StreamNode, Set<StreamNode>> chainGroupToProducers = new 
LinkedHashMap<>();
+
+        for (StreamNode startNode : 
topologicallySortedChainedStreamNodesMap.keySet()) {
+            int startNodeId = startNode.getId();
+            // All downstream node connect to current chain group with forward 
partitioner.
+            Set<StreamNode> forwardConsumers =
+                    transitiveNonChainableOutEdgesMap.get(startNodeId).stream()
+                            .filter(
+                                    edge ->
+                                            edge.getPartitioner()
+                                                    .getClass()
+                                                    
.equals(ForwardPartitioner.class))
+                            .map(StreamEdge::getTargetId)
+                            .map(streamGraph::getStreamNode)
+                            .collect(Collectors.toSet());
+            for (StreamNode forwardConsumer : forwardConsumers) {
+                chainGroupToProducers
+                        .computeIfAbsent(forwardConsumer, ignored -> new 
HashSet<>())
+                        .add(streamGraph.getStreamNode(startNodeId));
+            }
+        }
+
+        final Map<Integer, StreamNodeForwardGroup> forwardGroupsByStartNodeId =
+                ForwardGroupComputeUtil.computeStreamNodeForwardGroup(
+                        topologicallySortedChainedStreamNodesMap,
+                        startNode ->
+                                chainGroupToProducers.getOrDefault(
+                                        startNode, Collections.emptySet()));
+
+        forwardGroupsByStartNodeId.forEach(
+                (startNodeId, forwardGroup) -> {
+                    this.startAndEndNodeIdToForwardGroupMap.put(startNodeId, 
forwardGroup);
+                    // Gets end node of current chain group by 
transitiveNonChainableOutEdgesMap
+                    transitiveNonChainableOutEdgesMap
+                            .get(startNodeId)
+                            .forEach(
+                                    streamEdge -> {
+                                        
this.startAndEndNodeIdToForwardGroupMap.put(
+                                                streamEdge.getSourceId(), 
forwardGroup);

Review Comment:
   streamEdge -> this.startAndEndNodeIdToForwardGroupMap.put(
                                               streamEdge.getSourceId(), 
forwardGroup)



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManagerTest.java:
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import 
org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AdaptiveGraphManager}. */
+public class AdaptiveGraphManagerTest extends JobGraphGeneratorTestBase {
+    @Override
+    JobGraph createJobGraph(StreamGraph streamGraph) {
+        return generateJobGraphInLazilyMode(streamGraph);
+    }
+
+    @Test
+    void testCreateJobVertexLazily() {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        DataStream<Tuple2<String, String>> input =
+                env.fromData("a", "b", "c", "d", "e", "f")
+                        .map(
+                                new MapFunction<String, Tuple2<String, 
String>>() {
+
+                                    @Override
+                                    public Tuple2<String, String> map(String 
value) {
+                                        return new Tuple2<>(value, value);
+                                    }
+                                });
+
+        DataStream<Tuple2<String, String>> result =
+                input.keyBy(x -> x.f0)
+                        .map(
+                                new MapFunction<Tuple2<String, String>, 
Tuple2<String, String>>() {
+
+                                    @Override
+                                    public Tuple2<String, String> map(
+                                            Tuple2<String, String> value) {
+                                        return value;
+                                    }
+                                });
+
+        result.addSink(

Review Comment:
   the SinkFunction is deprecated, we should use the SinkV2:
   
   ```
   result.sinkTo(new DiscardingSink<>());
   ```



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorChainInfo.java:
##########
@@ -180,4 +182,18 @@ public void addTransitiveInEdge(StreamEdge streamEdge) {
     public List<StreamEdge> getTransitiveInEdges() {
         return transitiveInEdges;
     }
+
+    public OperatorInfo getOperatorInfo(Integer nodeId) {
+        return chainedOperatorInfos.get(nodeId);
+    }
+
+    public OperatorInfo createAndGetOperatorInfo(Integer nodeId, OperatorID 
operatorId) {
+        OperatorInfo operatorInfo = new OperatorInfo(operatorId);
+        chainedOperatorInfos.put(nodeId, operatorInfo);
+        return operatorInfo;
+    }
+
+    public Map<Integer, OperatorInfo> getOperatorInfos() {
+        return chainedOperatorInfos;

Review Comment:
   Collections.unmodifiableMap



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/forwardgroup/StreamNodeForwardGroup.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.forwardgroup;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.graph.StreamNode;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Stream node level implement for {@link ForwardGroup}. */
+public class StreamNodeForwardGroup implements ForwardGroup {
+
+    private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
+
+    private int maxParallelism = JobVertex.MAX_PARALLELISM_DEFAULT;
+
+    private final Map<StreamNode, List<StreamNode>> 
chainedStreamNodeGroupsByStartNode =
+            new HashMap<>();
+
+    // For a group of chained stream nodes, their parallelism is consistent. 
In order to make
+    // calculation and usage easier, we only use the start node to calculate 
forward group.
+    public StreamNodeForwardGroup(
+            final Map<StreamNode, List<StreamNode>> 
chainedStreamNodeGroupsByStartNode) {
+        checkNotNull(chainedStreamNodeGroupsByStartNode);
+
+        Set<Integer> configuredParallelisms =
+                chainedStreamNodeGroupsByStartNode.keySet().stream()
+                        .map(StreamNode::getParallelism)
+                        .filter(val -> val > 0)
+                        .collect(Collectors.toSet());
+
+        checkState(configuredParallelisms.size() <= 1);
+
+        if (configuredParallelisms.size() == 1) {
+            this.parallelism = configuredParallelisms.iterator().next();
+        }
+
+        Set<Integer> configuredMaxParallelisms =
+                chainedStreamNodeGroupsByStartNode.keySet().stream()
+                        .map(StreamNode::getMaxParallelism)
+                        .filter(val -> val > 0)
+                        .collect(Collectors.toSet());
+
+        if (!configuredMaxParallelisms.isEmpty()) {
+            this.maxParallelism = Collections.min(configuredMaxParallelisms);
+            checkState(
+                    parallelism == ExecutionConfig.PARALLELISM_DEFAULT
+                            || maxParallelism >= parallelism,
+                    "There is a start node in the forward group whose maximum 
parallelism is smaller than the group's parallelism");
+        }
+
+        
this.chainedStreamNodeGroupsByStartNode.putAll(chainedStreamNodeGroupsByStartNode);
+    }
+
+    @Override
+    public void setParallelism(int parallelism) {
+        checkState(this.parallelism == ExecutionConfig.PARALLELISM_DEFAULT);
+        this.parallelism = parallelism;
+    }
+
+    @Override
+    public boolean isParallelismDecided() {
+        return parallelism > 0;
+    }
+
+    @Override
+    public int getParallelism() {
+        checkState(isParallelismDecided());
+        return parallelism;
+    }
+
+    @Override
+    public boolean isMaxParallelismDecided() {
+        return maxParallelism > 0;
+    }
+
+    @Override
+    public int getMaxParallelism() {
+        checkState(isMaxParallelismDecided());
+        return maxParallelism;
+    }
+
+    @VisibleForTesting
+    public int size() {
+        return 
chainedStreamNodeGroupsByStartNode.values().stream().mapToInt(List::size).sum();
+    }
+
+    public Iterable<StreamNode> getStartNodes() {
+        return chainedStreamNodeGroupsByStartNode.keySet();
+    }
+
+    public Iterable<List<StreamNode>> getChainedStreamNodeGroups() {
+        return chainedStreamNodeGroupsByStartNode.values();
+    }
+
+    /**
+     * Responds to merge targetForwardGroup into this and update the 
parallelism information for
+     * stream nodes in merged forward group.
+     *
+     * @param targetForwardGroup The forward group to be merged.
+     * @return whether the merge was successful.
+     */
+    public boolean mergeForwardGroup(StreamNodeForwardGroup 
targetForwardGroup) {
+        checkNotNull(targetForwardGroup);
+        if (!ForwardGroupComputeUtil.canTargetMergeIntoSourceForwardGroup(
+                this, targetForwardGroup)) {
+            return false;
+        }
+
+        if (targetForwardGroup == this) {
+            return true;
+        }

Review Comment:
   ```
           if (targetForwardGroup == this) {
               return true;
           }
   
           if (!ForwardGroupComputeUtil.canTargetMergeIntoSourceForwardGroup(
                   this, targetForwardGroup)) {
               return false;
           }
   
   ```



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -18,2596 +19,380 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.api.common.BatchShuffleMode;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.operators.ResourceSpec;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.CommitterInitContext;
-import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.api.connector.sink2.SupportsCommitter;
-import org.apache.flink.api.connector.sink2.WriterInitContext;
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
-import org.apache.flink.api.connector.source.mocks.MockSource;
 import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
-import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.core.execution.CheckpointingMode;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.core.io.SimpleVersionedSerializerAdapter;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
-import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
-import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
-import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
-import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
-import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
-import 
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
-import 
org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
-import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
-import org.apache.flink.streaming.api.datastream.CachedDataStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.PrintSink;
-import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
-import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
-import 
org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction;
 import 
org.apache.flink.streaming.api.functions.source.legacy.ParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
-import org.apache.flink.streaming.api.legacy.io.TextInputFormat;
-import org.apache.flink.streaming.api.legacy.io.TextOutputFormat;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
-import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
-import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
-import org.apache.flink.streaming.api.operators.legacy.YieldingOperatorFactory;
-import org.apache.flink.streaming.api.transformations.CacheTransformation;
-import 
org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
-import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
-import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
-import org.apache.flink.streaming.util.TestAnyModeReadingStreamOperator;
-import org.apache.flink.util.AbstractID;
-import org.apache.flink.util.SerializedValue;
 
 import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
 
 import org.assertj.core.api.Assertions;
 import org.assertj.core.data.Offset;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
 
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
 import java.lang.reflect.Method;
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
 
-import static org.apache.flink.runtime.jobgraph.DistributionPattern.POINTWISE;
-import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.areOperatorsChainable;
-import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link StreamingJobGraphGenerator}. */
 @SuppressWarnings("serial")
-class StreamingJobGraphGeneratorTest {
+class StreamingJobGraphGeneratorTest extends JobGraphGeneratorTestBase {

Review Comment:
   I think that the incremental tests need to cover more aspects than those for 
the StreamingJobGraphGenerator. They should cover the incremental parts in more 
detail. Therefore, all tests for the StreamingJobGraphGenerator should be in 
the JobGraphGeneratorTestBase.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java:
##########
@@ -103,6 +110,22 @@ public void addConsumer(JobEdge edge) {
         consumers.add(edge);
     }
 
+    public void addOutputStreamEdge(StreamEdge edge) {
+        DistributionPattern pattern =

Review Comment:
   Currently, we do not allow adding a stream edge if the output job edges have 
already been added. There could be a sanity check to ensure that the job edges 
are empty.



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java:
##########
@@ -0,0 +1,672 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.*;
+import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroupComputeUtil;
+import org.apache.flink.runtime.jobgraph.forwardgroup.StreamNodeForwardGroup;
+import org.apache.flink.streaming.api.graph.util.JobVertexBuildContext;
+import org.apache.flink.streaming.api.graph.util.OperatorChainInfo;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+import java.util.*;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.*;
+
+/** Default implementation for {@link AdaptiveGraphGenerator}. */
+@Internal
+public class AdaptiveGraphManager implements AdaptiveGraphGenerator {
+
+    private final StreamGraph streamGraph;
+
+    private final JobGraph jobGraph;
+
+    private final StreamGraphHasher defaultStreamGraphHasher;
+
+    private final List<StreamGraphHasher> legacyStreamGraphHasher;
+
+    private final Executor serializationExecutor;
+
+    private final AtomicInteger vertexIndexId;
+
+    private final StreamGraphContext streamGraphContext;
+
+    private final Map<Integer, byte[]> hashes;
+
+    private final List<Map<Integer, byte[]>> legacyHashes;
+
+    // Records the id of stream node which job vertex is created.
+    private final Map<Integer, Integer> frozenNodeToStartNodeMap;
+
+    // When the downstream vertex is not created, we need to cache the output.
+    private final Map<Integer, Map<StreamEdge, NonChainedOutput>> 
intermediateOutputsCaches;
+
+    // Records the id of the stream node that produces the IntermediateDataSet.
+    private final Map<IntermediateDataSetID, Integer> 
intermediateDataSetIdToProducerMap;
+
+    // Records the ids of the start and end nodes for chained groups in the 
StreamNodeForwardGroup.
+    // When stream edge's partitioner is modified to forward, we need get 
forward groups by source
+    // and target node id.
+    private final Map<Integer, StreamNodeForwardGroup> 
startAndEndNodeIdToForwardGroupMap;
+
+    // Records the chain info that is not ready to create the job vertex, the 
key is the start node
+    // in this chain.
+    private final Map<Integer, OperatorChainInfo> pendingChainEntryPoints;
+
+    // The value is the stream node ids belonging to that job vertex.
+    private final Map<JobVertexID, Integer> jobVertexToStartNodeMap;
+
+    // The value is the stream node ids belonging to that job vertex.
+    private final Map<JobVertexID, List<Integer>> 
jobVertexToChainedStreamNodeIdsMap;
+
+    // We need cache all job vertices to create JobEdge for downstream vertex.
+    private final Map<Integer, JobVertex> jobVerticesCache;
+
+    // Records the ID of the job vertex that has completed execution.
+    private final Set<JobVertexID> finishedJobVertices;
+
+    private final AtomicBoolean hasHybridResultPartition;
+
+    public AdaptiveGraphManager(
+            ClassLoader userClassloader,
+            StreamGraph streamGraph,
+            Executor serializationExecutor,
+            @Nullable JobID jobId) {

Review Comment:
   I think the job id should be obtained from the StreamGraph instead of being 
passed as a parameter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to