zhuzhurk commented on code in PR #25366: URL: https://github.com/apache/flink/pull/25366#discussion_r1794972823
########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorInfo.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.graph.StreamEdge; + +import java.util.ArrayList; +import java.util.List; + +/** Helper class to help maintain the information of an operator. */ +@Internal +public class OperatorInfo { + + private StreamConfig vertexConfig; + + // This is used to cache the chainable outputs, to set the chainable outputs config after + // all job vertices are created. + private final List<StreamEdge> chainableOutputs; + + // This is used to cache the non-chainable outputs, to set the non-chainable outputs config + // after all job vertices are created. + private final List<StreamEdge> nonChainableOutputs; + + public OperatorInfo() { + this.chainableOutputs = new ArrayList<>(); + this.nonChainableOutputs = new ArrayList<>(); + } + + public List<StreamEdge> getChainableOutputs() { + return chainableOutputs; + } + + public void setChainableOutputs(List<StreamEdge> chainableOutputs) { Review Comment: Looks to me the behavior is actually `addChainableOutputs` ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorInfo.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.graph.StreamEdge; + +import java.util.ArrayList; +import java.util.List; + +/** Helper class to help maintain the information of an operator. */ +@Internal +public class OperatorInfo { + + private StreamConfig vertexConfig; + + // This is used to cache the chainable outputs, to set the chainable outputs config after + // all job vertices are created. + private final List<StreamEdge> chainableOutputs; + + // This is used to cache the non-chainable outputs, to set the non-chainable outputs config + // after all job vertices are created. + private final List<StreamEdge> nonChainableOutputs; + + public OperatorInfo() { + this.chainableOutputs = new ArrayList<>(); + this.nonChainableOutputs = new ArrayList<>(); + } + + public List<StreamEdge> getChainableOutputs() { + return chainableOutputs; + } + + public void setChainableOutputs(List<StreamEdge> chainableOutputs) { + this.chainableOutputs.addAll(chainableOutputs); + } + + public List<StreamEdge> getNonChainableOutputs() { + return nonChainableOutputs; + } + + public void setNonChainableOutputs(List<StreamEdge> nonChainableOutEdges) { Review Comment: Looks to me the behavior is actually `addNonChainableOutputs` ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/JobVertexBuildContext.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.util.SerializedValue; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Helper class encapsulates all necessary information and configurations required during the + * construction of job vertices. + */ +@Internal +public class JobVertexBuildContext { + + private final StreamGraph streamGraph; + + /** The {@link OperatorChainInfo}s, key is the start node id of the chain. */ + private final Map<Integer, OperatorChainInfo> chainInfos; + + /** The {@link OperatorInfo}s, key is the id of the stream node. */ + private final Map<Integer, OperatorInfo> operatorInfosInorder; + + // This map's key represents the starting node id of each chain. Note that this includes not + // only the usual head node of the chain but also the ids of chain sources which are used by + // multi-input. + private final Map<Integer, Map<Integer, StreamConfig>> chainedConfigs; + + // The created JobVertex, the key is start node id. + private final Map<Integer, JobVertex> jobVerticesInorder; Review Comment: jobVerticesInorder -> jobVerticesInOrder ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ########## @@ -1773,7 +1890,8 @@ private Map<JobVertexID, SlotSharingGroup> buildVertexRegionSlotSharingGroups() final boolean allRegionsInSameSlotSharingGroup = streamGraph.isAllVerticesInSameSlotSharingGroupByDefault(); - + // Perhaps we need to make modifications in the future, we don't have to obtain the regions + // of the entire image every time Review Comment: It's better to create a JIRA instead of writing a comment to track follow-up tasks. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorChainInfo.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.operators.ResourceSpec; +import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Helper class to help maintain the information of an operator chain. */ +@Internal +public class OperatorChainInfo { + private final Integer startNodeId; + private final Map<Integer, byte[]> hashes; + private final List<Map<Integer, byte[]>> legacyHashes; + private final Map<Integer, List<ChainedOperatorHashInfo>> chainedOperatorHashes; + private final Map<Integer, ChainedSourceInfo> chainedSources; + private final Map<Integer, ResourceSpec> chainedMinResources; + private final Map<Integer, ResourceSpec> chainedPreferredResources; + private final Map<Integer, String> chainedNames; + private final List<OperatorCoordinator.Provider> coordinatorProviders; + private final StreamGraph streamGraph; Review Comment: Looks to me these fields can be globally shared: streamGraph, hashes, legacyHashes, chainedOperatorHashes, chainedSources, chainedMinResources, chainedPreferredResources, chainedNames? How about to put them into the build context, providing necessary getters&setters, and just pass the context directly to `OperatorChainInfo`? Maybe we can have a simpler `OperatorChainInfo`? Moreover, if we do it in that way, would it work if we let the generator call `context#getHash()` instead of `chainInfo#getHash()`? ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ########## @@ -1830,10 +1950,10 @@ private void setCoLocation() { } } - private static void setManagedMemoryFraction( + public static void setManagedMemoryFraction( final Map<Integer, JobVertex> jobVertices, - final Map<Integer, StreamConfig> operatorConfigs, final Map<Integer, Map<Integer, StreamConfig>> vertexChainedConfigs, + final java.util.function.Function<Integer, StreamConfig> operatorConfigRetriever, Review Comment: How about to directly pass in the context which can provides `jobVertices`, `operatorConfigs` and `vertexChainedConfigs`? ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ########## @@ -220,24 +197,15 @@ private StreamingJobGraphGenerator( this.defaultStreamGraphHasher = new StreamGraphHasherV2(); this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher()); - this.jobVertices = new LinkedHashMap<>(); this.builtVertices = new HashSet<>(); - this.chainedConfigs = new HashMap<>(); - this.vertexConfigs = new HashMap<>(); - this.chainedNames = new HashMap<>(); - this.chainedMinResources = new HashMap<>(); - this.chainedPreferredResources = new HashMap<>(); - this.chainedInputOutputFormats = new HashMap<>(); - this.physicalEdgesInOrder = new ArrayList<>(); this.serializationExecutor = Preconditions.checkNotNull(serializationExecutor); - this.chainInfos = new HashMap<>(); - this.opNonChainableOutputsCache = new LinkedHashMap<>(); - + this.jobVertexBuildContext = + new JobVertexBuildContext(streamGraph, new AtomicBoolean(false)); Review Comment: Is it a must to pass in the default value for `hasHybridResultPartition` through the ctor? ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ChainedOperatorHashInfo.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.graph.StreamNode; + +/** Helper class to help maintain the hash info of an operator chain. */ +@Internal +public final class ChainedOperatorHashInfo { + private final byte[] generatedOperatorId; + private final byte[] userDefinedOperatorId; + private final StreamNode streamNode; + + ChainedOperatorHashInfo( + final byte[] generatedOperatorId, + final byte[] userDefinedOperatorId, + final StreamNode streamNode) { + this.generatedOperatorId = generatedOperatorId; Review Comment: this.generatedOperatorId = checkNotNull(generatedOperatorId); ... Such checks are needed unless it is a private method and so that all the callers are in control. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ########## @@ -727,64 +759,75 @@ private List<StreamEdge> createChain( chainEntryPoints); } - chainedNames.put( + chainInfo.addChainedName( currentNodeId, createChainedName( currentNodeId, chainableOutputs, - Optional.ofNullable(chainEntryPoints.get(currentNodeId)))); - chainedMinResources.put( - currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs)); - chainedPreferredResources.put( + Optional.ofNullable(chainEntryPoints.get(currentNodeId)), + chainInfo.getChainedNames(), Review Comment: If we can maintain all the chained names in the context, this parameter is not needed. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ########## @@ -355,32 +337,46 @@ private void waitForSerializationFuturesAndUpdateJobVertices() } } - private void addVertexIndexPrefixInVertexName() { - if (!streamGraph.isVertexNameIncludeIndexPrefix()) { + public static void addVertexIndexPrefixInVertexName( + JobVertexBuildContext jobVertexBuildContext, + AtomicInteger vertexIndexId, + JobGraph jobGraph) { + if (!jobVertexBuildContext.getStreamGraph().isVertexNameIncludeIndexPrefix()) { return; } - final AtomicInteger vertexIndexId = new AtomicInteger(0); + Set<JobVertexID> jobVertexIds = + jobVertexBuildContext.getJobVertices().values().stream() + .map(JobVertex::getID) + .collect(Collectors.toSet()); + // JobVertexBuildContext only contains incrementally generated jobVertex instances. The Review Comment: The `JobVertexBuildContext` does not contain all generated job vertices at that time? ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/JobVertexBuildContext.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.util.SerializedValue; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Helper class encapsulates all necessary information and configurations required during the + * construction of job vertices. + */ +@Internal +public class JobVertexBuildContext { + + private final StreamGraph streamGraph; + + /** The {@link OperatorChainInfo}s, key is the start node id of the chain. */ + private final Map<Integer, OperatorChainInfo> chainInfos; + + /** The {@link OperatorInfo}s, key is the id of the stream node. */ + private final Map<Integer, OperatorInfo> operatorInfosInorder; + + // This map's key represents the starting node id of each chain. Note that this includes not + // only the usual head node of the chain but also the ids of chain sources which are used by + // multi-input. + private final Map<Integer, Map<Integer, StreamConfig>> chainedConfigs; + + // The created JobVertex, the key is start node id. + private final Map<Integer, JobVertex> jobVerticesInorder; + + // Futures for the serialization of operator coordinators. + private final Map< + JobVertexID, + List<CompletableFuture<SerializedValue<OperatorCoordinator.Provider>>>> + coordinatorSerializationFuturesPerJobVertex; + + // The order of StreamEdge connected to other vertices should be consistent with the order in + // which JobEdge was created. + private final List<StreamEdge> physicalEdgesInOrder; + + // We use AtomicBoolean to track the existence of HybridResultPartition during the incremental + // JobGraph generation process introduced by AdaptiveGraphManager. It is essential to globally + // monitor changes to this variable, thus necessitating the use of a Boolean object instead of a + // primitive boolean. + private final AtomicBoolean hasHybridResultPartition; + + public JobVertexBuildContext(StreamGraph streamGraph, AtomicBoolean hasHybridResultPartition) { + this.streamGraph = streamGraph; + this.chainInfos = new LinkedHashMap<>(); + this.operatorInfosInorder = new LinkedHashMap<>(); + this.jobVerticesInorder = new LinkedHashMap<>(); + this.physicalEdgesInOrder = new ArrayList<>(); + this.hasHybridResultPartition = hasHybridResultPartition; + this.coordinatorSerializationFuturesPerJobVertex = new HashMap<>(); + this.chainedConfigs = new HashMap<>(); + } + + public void addChainInfo(Integer startNodeId, OperatorChainInfo chainInfo) { + chainInfos.put(startNodeId, chainInfo); + } + + public OperatorChainInfo getChainInfo(Integer startNodeId) { + return chainInfos.get(startNodeId); + } + + public Map<Integer, OperatorChainInfo> getChainInfos() { + return chainInfos; + } + + public OperatorInfo getOperatorInfo(Integer nodeId) { + return operatorInfosInorder.get(nodeId); + } + + public OperatorInfo createAndGetOperatorInfo(Integer nodeId) { + OperatorInfo operatorInfo = new OperatorInfo(); + operatorInfosInorder.put(nodeId, operatorInfo); + return operatorInfo; + } + + public Map<Integer, OperatorInfo> getOperatorInfos() { + return operatorInfosInorder; + } + + public StreamGraph getStreamGraph() { + return streamGraph; + } + + public boolean hasHybridResultPartition() { + return hasHybridResultPartition.get(); + } + + public void setHasHybridResultPartition(boolean hasHybridResultPartition) { + this.hasHybridResultPartition.set(hasHybridResultPartition); + } + + public void addPhysicalEdgesInOrder(StreamEdge edge) { + physicalEdgesInOrder.add(edge); + } + + public List<StreamEdge> getPhysicalEdgesInOrder() { + return physicalEdgesInOrder; + } + + public void addJobVertex(Integer startNodeId, JobVertex jobVertex) { + jobVerticesInorder.put(startNodeId, jobVertex); + } + + public Map<Integer, JobVertex> getJobVertices() { Review Comment: maybe `getJobVerticesInOrder` because the order is required in some cases. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/JobVertexBuildContext.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.util.SerializedValue; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Helper class encapsulates all necessary information and configurations required during the + * construction of job vertices. + */ +@Internal +public class JobVertexBuildContext { + + private final StreamGraph streamGraph; + + /** The {@link OperatorChainInfo}s, key is the start node id of the chain. */ + private final Map<Integer, OperatorChainInfo> chainInfos; + + /** The {@link OperatorInfo}s, key is the id of the stream node. */ + private final Map<Integer, OperatorInfo> operatorInfosInorder; Review Comment: operatorInfosInorder -> operatorInfosInOrder The operators are in topological order? Is this order important? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org