noorall commented on code in PR #25414:
URL: https://github.com/apache/flink/pull/25414#discussion_r1851601660


##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -1969,8 +2032,12 @@ private static void 
setManagedMemoryFractionForSlotSharingGroup(
         final StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
         final Map<Integer, Map<Integer, StreamConfig>> vertexChainedConfigs =
                 jobVertexBuildContext.getChainedConfigs();
-        final Set<Integer> groupOperatorIds =
+        final Set<JobVertexID> jobVertexIds =
                 slotSharingGroup.getJobVertexIds().stream()
+                        .filter(vertexOperators::containsKey)

Review Comment:
   > In which case the `slotSharingGroup` will contain a job vertex which is 
not included in the `vertexOperators`? Could you add some comments to explain 
it as it may not be obvious to other developers.
   
   In the progressive job graph generation algorithm, if the user specified the 
`SlotSharingGroupResource` or the `AllVerticesInSameSlotSharingGroupByDefault` 
is set to true, job vertices generated in different phase may be assigned to 
the same `slotSharingGroup`. Therefore, we need to filter out the job vertices 
that belong to the current phase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to