zhuzhurk commented on code in PR #25472:
URL: https://github.com/apache/flink/pull/25472#discussion_r1808402242


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java:
##########
@@ -80,6 +83,14 @@ public SchedulerNG createInstance(
             final Collection<FailureEnricher> failureEnrichers,
             final BlocklistOperations blocklistOperations)
             throws Exception {
+        JobGraph jobGraph;
+
+        if (executionPlan instanceof JobGraph) {
+            jobGraph = (JobGraph) executionPlan;
+        } else {
+            checkState(executionPlan instanceof StreamGraph, "Unsupported 
execution plan.");

Review Comment:
   maybe
   ```
   } else if (executionPlan instanceof StreamGraph) {
   ...
   } else {
       throw FlinkException("Unsupported execution plan " + 
executionPlan.getClass().getCanonicalName());
   }
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java:
##########
@@ -1066,6 +1067,14 @@ public CompletableFuture<JobSubmissionResult> 
submitJob(ExecutionPlan executionP
         // When MiniCluster uses the local RPC, the provided ExecutionPlan is 
passed directly to the
         // Dispatcher. This means that any mutations to the JG can affect the 
Dispatcher behaviour,
         // so we rather clone it to guard against this.

Review Comment:
   Comments above are for the original line.



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##########
@@ -169,10 +225,15 @@ public ExecutionConfig getExecutionConfig() {
         return executionConfig;
     }
 
+    @Override
     public Configuration getJobConfiguration() {
         return jobConfiguration;
     }
 
+    public void setJobConfiguration(Configuration configuration) {

Review Comment:
   Is this method required? If not, we can keep `jobConfiguration` final.



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##########
@@ -1081,4 +1269,345 @@ public void setAttribute(Integer vertexId, Attribute 
attribute) {
             getStreamNode(vertexId).setAttribute(attribute);
         }
     }
+
+    public void setJobId(JobID jobId) {
+        this.jobId = jobId;
+    }
+
+    @Override
+    public JobID getJobID() {
+        return jobId;
+    }
+
+    /**
+     * Sets the classpath required to run the job on a task manager.
+     *
+     * @param paths paths of the directories/JAR files required to run the job 
on a task manager
+     */
+    public void setClasspath(List<URL> paths) {
+        classpath = paths;
+    }
+
+    public List<URL> getClasspath() {
+        return classpath;
+    }
+
+    /**
+     * Adds the given jar files to the {@link JobGraph} via {@link 
JobGraph#addJar}.
+     *
+     * @param jarFilesToAttach a list of the {@link URL URLs} of the jar files 
to attach to the
+     *     jobgraph.
+     * @throws RuntimeException if a jar URL is not valid.
+     */
+    public void addJars(final List<URL> jarFilesToAttach) {
+        for (URL jar : jarFilesToAttach) {
+            try {
+                addJar(new Path(jar.toURI()));
+            } catch (URISyntaxException e) {
+                throw new RuntimeException("URL is invalid. This should not 
happen.", e);
+            }
+        }
+    }
+
+    /**
+     * Returns a list of BLOB keys referring to the JAR files required to run 
this job.
+     *
+     * @return list of BLOB keys referring to the JAR files required to run 
this job
+     */
+    @Override
+    public List<PermanentBlobKey> getUserJarBlobKeys() {
+        return this.userJarBlobKeys;
+    }
+
+    @Override
+    public List<URL> getClasspaths() {
+        return classpath;
+    }
+
+    public void addUserArtifact(String name, 
DistributedCache.DistributedCacheEntry file) {
+        if (file == null) {
+            throw new IllegalArgumentException();
+        }
+
+        userArtifacts.putIfAbsent(name, file);
+    }
+
+    @Override
+    public Map<String, DistributedCache.DistributedCacheEntry> 
getUserArtifacts() {
+        return userArtifacts;
+    }
+
+    @Override
+    public void addUserJarBlobKey(PermanentBlobKey key) {
+        if (key == null) {
+            throw new IllegalArgumentException();
+        }
+
+        if (!userJarBlobKeys.contains(key)) {
+            userJarBlobKeys.add(key);
+        }
+    }
+
+    @Override
+    public void setUserArtifactBlobKey(String entryName, PermanentBlobKey 
blobKey)
+            throws IOException {
+        byte[] serializedBlobKey;
+        serializedBlobKey = InstantiationUtil.serializeObject(blobKey);
+
+        userArtifacts.computeIfPresent(
+                entryName,
+                (key, originalEntry) ->
+                        new DistributedCache.DistributedCacheEntry(
+                                originalEntry.filePath,
+                                originalEntry.isExecutable,
+                                serializedBlobKey,
+                                originalEntry.isZipped));
+    }
+
+    @Override
+    public void writeUserArtifactEntriesToConfiguration() {
+        for (Map.Entry<String, DistributedCache.DistributedCacheEntry> 
userArtifact :
+                userArtifacts.entrySet()) {
+            DistributedCache.writeFileInfoToConfig(
+                    userArtifact.getKey(), userArtifact.getValue(), 
jobConfiguration);
+        }
+    }
+
+    @Override
+    public int getMaximumParallelism() {
+        int maxParallelism = -1;
+        for (StreamNode node : streamNodes.values()) {
+            maxParallelism = Math.max(node.getParallelism(), maxParallelism);
+        }
+        return maxParallelism;
+    }
+
+    public void setInitialClientHeartbeatTimeout(long 
initialClientHeartbeatTimeout) {
+        this.initialClientHeartbeatTimeout = initialClientHeartbeatTimeout;
+    }
+
+    @Override
+    public long getInitialClientHeartbeatTimeout() {
+        return initialClientHeartbeatTimeout;
+    }
+
+    @Override
+    public boolean isPartialResourceConfigured() {
+        return isPartialResourceConfigured;
+    }
+
+    public void serializeUserDefinedInstances() throws IOException {
+        final ExecutorService serializationExecutor =
+                Executors.newFixedThreadPool(
+                        Math.max(
+                                1,
+                                Math.min(
+                                        Hardware.getNumberCPUCores(),
+                                        
getExecutionConfig().getParallelism())),
+                        new 
ExecutorThreadFactory("flink-operator-serialization-io"));
+        try {
+            this.userDefinedObjectsHolder =
+                    new UserDefinedObjectsHolder(
+                            streamNodes,
+                            virtualSideOutputNodes,
+                            virtualPartitionNodes,
+                            executionConfig,
+                            stateBackend,
+                            checkpointStorage,
+                            serializationExecutor);
+            this.isPartialResourceConfigured = 
isPartialResourceConfiguredGraph();
+            this.isEmpty = streamNodes.isEmpty();
+        } finally {
+            serializationExecutor.shutdown();
+        }
+    }
+
+    public void deserializeUserDefinedInstances(
+            ClassLoader userClassLoader, Executor serializationExecutor) 
throws Exception {
+        this.userDefinedObjectsHolder.deserialize(userClassLoader, 
serializationExecutor);
+    }
+
+    private boolean isPartialResourceConfiguredGraph() {
+        boolean hasVerticesWithUnknownResource = false;
+        boolean hasVerticesWithConfiguredResource = false;
+
+        for (StreamNode streamNode : this.getStreamNodes()) {
+            if (streamNode.getMinResources() == ResourceSpec.UNKNOWN) {
+                hasVerticesWithUnknownResource = true;
+            } else {
+                hasVerticesWithConfiguredResource = true;
+            }
+
+            if (hasVerticesWithUnknownResource && 
hasVerticesWithConfiguredResource) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return "StreamGraph(jobId: " + jobId + ")";
+    }
+
+    /**
+     * A static inner class designed to hold user-defined objects for 
serialization and
+     * deserialization in the stream graph.
+     */
+    private class UserDefinedObjectsHolder implements Serializable {
+
+        private static final long serialVersionUID = 1L;
+
+        private final SerializedValue<
+                        Map<Integer, Tuple3<Integer, StreamPartitioner<?>, 
StreamExchangeMode>>>
+                serializedVirtualPartitionNodes;
+
+        private final SerializedValue<ExecutionConfig> 
serializedExecutionConfig;
+
+        private SerializedValue<Map<Integer, StreamNode>> 
serializedStreamNodes;
+
+        /**
+         * This collection stores operator factories serialized separately 
from the {@link
+         * StreamGraph}. This separation allows for the parallel serialization 
of operator
+         * factories, improving the overall performance of the serialization 
process.
+         *
+         * <p>Each tuple in this collection consists of an integer key that 
identifies the stream
+         * node, and a value that wraps the serialized representation of the 
associated {@link
+         * StreamOperatorFactory} instance.
+         */
+        private Collection<Tuple2<Integer, 
SerializedValue<StreamOperatorFactory<?>>>>
+                streamNodeToSerializedOperatorFactories;
+
+        private final SerializedValue<Map<Integer, Tuple2<Integer, OutputTag>>>
+                serializedVirtualSideOutputNodes;
+
+        public UserDefinedObjectsHolder(
+                Map<Integer, StreamNode> streamNodes,
+                Map<Integer, Tuple2<Integer, OutputTag>> 
virtualSideOutputNodes,
+                Map<Integer, Tuple3<Integer, StreamPartitioner<?>, 
StreamExchangeMode>>
+                        virtualPartitionNodes,
+                ExecutionConfig executionConfig,
+                @Nullable StateBackend stateBackend,
+                @Nullable CheckpointStorage checkpointStorage,
+                Executor serializationExecutor)
+                throws IOException {
+            serializeStreamNodes(streamNodes, serializationExecutor);
+
+            this.serializedVirtualSideOutputNodes = new 
SerializedValue<>(virtualSideOutputNodes);
+            this.serializedVirtualPartitionNodes = new 
SerializedValue<>(virtualPartitionNodes);
+            this.serializedExecutionConfig = new 
SerializedValue<>(executionConfig);
+
+            if (stateBackend != null && serializedStateBackend == null) {
+                serializedStateBackend = new SerializedValue<>(stateBackend);
+            }
+
+            if (checkpointStorage != null && serializedCheckpointStorage == 
null) {
+                serializedCheckpointStorage = new 
SerializedValue<>(checkpointStorage);
+            }

Review Comment:
   How about to ensure `JobCheckpointingSettings` is created right after 
`stateBackend` and `checkpointStorage` are set, and let it be the holder of 
`serializedStateBackend` and `serializedCheckpointStorage`? Looks to me it can 
simplify things a lot.



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##########
@@ -1081,4 +1269,345 @@ public void setAttribute(Integer vertexId, Attribute 
attribute) {
             getStreamNode(vertexId).setAttribute(attribute);
         }
     }
+
+    public void setJobId(JobID jobId) {
+        this.jobId = jobId;
+    }
+
+    @Override
+    public JobID getJobID() {
+        return jobId;
+    }
+
+    /**
+     * Sets the classpath required to run the job on a task manager.
+     *
+     * @param paths paths of the directories/JAR files required to run the job 
on a task manager
+     */
+    public void setClasspath(List<URL> paths) {
+        classpath = paths;
+    }
+
+    public List<URL> getClasspath() {
+        return classpath;
+    }
+
+    /**
+     * Adds the given jar files to the {@link JobGraph} via {@link 
JobGraph#addJar}.
+     *
+     * @param jarFilesToAttach a list of the {@link URL URLs} of the jar files 
to attach to the
+     *     jobgraph.
+     * @throws RuntimeException if a jar URL is not valid.
+     */
+    public void addJars(final List<URL> jarFilesToAttach) {
+        for (URL jar : jarFilesToAttach) {
+            try {
+                addJar(new Path(jar.toURI()));
+            } catch (URISyntaxException e) {
+                throw new RuntimeException("URL is invalid. This should not 
happen.", e);
+            }
+        }
+    }
+
+    /**
+     * Returns a list of BLOB keys referring to the JAR files required to run 
this job.
+     *
+     * @return list of BLOB keys referring to the JAR files required to run 
this job
+     */
+    @Override
+    public List<PermanentBlobKey> getUserJarBlobKeys() {
+        return this.userJarBlobKeys;
+    }
+
+    @Override
+    public List<URL> getClasspaths() {
+        return classpath;
+    }
+
+    public void addUserArtifact(String name, 
DistributedCache.DistributedCacheEntry file) {
+        if (file == null) {
+            throw new IllegalArgumentException();
+        }
+
+        userArtifacts.putIfAbsent(name, file);
+    }
+
+    @Override
+    public Map<String, DistributedCache.DistributedCacheEntry> 
getUserArtifacts() {
+        return userArtifacts;
+    }
+
+    @Override
+    public void addUserJarBlobKey(PermanentBlobKey key) {
+        if (key == null) {
+            throw new IllegalArgumentException();
+        }
+
+        if (!userJarBlobKeys.contains(key)) {
+            userJarBlobKeys.add(key);
+        }
+    }
+
+    @Override
+    public void setUserArtifactBlobKey(String entryName, PermanentBlobKey 
blobKey)
+            throws IOException {
+        byte[] serializedBlobKey;
+        serializedBlobKey = InstantiationUtil.serializeObject(blobKey);
+
+        userArtifacts.computeIfPresent(
+                entryName,
+                (key, originalEntry) ->
+                        new DistributedCache.DistributedCacheEntry(
+                                originalEntry.filePath,
+                                originalEntry.isExecutable,
+                                serializedBlobKey,
+                                originalEntry.isZipped));
+    }
+
+    @Override
+    public void writeUserArtifactEntriesToConfiguration() {
+        for (Map.Entry<String, DistributedCache.DistributedCacheEntry> 
userArtifact :
+                userArtifacts.entrySet()) {
+            DistributedCache.writeFileInfoToConfig(
+                    userArtifact.getKey(), userArtifact.getValue(), 
jobConfiguration);
+        }
+    }
+
+    @Override
+    public int getMaximumParallelism() {
+        int maxParallelism = -1;
+        for (StreamNode node : streamNodes.values()) {
+            maxParallelism = Math.max(node.getParallelism(), maxParallelism);
+        }
+        return maxParallelism;
+    }
+
+    public void setInitialClientHeartbeatTimeout(long 
initialClientHeartbeatTimeout) {
+        this.initialClientHeartbeatTimeout = initialClientHeartbeatTimeout;
+    }
+
+    @Override
+    public long getInitialClientHeartbeatTimeout() {
+        return initialClientHeartbeatTimeout;
+    }
+
+    @Override
+    public boolean isPartialResourceConfigured() {
+        return isPartialResourceConfigured;
+    }
+
+    public void serializeUserDefinedInstances() throws IOException {
+        final ExecutorService serializationExecutor =
+                Executors.newFixedThreadPool(
+                        Math.max(
+                                1,
+                                Math.min(
+                                        Hardware.getNumberCPUCores(),
+                                        
getExecutionConfig().getParallelism())),
+                        new 
ExecutorThreadFactory("flink-operator-serialization-io"));
+        try {
+            this.userDefinedObjectsHolder =
+                    new UserDefinedObjectsHolder(
+                            streamNodes,
+                            virtualSideOutputNodes,
+                            virtualPartitionNodes,
+                            executionConfig,
+                            stateBackend,
+                            checkpointStorage,
+                            serializationExecutor);
+            this.isPartialResourceConfigured = 
isPartialResourceConfiguredGraph();
+            this.isEmpty = streamNodes.isEmpty();

Review Comment:
   Looks to me the 2 lines above are unrelated to the serialization work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to