zhuzhurk commented on code in PR #25472: URL: https://github.com/apache/flink/pull/25472#discussion_r1799818850
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java: ########## @@ -80,6 +82,16 @@ public SchedulerNG createInstance( final Collection<FailureEnricher> failureEnrichers, final BlocklistOperations blocklistOperations) throws Exception { + JobGraph jobGraph; + + if (executionPlan instanceof JobGraph) { + jobGraph = (JobGraph) executionPlan; + } else { + jobGraph = + ((StreamGraphDescriptor) executionPlan) Review Comment: It's better to add a sanity check that it is instanceOf `StreamGraphDescriptor`. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java: ########## @@ -185,6 +234,169 @@ StreamOperatorFactory<?> getHeadOperatorForNodeFromCache(StreamNode node) { return nodeToHeadOperatorCache.get(node); } + public CheckpointingMode getCheckpointingMode() { Review Comment: Maybe introduce a static `getCheckpointingMode(CheckpointConfig checkpointConfig)` method to be reused by `getCheckpointingMode()` and `StreamingJobGraphGenerator`. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphDescriptor.java: ########## @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.common.operators.ResourceSpec; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.jobgraph.JobType; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.concurrent.FutureUtils; + +import java.io.IOException; +import java.net.URL; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A wrapper class that encapsulates a serialized StreamGraph along with its necessary dependencies, + * such as JobID, user JAR blob keys, and classpaths. + * + * <p>This class facilitates the transfer of the serialized StreamGraph and its dependencies to a + * cluster. By using this class, we can directly control the serialization process of the + * StreamGraph, rather than relying on the RPC serialization mechanism. + */ +@Internal +public class StreamGraphDescriptor implements ExecutionPlan { + + private final SerializedValue<StreamGraph> serializedStreamGraph; + + private final long initialClientHeartbeatTimeout; + + /** + * This collection stores operator factories serialized separately from the {@link StreamGraph}. + * This separation allows for the parallel serialization of operator factories, improving the + * overall performance of the serialization process. + * + * <p>Each tuple in this collection consists of an integer key that identifies the stream node, + * and a value that wraps the serialized representation of the associated {@link + * StreamOperatorFactory} instance. + */ + private final Collection<Tuple2<Integer, SerializedValue<StreamOperatorFactory<?>>>> + streamNodeToSerializedOperatorFactories; + + private final JobID jobId; + + private final String jobName; + + private final List<PermanentBlobKey> userJarBlobKeys; + + private final List<URL> classpath; + + private final boolean isPartialResourceConfigured; + + private final boolean isEmptyGraph; + + private final JobType jobType; + + private final boolean dynamic; + + private final JobCheckpointingSettings checkpointingSettings; + + private final Configuration jobConfiguration; + + private final List<Path> userJars; + + private final Map<String, DistributedCache.DistributedCacheEntry> userArtifacts; + + private final SerializedValue<ExecutionConfig> serializedExecutionConfig; + + private SavepointRestoreSettings savepointRestoreSettings; + + private final int maximumParallelism; + + public StreamGraphDescriptor(StreamGraph streamGraph, Executor serializationExecutor) + throws Exception { + this.jobId = checkNotNull(streamGraph.getJobId()); + this.jobName = checkNotNull(streamGraph.getJobName()); + this.userJarBlobKeys = checkNotNull(streamGraph.getUserJarBlobKeys()); + this.classpath = checkNotNull(streamGraph.getClasspath()); + + checkNotNull(streamGraph); + checkNotNull(serializationExecutor); + + // Serialize operator factories in parallel to accelerate serialization. + this.streamNodeToSerializedOperatorFactories = + serializeOperatorFactories(streamGraph.getStreamNodes(), serializationExecutor); + this.serializedStreamGraph = new SerializedValue<>(streamGraph); + + this.isPartialResourceConfigured = isPartialResourceConfigured(streamGraph); + this.initialClientHeartbeatTimeout = streamGraph.getInitialClientHeartbeatTimeout(); + this.isEmptyGraph = streamGraph.getStreamNodes().isEmpty(); + this.jobType = streamGraph.getJobType(); + this.dynamic = streamGraph.isDynamic(); + this.checkpointingSettings = streamGraph.getJobCheckpointingSettings(); + this.jobConfiguration = streamGraph.getJobConfiguration(); + this.userJars = streamGraph.getUserJars(); + this.userArtifacts = streamGraph.getUserArtifacts(); + this.savepointRestoreSettings = streamGraph.getSavepointRestoreSettings(); + this.maximumParallelism = streamGraph.getMaximumParallelism(); + this.serializedExecutionConfig = new SerializedValue<>(streamGraph.getExecutionConfig()); + } + + private boolean isPartialResourceConfigured(StreamGraph streamGraph) { + boolean hasVerticesWithUnknownResource = false; + boolean hasVerticesWithConfiguredResource = false; + + for (StreamNode streamNode : streamGraph.getStreamNodes()) { + if (streamNode.getMinResources() == ResourceSpec.UNKNOWN) { + hasVerticesWithUnknownResource = true; + } else { + hasVerticesWithConfiguredResource = true; + } + + if (hasVerticesWithUnknownResource && hasVerticesWithConfiguredResource) { + return true; + } + } + + return false; + } + + @Override + public JobID getJobID() { + return jobId; + } + + @Override + public List<PermanentBlobKey> getUserJarBlobKeys() { + return userJarBlobKeys; + } + + @Override + public List<URL> getClasspaths() { + return classpath; + } + + private Collection<Tuple2<Integer, SerializedValue<StreamOperatorFactory<?>>>> + serializeOperatorFactories( + Collection<StreamNode> streamNodes, Executor serializationExecutor) + throws Exception { + List<CompletableFuture<Tuple2<Integer, SerializedValue<StreamOperatorFactory<?>>>>> + futures = + streamNodes.stream() + .filter(node -> node.getOperatorFactory() != null) + .map( + node -> + CompletableFuture.supplyAsync( + () -> { + try { + return Tuple2 + .<Integer, + SerializedValue< + StreamOperatorFactory< + ?>>> + of( + node + .getId(), + new SerializedValue<>( + node + .getOperatorFactory())); + } catch (Throwable throwable) { + throw new RuntimeException( + String.format( + "Could not serialize stream node %s", + node), + throwable); + } + }, + serializationExecutor)) + .collect(Collectors.toList()); + return FutureUtils.combineAll(futures).get(); + } + + public StreamGraph deserializeStreamGraph( + ClassLoader userClassLoader, Executor deserializationExecutor) throws Exception { + Collection<Tuple2<Integer, StreamOperatorFactory<?>>> streamNodeToOperatorFactories = + deserializeOperators(userClassLoader, deserializationExecutor); + StreamGraph streamGraph = serializedStreamGraph.deserializeValue(userClassLoader); + + streamNodeToOperatorFactories.forEach( + tuple2 -> streamGraph.getStreamNode(tuple2.f0).setOperatorFactory(tuple2.f1)); + + streamGraph.setUserArtifacts(userArtifacts); + streamGraph.setUserJarBlobKeys(userJarBlobKeys); + streamGraph.setJobConfiguration(jobConfiguration); + streamGraph.setSavepointRestoreSettings(savepointRestoreSettings); + streamGraph.setExecutionConfig(serializedExecutionConfig.deserializeValue(userClassLoader)); + + return streamGraph; + } + + private Collection<Tuple2<Integer, StreamOperatorFactory<?>>> deserializeOperators( + ClassLoader userClassLoader, Executor serializationExecutor) throws Exception { + List<CompletableFuture<Tuple2<Integer, StreamOperatorFactory<?>>>> futures = + streamNodeToSerializedOperatorFactories.stream() + .map( + tuple2 -> + CompletableFuture.supplyAsync( Review Comment: It's better to introduce a method `deserializeOperatorFactoryAsync(nodeId, serializedStreamOperatorFactory , serializationExecutor)`. ########## flink-end-to-end-tests/test-scripts/common_ha.sh: ########## @@ -65,7 +65,7 @@ function verify_logs() { fi # checks that all apart from the first JM recover the failed jobgraph. - if ! verify_num_occurences_in_logs 'standalonesession' 'Recovered JobGraph' ${JM_FAILURES}; then + if ! verify_num_occurences_in_logs 'standalonesession' 'Recovered StreamGraph' ${JM_FAILURES}; then Review Comment: StreamGraph -> Job ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphDescriptor.java: ########## @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.common.operators.ResourceSpec; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.jobgraph.JobType; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.concurrent.FutureUtils; + +import java.io.IOException; +import java.net.URL; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A wrapper class that encapsulates a serialized StreamGraph along with its necessary dependencies, + * such as JobID, user JAR blob keys, and classpaths. + * + * <p>This class facilitates the transfer of the serialized StreamGraph and its dependencies to a + * cluster. By using this class, we can directly control the serialization process of the + * StreamGraph, rather than relying on the RPC serialization mechanism. + */ +@Internal +public class StreamGraphDescriptor implements ExecutionPlan { + + private final SerializedValue<StreamGraph> serializedStreamGraph; + + private final long initialClientHeartbeatTimeout; + + /** + * This collection stores operator factories serialized separately from the {@link StreamGraph}. + * This separation allows for the parallel serialization of operator factories, improving the + * overall performance of the serialization process. + * + * <p>Each tuple in this collection consists of an integer key that identifies the stream node, + * and a value that wraps the serialized representation of the associated {@link + * StreamOperatorFactory} instance. + */ + private final Collection<Tuple2<Integer, SerializedValue<StreamOperatorFactory<?>>>> + streamNodeToSerializedOperatorFactories; + + private final JobID jobId; + + private final String jobName; + + private final List<PermanentBlobKey> userJarBlobKeys; + + private final List<URL> classpath; + + private final boolean isPartialResourceConfigured; + + private final boolean isEmptyGraph; + + private final JobType jobType; + + private final boolean dynamic; + + private final JobCheckpointingSettings checkpointingSettings; + + private final Configuration jobConfiguration; + + private final List<Path> userJars; + + private final Map<String, DistributedCache.DistributedCacheEntry> userArtifacts; + + private final SerializedValue<ExecutionConfig> serializedExecutionConfig; + + private SavepointRestoreSettings savepointRestoreSettings; + + private final int maximumParallelism; + + public StreamGraphDescriptor(StreamGraph streamGraph, Executor serializationExecutor) + throws Exception { + this.jobId = checkNotNull(streamGraph.getJobId()); + this.jobName = checkNotNull(streamGraph.getJobName()); + this.userJarBlobKeys = checkNotNull(streamGraph.getUserJarBlobKeys()); + this.classpath = checkNotNull(streamGraph.getClasspath()); + + checkNotNull(streamGraph); + checkNotNull(serializationExecutor); + + // Serialize operator factories in parallel to accelerate serialization. + this.streamNodeToSerializedOperatorFactories = + serializeOperatorFactories(streamGraph.getStreamNodes(), serializationExecutor); + this.serializedStreamGraph = new SerializedValue<>(streamGraph); + + this.isPartialResourceConfigured = isPartialResourceConfigured(streamGraph); + this.initialClientHeartbeatTimeout = streamGraph.getInitialClientHeartbeatTimeout(); + this.isEmptyGraph = streamGraph.getStreamNodes().isEmpty(); + this.jobType = streamGraph.getJobType(); + this.dynamic = streamGraph.isDynamic(); + this.checkpointingSettings = streamGraph.getJobCheckpointingSettings(); + this.jobConfiguration = streamGraph.getJobConfiguration(); + this.userJars = streamGraph.getUserJars(); + this.userArtifacts = streamGraph.getUserArtifacts(); + this.savepointRestoreSettings = streamGraph.getSavepointRestoreSettings(); + this.maximumParallelism = streamGraph.getMaximumParallelism(); + this.serializedExecutionConfig = new SerializedValue<>(streamGraph.getExecutionConfig()); + } + + private boolean isPartialResourceConfigured(StreamGraph streamGraph) { + boolean hasVerticesWithUnknownResource = false; + boolean hasVerticesWithConfiguredResource = false; + + for (StreamNode streamNode : streamGraph.getStreamNodes()) { + if (streamNode.getMinResources() == ResourceSpec.UNKNOWN) { + hasVerticesWithUnknownResource = true; + } else { + hasVerticesWithConfiguredResource = true; + } + + if (hasVerticesWithUnknownResource && hasVerticesWithConfiguredResource) { + return true; + } + } + + return false; + } + + @Override + public JobID getJobID() { + return jobId; + } + + @Override + public List<PermanentBlobKey> getUserJarBlobKeys() { + return userJarBlobKeys; + } + + @Override + public List<URL> getClasspaths() { + return classpath; + } + + private Collection<Tuple2<Integer, SerializedValue<StreamOperatorFactory<?>>>> + serializeOperatorFactories( + Collection<StreamNode> streamNodes, Executor serializationExecutor) + throws Exception { + List<CompletableFuture<Tuple2<Integer, SerializedValue<StreamOperatorFactory<?>>>>> + futures = + streamNodes.stream() + .filter(node -> node.getOperatorFactory() != null) + .map( + node -> + CompletableFuture.supplyAsync( Review Comment: It's better to introduce a method `serializeOperatorFactoryAsync(node, serializationExecutor)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org