zhuzhurk commented on code in PR #25472: URL: https://github.com/apache/flink/pull/25472#discussion_r1808402242
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java: ########## @@ -80,6 +83,14 @@ public SchedulerNG createInstance( final Collection<FailureEnricher> failureEnrichers, final BlocklistOperations blocklistOperations) throws Exception { + JobGraph jobGraph; + + if (executionPlan instanceof JobGraph) { + jobGraph = (JobGraph) executionPlan; + } else { + checkState(executionPlan instanceof StreamGraph, "Unsupported execution plan."); Review Comment: maybe ``` } else if (executionPlan instanceof StreamGraph) { ... } else { throw FlinkException("Unsupported execution plan " + executionPlan.getClass().getCanonicalName()); } ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java: ########## @@ -1066,6 +1067,14 @@ public CompletableFuture<JobSubmissionResult> submitJob(ExecutionPlan executionP // When MiniCluster uses the local RPC, the provided ExecutionPlan is passed directly to the // Dispatcher. This means that any mutations to the JG can affect the Dispatcher behaviour, // so we rather clone it to guard against this. Review Comment: Comments above are for the original line. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java: ########## @@ -169,10 +225,15 @@ public ExecutionConfig getExecutionConfig() { return executionConfig; } + @Override public Configuration getJobConfiguration() { return jobConfiguration; } + public void setJobConfiguration(Configuration configuration) { Review Comment: Is this method required? If not, we can keep `jobConfiguration` final. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java: ########## @@ -1081,4 +1269,345 @@ public void setAttribute(Integer vertexId, Attribute attribute) { getStreamNode(vertexId).setAttribute(attribute); } } + + public void setJobId(JobID jobId) { + this.jobId = jobId; + } + + @Override + public JobID getJobID() { + return jobId; + } + + /** + * Sets the classpath required to run the job on a task manager. + * + * @param paths paths of the directories/JAR files required to run the job on a task manager + */ + public void setClasspath(List<URL> paths) { + classpath = paths; + } + + public List<URL> getClasspath() { + return classpath; + } + + /** + * Adds the given jar files to the {@link JobGraph} via {@link JobGraph#addJar}. + * + * @param jarFilesToAttach a list of the {@link URL URLs} of the jar files to attach to the + * jobgraph. + * @throws RuntimeException if a jar URL is not valid. + */ + public void addJars(final List<URL> jarFilesToAttach) { + for (URL jar : jarFilesToAttach) { + try { + addJar(new Path(jar.toURI())); + } catch (URISyntaxException e) { + throw new RuntimeException("URL is invalid. This should not happen.", e); + } + } + } + + /** + * Returns a list of BLOB keys referring to the JAR files required to run this job. + * + * @return list of BLOB keys referring to the JAR files required to run this job + */ + @Override + public List<PermanentBlobKey> getUserJarBlobKeys() { + return this.userJarBlobKeys; + } + + @Override + public List<URL> getClasspaths() { + return classpath; + } + + public void addUserArtifact(String name, DistributedCache.DistributedCacheEntry file) { + if (file == null) { + throw new IllegalArgumentException(); + } + + userArtifacts.putIfAbsent(name, file); + } + + @Override + public Map<String, DistributedCache.DistributedCacheEntry> getUserArtifacts() { + return userArtifacts; + } + + @Override + public void addUserJarBlobKey(PermanentBlobKey key) { + if (key == null) { + throw new IllegalArgumentException(); + } + + if (!userJarBlobKeys.contains(key)) { + userJarBlobKeys.add(key); + } + } + + @Override + public void setUserArtifactBlobKey(String entryName, PermanentBlobKey blobKey) + throws IOException { + byte[] serializedBlobKey; + serializedBlobKey = InstantiationUtil.serializeObject(blobKey); + + userArtifacts.computeIfPresent( + entryName, + (key, originalEntry) -> + new DistributedCache.DistributedCacheEntry( + originalEntry.filePath, + originalEntry.isExecutable, + serializedBlobKey, + originalEntry.isZipped)); + } + + @Override + public void writeUserArtifactEntriesToConfiguration() { + for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : + userArtifacts.entrySet()) { + DistributedCache.writeFileInfoToConfig( + userArtifact.getKey(), userArtifact.getValue(), jobConfiguration); + } + } + + @Override + public int getMaximumParallelism() { + int maxParallelism = -1; + for (StreamNode node : streamNodes.values()) { + maxParallelism = Math.max(node.getParallelism(), maxParallelism); + } + return maxParallelism; + } + + public void setInitialClientHeartbeatTimeout(long initialClientHeartbeatTimeout) { + this.initialClientHeartbeatTimeout = initialClientHeartbeatTimeout; + } + + @Override + public long getInitialClientHeartbeatTimeout() { + return initialClientHeartbeatTimeout; + } + + @Override + public boolean isPartialResourceConfigured() { + return isPartialResourceConfigured; + } + + public void serializeUserDefinedInstances() throws IOException { + final ExecutorService serializationExecutor = + Executors.newFixedThreadPool( + Math.max( + 1, + Math.min( + Hardware.getNumberCPUCores(), + getExecutionConfig().getParallelism())), + new ExecutorThreadFactory("flink-operator-serialization-io")); + try { + this.userDefinedObjectsHolder = + new UserDefinedObjectsHolder( + streamNodes, + virtualSideOutputNodes, + virtualPartitionNodes, + executionConfig, + stateBackend, + checkpointStorage, + serializationExecutor); + this.isPartialResourceConfigured = isPartialResourceConfiguredGraph(); + this.isEmpty = streamNodes.isEmpty(); + } finally { + serializationExecutor.shutdown(); + } + } + + public void deserializeUserDefinedInstances( + ClassLoader userClassLoader, Executor serializationExecutor) throws Exception { + this.userDefinedObjectsHolder.deserialize(userClassLoader, serializationExecutor); + } + + private boolean isPartialResourceConfiguredGraph() { + boolean hasVerticesWithUnknownResource = false; + boolean hasVerticesWithConfiguredResource = false; + + for (StreamNode streamNode : this.getStreamNodes()) { + if (streamNode.getMinResources() == ResourceSpec.UNKNOWN) { + hasVerticesWithUnknownResource = true; + } else { + hasVerticesWithConfiguredResource = true; + } + + if (hasVerticesWithUnknownResource && hasVerticesWithConfiguredResource) { + return true; + } + } + + return false; + } + + @Override + public String toString() { + return "StreamGraph(jobId: " + jobId + ")"; + } + + /** + * A static inner class designed to hold user-defined objects for serialization and + * deserialization in the stream graph. + */ + private class UserDefinedObjectsHolder implements Serializable { + + private static final long serialVersionUID = 1L; + + private final SerializedValue< + Map<Integer, Tuple3<Integer, StreamPartitioner<?>, StreamExchangeMode>>> + serializedVirtualPartitionNodes; + + private final SerializedValue<ExecutionConfig> serializedExecutionConfig; + + private SerializedValue<Map<Integer, StreamNode>> serializedStreamNodes; + + /** + * This collection stores operator factories serialized separately from the {@link + * StreamGraph}. This separation allows for the parallel serialization of operator + * factories, improving the overall performance of the serialization process. + * + * <p>Each tuple in this collection consists of an integer key that identifies the stream + * node, and a value that wraps the serialized representation of the associated {@link + * StreamOperatorFactory} instance. + */ + private Collection<Tuple2<Integer, SerializedValue<StreamOperatorFactory<?>>>> + streamNodeToSerializedOperatorFactories; + + private final SerializedValue<Map<Integer, Tuple2<Integer, OutputTag>>> + serializedVirtualSideOutputNodes; + + public UserDefinedObjectsHolder( + Map<Integer, StreamNode> streamNodes, + Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes, + Map<Integer, Tuple3<Integer, StreamPartitioner<?>, StreamExchangeMode>> + virtualPartitionNodes, + ExecutionConfig executionConfig, + @Nullable StateBackend stateBackend, + @Nullable CheckpointStorage checkpointStorage, + Executor serializationExecutor) + throws IOException { + serializeStreamNodes(streamNodes, serializationExecutor); + + this.serializedVirtualSideOutputNodes = new SerializedValue<>(virtualSideOutputNodes); + this.serializedVirtualPartitionNodes = new SerializedValue<>(virtualPartitionNodes); + this.serializedExecutionConfig = new SerializedValue<>(executionConfig); + + if (stateBackend != null && serializedStateBackend == null) { + serializedStateBackend = new SerializedValue<>(stateBackend); + } + + if (checkpointStorage != null && serializedCheckpointStorage == null) { + serializedCheckpointStorage = new SerializedValue<>(checkpointStorage); + } Review Comment: How about to ensure `JobCheckpointingSettings` is created right after `stateBackend` and `checkpointStorage` are set, and let it be the holder of `serializedStateBackend` and `serializedCheckpointStorage`? Looks to me it can simplify things a lot. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java: ########## @@ -1081,4 +1269,345 @@ public void setAttribute(Integer vertexId, Attribute attribute) { getStreamNode(vertexId).setAttribute(attribute); } } + + public void setJobId(JobID jobId) { + this.jobId = jobId; + } + + @Override + public JobID getJobID() { + return jobId; + } + + /** + * Sets the classpath required to run the job on a task manager. + * + * @param paths paths of the directories/JAR files required to run the job on a task manager + */ + public void setClasspath(List<URL> paths) { + classpath = paths; + } + + public List<URL> getClasspath() { + return classpath; + } + + /** + * Adds the given jar files to the {@link JobGraph} via {@link JobGraph#addJar}. + * + * @param jarFilesToAttach a list of the {@link URL URLs} of the jar files to attach to the + * jobgraph. + * @throws RuntimeException if a jar URL is not valid. + */ + public void addJars(final List<URL> jarFilesToAttach) { + for (URL jar : jarFilesToAttach) { + try { + addJar(new Path(jar.toURI())); + } catch (URISyntaxException e) { + throw new RuntimeException("URL is invalid. This should not happen.", e); + } + } + } + + /** + * Returns a list of BLOB keys referring to the JAR files required to run this job. + * + * @return list of BLOB keys referring to the JAR files required to run this job + */ + @Override + public List<PermanentBlobKey> getUserJarBlobKeys() { + return this.userJarBlobKeys; + } + + @Override + public List<URL> getClasspaths() { + return classpath; + } + + public void addUserArtifact(String name, DistributedCache.DistributedCacheEntry file) { + if (file == null) { + throw new IllegalArgumentException(); + } + + userArtifacts.putIfAbsent(name, file); + } + + @Override + public Map<String, DistributedCache.DistributedCacheEntry> getUserArtifacts() { + return userArtifacts; + } + + @Override + public void addUserJarBlobKey(PermanentBlobKey key) { + if (key == null) { + throw new IllegalArgumentException(); + } + + if (!userJarBlobKeys.contains(key)) { + userJarBlobKeys.add(key); + } + } + + @Override + public void setUserArtifactBlobKey(String entryName, PermanentBlobKey blobKey) + throws IOException { + byte[] serializedBlobKey; + serializedBlobKey = InstantiationUtil.serializeObject(blobKey); + + userArtifacts.computeIfPresent( + entryName, + (key, originalEntry) -> + new DistributedCache.DistributedCacheEntry( + originalEntry.filePath, + originalEntry.isExecutable, + serializedBlobKey, + originalEntry.isZipped)); + } + + @Override + public void writeUserArtifactEntriesToConfiguration() { + for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : + userArtifacts.entrySet()) { + DistributedCache.writeFileInfoToConfig( + userArtifact.getKey(), userArtifact.getValue(), jobConfiguration); + } + } + + @Override + public int getMaximumParallelism() { + int maxParallelism = -1; + for (StreamNode node : streamNodes.values()) { + maxParallelism = Math.max(node.getParallelism(), maxParallelism); + } + return maxParallelism; + } + + public void setInitialClientHeartbeatTimeout(long initialClientHeartbeatTimeout) { + this.initialClientHeartbeatTimeout = initialClientHeartbeatTimeout; + } + + @Override + public long getInitialClientHeartbeatTimeout() { + return initialClientHeartbeatTimeout; + } + + @Override + public boolean isPartialResourceConfigured() { + return isPartialResourceConfigured; + } + + public void serializeUserDefinedInstances() throws IOException { + final ExecutorService serializationExecutor = + Executors.newFixedThreadPool( + Math.max( + 1, + Math.min( + Hardware.getNumberCPUCores(), + getExecutionConfig().getParallelism())), + new ExecutorThreadFactory("flink-operator-serialization-io")); + try { + this.userDefinedObjectsHolder = + new UserDefinedObjectsHolder( + streamNodes, + virtualSideOutputNodes, + virtualPartitionNodes, + executionConfig, + stateBackend, + checkpointStorage, + serializationExecutor); + this.isPartialResourceConfigured = isPartialResourceConfiguredGraph(); + this.isEmpty = streamNodes.isEmpty(); Review Comment: Looks to me the 2 lines above are unrelated to the serialization work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org