zhuzhurk commented on code in PR #25414: URL: https://github.com/apache/flink/pull/25414#discussion_r1847688952
########## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java: ########## @@ -43,6 +44,8 @@ public class IntermediateDataSet implements java.io.Serializable { // All consumers must have the same partitioner and parallelism private final List<JobEdge> consumers = new ArrayList<>(); + private final List<StreamEdge> outputStreamEdges = new ArrayList<>(); Review Comment: `IntermediateDataSet` is a `JobGraph` level component. I prefer not to bind it to `StreamEdges`. How about to maintain the relationship between a `IntermediateDataSet` and the output `StreamEdges` in `AdaptiveGraphManager`, and refactor the method `addOutputStreamEdge()` to `configure(distributionPattern, broadcast)`. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java: ########## @@ -148,6 +148,18 @@ private static byte[] generateUserSpecifiedHash(String operatorUid, Hasher hashe return hasher.hash().asBytes(); } + @Override + public boolean generateHashesByStreamNodeId( + int streamNodeId, StreamGraph streamGraph, Map<Integer, byte[]> hashes) { + StreamNode streamNode = streamGraph.getStreamNode(streamNodeId); + return generateNodeHash( + streamNode, + Hashing.murmur3_128(0), Review Comment: -> getHashFunction() ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ########## @@ -1655,6 +1703,21 @@ && arePartitionerAndExchangeModeChainable( return true; } + public static boolean isSourceChainable(StreamNode streamNode, StreamGraph streamGraph) { + if (!streamNode.getInEdges().isEmpty() Review Comment: Why requiring the inputs to be empty? ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ########## @@ -673,18 +601,32 @@ private void setChaining() { info.getStartNodeId(), 1, // operators start at position 1 because 0 is for chained source inputs info, - chainEntryPoints); + chainEntryPoints, + true, + serializationExecutor, + jobVertexBuildContext, + null); } } - private List<StreamEdge> createChain( + public static List<StreamEdge> createChain( final Integer currentNodeId, final int chainIndex, final OperatorChainInfo chainInfo, - final Map<Integer, OperatorChainInfo> chainEntryPoints) { + final Map<Integer, OperatorChainInfo> chainEntryPoints, + final boolean canCreateNewChain, + final Executor serializationExecutor, + final JobVertexBuildContext jobVertexBuildContext, + final @Nullable Consumer<Integer> hashGenerator) { Integer startNodeId = chainInfo.getStartNodeId(); - if (!builtVertices.contains(startNodeId)) { + if (!jobVertexBuildContext.getJobVerticesInOrder().containsKey(startNodeId)) { + StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph(); + + // Progressive hash generation is required in adaptive graph generator. + if (hashGenerator != null) { + hashGenerator.accept(currentNodeId); Review Comment: From the perspective of this method, it is not a generator because it generates nothing. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ########## @@ -731,13 +677,20 @@ private List<StreamEdge> createChain( for (StreamEdge nonChainable : nonChainableOutputs) { transitiveOutEdges.add(nonChainable); - createChain( - nonChainable.getTargetId(), - 1, // operators start at position 1 because 0 is for chained source inputs - chainEntryPoints.computeIfAbsent( - nonChainable.getTargetId(), - (k) -> chainInfo.newChain(nonChainable.getTargetId())), - chainEntryPoints); + if (canCreateNewChain) { Review Comment: Comments are needed to explain what's it for. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ########## @@ -1969,8 +2032,12 @@ private static void setManagedMemoryFractionForSlotSharingGroup( final StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph(); final Map<Integer, Map<Integer, StreamConfig>> vertexChainedConfigs = jobVertexBuildContext.getChainedConfigs(); - final Set<Integer> groupOperatorIds = + final Set<JobVertexID> jobVertexIds = slotSharingGroup.getJobVertexIds().stream() + .filter(vertexOperators::containsKey) Review Comment: In which case the `slotSharingGroup` will contain a job vertex which is not included in the `vertexOperators`? Could you add some comments to explain it as it may not be obvious to other developers. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ########## @@ -1655,6 +1703,21 @@ && arePartitionerAndExchangeModeChainable( return true; } + public static boolean isSourceChainable(StreamNode streamNode, StreamGraph streamGraph) { Review Comment: -> isChainableSource ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java: ########## @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroupComputeUtil; +import org.apache.flink.runtime.jobgraph.forwardgroup.StreamNodeForwardGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.streaming.api.graph.util.JobVertexBuildContext; +import org.apache.flink.streaming.api.graph.util.OperatorChainInfo; +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.addVertexIndexPrefixInVertexName; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.connect; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createAndInitializeJobGraph; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createSourceChainInfo; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.isChainable; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.isSourceChainable; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.markSupportingConcurrentExecutionAttempts; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.serializeOperatorCoordinatorsAndStreamConfig; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setAllOperatorNonChainedOutputsConfigs; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setManagedMemoryFraction; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setPhysicalEdges; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setSlotSharingAndCoLocation; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexDescription; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.validateHybridShuffleExecuteInBatchMode; + +/** Default implementation for {@link AdaptiveGraphGenerator}. */ +@Internal +public class AdaptiveGraphManager implements AdaptiveGraphGenerator { + + private final StreamGraph streamGraph; + + private final JobGraph jobGraph; + + private final StreamGraphHasher defaultStreamGraphHasher; + + private final List<StreamGraphHasher> legacyStreamGraphHasher; + + private final Executor serializationExecutor; + + private final AtomicInteger vertexIndexId; + + private final StreamGraphContext streamGraphContext; + + private final Map<Integer, byte[]> hashes; + + private final List<Map<Integer, byte[]>> legacyHashes; + + // Records the id of stream node which job vertex is created. Review Comment: -> Records the ids of stream nodes of which the job vertices are already created ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ########## @@ -777,18 +730,28 @@ private List<StreamEdge> createChain( .addOutputFormat(currentOperatorId, currentNode.getOutputFormat()); } OperatorInfo operatorInfo = - jobVertexBuildContext.createAndGetOperatorInfo(currentNodeId); + chainInfo.createAndGetOperatorInfo(currentNodeId, currentOperatorId); - StreamConfig config = - currentNodeId.equals(startNodeId) - ? createJobVertex(startNodeId, chainInfo) - : new StreamConfig(new Configuration()); + StreamConfig config; + if (currentNodeId.equals(startNodeId)) { + JobVertex jobVertex = jobVertexBuildContext.getJobVertex(startNodeId); + config = + jobVertex != null + ? new StreamConfig(jobVertex.getConfiguration()) + : new StreamConfig( + createJobVertex( + chainInfo, + serializationExecutor, + jobVertexBuildContext) + .getConfiguration()); Review Comment: Maybe ``` JobVertex jobVertex = jobVertexBuildContext.getJobVertex(startNodeId); if (jobVertex == null) { jobVertex = createJobVertex( chainInfo, serializationExecutor, jobVertexBuildContext); } config = new StreamConfig(jobVertex.getConfiguration()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org