noorall commented on code in PR #25366: URL: https://github.com/apache/flink/pull/25366#discussion_r1772524678
########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ########## @@ -1221,52 +1304,68 @@ private void setVertexNonChainedOutputsConfig( for (StreamEdge edge : transitiveOutEdges) { NonChainedOutput output = opIntermediateOutputs.get(edge.getSourceId()).get(edge); transitiveOutputs.add(output); - connect(startNodeId, edge, output); + connect( + startNodeId, + edge, + output, + jobVertexBuildContext.getJobVertices(), + jobVertexBuildContext); } config.setVertexNonChainedOutputs(new ArrayList<>(transitiveOutputs)); } - private void setAllOperatorNonChainedOutputsConfigs( - final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs) { + public static void setAllOperatorNonChainedOutputsConfigs( + final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs, + JobVertexBuildContext jobVertexBuildContext) { // set non chainable output config - opNonChainableOutputsCache.forEach( - (vertexId, nonChainableOutputs) -> { - Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge = - opIntermediateOutputs.computeIfAbsent( - vertexId, ignored -> new HashMap<>()); - setOperatorNonChainedOutputsConfig( - vertexId, - vertexConfigs.get(vertexId), - nonChainableOutputs, - outputsConsumedByEdge); - }); + jobVertexBuildContext + .getOperatorInfos() Review Comment: Yes, this is consistent with the original logic. In the original opNonChainableOutputsCache field, a list will be created for each streamNode. When there are no nonChainableOutputs, the list will be emtpy. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ########## @@ -1221,52 +1304,68 @@ private void setVertexNonChainedOutputsConfig( for (StreamEdge edge : transitiveOutEdges) { NonChainedOutput output = opIntermediateOutputs.get(edge.getSourceId()).get(edge); transitiveOutputs.add(output); - connect(startNodeId, edge, output); + connect( + startNodeId, + edge, + output, + jobVertexBuildContext.getJobVertices(), + jobVertexBuildContext); } config.setVertexNonChainedOutputs(new ArrayList<>(transitiveOutputs)); } - private void setAllOperatorNonChainedOutputsConfigs( - final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs) { + public static void setAllOperatorNonChainedOutputsConfigs( + final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs, + JobVertexBuildContext jobVertexBuildContext) { // set non chainable output config - opNonChainableOutputsCache.forEach( - (vertexId, nonChainableOutputs) -> { - Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge = - opIntermediateOutputs.computeIfAbsent( - vertexId, ignored -> new HashMap<>()); - setOperatorNonChainedOutputsConfig( - vertexId, - vertexConfigs.get(vertexId), - nonChainableOutputs, - outputsConsumedByEdge); - }); + jobVertexBuildContext + .getOperatorInfos() Review Comment: > After this change, all operator info will be looped and set setOperatorNonChainedOutputsConfig, while the original behavior only looped opNonChainableOutputsCache. Is this approach safe because nonChainableOutputs is guaranteed to be empty, right ? Yes, this is consistent with the original logic. In the original opNonChainableOutputsCache field, a list will be created for each streamNode. When there are no nonChainableOutputs, the list will be emtpy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org