[ 
https://issues.apache.org/jira/browse/FLINK-10291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633777#comment-16633777
 ] 

ASF GitHub Bot commented on FLINK-10291:
----------------------------------------

asfgit closed pull request #6733: [FLINK-10291] Generate JobGraph with 
fixed/configurable JobID in StandaloneJobClusterEntrypoint
URL: https://github.com/apache/flink/pull/6733
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
index 94fc109c47b..59ab4065804 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.program;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
@@ -39,18 +40,21 @@
 public class PackagedProgramUtils {
 
        /**
-        * Creates a {@link JobGraph} from the given {@link PackagedProgram}.
+        * Creates a {@link JobGraph} with a specified {@link JobID}
+        * from the given {@link PackagedProgram}.
         *
         * @param packagedProgram to extract the JobGraph from
         * @param configuration to use for the optimizer and job graph generator
         * @param defaultParallelism for the JobGraph
+        * @param jobID the pre-generated job id
         * @return JobGraph extracted from the PackagedProgram
         * @throws ProgramInvocationException if the JobGraph generation failed
         */
        public static JobGraph createJobGraph(
                        PackagedProgram packagedProgram,
                        Configuration configuration,
-                       int defaultParallelism) throws 
ProgramInvocationException {
+                       int defaultParallelism,
+                       JobID jobID) throws ProgramInvocationException {
                
Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
                final Optimizer optimizer = new Optimizer(new DataStatistics(), 
new DefaultCostEstimator(), configuration);
                final FlinkPlan flinkPlan;
@@ -79,11 +83,11 @@ public static JobGraph createJobGraph(
                final JobGraph jobGraph;
 
                if (flinkPlan instanceof StreamingPlan) {
-                       jobGraph = ((StreamingPlan) flinkPlan).getJobGraph();
+                       jobGraph = ((StreamingPlan) 
flinkPlan).getJobGraph(jobID);
                        
jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
                } else {
                        final JobGraphGenerator jobGraphGenerator = new 
JobGraphGenerator(configuration);
-                       jobGraph = 
jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan);
+                       jobGraph = 
jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan, jobID);
                }
 
                for (URL url : packagedProgram.getAllLibraries()) {
@@ -99,5 +103,22 @@ public static JobGraph createJobGraph(
                return jobGraph;
        }
 
+       /**
+        * Creates a {@link JobGraph} with a random {@link JobID}
+        * from the given {@link PackagedProgram}.
+        *
+        * @param packagedProgram to extract the JobGraph from
+        * @param configuration to use for the optimizer and job graph generator
+        * @param defaultParallelism for the JobGraph
+        * @return JobGraph extracted from the PackagedProgram
+        * @throws ProgramInvocationException if the JobGraph generation failed
+        */
+       public static JobGraph createJobGraph(
+               PackagedProgram packagedProgram,
+               Configuration configuration,
+               int defaultParallelism) throws ProgramInvocationException {
+               return createJobGraph(packagedProgram, configuration, 
defaultParallelism, null);
+       }
+
        private PackagedProgramUtils() {}
 }
diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
index 3e0645d6859..d769b6848e7 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.container.entrypoint;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -45,6 +46,8 @@
        @Nonnull
        private final String[] programArguments;
 
+       public static final JobID FIXED_JOB_ID = new JobID(0, 0);
+
        public ClassPathJobGraphRetriever(
                        @Nonnull String jobClassName,
                        @Nonnull SavepointRestoreSettings 
savepointRestoreSettings,
@@ -59,7 +62,11 @@ public JobGraph retrieveJobGraph(Configuration 
configuration) throws FlinkExcept
                final PackagedProgram packagedProgram = createPackagedProgram();
                final int defaultParallelism = 
configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
                try {
-                       final JobGraph jobGraph = 
PackagedProgramUtils.createJobGraph(packagedProgram, configuration, 
defaultParallelism);
+                       final JobGraph jobGraph = 
PackagedProgramUtils.createJobGraph(
+                               packagedProgram,
+                               configuration,
+                               defaultParallelism,
+                               FIXED_JOB_ID);
                        jobGraph.setAllowQueuedScheduling(true);
                        
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
 
diff --git 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
index 6e460e1f894..83696e37c90 100644
--- 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
+++ 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
@@ -29,6 +29,7 @@
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -53,6 +54,7 @@ public void testJobGraphRetrieval() throws FlinkException {
 
                assertThat(jobGraph.getName(), 
is(equalTo(TestJob.class.getCanonicalName() + "-suffix")));
                assertThat(jobGraph.getMaximumParallelism(), is(parallelism));
+               assertEquals(jobGraph.getJobID(), 
ClassPathJobGraphRetriever.FIXED_JOB_ID);
        }
 
        @Test
@@ -68,5 +70,6 @@ public void testSavepointRestoreSettings() throws 
FlinkException {
                final JobGraph jobGraph = 
classPathJobGraphRetriever.retrieveJobGraph(configuration);
 
                assertThat(jobGraph.getSavepointRestoreSettings(), 
is(equalTo(savepointRestoreSettings)));
+               assertEquals(jobGraph.getJobID(), 
ClassPathJobGraphRetriever.FIXED_JOB_ID);
        }
 }
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
index 880f2e3d5db..764134f391e 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
@@ -21,15 +21,29 @@
 import java.io.File;
 import java.io.IOException;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
+import javax.annotation.Nullable;
+
 /**
  * Abstract class representing Flink Streaming plans
  * 
  */
 public abstract class StreamingPlan implements FlinkPlan {
 
-       public abstract JobGraph getJobGraph();
+       /**
+        * Gets the assembled {@link JobGraph} with a random {@link JobID}.
+        */
+       @SuppressWarnings("deprecation")
+       public JobGraph getJobGraph() {
+               return getJobGraph(null);
+       }
+
+       /**
+        * Gets the assembled {@link JobGraph} with a specified {@link JobID}.
+        */
+       public abstract JobGraph getJobGraph(@Nullable JobID jobID);
 
        public abstract String getStreamingPlanAsJSON();
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 01768ad6df7..46a4ce22112 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -19,6 +19,7 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -652,19 +653,20 @@ private void removeVertex(StreamNode toRemove) {
        }
 
        /**
-        * Gets the assembled {@link JobGraph}.
+        * Gets the assembled {@link JobGraph} with a given job id.
         */
        @SuppressWarnings("deprecation")
-       public JobGraph getJobGraph() {
+       @Override
+       public JobGraph getJobGraph(@Nullable JobID jobID) {
                // temporarily forbid checkpointing for iterative jobs
                if (isIterative() && checkpointConfig.isCheckpointingEnabled() 
&& !checkpointConfig.isForceCheckpointing()) {
                        throw new UnsupportedOperationException(
-                                       "Checkpointing is currently not 
supported by default for iterative jobs, as we cannot guarantee exactly once 
semantics. "
-                                                       + "State checkpoints 
happen normally, but records in-transit during the snapshot will be lost upon 
failure. "
-                                                       + "\nThe user can force 
enable state checkpoints with the reduced guarantees by calling: 
env.enableCheckpointing(interval,true)");
+                               "Checkpointing is currently not supported by 
default for iterative jobs, as we cannot guarantee exactly once semantics. "
+                                       + "State checkpoints happen normally, 
but records in-transit during the snapshot will be lost upon failure. "
+                                       + "\nThe user can force enable state 
checkpoints with the reduced guarantees by calling: 
env.enableCheckpointing(interval,true)");
                }
 
-               return StreamingJobGraphGenerator.createJobGraph(this);
+               return StreamingJobGraphGenerator.createJobGraph(this, jobID);
        }
 
        @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 0ce52250920..69213024975 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -19,6 +19,7 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
@@ -62,6 +63,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -83,7 +86,11 @@
        // 
------------------------------------------------------------------------
 
        public static JobGraph createJobGraph(StreamGraph streamGraph) {
-               return new 
StreamingJobGraphGenerator(streamGraph).createJobGraph();
+               return createJobGraph(streamGraph, null);
+       }
+
+       public static JobGraph createJobGraph(StreamGraph streamGraph, 
@Nullable JobID jobID) {
+               return new StreamingJobGraphGenerator(streamGraph, 
jobID).createJobGraph();
        }
 
        // 
------------------------------------------------------------------------
@@ -108,6 +115,10 @@ public static JobGraph createJobGraph(StreamGraph 
streamGraph) {
        private final List<StreamGraphHasher> legacyStreamGraphHashers;
 
        private StreamingJobGraphGenerator(StreamGraph streamGraph) {
+               this(streamGraph, null);
+       }
+
+       private StreamingJobGraphGenerator(StreamGraph streamGraph, @Nullable 
JobID jobID) {
                this.streamGraph = streamGraph;
                this.defaultStreamGraphHasher = new StreamGraphHasherV2();
                this.legacyStreamGraphHashers = Arrays.asList(new 
StreamGraphUserHashHasher());
@@ -121,7 +132,7 @@ private StreamingJobGraphGenerator(StreamGraph streamGraph) 
{
                this.chainedPreferredResources = new HashMap<>();
                this.physicalEdgesInOrder = new ArrayList<>();
 
-               jobGraph = new JobGraph(streamGraph.getJobName());
+               jobGraph = new JobGraph(jobID, streamGraph.getJobName());
        }
 
        private JobGraph createJobGraph() {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Generate JobGraph with fixed/configurable JobID in 
> StandaloneJobClusterEntrypoint
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-10291
>                 URL: https://issues.apache.org/jira/browse/FLINK-10291
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Coordination
>    Affects Versions: 1.6.0, 1.7.0
>            Reporter: Till Rohrmann
>            Assignee: vinoyang
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.7.0, 1.6.2
>
>
> The {{StandaloneJobClusterEntrypoint}} currently generates the {{JobGraph}} 
> from the user code when being started. Due to the nature of how the 
> {{JobGraph}} is generated, it will get a random {{JobID}} assigned. This is 
> problematic in case of a failover because then, the {{JobMaster}} won't be 
> able to detect the checkpoints. In order to solve this problem, we need to 
> either fix the {{JobID}} assignment or make it configurable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to