noorall commented on code in PR #25366:
URL: https://github.com/apache/flink/pull/25366#discussion_r1772518738


##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -172,43 +176,18 @@ public static JobGraph createJobGraph(
     private final ClassLoader userClassloader;
     private final StreamGraph streamGraph;
 
-    private final Map<Integer, JobVertex> jobVertices;
     private final JobGraph jobGraph;
     private final Collection<Integer> builtVertices;
 
-    private final List<StreamEdge> physicalEdgesInOrder;
-
-    private final Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;
-
-    private final Map<Integer, StreamConfig> vertexConfigs;
-    private final Map<Integer, String> chainedNames;
-
-    private final Map<Integer, ResourceSpec> chainedMinResources;
-    private final Map<Integer, ResourceSpec> chainedPreferredResources;
-
-    private final Map<Integer, InputOutputFormatContainer> 
chainedInputOutputFormats;
-
     private final StreamGraphHasher defaultStreamGraphHasher;
     private final List<StreamGraphHasher> legacyStreamGraphHashers;
 
-    private boolean hasHybridResultPartition = false;
-
     private final Executor serializationExecutor;
 
-    // Futures for the serialization of operator coordinators
-    private final Map<
-                    JobVertexID,
-                    
List<CompletableFuture<SerializedValue<OperatorCoordinator.Provider>>>>
-            coordinatorSerializationFuturesPerJobVertex = new HashMap<>();
-
-    /** The {@link OperatorChainInfo}s, key is the start node id of the chain. 
*/
-    private final Map<Integer, OperatorChainInfo> chainInfos;
+    /** We save all the context needed to create the JobVertex in this 
structure */
+    private final JobVertexBuildContext jobVertexBuildContext;
 
-    /**
-     * This is used to cache the non-chainable outputs, to set the 
non-chainable outputs config
-     * after all job vertices are created.
-     */
-    private final Map<Integer, List<StreamEdge>> opNonChainableOutputsCache;
+    private final AtomicBoolean hasHybridResultPartition;

Review Comment:
   > why we need change this field to Atomic?
   
   It is designed to adapt to the incrementally generating JobGraph from 
StreamGraph feature introduced by AdaptiveGraphManager. In 
AdaptiveGraphManager, it is necessary to globally perceive the changes in this 
variable, so the boolean type needs to be changed to a boolean object.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to