[ https://issues.apache.org/jira/browse/FLINK-10494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654881#comment-16654881 ]
ASF GitHub Bot commented on FLINK-10494: ---------------------------------------- tillrohrmann closed pull request #6794: [FLINK-10494] [Job Manager] Rename 'JobManager' to 'JobMaster' for some classes in JobMaster folder URL: https://github.com/apache/flink/pull/6794 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index a1da2131e48..a4775066117 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -37,9 +37,9 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; -import org.apache.flink.runtime.jobmaster.JobManagerRunner; -import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.jobmaster.JobMasterRunner; +import org.apache.flink.runtime.jobmaster.JobMasterSharedServices; import org.apache.flink.runtime.jobmaster.JobNotFinishedException; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.jobmaster.RescalingBehaviour; @@ -105,13 +105,13 @@ private final HighAvailabilityServices highAvailabilityServices; private final ResourceManagerGateway resourceManagerGateway; - private final JobManagerSharedServices jobManagerSharedServices; + private final JobMasterSharedServices jobMasterSharedServices; private final HeartbeatServices heartbeatServices; private final BlobServer blobServer; private final FatalErrorHandler fatalErrorHandler; - private final Map<JobID, CompletableFuture<JobManagerRunner>> jobManagerRunnerFutures; + private final Map<JobID, CompletableFuture<JobMasterRunner>> jobManagerRunnerFutures; private final LeaderElectionService leaderElectionService; @@ -161,7 +161,7 @@ public Dispatcher( this.jobManagerMetricGroup = Preconditions.checkNotNull(jobManagerMetricGroup); this.metricQueryServicePath = metricServiceQueryPath; - this.jobManagerSharedServices = JobManagerSharedServices.fromConfiguration( + this.jobMasterSharedServices = JobMasterSharedServices.fromConfiguration( configuration, this.blobServer); @@ -197,7 +197,7 @@ public Dispatcher( () -> { Exception exception = null; try { - jobManagerSharedServices.shutdown(); + jobMasterSharedServices.shutdown(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } @@ -281,7 +281,7 @@ public void start() throws Exception { private CompletableFuture<Void> runJob(JobGraph jobGraph) { Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID())); - final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph); + final CompletableFuture<JobMasterRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph); jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture); @@ -296,10 +296,10 @@ public void start() throws Exception { getMainThreadExecutor()); } - private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) { + private CompletableFuture<JobMasterRunner> createJobManagerRunner(JobGraph jobGraph) { final RpcService rpcService = getRpcService(); - final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = CompletableFuture.supplyAsync( + final CompletableFuture<JobMasterRunner> jobManagerRunnerFuture = CompletableFuture.supplyAsync( CheckedSupplier.unchecked(() -> jobManagerRunnerFactory.createJobManagerRunner( ResourceID.generate(), @@ -309,7 +309,7 @@ public void start() throws Exception { highAvailabilityServices, heartbeatServices, blobServer, - jobManagerSharedServices, + jobMasterSharedServices, new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup), fatalErrorHandler)), rpcService.getExecutor()); @@ -317,13 +317,13 @@ public void start() throws Exception { return jobManagerRunnerFuture.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner)); } - private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception { - final JobID jobId = jobManagerRunner.getJobGraph().getJobID(); - jobManagerRunner.getResultFuture().whenCompleteAsync( + private JobMasterRunner startJobManagerRunner(JobMasterRunner jobMasterRunner) throws Exception { + final JobID jobId = jobMasterRunner.getJobGraph().getJobID(); + jobMasterRunner.getResultFuture().whenCompleteAsync( (ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> { - // check if we are still the active JobManagerRunner by checking the identity + // check if we are still the active JobMasterRunner by checking the identity //noinspection ObjectEquality - if (jobManagerRunner == jobManagerRunnerFutures.get(jobId).getNow(null)) { + if (jobMasterRunner == jobManagerRunnerFutures.get(jobId).getNow(null)) { if (archivedExecutionGraph != null) { jobReachedGloballyTerminalState(archivedExecutionGraph); } else { @@ -336,13 +336,13 @@ private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner } } } else { - log.debug("There is a newer JobManagerRunner for the job {}.", jobId); + log.debug("There is a newer JobMasterRunner for the job {}.", jobId); } }, getMainThreadExecutor()); - jobManagerRunner.start(); + jobMasterRunner.start(); - return jobManagerRunner; + return jobMasterRunner; } @Override @@ -367,7 +367,7 @@ private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner return Acknowledge.get(); }, - jobManagerSharedServices.getScheduledExecutorService()); + jobMasterSharedServices.getScheduledExecutorService()); } @Override @@ -498,7 +498,7 @@ private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner @Override public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) { - final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId); + final CompletableFuture<JobMasterRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId); if (jobManagerRunnerFuture == null) { final ArchivedExecutionGraph archivedExecutionGraph = archivedExecutionGraphStore.get(jobId); @@ -509,7 +509,7 @@ private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner return CompletableFuture.completedFuture(JobResult.createFrom(archivedExecutionGraph)); } } else { - return jobManagerRunnerFuture.thenCompose(JobManagerRunner::getResultFuture).thenApply(JobResult::createFrom); + return jobManagerRunnerFuture.thenCompose(JobMasterRunner::getResultFuture).thenApply(JobResult::createFrom); } } @@ -583,11 +583,11 @@ private void registerJobManagerRunnerTerminationFuture(JobID jobId, CompletableF } private CompletableFuture<Void> removeJob(JobID jobId, boolean cleanupHA) { - CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.remove(jobId); + CompletableFuture<JobMasterRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.remove(jobId); final CompletableFuture<Void> jobManagerRunnerTerminationFuture; if (jobManagerRunnerFuture != null) { - jobManagerRunnerTerminationFuture = jobManagerRunnerFuture.thenCompose(JobManagerRunner::closeAsync); + jobManagerRunnerTerminationFuture = jobManagerRunnerFuture.thenCompose(JobMasterRunner::closeAsync); } else { jobManagerRunnerTerminationFuture = CompletableFuture.completedFuture(null); } @@ -628,7 +628,7 @@ private void cleanUpJobData(JobID jobId, boolean cleanupHA) { } /** - * Terminate all currently running {@link JobManagerRunner}. + * Terminate all currently running {@link JobMasterRunner}. */ private void terminateJobManagerRunners() { log.info("Stopping all currently running jobs of dispatcher {}.", getAddress()); @@ -756,12 +756,12 @@ private void jobMasterFailed(JobID jobId, Throwable cause) { } private CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture(JobID jobId) { - final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId); + final CompletableFuture<JobMasterRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId); if (jobManagerRunnerFuture == null) { return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); } else { - final CompletableFuture<JobMasterGateway> leaderGatewayFuture = jobManagerRunnerFuture.thenCompose(JobManagerRunner::getLeaderGatewayFuture); + final CompletableFuture<JobMasterGateway> leaderGatewayFuture = jobManagerRunnerFuture.thenCompose(JobMasterRunner::getLeaderGatewayFuture); return leaderGatewayFuture.thenApplyAsync( (JobMasterGateway jobMasterGateway) -> { // check whether the retrieved JobMasterGateway belongs still to a running JobMaster @@ -1011,11 +1011,11 @@ public void onRemovedJobGraph(final JobID jobId) { //------------------------------------------------------ /** - * Factory for a {@link JobManagerRunner}. + * Factory for a {@link JobMasterRunner}. */ @FunctionalInterface public interface JobManagerRunnerFactory { - JobManagerRunner createJobManagerRunner( + JobMasterRunner createJobManagerRunner( ResourceID resourceId, JobGraph jobGraph, Configuration configuration, @@ -1023,19 +1023,19 @@ JobManagerRunner createJobManagerRunner( HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, - JobManagerSharedServices jobManagerServices, + JobMasterSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception; } /** - * Singleton default factory for {@link JobManagerRunner}. + * Singleton default factory for {@link JobMasterRunner}. */ public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory { INSTANCE; @Override - public JobManagerRunner createJobManagerRunner( + public JobMasterRunner createJobManagerRunner( ResourceID resourceId, JobGraph jobGraph, Configuration configuration, @@ -1043,10 +1043,10 @@ public JobManagerRunner createJobManagerRunner( HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, - JobManagerSharedServices jobManagerServices, + JobMasterSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception { - return new JobManagerRunner( + return new JobMasterRunner( resourceId, jobGraph, configuration, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionGraphException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionGraphException.java index 7c35f3dbbbe..eafbeec6bb6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionGraphException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionGraphException.java @@ -23,7 +23,7 @@ /** * Exceptions thrown by operations on the {@link ExecutionGraph} by the {@link JobMaster}. */ -public class ExecutionGraphException extends JobManagerException { +public class ExecutionGraphException extends JobMasterException { private static final long serialVersionUID = -5439002256464886357L; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerException.java deleted file mode 100644 index 1650c83290d..00000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerException.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmaster; - -import org.apache.flink.util.FlinkException; - -/** - * Base exception thrown by the {@link JobMaster}. - */ -public class JobManagerException extends FlinkException { - - private static final long serialVersionUID = -7290962952242188064L; - - public JobManagerException(final String message) { - super(message); - } - - public JobManagerException(final String message, Throwable cause) { - super(message, cause); - } - - public JobManagerException(Throwable cause) { - super(cause); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 21e06af30d6..8e4ce077d81 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -228,7 +228,7 @@ public JobMaster( JobGraph jobGraph, HighAvailabilityServices highAvailabilityService, SlotPoolFactory slotPoolFactory, - JobManagerSharedServices jobManagerSharedServices, + JobMasterSharedServices jobMasterSharedServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerJobMetricGroupFactory jobMetricGroupFactory, @@ -246,7 +246,7 @@ public JobMaster( this.rpcTimeout = jobMasterConfiguration.getRpcTimeout(); this.highAvailabilityServices = checkNotNull(highAvailabilityService); this.blobServer = checkNotNull(blobServer); - this.scheduledExecutorService = jobManagerSharedServices.getScheduledExecutorService(); + this.scheduledExecutorService = jobMasterSharedServices.getScheduledExecutorService(); this.jobCompletionActions = checkNotNull(jobCompletionActions); this.fatalErrorHandler = checkNotNull(fatalErrorHandler); this.userCodeLoader = checkNotNull(userCodeLoader); @@ -275,7 +275,7 @@ public JobMaster( .getRestartStrategy(); this.restartStrategy = RestartStrategyResolving.resolve(restartStrategyConfiguration, - jobManagerSharedServices.getRestartStrategyFactory(), + jobMasterSharedServices.getRestartStrategyFactory(), jobGraph.isCheckpointingEnabled()); log.info("Using restart strategy {} for {} ({}).", this.restartStrategy, jobName, jid); @@ -288,7 +288,7 @@ public JobMaster( this.registeredTaskManagers = new HashMap<>(4); - this.backPressureStatsTracker = checkNotNull(jobManagerSharedServices.getBackPressureStatsTracker()); + this.backPressureStatsTracker = checkNotNull(jobMasterSharedServices.getBackPressureStatsTracker()); this.lastInternalSavepoint = null; this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRunner.java similarity index 94% rename from flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRunner.java index 78671bc782f..f9a126a7f84 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRunner.java @@ -58,9 +58,9 @@ * The runner for the job manager. It deals with job level leader election and make underlying job manager * properly reacted. */ -public class JobManagerRunner implements LeaderContender, OnCompletionActions, AutoCloseableAsync { +public class JobMasterRunner implements LeaderContender, OnCompletionActions, AutoCloseableAsync { - private static final Logger log = LoggerFactory.getLogger(JobManagerRunner.class); + private static final Logger log = LoggerFactory.getLogger(JobMasterRunner.class); // ------------------------------------------------------------------------ @@ -76,7 +76,7 @@ /** Leader election for this job. */ private final LeaderElectionService leaderElectionService; - private final JobManagerSharedServices jobManagerSharedServices; + private final JobMasterSharedServices jobMasterSharedServices; private final JobMaster jobMaster; @@ -96,13 +96,13 @@ // ------------------------------------------------------------------------ /** - * Exceptions that occur while creating the JobManager or JobManagerRunner are directly + * Exceptions that occur while creating the JobManager or JobMasterRunner are directly * thrown and not reported to the given {@code FatalErrorHandler}. * * @throws Exception Thrown if the runner cannot be set up, because either one of the * required services could not be started, ot the Job could not be initialized. */ - public JobManagerRunner( + public JobMasterRunner( final ResourceID resourceId, final JobGraph jobGraph, final Configuration configuration, @@ -110,7 +110,7 @@ public JobManagerRunner( final HighAvailabilityServices haServices, final HeartbeatServices heartbeatServices, final BlobServer blobServer, - final JobManagerSharedServices jobManagerSharedServices, + final JobMasterSharedServices jobMasterSharedServices, final JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, final FatalErrorHandler fatalErrorHandler) throws Exception { @@ -120,13 +120,13 @@ public JobManagerRunner( // make sure we cleanly shut down out JobManager services if initialization fails try { this.jobGraph = checkNotNull(jobGraph); - this.jobManagerSharedServices = checkNotNull(jobManagerSharedServices); + this.jobMasterSharedServices = checkNotNull(jobMasterSharedServices); this.fatalErrorHandler = checkNotNull(fatalErrorHandler); checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty"); // libraries and class loader first - final LibraryCacheManager libraryCacheManager = jobManagerSharedServices.getLibraryCacheManager(); + final LibraryCacheManager libraryCacheManager = jobMasterSharedServices.getLibraryCacheManager(); try { libraryCacheManager.registerJob( jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths()); @@ -161,7 +161,7 @@ public JobManagerRunner( jobGraph, haServices, slotPoolFactory, - jobManagerSharedServices, + jobMasterSharedServices, heartbeatServices, blobServer, jobManagerJobMetricGroupFactory, @@ -226,12 +226,12 @@ public void start() throws Exception { throwable = ExceptionUtils.firstOrSuppressed(t, ExceptionUtils.stripCompletionException(throwable)); } - final LibraryCacheManager libraryCacheManager = jobManagerSharedServices.getLibraryCacheManager(); + final LibraryCacheManager libraryCacheManager = jobMasterSharedServices.getLibraryCacheManager(); libraryCacheManager.unregisterJob(jobGraph.getJobID()); if (throwable != null) { terminationFuture.completeExceptionally( - new FlinkException("Could not properly shut down the JobManagerRunner", throwable)); + new FlinkException("Could not properly shut down the JobMasterRunner", throwable)); } else { terminationFuture.complete(null); } @@ -307,7 +307,7 @@ private void unregisterJobFromHighAvailability() { public void grantLeadership(final UUID leaderSessionID) { synchronized (lock) { if (shutdown) { - log.info("JobManagerRunner already shutdown."); + log.info("JobMasterRunner already shutdown."); return; } @@ -342,7 +342,7 @@ private void verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) t confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture); } }, - jobManagerSharedServices.getScheduledExecutorService()); + jobMasterSharedServices.getScheduledExecutorService()); } } @@ -359,7 +359,7 @@ private void confirmLeaderSessionIdIfStillLeader(UUID leaderSessionId, Completab public void revokeLeadership() { synchronized (lock) { if (shutdown) { - log.info("JobManagerRunner already shutdown."); + log.info("JobMasterRunner already shutdown."); return; } @@ -376,7 +376,7 @@ public void revokeLeadership() { handleJobManagerRunnerError(new FlinkException("Could not suspend the job manager.", throwable)); } }, - jobManagerSharedServices.getScheduledExecutorService()); + jobMasterSharedServices.getScheduledExecutorService()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterSharedServices.java similarity index 97% rename from flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterSharedServices.java index c1e910cbeb8..20715eec3d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterSharedServices.java @@ -48,7 +48,7 @@ * Utility class which holds all auxiliary shared services used by the {@link JobMaster}. * Consequently, the {@link JobMaster} should never shut these services down. */ -public class JobManagerSharedServices { +public class JobMasterSharedServices { private final ScheduledExecutorService scheduledExecutorService; @@ -60,7 +60,7 @@ private final BackPressureStatsTracker backPressureStatsTracker; - public JobManagerSharedServices( + public JobMasterSharedServices( ScheduledExecutorService scheduledExecutorService, LibraryCacheManager libraryCacheManager, RestartStrategyFactory restartStrategyFactory, @@ -121,7 +121,7 @@ public void shutdown() throws Exception { // Creating the components from a configuration // ------------------------------------------------------------------------ - public static JobManagerSharedServices fromConfiguration( + public static JobMasterSharedServices fromConfiguration( Configuration config, BlobServer blobServer) throws Exception { @@ -166,7 +166,7 @@ public static JobManagerSharedServices fromConfiguration( cleanUpInterval, TimeUnit.MILLISECONDS); - return new JobManagerSharedServices( + return new JobMasterSharedServices( futureExecutor, libraryCacheManager, RestartStrategyFactory.createRestartStrategyFactory(config), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java index c825451d946..443c9b6a4c2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java @@ -169,7 +169,7 @@ private HATestingDispatcher createHADispatcher(TestingHighAvailabilityServices h } /** - * Tests that all JobManagerRunner are terminated if the leadership of the + * Tests that all JobMasterRunner are terminated if the leadership of the * Dispatcher is revoked. */ @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 24426761b2e..6b733eba05c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -40,8 +40,8 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; -import org.apache.flink.runtime.jobmaster.JobManagerRunner; -import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; +import org.apache.flink.runtime.jobmaster.JobMasterRunner; +import org.apache.flink.runtime.jobmaster.JobMasterSharedServices; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; @@ -244,7 +244,7 @@ public void tearDown() throws Exception { /** * Tests that we can submit a job to the Dispatcher which then spawns a - * new JobManagerRunner. + * new JobMasterRunner. */ @Test public void testJobSubmission() throws Exception { @@ -585,7 +585,7 @@ public void testJobSubmissionErrorAfterJobRecovery() throws Exception { } /** - * Tests that a blocking {@link JobManagerRunner} creation, e.g. due to blocking FileSystem access, + * Tests that a blocking {@link JobMasterRunner} creation, e.g. due to blocking FileSystem access, * does not block the {@link Dispatcher}. * * <p>See FLINK-10314 @@ -618,7 +618,7 @@ public void testBlockingJobManagerRunner() throws Exception { } /** - * Tests that a failing {@link JobManagerRunner} will be properly cleaned up. + * Tests that a failing {@link JobMasterRunner} will be properly cleaned up. */ @Test public void testFailingJobManagerRunnerCleanup() throws Exception { @@ -649,7 +649,7 @@ public void testFailingJobManagerRunnerCleanup() throws Exception { try { submissionFuture.get(); - fail("Should fail because we could not instantiate the JobManagerRunner."); + fail("Should fail because we could not instantiate the JobMasterRunner."); } catch (Exception e) { assertThat(ExceptionUtils.findThrowable(e, t -> t.equals(testException)).isPresent(), is(true)); } @@ -673,10 +673,10 @@ public void testFailingJobManagerRunnerCleanup() throws Exception { } @Override - public JobManagerRunner createJobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception { + public JobMasterRunner createJobManagerRunner(ResourceID resourceId, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobMasterSharedServices jobMasterSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception { jobManagerRunnerCreationLatch.run(); - return super.createJobManagerRunner(resourceId, jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, blobServer, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler); + return super.createJobManagerRunner(resourceId, jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, blobServer, jobMasterSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler); } } @@ -723,7 +723,7 @@ private ExpectedJobIdJobManagerRunnerFactory(JobID expectedJobId, CountDownLatch } @Override - public JobManagerRunner createJobManagerRunner( + public JobMasterRunner createJobManagerRunner( ResourceID resourceId, JobGraph jobGraph, Configuration configuration, @@ -731,7 +731,7 @@ public JobManagerRunner createJobManagerRunner( HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, - JobManagerSharedServices jobManagerSharedServices, + JobMasterSharedServices jobMasterSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception { assertEquals(expectedJobId, jobGraph.getJobID()); @@ -746,7 +746,7 @@ public JobManagerRunner createJobManagerRunner( highAvailabilityServices, heartbeatServices, blobServer, - jobManagerSharedServices, + jobMasterSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java index 992f08713c6..babeab37917 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerFactory.java @@ -25,8 +25,8 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobmaster.JobManagerRunner; -import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; +import org.apache.flink.runtime.jobmaster.JobMasterRunner; +import org.apache.flink.runtime.jobmaster.JobMasterSharedServices; import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; @@ -53,7 +53,7 @@ } @Override - public JobManagerRunner createJobManagerRunner( + public JobMasterRunner createJobManagerRunner( ResourceID resourceId, JobGraph jobGraph, Configuration configuration, @@ -61,12 +61,12 @@ public JobManagerRunner createJobManagerRunner( HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, - JobManagerSharedServices jobManagerSharedServices, + JobMasterSharedServices jobMasterSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception { jobGraphFuture.complete(jobGraph); - final JobManagerRunner mock = mock(JobManagerRunner.class); + final JobMasterRunner mock = mock(JobMasterRunner.class); when(mock.getResultFuture()).thenReturn(resultFuture); when(mock.closeAsync()).thenReturn(terminationFuture); when(mock.getJobGraph()).thenReturn(jobGraph); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterRunnerTest.java similarity index 81% rename from flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterRunnerTest.java index 08f6fe55dea..5158197ed6b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterRunnerTest.java @@ -61,9 +61,9 @@ import static org.junit.Assert.fail; /** - * Tests for the {@link JobManagerRunner} + * Tests for the {@link JobMasterRunner} */ -public class JobManagerRunnerTest extends TestLogger { +public class JobMasterRunnerTest extends TestLogger { @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -76,7 +76,7 @@ private static HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L); - private static JobManagerSharedServices jobManagerSharedServices; + private static JobMasterSharedServices jobMasterSharedServices; private static JobGraph jobGraph; @@ -97,7 +97,7 @@ public static void setupClass() throws Exception { configuration, new VoidBlobStore()); - jobManagerSharedServices = JobManagerSharedServices.fromConfiguration(configuration, blobServer); + jobMasterSharedServices = JobMasterSharedServices.fromConfiguration(configuration, blobServer); final JobVertex jobVertex = new JobVertex("Test vertex"); jobVertex.setInvokableClass(NoOpInvokable.class); @@ -126,8 +126,8 @@ public void tearDown() throws Exception { @AfterClass public static void tearDownClass() throws Exception { - if (jobManagerSharedServices != null) { - jobManagerSharedServices.shutdown(); + if (jobMasterSharedServices != null) { + jobMasterSharedServices.shutdown(); } if (blobServer != null) { @@ -141,35 +141,35 @@ public static void tearDownClass() throws Exception { @Test public void testJobCompletion() throws Exception { - final JobManagerRunner jobManagerRunner = createJobManagerRunner(); + final JobMasterRunner jobMasterRunner = createJobManagerRunner(); try { - jobManagerRunner.start(); + jobMasterRunner.start(); - final CompletableFuture<ArchivedExecutionGraph> resultFuture = jobManagerRunner.getResultFuture(); + final CompletableFuture<ArchivedExecutionGraph> resultFuture = jobMasterRunner.getResultFuture(); assertThat(resultFuture.isDone(), is(false)); - jobManagerRunner.jobReachedGloballyTerminalState(archivedExecutionGraph); + jobMasterRunner.jobReachedGloballyTerminalState(archivedExecutionGraph); assertThat(resultFuture.get(), is(archivedExecutionGraph)); } finally { - jobManagerRunner.close(); + jobMasterRunner.close(); } } @Test public void testJobFinishedByOther() throws Exception { - final JobManagerRunner jobManagerRunner = createJobManagerRunner(); + final JobMasterRunner jobMasterRunner = createJobManagerRunner(); try { - jobManagerRunner.start(); + jobMasterRunner.start(); - final CompletableFuture<ArchivedExecutionGraph> resultFuture = jobManagerRunner.getResultFuture(); + final CompletableFuture<ArchivedExecutionGraph> resultFuture = jobMasterRunner.getResultFuture(); assertThat(resultFuture.isDone(), is(false)); - jobManagerRunner.jobFinishedByOther(); + jobMasterRunner.jobFinishedByOther(); try { resultFuture.get(); @@ -178,22 +178,22 @@ public void testJobFinishedByOther() throws Exception { assertThat(ExceptionUtils.stripExecutionException(ee), instanceOf(JobNotFinishedException.class)); } } finally { - jobManagerRunner.close(); + jobMasterRunner.close(); } } @Test public void testShutDown() throws Exception { - final JobManagerRunner jobManagerRunner = createJobManagerRunner(); + final JobMasterRunner jobMasterRunner = createJobManagerRunner(); try { - jobManagerRunner.start(); + jobMasterRunner.start(); - final CompletableFuture<ArchivedExecutionGraph> resultFuture = jobManagerRunner.getResultFuture(); + final CompletableFuture<ArchivedExecutionGraph> resultFuture = jobMasterRunner.getResultFuture(); assertThat(resultFuture.isDone(), is(false)); - jobManagerRunner.closeAsync(); + jobMasterRunner.closeAsync(); try { resultFuture.get(); @@ -202,33 +202,33 @@ public void testShutDown() throws Exception { assertThat(ExceptionUtils.stripExecutionException(ee), instanceOf(JobNotFinishedException.class)); } } finally { - jobManagerRunner.close(); + jobMasterRunner.close(); } } @Test public void testLibraryCacheManagerRegistration() throws Exception { - final JobManagerRunner jobManagerRunner = createJobManagerRunner(); + final JobMasterRunner jobMasterRunner = createJobManagerRunner(); try { - jobManagerRunner.start(); + jobMasterRunner.start(); - final LibraryCacheManager libraryCacheManager = jobManagerSharedServices.getLibraryCacheManager(); + final LibraryCacheManager libraryCacheManager = jobMasterSharedServices.getLibraryCacheManager(); final JobID jobID = jobGraph.getJobID(); assertThat(libraryCacheManager.hasClassLoader(jobID), is(true)); - jobManagerRunner.close(); + jobMasterRunner.close(); assertThat(libraryCacheManager.hasClassLoader(jobID), is(false)); } finally { - jobManagerRunner.close(); + jobMasterRunner.close(); } } @Nonnull - private JobManagerRunner createJobManagerRunner() throws Exception { - return new JobManagerRunner( + private JobMasterRunner createJobManagerRunner() throws Exception { + return new JobMasterRunner( ResourceID.generate(), jobGraph, configuration, @@ -236,7 +236,7 @@ private JobManagerRunner createJobManagerRunner() throws Exception { haServices, heartbeatServices, blobServer, - jobManagerSharedServices, + jobMasterSharedServices, UnregisteredJobManagerJobMetricGroupFactory.INSTANCE, fatalErrorHandler); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 462b1d16da9..2eab3421227 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -243,13 +243,13 @@ public void testHeartbeatTimeoutWithTaskManager() throws Exception { rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway); - final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build(); + final JobMasterSharedServices jobMasterSharedServices = new TestingJobManagerSharedServicesBuilder().build(); final JobMaster jobMaster = createJobMaster( configuration, jobGraph, haServices, - jobManagerSharedServices); + jobMasterSharedServices); CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout); @@ -276,7 +276,7 @@ public void testHeartbeatTimeoutWithTaskManager() throws Exception { assertThat(disconnectedJobManager, Matchers.equalTo(jobGraph.getJobID())); } finally { - jobManagerSharedServices.shutdown(); + jobMasterSharedServices.shutdown(); RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } @@ -310,13 +310,13 @@ public void testHeartbeatTimeoutWithResourceManager() throws Exception { rpcService.registerGateway(resourceManagerAddress, resourceManagerGateway); - final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build(); + final JobMasterSharedServices jobMasterSharedServices = new TestingJobManagerSharedServicesBuilder().build(); final JobMaster jobMaster = createJobMaster( configuration, jobGraph, haServices, - jobManagerSharedServices); + jobMasterSharedServices); CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout); @@ -344,7 +344,7 @@ public void testHeartbeatTimeoutWithResourceManager() throws Exception { // the JobMaster should try to reconnect to the RM registrationAttempts.await(); } finally { - jobManagerSharedServices.shutdown(); + jobMasterSharedServices.shutdown(); RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); } } @@ -711,7 +711,7 @@ public void testRequestNextInputSplit() throws Exception { configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); - final JobManagerSharedServices jobManagerSharedServices = + final JobMasterSharedServices jobMasterSharedServices = new TestingJobManagerSharedServicesBuilder() .setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration)) .build(); @@ -720,7 +720,7 @@ public void testRequestNextInputSplit() throws Exception { configuration, testJobGraph, haServices, - jobManagerSharedServices); + jobMasterSharedServices); CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout); @@ -966,7 +966,7 @@ public void testTriggerSavepointTimeout() throws Exception { */ @Test public void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception { - final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build(); + final JobMasterSharedServices jobMasterSharedServices = new TestingJobManagerSharedServicesBuilder().build(); final JobGraph jobGraph = createSingleVertexJobWithRestartStrategy(); @@ -974,7 +974,7 @@ public void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception configuration, jobGraph, haServices, - jobManagerSharedServices, + jobMasterSharedServices, heartbeatServices); final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); @@ -1081,12 +1081,12 @@ private JobMaster createJobMaster( Configuration configuration, JobGraph jobGraph, HighAvailabilityServices highAvailabilityServices, - JobManagerSharedServices jobManagerSharedServices) throws Exception { + JobMasterSharedServices jobMasterSharedServices) throws Exception { return createJobMaster( configuration, jobGraph, highAvailabilityServices, - jobManagerSharedServices, + jobMasterSharedServices, fastHeartbeatServices); } @@ -1095,7 +1095,7 @@ private JobMaster createJobMaster( Configuration configuration, JobGraph jobGraph, HighAvailabilityServices highAvailabilityServices, - JobManagerSharedServices jobManagerSharedServices, + JobMasterSharedServices jobMasterSharedServices, HeartbeatServices heartbeatServices) throws Exception { final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration); @@ -1107,7 +1107,7 @@ private JobMaster createJobMaster( jobGraph, highAvailabilityServices, DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService), - jobManagerSharedServices, + jobMasterSharedServices, heartbeatServices, blobServer, UnregisteredJobManagerJobMetricGroupFactory.INSTANCE, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java index 030e4e67e7a..9655a2f34b9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java @@ -31,7 +31,7 @@ import static org.mockito.Mockito.mock; /** - * Builder for the {@link JobManagerSharedServices}. + * Builder for the {@link JobMasterSharedServices}. */ public class TestingJobManagerSharedServicesBuilder { @@ -80,8 +80,8 @@ public TestingJobManagerSharedServicesBuilder setBackPressureStatsTracker(BackPr } - public JobManagerSharedServices build() { - return new JobManagerSharedServices( + public JobMasterSharedServices build() { + return new JobMasterSharedServices( scheduledExecutorService, libraryCacheManager, restartStrategyFactory, diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java index 8ff85e05c21..b2f3461d203 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java @@ -158,7 +158,7 @@ private void waitForJob() throws Exception { return; } } catch (ExecutionException ignored) { - // JobManagerRunner is not yet registered in Dispatcher + // JobMasterRunner is not yet registered in Dispatcher } Thread.sleep(1000); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Rename 'JobManager' to 'JobMaster' for some classes in JobMaster folder > ----------------------------------------------------------------------- > > Key: FLINK-10494 > URL: https://issues.apache.org/jira/browse/FLINK-10494 > Project: Flink > Issue Type: Sub-task > Components: JobManager > Affects Versions: 1.6.0, 1.7.0 > Reporter: JIN SUN > Assignee: JIN SUN > Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Some names in > "flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster" folder are > confusing, we should rename it to JobMaster. > > * JobManagerRunner -> JobMasterRunner > * JobManagerGateway -> JobMasterGateway > * JobManagerSharedServices -> JobMasterSharedServices > * JobManagerException -> JobMasterException -- This message was sent by Atlassian JIRA (v7.6.3#76005)