[ https://issues.apache.org/jira/browse/FLINK-10349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620028#comment-16620028 ]
ASF GitHub Bot commented on FLINK-10349: ---------------------------------------- TisonKun closed pull request #6701: [FLINK-10349] Unify stopActor utils URL: https://github.com/apache/flink/pull/6701 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java index 2b8abb1f8ff..85952da64ea 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.akka.ActorUtils; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobClientActorTest; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -29,7 +30,6 @@ import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException; import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; -import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.NetUtils; import org.apache.flink.util.TestLogger; @@ -148,7 +148,7 @@ public void testJobManagerRetrievalWithHAServices() throws Exception { assertEquals(leaderId, gateway.leaderSessionID()); } finally { if (actorRef != null) { - TestingUtils.stopActorGracefully(actorRef); + ActorUtils.stopActorGracefully(actorRef); } actorSystem.shutdown(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java index 9a992814235..5f976e701ab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java @@ -19,17 +19,19 @@ package org.apache.flink.runtime.akka; import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.instance.ActorGateway; import akka.actor.ActorRef; import akka.actor.Kill; -import akka.actor.PoisonPill; import akka.pattern.Patterns; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; @@ -87,12 +89,52 @@ return FutureUtils.completeAll(terminationFutures); } - public static void stopActor(AkkaActorGateway akkaActorGateway) { - stopActor(akkaActorGateway.actor()); - } + // ---------- Utils to stop an actor ---------- + + private static final FiniteDuration DEFAULT_TIMEOUT = FiniteDuration.apply(1, TimeUnit.MINUTES); public static void stopActor(ActorRef actorRef) { - actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); + if (actorRef != null) { + actorRef.tell(Kill.getInstance(), ActorRef.noSender()); + } + } + + public static void stopActor(ActorGateway actorGateway) { + if (actorGateway != null) { + stopActor(actorGateway.actor()); + } + } + + public static void stopActorGracefully(ActorRef actorRef) { + stopActorsGracefully(actorRef); + } + + public static void stopActorGracefully(ActorGateway actorGateway) { + stopActorGracefully(actorGateway.actor()); + } + + public static void stopActorsGracefully(@Nonnull ActorRef... actorRefs) { + List<CompletableFuture<?>> futures = new ArrayList<>(actorRefs.length); + + for (ActorRef actorRef : actorRefs) { + if (actorRef != null) { + futures.add(FutureUtils.toJava(Patterns.gracefulStop(actorRef, DEFAULT_TIMEOUT))); + } + } + + FutureUtils.waitForAll(futures); + } + + public static void stopActorsGracefully(@Nonnull ActorGateway... actorGateways) { + List<ActorRef> actorRefs = new ArrayList<>(actorGateways.length); + + for (ActorGateway actorGateway : actorGateways) { + if (actorGateway != null) { + actorRefs.add(actorGateway.actor()); + } + } + + stopActorsGracefully(actorRefs.toArray(new ActorRef[0])); } private ActorUtils() {} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java index 2d39e0b972d..9f4a3b2c0a3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java @@ -25,7 +25,6 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; -import akka.actor.Kill; import akka.actor.Props; import akka.actor.RobustActorSystem; import akka.testkit.JavaTestKit; @@ -81,7 +80,7 @@ public void testLeaderSessionMessageFilteringOfFlinkUntypedActor() { assertEquals(3, underlyingActor.getMessageCounter()); } finally { - stopActor(actor); + ActorUtils.stopActor(actor); } } @@ -114,13 +113,7 @@ public void testThrowingExceptionWhenReceivingNonWrappedRequiresLeaderSessionIDM } } finally { - stopActor(actor); - } - } - - private static void stopActor(ActorRef actor) { - if (actor != null) { - actor.tell(Kill.getInstance(), ActorRef.noSender()); + ActorUtils.stopActor(actor); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java index a998fb0ac8c..eab262caafb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.akka; -import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; @@ -130,9 +129,9 @@ public void testWatcheeQuarantined() throws ExecutionException, InterruptedExcep Assert.assertEquals(actorSystem1Address.toString(), quarantineFuture.get()); } finally { - TestingUtils.stopActor(watchee); - TestingUtils.stopActor(watcher); - TestingUtils.stopActor(monitor); + ActorUtils.stopActor(watchee); + ActorUtils.stopActor(watcher); + ActorUtils.stopActor(monitor); } } @@ -171,9 +170,9 @@ public void testWatcherQuarantining() throws ExecutionException, InterruptedExce Assert.assertEquals(actorSystem1Address.toString(), quarantineFuture.get()); } finally { - TestingUtils.stopActor(watchee); - TestingUtils.stopActor(watcher); - TestingUtils.stopActor(monitor); + ActorUtils.stopActor(watchee); + ActorUtils.stopActor(watcher); + ActorUtils.stopActor(monitor); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java index 388572c70ef..de395238032 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java @@ -22,6 +22,7 @@ import akka.testkit.JavaTestKit; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.ActorUtils; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.messages.StopCluster; import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful; @@ -42,8 +43,6 @@ import scala.Option; -import java.util.Arrays; - /** * Runs tests to ensure that a cluster is shutdown properly. */ @@ -129,8 +128,7 @@ protected void run() { StopClusterSuccessful.getInstance() ); } finally { - TestingUtils.stopActorGatewaysGracefully(Arrays.asList( - jobManager, taskManager, forwardingActor)); + ActorUtils.stopActorsGracefully(jobManager, taskManager, forwardingActor); } }}; @@ -207,8 +205,7 @@ protected void run() { StopClusterSuccessful.getInstance() ); } finally { - TestingUtils.stopActorGatewaysGracefully(Arrays.asList( - jobManager, taskManager, resourceManager, forwardingActor)); + ActorUtils.stopActorsGracefully(jobManager, taskManager, resourceManager, forwardingActor); } }}; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java index d52afe4b4ea..7af84a4888d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java @@ -21,6 +21,7 @@ import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.ActorUtils; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -42,9 +43,6 @@ import scala.Option; - -import java.util.Arrays; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -152,8 +150,7 @@ protected void run() { assertEquals(1, reply.resources.size()); assertTrue(reply.resources.contains(resourceID)); } finally { - TestingUtils.stopActorGatewaysGracefully(Arrays.asList( - jobManager, resourceManager, forwardingActor)); + ActorUtils.stopActorsGracefully(jobManager, resourceManager, forwardingActor); } }}; @@ -215,8 +212,7 @@ protected void run() { assertEquals(1, reply.resources.size()); } finally { - TestingUtils.stopActorGatewaysGracefully(Arrays.asList( - jobManager, resourceManager, taskManager, forwardingActor)); + ActorUtils.stopActorsGracefully(jobManager, resourceManager, taskManager, forwardingActor); } }}; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index d991983e6f2..757110e6ee1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -26,6 +26,7 @@ import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.akka.ActorUtils; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.blob.BlobServer; @@ -415,7 +416,7 @@ public void testFailingJobRecovery() throws Exception { // verify that the JobManager terminated testProbe.expectTerminated(jobManager, timeout); } finally { - TestingUtils.stopActor(jobManager); + ActorUtils.stopActor(jobManager); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java index 3b502db0d7d..cf2167a3cda 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java @@ -20,6 +20,7 @@ import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.ActorUtils; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.VoidBlobStore; @@ -116,7 +117,7 @@ public void testLeaderElection() throws Exception { Await.ready(leaderFuture, duration); } finally { - TestingUtils.stopActor(jm); + ActorUtils.stopActor(jm); } } @@ -162,7 +163,7 @@ public void testLeaderReelection() throws Exception { Await.ready(leader2Future, duration); } finally { - TestingUtils.stopActor(jm2); + ActorUtils.stopActor(jm2); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java index 0e837a59c17..eebd25d5d47 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.akka.ActorUtils; import org.apache.flink.runtime.akka.AkkaJobManagerGateway; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobClient; @@ -277,8 +278,8 @@ protected void run() { } }; } finally { - TestingUtils.stopActor(jobManger); - TestingUtils.stopActor(taskManager); + ActorUtils.stopActor(jobManger); + ActorUtils.stopActor(taskManager); highAvailabilityServices.closeAndCleanupAllData(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java index ccf5f60a824..5e43095db77 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.akka.ActorUtils; import org.apache.flink.runtime.akka.AkkaJobManagerGateway; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobClient; @@ -193,8 +194,8 @@ protected void run() { } }; } finally { - TestingUtils.stopActor(jobManger); - TestingUtils.stopActor(taskManager); + ActorUtils.stopActor(jobManger); + ActorUtils.stopActor(taskManager); highAvailabilityServices.closeAndCleanupAllData(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java index 7ee0921646a..51529543617 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.akka.ActorUtils; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.clusterframework.FlinkResourceManager; @@ -60,15 +61,13 @@ import scala.concurrent.duration.FiniteDuration; import java.io.IOException; -import java.util.Arrays; import java.util.UUID; import java.util.concurrent.TimeUnit; -import static org.apache.flink.runtime.testingUtils.TestingUtils.stopActor; import static org.apache.flink.runtime.testingUtils.TestingUtils.createTaskManager; -import static org.apache.flink.runtime.testingUtils.TestingUtils.stopActorGatewaysGracefully; -import static org.apache.flink.runtime.testingUtils.TestingUtils.stopActorGracefully; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -189,7 +188,7 @@ public void testSimpleRegistration() throws Exception { e.printStackTrace(); fail(e.getMessage()); } finally { - stopActorGatewaysGracefully(Arrays.asList(taskManager1, taskManager2, jobManager, resourceManager)); + ActorUtils.stopActorsGracefully(taskManager1, taskManager2, jobManager, resourceManager); embeddedHaServices.closeAndCleanupAllData(); } @@ -240,7 +239,7 @@ public void testDelayedRegistration() throws Exception { assertTrue(response instanceof TaskManagerMessages.RegisteredAtJobManager); } finally { - stopActorGatewaysGracefully(Arrays.asList(taskManager, jobManager)); + ActorUtils.stopActorsGracefully(taskManager, jobManager); embeddedHaServices.closeAndCleanupAllData(); } @@ -301,7 +300,7 @@ protected void run() { e.printStackTrace(); fail(e.getMessage()); } finally { - stopActorGracefully(taskManager); + ActorUtils.stopActorsGracefully(taskManager); } }}; } @@ -371,7 +370,7 @@ protected void run() { } }; } finally { - stopActorGatewaysGracefully(Arrays.asList(taskManager, jm)); + ActorUtils.stopActorsGracefully(taskManager, jm); } }}; } @@ -469,7 +468,7 @@ protected RegisterTaskManager match(Object msg) throws Exception { + maxExpectedNumberOfRegisterTaskManagerMessages, registerTaskManagerMessages.length <= maxExpectedNumberOfRegisterTaskManagerMessages); } finally { - stopActorGatewaysGracefully(Arrays.asList(taskManager, jm)); + ActorUtils.stopActorsGracefully(taskManager, jm); } }}; } @@ -533,7 +532,7 @@ protected void run() { // kill the first forwarding JobManager watch(fakeJobManager1Gateway.actor()); - stopActor(fakeJobManager1Gateway.actor()); + ActorUtils.stopActorsGracefully(fakeJobManager1Gateway.actor()); final ActorGateway gateway = fakeJobManager1Gateway; @@ -598,7 +597,7 @@ protected void run() { e.printStackTrace(); fail(e.getMessage()); } finally { - stopActorGatewaysGracefully(Arrays.asList(taskManagerGateway, fakeJobManager2Gateway)); + ActorUtils.stopActorsGracefully(taskManagerGateway, fakeJobManager2Gateway); } }}; } @@ -675,7 +674,7 @@ protected void run() { } }; } finally { - stopActorGracefully(taskManagerGateway); + ActorUtils.stopActorsGracefully(taskManagerGateway); } }}; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 2da2ba49265..9d66cca6c82 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.akka.ActorUtils; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.FlinkUntypedActor; import org.apache.flink.runtime.blob.PermanentBlobKey; @@ -268,8 +269,8 @@ else if (!(message instanceof TaskManagerMessages.Heartbeat)) { } finally { // shut down the actors - TestingUtils.stopActor(taskManager); - TestingUtils.stopActor(jobManager); + ActorUtils.stopActor(taskManager); + ActorUtils.stopActor(jobManager); } }}; } @@ -409,8 +410,8 @@ protected void run() { fail(e.getMessage()); } finally { - TestingUtils.stopActor(taskManager); - TestingUtils.stopActor(jobManager); + ActorUtils.stopActor(taskManager); + ActorUtils.stopActor(jobManager); } }}; } @@ -539,8 +540,8 @@ protected void run() { }; } finally { - TestingUtils.stopActor(taskManager); - TestingUtils.stopActor(jobManager); + ActorUtils.stopActor(taskManager); + ActorUtils.stopActor(jobManager); } }}; } @@ -636,8 +637,8 @@ protected void run() { } finally { // shut down the actors - TestingUtils.stopActor(taskManager); - TestingUtils.stopActor(jobManager); + ActorUtils.stopActor(taskManager); + ActorUtils.stopActor(jobManager); } }}; } @@ -775,8 +776,8 @@ protected void run() { } finally { // shut down the actors - TestingUtils.stopActor(taskManager); - TestingUtils.stopActor(jobManager); + ActorUtils.stopActor(taskManager); + ActorUtils.stopActor(jobManager); } }}; } @@ -924,8 +925,8 @@ protected void run() { } finally { // shut down the actors - TestingUtils.stopActor(taskManager); - TestingUtils.stopActor(jobManager); + ActorUtils.stopActor(taskManager); + ActorUtils.stopActor(jobManager); } }}; } @@ -1027,8 +1028,8 @@ protected void run() { fail(e.getMessage()); } finally { - TestingUtils.stopActor(taskManager); - TestingUtils.stopActor(jobManager); + ActorUtils.stopActor(taskManager); + ActorUtils.stopActor(jobManager); } }}; } @@ -1148,8 +1149,8 @@ protected void run() { fail(e.getMessage()); } finally { - TestingUtils.stopActor(taskManager); - TestingUtils.stopActor(jobManager); + ActorUtils.stopActor(taskManager); + ActorUtils.stopActor(jobManager); } }}; } @@ -1206,8 +1207,8 @@ protected void run() { } }; } finally { - TestingUtils.stopActor(taskManager); - TestingUtils.stopActor(jobManager); + ActorUtils.stopActor(taskManager); + ActorUtils.stopActor(jobManager); } }};} @@ -1519,8 +1520,8 @@ protected void run() { } }; } finally { - TestingUtils.stopActor(taskManagerActorGateway); - TestingUtils.stopActor(jobManagerActorGateway); + ActorUtils.stopActor(taskManagerActorGateway); + ActorUtils.stopActor(jobManagerActorGateway); } }}; } @@ -1618,8 +1619,8 @@ public void testFailingScheduleOrUpdateConsumersMessage() throws Exception { assertEquals(true, cancelFuture.get()); } finally { - TestingUtils.stopActor(taskManager); - TestingUtils.stopActor(jobManager); + ActorUtils.stopActor(taskManager); + ActorUtils.stopActor(jobManager); } }}; } @@ -1679,8 +1680,8 @@ public void testSubmitTaskFailure() throws Exception { // expected } } finally { - TestingUtils.stopActor(jobManager); - TestingUtils.stopActor(taskManager); + ActorUtils.stopActor(jobManager); + ActorUtils.stopActor(taskManager); } } @@ -1748,8 +1749,8 @@ public void testStopTaskFailure() throws Exception { // expected } } finally { - TestingUtils.stopActor(jobManager); - TestingUtils.stopActor(taskManager); + ActorUtils.stopActor(jobManager); + ActorUtils.stopActor(taskManager); } } @@ -1795,8 +1796,8 @@ public void testStackTraceSampleFailure() throws Exception { // expected } } finally { - TestingUtils.stopActor(jobManager); - TestingUtils.stopActor(taskManager); + ActorUtils.stopActor(jobManager); + ActorUtils.stopActor(taskManager); } } @@ -1866,8 +1867,8 @@ public void testUpdateTaskInputPartitionsFailure() throws Exception { // expected } } finally { - TestingUtils.stopActor(jobManager); - TestingUtils.stopActor(taskManager); + ActorUtils.stopActor(jobManager); + ActorUtils.stopActor(taskManager); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java index 35fa7b82a02..9948704eafc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java @@ -19,12 +19,12 @@ package org.apache.flink.runtime.webmonitor.retriever.impl; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.akka.ActorUtils; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobClientActorTest; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; -import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.TestLogger; import akka.actor.ActorRef; @@ -100,7 +100,7 @@ public void testAkkaJobManagerRetrieval() throws Exception { settableLeaderRetrievalService.stop(); if (actorRef != null) { - TestingUtils.stopActorGracefully(actorRef); + ActorUtils.stopActorGracefully(actorRef); } } } diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/FlinkActorTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/FlinkActorTest.scala index acd48469c66..f4df1056b02 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/FlinkActorTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/FlinkActorTest.scala @@ -20,7 +20,7 @@ package org.apache.flink.runtime.akka import java.util.UUID -import akka.actor.{Props, Kill, ActorRef, ActorSystem} +import akka.actor.{Props, ActorSystem} import akka.testkit.{TestActorRef, TestKit} import grizzled.slf4j.Logger import org.apache.flink.runtime.akka.FlinkUntypedActorTest.PlainRequiresLeaderSessionID @@ -82,11 +82,6 @@ class FlinkActorTest(_system: ActorSystem) s"leader session ID, even though the message requires a leader session ID.") } } - - def stopActor(actor: ActorRef): Unit = { - actor ! Kill - } - } class PlainFlinkActor(val leaderSessionID: Option[UUID]) diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala index 887c4f5d377..54108153eda 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala @@ -24,7 +24,7 @@ import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} import akka.actor._ import akka.testkit.{ImplicitSender, TestKit, TestProbe} import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.akka.AkkaUtils +import org.apache.flink.runtime.akka.{ActorUtils, AkkaUtils} import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.types.ResourceID import org.apache.flink.runtime.highavailability.HighAvailabilityServices @@ -33,7 +33,7 @@ import org.apache.flink.runtime.instance._ import org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.PlainForwardingActor import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered, RegisterTaskManager} -import org.apache.flink.runtime.metrics.{MetricRegistryImpl, MetricRegistryConfiguration} +import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl} import org.apache.flink.runtime.taskmanager.TaskManagerLocation import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenLeader import org.apache.flink.runtime.testingUtils.{TestingJobManager, TestingUtils} @@ -150,7 +150,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with Befor val response = probe.expectMsgType[LeaderSessionMessage] response match { - case LeaderSessionMessage(leaderSessionID, AcknowledgeRegistration(id, _)) => id2 = id + case LeaderSessionMessage(_, AcknowledgeRegistration(id, _)) => id2 = id case _ => fail("Wrong response message: " + response) } } @@ -159,10 +159,10 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with Befor assertNotNull(id2) assertNotEquals(id1, id2) } finally { - jmOption.foreach(TestingUtils.stopActorGracefully) - rmOption.foreach(TestingUtils.stopActorGracefully) - tm1Option.foreach(TestingUtils.stopActorGracefully) - tm2Option.foreach(TestingUtils.stopActorGracefully) + jmOption.foreach(ActorUtils.stopActorGracefully) + rmOption.foreach(ActorUtils.stopActorGracefully) + tm1Option.foreach(ActorUtils.stopActorGracefully) + tm2Option.foreach(ActorUtils.stopActorGracefully) } } @@ -239,8 +239,8 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with Befor } } } finally { - jmOption.foreach(TestingUtils.stopActorGracefully) - rmOption.foreach(TestingUtils.stopActorGracefully) + jmOption.foreach(ActorUtils.stopActorGracefully) + rmOption.foreach(ActorUtils.stopActorGracefully) } } } @@ -277,8 +277,8 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with Befor components._6, highAvailabilityServices.getJobManagerLeaderElectionService( HighAvailabilityServices.DEFAULT_JOB_ID), - highAvailabilityServices.getSubmittedJobGraphStore(), - highAvailabilityServices.getCheckpointRecoveryFactory(), + highAvailabilityServices.getSubmittedJobGraphStore, + highAvailabilityServices.getCheckpointRecoveryFactory, components._9, components._10, None) diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 9d791a90f9d..8eae515ac99 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -22,8 +22,8 @@ import java.util import java.util.concurrent._ import java.util.{Collections, UUID} -import akka.actor.{ActorRef, ActorSystem, Kill, Props} -import akka.pattern.{Patterns, ask} +import akka.actor.{ActorRef, ActorSystem, Props} +import akka.pattern.ask import com.typesafe.config.ConfigFactory import grizzled.slf4j.Logger import org.apache.flink.api.common.time.Time @@ -295,76 +295,6 @@ object TestingUtils { new AkkaActorGateway(taskManager, leaderId) } - /** Stops the given actor by sending it a Kill message - * - * @param actor - */ - def stopActor(actor: ActorRef): Unit = { - if (actor != null) { - actor ! Kill - } - } - - /** Stops the given actor by sending it a Kill message - * - * @param actorGateway - */ - def stopActor(actorGateway: ActorGateway): Unit = { - if (actorGateway != null) { - stopActor(actorGateway.actor()) - } - } - - def stopActorGracefully(actor: ActorRef): Unit = { - val gracefulStopFuture = Patterns.gracefulStop(actor, TestingUtils.TESTING_TIMEOUT) - - Await.result(gracefulStopFuture, TestingUtils.TESTING_TIMEOUT) - } - - def stopActorGracefully(actorGateway: ActorGateway): Unit = { - stopActorGracefully(actorGateway.actor()) - } - - def stopActorsGracefully(actors: ActorRef*): Unit = { - val gracefulStopFutures = actors.flatMap{ - actor => - Option(actor) match { - case Some(actorRef) => Some(Patterns.gracefulStop(actorRef, TestingUtils.TESTING_TIMEOUT)) - case None => None - } - } - - implicit val executionContext = defaultExecutionContext - - val globalStopFuture = scala.concurrent.Future.sequence(gracefulStopFutures) - - Await.result(globalStopFuture, TestingUtils.TESTING_TIMEOUT) - } - - def stopActorsGracefully(actors: java.util.List[ActorRef]): Unit = { - import scala.collection.JavaConverters._ - - stopActorsGracefully(actors.asScala: _*) - } - - def stopActorGatewaysGracefully(actorGateways: ActorGateway*): Unit = { - val actors = actorGateways.flatMap { - actorGateway => - Option(actorGateway) match { - case Some(actorGateway) => Some(actorGateway.actor()) - case None => None - } - } - - stopActorsGracefully(actors: _*) - } - - def stopActorGatewaysGracefully(actorGateways: java.util.List[ActorGateway]): Unit = { - import scala.collection.JavaConverters._ - - stopActorGatewaysGracefully(actorGateways.asScala: _*) - } - /** Creates a testing JobManager using the default recovery mode (standalone) * * @param actorSystem The ActorSystem to use diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala index 5c9b1fb0c23..9c314d042a0 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala @@ -21,7 +21,7 @@ package org.apache.flink.api.scala.runtime.jobmanager import akka.actor.ActorSystem import akka.testkit.{ImplicitSender, TestKit} import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions, TaskManagerOptions} -import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} +import org.apache.flink.runtime.akka.{ActorUtils, AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex} import org.apache.flink.runtime.messages.Acknowledge import org.apache.flink.runtime.messages.JobManagerMessages._ @@ -55,7 +55,7 @@ class JobManagerFailsITCase(_system: ActorSystem) val cluster = startDeathwatchCluster(num_slots, 1) try { - val tm = cluster.getTaskManagers(0) + val tm = cluster.getTaskManagers.head val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION) // disable disconnect message to test death watch @@ -66,7 +66,7 @@ class JobManagerFailsITCase(_system: ActorSystem) expectMsg(1) // stop the current leader and make sure that he is gone - TestingUtils.stopActorGracefully(jmGateway) + ActorUtils.stopActorGracefully(jmGateway) cluster.restartLeadingJobManager() @@ -99,14 +99,13 @@ class JobManagerFailsITCase(_system: ActorSystem) try { var jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION) - val tm = cluster.getTaskManagers(0) within(TestingUtils.TESTING_DURATION) { jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), self) expectMsg(JobSubmitSuccess(jobGraph.getJobID)) // stop the current leader and make sure that he is gone - TestingUtils.stopActorGracefully(jmGateway) + ActorUtils.stopActorGracefully(jmGateway) cluster.restartLeadingJobManager() @@ -119,11 +118,11 @@ class JobManagerFailsITCase(_system: ActorSystem) jmGateway.tell(SubmitJob(jobGraph2, ListeningBehaviour.EXECUTION_RESULT), self) - expectMsg(JobSubmitSuccess(jobGraph2.getJobID())) + expectMsg(JobSubmitSuccess(jobGraph2.getJobID)) val result = expectMsgType[JobResultSuccess] - result.result.getJobId() should equal(jobGraph2.getJobID) + result.result.getJobId should equal(jobGraph2.getJobID) } } finally { cluster.stop() ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Unify stopActor utils > --------------------- > > Key: FLINK-10349 > URL: https://issues.apache.org/jira/browse/FLINK-10349 > Project: Flink > Issue Type: Improvement > Components: Core > Affects Versions: 1.7.0 > Reporter: 陈梓立 > Assignee: 陈梓立 > Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)