noorall commented on code in PR #25366:
URL: https://github.com/apache/flink/pull/25366#discussion_r1772524678


##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -1221,52 +1304,68 @@ private void setVertexNonChainedOutputsConfig(
         for (StreamEdge edge : transitiveOutEdges) {
             NonChainedOutput output = 
opIntermediateOutputs.get(edge.getSourceId()).get(edge);
             transitiveOutputs.add(output);
-            connect(startNodeId, edge, output);
+            connect(
+                    startNodeId,
+                    edge,
+                    output,
+                    jobVertexBuildContext.getJobVertices(),
+                    jobVertexBuildContext);
         }
 
         config.setVertexNonChainedOutputs(new ArrayList<>(transitiveOutputs));
     }
 
-    private void setAllOperatorNonChainedOutputsConfigs(
-            final Map<Integer, Map<StreamEdge, NonChainedOutput>> 
opIntermediateOutputs) {
+    public static void setAllOperatorNonChainedOutputsConfigs(
+            final Map<Integer, Map<StreamEdge, NonChainedOutput>> 
opIntermediateOutputs,
+            JobVertexBuildContext jobVertexBuildContext) {
         // set non chainable output config
-        opNonChainableOutputsCache.forEach(
-                (vertexId, nonChainableOutputs) -> {
-                    Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge =
-                            opIntermediateOutputs.computeIfAbsent(
-                                    vertexId, ignored -> new HashMap<>());
-                    setOperatorNonChainedOutputsConfig(
-                            vertexId,
-                            vertexConfigs.get(vertexId),
-                            nonChainableOutputs,
-                            outputsConsumedByEdge);
-                });
+        jobVertexBuildContext
+                .getOperatorInfos()

Review Comment:
   Yes, this is consistent with the original logic. In the original 
opNonChainableOutputsCache field, a list will be created for each streamNode. 
When there are no nonChainableOutputs, the list will be emtpy.



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -1221,52 +1304,68 @@ private void setVertexNonChainedOutputsConfig(
         for (StreamEdge edge : transitiveOutEdges) {
             NonChainedOutput output = 
opIntermediateOutputs.get(edge.getSourceId()).get(edge);
             transitiveOutputs.add(output);
-            connect(startNodeId, edge, output);
+            connect(
+                    startNodeId,
+                    edge,
+                    output,
+                    jobVertexBuildContext.getJobVertices(),
+                    jobVertexBuildContext);
         }
 
         config.setVertexNonChainedOutputs(new ArrayList<>(transitiveOutputs));
     }
 
-    private void setAllOperatorNonChainedOutputsConfigs(
-            final Map<Integer, Map<StreamEdge, NonChainedOutput>> 
opIntermediateOutputs) {
+    public static void setAllOperatorNonChainedOutputsConfigs(
+            final Map<Integer, Map<StreamEdge, NonChainedOutput>> 
opIntermediateOutputs,
+            JobVertexBuildContext jobVertexBuildContext) {
         // set non chainable output config
-        opNonChainableOutputsCache.forEach(
-                (vertexId, nonChainableOutputs) -> {
-                    Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge =
-                            opIntermediateOutputs.computeIfAbsent(
-                                    vertexId, ignored -> new HashMap<>());
-                    setOperatorNonChainedOutputsConfig(
-                            vertexId,
-                            vertexConfigs.get(vertexId),
-                            nonChainableOutputs,
-                            outputsConsumedByEdge);
-                });
+        jobVertexBuildContext
+                .getOperatorInfos()

Review Comment:
   > After this change, all operator info will be looped and set 
setOperatorNonChainedOutputsConfig, while the original behavior only looped 
opNonChainableOutputsCache. Is this approach safe because nonChainableOutputs 
is guaranteed to be empty, right ?
   
   Yes, this is consistent with the original logic. In the original 
opNonChainableOutputsCache field, a list will be created for each streamNode. 
When there are no nonChainableOutputs, the list will be emtpy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to