[ 
https://issues.apache.org/jira/browse/FLINK-10349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620028#comment-16620028
 ] 

ASF GitHub Bot commented on FLINK-10349:
----------------------------------------

TisonKun closed pull request #6701: [FLINK-10349] Unify stopActor utils
URL: https://github.com/apache/flink/pull/6701
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index 2b8abb1f8ff..85952da64ea 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -22,6 +22,7 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.akka.ActorUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClientActorTest;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -29,7 +30,6 @@
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -148,7 +148,7 @@ public void testJobManagerRetrievalWithHAServices() throws 
Exception {
                        assertEquals(leaderId, gateway.leaderSessionID());
                } finally {
                        if (actorRef != null) {
-                               TestingUtils.stopActorGracefully(actorRef);
+                               ActorUtils.stopActorGracefully(actorRef);
                        }
 
                        actorSystem.shutdown();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
index 9a992814235..5f976e701ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
@@ -19,17 +19,19 @@
 package org.apache.flink.runtime.akka;
 
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.instance.ActorGateway;
 
 import akka.actor.ActorRef;
 import akka.actor.Kill;
-import akka.actor.PoisonPill;
 import akka.pattern.Patterns;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
@@ -87,12 +89,52 @@
                return FutureUtils.completeAll(terminationFutures);
        }
 
-       public static void stopActor(AkkaActorGateway akkaActorGateway) {
-               stopActor(akkaActorGateway.actor());
-       }
+       // ---------- Utils to stop an actor ----------
+
+       private static final FiniteDuration DEFAULT_TIMEOUT = 
FiniteDuration.apply(1, TimeUnit.MINUTES);
 
        public static void stopActor(ActorRef actorRef) {
-               actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
+               if (actorRef != null) {
+                       actorRef.tell(Kill.getInstance(), ActorRef.noSender());
+               }
+       }
+
+       public static void stopActor(ActorGateway actorGateway) {
+               if (actorGateway != null) {
+                       stopActor(actorGateway.actor());
+               }
+       }
+
+       public static void stopActorGracefully(ActorRef actorRef) {
+               stopActorsGracefully(actorRef);
+       }
+
+       public static void stopActorGracefully(ActorGateway actorGateway) {
+               stopActorGracefully(actorGateway.actor());
+       }
+
+       public static void stopActorsGracefully(@Nonnull ActorRef... actorRefs) 
{
+               List<CompletableFuture<?>> futures = new 
ArrayList<>(actorRefs.length);
+
+               for (ActorRef actorRef : actorRefs) {
+                       if (actorRef != null) {
+                               
futures.add(FutureUtils.toJava(Patterns.gracefulStop(actorRef, 
DEFAULT_TIMEOUT)));
+                       }
+               }
+
+               FutureUtils.waitForAll(futures);
+       }
+
+       public static void stopActorsGracefully(@Nonnull ActorGateway... 
actorGateways) {
+               List<ActorRef> actorRefs = new 
ArrayList<>(actorGateways.length);
+
+               for (ActorGateway actorGateway : actorGateways) {
+                       if (actorGateway != null) {
+                               actorRefs.add(actorGateway.actor());
+                       }
+               }
+
+               stopActorsGracefully(actorRefs.toArray(new ActorRef[0]));
        }
 
        private ActorUtils() {}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
index 2d39e0b972d..9f4a3b2c0a3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
@@ -25,7 +25,6 @@
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import akka.actor.Kill;
 import akka.actor.Props;
 import akka.actor.RobustActorSystem;
 import akka.testkit.JavaTestKit;
@@ -81,7 +80,7 @@ public void 
testLeaderSessionMessageFilteringOfFlinkUntypedActor() {
                        assertEquals(3, underlyingActor.getMessageCounter());
 
                } finally {
-                       stopActor(actor);
+                       ActorUtils.stopActor(actor);
                }
        }
 
@@ -114,13 +113,7 @@ public void 
testThrowingExceptionWhenReceivingNonWrappedRequiresLeaderSessionIDM
                        }
 
                } finally {
-                       stopActor(actor);
-               }
-       }
-
-       private static void stopActor(ActorRef actor) {
-               if (actor != null) {
-                       actor.tell(Kill.getInstance(), ActorRef.noSender());
+                       ActorUtils.stopActor(actor);
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
index a998fb0ac8c..eab262caafb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.akka;
 
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
@@ -130,9 +129,9 @@ public void testWatcheeQuarantined() throws 
ExecutionException, InterruptedExcep
 
                        Assert.assertEquals(actorSystem1Address.toString(), 
quarantineFuture.get());
                } finally {
-                       TestingUtils.stopActor(watchee);
-                       TestingUtils.stopActor(watcher);
-                       TestingUtils.stopActor(monitor);
+                       ActorUtils.stopActor(watchee);
+                       ActorUtils.stopActor(watcher);
+                       ActorUtils.stopActor(monitor);
                }
        }
 
@@ -171,9 +170,9 @@ public void testWatcherQuarantining() throws 
ExecutionException, InterruptedExce
 
                        Assert.assertEquals(actorSystem1Address.toString(), 
quarantineFuture.get());
                } finally {
-                       TestingUtils.stopActor(watchee);
-                       TestingUtils.stopActor(watcher);
-                       TestingUtils.stopActor(monitor);
+                       ActorUtils.stopActor(watchee);
+                       ActorUtils.stopActor(watcher);
+                       ActorUtils.stopActor(monitor);
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
index 388572c70ef..de395238032 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.java
@@ -22,6 +22,7 @@
 import akka.testkit.JavaTestKit;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.ActorUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.messages.StopCluster;
 import 
org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
@@ -42,8 +43,6 @@
 
 import scala.Option;
 
-import java.util.Arrays;
-
 /**
  * Runs tests to ensure that a cluster is shutdown properly.
  */
@@ -129,8 +128,7 @@ protected void run() {
                                        StopClusterSuccessful.getInstance()
                                );
                        } finally {
-                               
TestingUtils.stopActorGatewaysGracefully(Arrays.asList(
-                                       jobManager, taskManager, 
forwardingActor));
+                               ActorUtils.stopActorsGracefully(jobManager, 
taskManager, forwardingActor);
                        }
 
                }};
@@ -207,8 +205,7 @@ protected void run() {
                                        StopClusterSuccessful.getInstance()
                                );
                        } finally {
-                               
TestingUtils.stopActorGatewaysGracefully(Arrays.asList(
-                                       jobManager, taskManager, 
resourceManager, forwardingActor));
+                               ActorUtils.stopActorsGracefully(jobManager, 
taskManager, resourceManager, forwardingActor);
                        }
 
                }};
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
index d52afe4b4ea..7af84a4888d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerITCase.java
@@ -21,6 +21,7 @@
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.ActorUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -42,9 +43,6 @@
 
 import scala.Option;
 
-
-import java.util.Arrays;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -152,8 +150,7 @@ protected void run() {
                                assertEquals(1, reply.resources.size());
                                
assertTrue(reply.resources.contains(resourceID));
                        } finally {
-                               
TestingUtils.stopActorGatewaysGracefully(Arrays.asList(
-                                       jobManager, resourceManager, 
forwardingActor));
+                               ActorUtils.stopActorsGracefully(jobManager, 
resourceManager, forwardingActor);
                        }
 
                }};
@@ -215,8 +212,7 @@ protected void run() {
 
                                assertEquals(1, reply.resources.size());
                        } finally {
-                               
TestingUtils.stopActorGatewaysGracefully(Arrays.asList(
-                                       jobManager, resourceManager, 
taskManager, forwardingActor));
+                               ActorUtils.stopActorsGracefully(jobManager, 
resourceManager, taskManager, forwardingActor);
                        }
 
                }};
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index d991983e6f2..757110e6ee1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -26,6 +26,7 @@
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.akka.ActorUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobServer;
@@ -415,7 +416,7 @@ public void testFailingJobRecovery() throws Exception {
                        // verify that the JobManager terminated
                        testProbe.expectTerminated(jobManager, timeout);
                } finally {
-                       TestingUtils.stopActor(jobManager);
+                       ActorUtils.stopActor(jobManager);
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index 3b502db0d7d..cf2167a3cda 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.ActorUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
@@ -116,7 +117,7 @@ public void testLeaderElection() throws Exception {
 
                        Await.ready(leaderFuture, duration);
                } finally {
-                       TestingUtils.stopActor(jm);
+                       ActorUtils.stopActor(jm);
                }
 
        }
@@ -162,7 +163,7 @@ public void testLeaderReelection() throws Exception {
 
                        Await.ready(leader2Future, duration);
                } finally {
-                       TestingUtils.stopActor(jm2);
+                       ActorUtils.stopActor(jm2);
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
index 0e837a59c17..eebd25d5d47 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
@@ -21,6 +21,7 @@
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.akka.ActorUtils;
 import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
@@ -277,8 +278,8 @@ protected void run() {
                                        }
                                };
                        } finally {
-                               TestingUtils.stopActor(jobManger);
-                               TestingUtils.stopActor(taskManager);
+                               ActorUtils.stopActor(jobManger);
+                               ActorUtils.stopActor(taskManager);
 
                                
highAvailabilityServices.closeAndCleanupAllData();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
index ccf5f60a824..5e43095db77 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
@@ -21,6 +21,7 @@
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.akka.ActorUtils;
 import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
@@ -193,8 +194,8 @@ protected void run() {
                                        }
                                };
                        } finally {
-                               TestingUtils.stopActor(jobManger);
-                               TestingUtils.stopActor(taskManager);
+                               ActorUtils.stopActor(jobManger);
+                               ActorUtils.stopActor(taskManager);
 
                                
highAvailabilityServices.closeAndCleanupAllData();
                        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 7ee0921646a..51529543617 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -26,6 +26,7 @@
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.akka.ActorUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
@@ -60,15 +61,13 @@
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.flink.runtime.testingUtils.TestingUtils.stopActor;
 import static 
org.apache.flink.runtime.testingUtils.TestingUtils.createTaskManager;
-import static 
org.apache.flink.runtime.testingUtils.TestingUtils.stopActorGatewaysGracefully;
-import static 
org.apache.flink.runtime.testingUtils.TestingUtils.stopActorGracefully;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -189,7 +188,7 @@ public void testSimpleRegistration() throws Exception {
                                e.printStackTrace();
                                fail(e.getMessage());
                        } finally {
-                               
stopActorGatewaysGracefully(Arrays.asList(taskManager1, taskManager2, 
jobManager, resourceManager));
+                               ActorUtils.stopActorsGracefully(taskManager1, 
taskManager2, jobManager, resourceManager);
 
                                embeddedHaServices.closeAndCleanupAllData();
                        }
@@ -240,7 +239,7 @@ public void testDelayedRegistration() throws Exception {
 
                                assertTrue(response instanceof 
TaskManagerMessages.RegisteredAtJobManager);
                        } finally {
-                               
stopActorGatewaysGracefully(Arrays.asList(taskManager, jobManager));
+                               ActorUtils.stopActorsGracefully(taskManager, 
jobManager);
 
                                embeddedHaServices.closeAndCleanupAllData();
                        }
@@ -301,7 +300,7 @@ protected void run() {
                                e.printStackTrace();
                                fail(e.getMessage());
                        } finally {
-                               stopActorGracefully(taskManager);
+                               ActorUtils.stopActorsGracefully(taskManager);
                        }
                }};
        }
@@ -371,7 +370,7 @@ protected void run() {
                                        }
                                };
                        } finally {
-                               
stopActorGatewaysGracefully(Arrays.asList(taskManager, jm));
+                               ActorUtils.stopActorsGracefully(taskManager, 
jm);
                        }
                }};
        }
@@ -469,7 +468,7 @@ protected RegisterTaskManager match(Object msg) throws 
Exception {
                                        + 
maxExpectedNumberOfRegisterTaskManagerMessages,
                                        registerTaskManagerMessages.length <= 
maxExpectedNumberOfRegisterTaskManagerMessages);
                        } finally {
-                               
stopActorGatewaysGracefully(Arrays.asList(taskManager, jm));
+                               ActorUtils.stopActorsGracefully(taskManager, 
jm);
                        }
                }};
        }
@@ -533,7 +532,7 @@ protected void run() {
 
                                // kill the first forwarding JobManager
                                watch(fakeJobManager1Gateway.actor());
-                               stopActor(fakeJobManager1Gateway.actor());
+                               
ActorUtils.stopActorsGracefully(fakeJobManager1Gateway.actor());
 
                                final ActorGateway gateway = 
fakeJobManager1Gateway;
 
@@ -598,7 +597,7 @@ protected void run() {
                                e.printStackTrace();
                                fail(e.getMessage());
                        } finally {
-                               
stopActorGatewaysGracefully(Arrays.asList(taskManagerGateway, 
fakeJobManager2Gateway));
+                               
ActorUtils.stopActorsGracefully(taskManagerGateway, fakeJobManager2Gateway);
                        }
                }};
        }
@@ -675,7 +674,7 @@ protected void run() {
                                        }
                                };
                        } finally {
-                               stopActorGracefully(taskManagerGateway);
+                               
ActorUtils.stopActorsGracefully(taskManagerGateway);
                        }
                }};
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 2da2ba49265..9d66cca6c82 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -24,6 +24,7 @@
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.akka.ActorUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
@@ -268,8 +269,8 @@ else if (!(message instanceof 
TaskManagerMessages.Heartbeat)) {
                        }
                        finally {
                                // shut down the actors
-                               TestingUtils.stopActor(taskManager);
-                               TestingUtils.stopActor(jobManager);
+                               ActorUtils.stopActor(taskManager);
+                               ActorUtils.stopActor(jobManager);
                        }
                }};
        }
@@ -409,8 +410,8 @@ protected void run() {
                                fail(e.getMessage());
                        }
                        finally {
-                               TestingUtils.stopActor(taskManager);
-                               TestingUtils.stopActor(jobManager);
+                               ActorUtils.stopActor(taskManager);
+                               ActorUtils.stopActor(jobManager);
                        }
                }};
        }
@@ -539,8 +540,8 @@ protected void run() {
                                };
                        }
                        finally {
-                               TestingUtils.stopActor(taskManager);
-                               TestingUtils.stopActor(jobManager);
+                               ActorUtils.stopActor(taskManager);
+                               ActorUtils.stopActor(jobManager);
                        }
                }};
        }
@@ -636,8 +637,8 @@ protected void run() {
                        }
                        finally {
                                // shut down the actors
-                               TestingUtils.stopActor(taskManager);
-                               TestingUtils.stopActor(jobManager);
+                               ActorUtils.stopActor(taskManager);
+                               ActorUtils.stopActor(jobManager);
                        }
                }};
        }
@@ -775,8 +776,8 @@ protected void run() {
                        }
                        finally {
                                // shut down the actors
-                               TestingUtils.stopActor(taskManager);
-                               TestingUtils.stopActor(jobManager);
+                               ActorUtils.stopActor(taskManager);
+                               ActorUtils.stopActor(jobManager);
                        }
                }};
        }
@@ -924,8 +925,8 @@ protected void run() {
                        }
                        finally {
                                // shut down the actors
-                               TestingUtils.stopActor(taskManager);
-                               TestingUtils.stopActor(jobManager);
+                               ActorUtils.stopActor(taskManager);
+                               ActorUtils.stopActor(jobManager);
                        }
                }};
        }
@@ -1027,8 +1028,8 @@ protected void run() {
                                fail(e.getMessage());
                        }
                        finally {
-                               TestingUtils.stopActor(taskManager);
-                               TestingUtils.stopActor(jobManager);
+                               ActorUtils.stopActor(taskManager);
+                               ActorUtils.stopActor(jobManager);
                        }
                }};
        }
@@ -1148,8 +1149,8 @@ protected void run() {
                                fail(e.getMessage());
                        }
                        finally {
-                               TestingUtils.stopActor(taskManager);
-                               TestingUtils.stopActor(jobManager);
+                               ActorUtils.stopActor(taskManager);
+                               ActorUtils.stopActor(jobManager);
                        }
                }};
        }
@@ -1206,8 +1207,8 @@ protected void run() {
                                        }
                                };
                        } finally {
-                               TestingUtils.stopActor(taskManager);
-                               TestingUtils.stopActor(jobManager);
+                               ActorUtils.stopActor(taskManager);
+                               ActorUtils.stopActor(jobManager);
                        }
                }};}
 
@@ -1519,8 +1520,8 @@ protected void run() {
                                        }
                                };
                        } finally {
-                               TestingUtils.stopActor(taskManagerActorGateway);
-                               TestingUtils.stopActor(jobManagerActorGateway);
+                               ActorUtils.stopActor(taskManagerActorGateway);
+                               ActorUtils.stopActor(jobManagerActorGateway);
                        }
                }};
        }
@@ -1618,8 +1619,8 @@ public void testFailingScheduleOrUpdateConsumersMessage() 
throws Exception {
 
                                assertEquals(true, cancelFuture.get());
                        } finally {
-                               TestingUtils.stopActor(taskManager);
-                               TestingUtils.stopActor(jobManager);
+                               ActorUtils.stopActor(taskManager);
+                               ActorUtils.stopActor(jobManager);
                        }
                }};
        }
@@ -1679,8 +1680,8 @@ public void testSubmitTaskFailure() throws Exception {
                                // expected
                        }
                } finally {
-                       TestingUtils.stopActor(jobManager);
-                       TestingUtils.stopActor(taskManager);
+                       ActorUtils.stopActor(jobManager);
+                       ActorUtils.stopActor(taskManager);
                }
        }
 
@@ -1748,8 +1749,8 @@ public void testStopTaskFailure() throws Exception {
                                // expected
                        }
                } finally {
-                       TestingUtils.stopActor(jobManager);
-                       TestingUtils.stopActor(taskManager);
+                       ActorUtils.stopActor(jobManager);
+                       ActorUtils.stopActor(taskManager);
                }
        }
 
@@ -1795,8 +1796,8 @@ public void testStackTraceSampleFailure() throws 
Exception {
                                // expected
                        }
                } finally {
-                       TestingUtils.stopActor(jobManager);
-                       TestingUtils.stopActor(taskManager);
+                       ActorUtils.stopActor(jobManager);
+                       ActorUtils.stopActor(taskManager);
                }
        }
 
@@ -1866,8 +1867,8 @@ public void testUpdateTaskInputPartitionsFailure() throws 
Exception {
                                // expected
                        }
                } finally {
-                       TestingUtils.stopActor(jobManager);
-                       TestingUtils.stopActor(taskManager);
+                       ActorUtils.stopActor(jobManager);
+                       ActorUtils.stopActor(taskManager);
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
index 35fa7b82a02..9948704eafc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
@@ -19,12 +19,12 @@
 package org.apache.flink.runtime.webmonitor.retriever.impl;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.ActorUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClientActorTest;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
 
 import akka.actor.ActorRef;
@@ -100,7 +100,7 @@ public void testAkkaJobManagerRetrieval() throws Exception {
                        settableLeaderRetrievalService.stop();
 
                        if (actorRef != null) {
-                               TestingUtils.stopActorGracefully(actorRef);
+                               ActorUtils.stopActorGracefully(actorRef);
                        }
                }
        }
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/FlinkActorTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/FlinkActorTest.scala
index acd48469c66..f4df1056b02 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/FlinkActorTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/FlinkActorTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.akka
 
 import java.util.UUID
 
-import akka.actor.{Props, Kill, ActorRef, ActorSystem}
+import akka.actor.{Props, ActorSystem}
 import akka.testkit.{TestActorRef, TestKit}
 import grizzled.slf4j.Logger
 import 
org.apache.flink.runtime.akka.FlinkUntypedActorTest.PlainRequiresLeaderSessionID
@@ -82,11 +82,6 @@ class FlinkActorTest(_system: ActorSystem)
           s"leader session ID, even though the message requires a leader 
session ID.")
     }
   }
-
-  def stopActor(actor: ActorRef): Unit = {
-    actor ! Kill
-  }
-
 }
 
 class PlainFlinkActor(val leaderSessionID: Option[UUID])
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 887c4f5d377..54108153eda 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.{Executors, 
ScheduledExecutorService, TimeUnit}
 import akka.actor._
 import akka.testkit.{ImplicitSender, TestKit, TestProbe}
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.akka.{ActorUtils, AkkaUtils}
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.instance._
 import 
org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.PlainForwardingActor
 import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
 import 
org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration,
 AlreadyRegistered, RegisterTaskManager}
-import org.apache.flink.runtime.metrics.{MetricRegistryImpl, 
MetricRegistryConfiguration}
+import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, 
MetricRegistryImpl}
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenLeader
 import org.apache.flink.runtime.testingUtils.{TestingJobManager, TestingUtils}
@@ -150,7 +150,7 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll with Befor
 
           val response = probe.expectMsgType[LeaderSessionMessage]
           response match {
-            case LeaderSessionMessage(leaderSessionID, 
AcknowledgeRegistration(id, _)) => id2 = id
+            case LeaderSessionMessage(_, AcknowledgeRegistration(id, _)) => 
id2 = id
             case _ => fail("Wrong response message: " + response)
           }
         }
@@ -159,10 +159,10 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll with Befor
         assertNotNull(id2)
         assertNotEquals(id1, id2)
       } finally {
-        jmOption.foreach(TestingUtils.stopActorGracefully)
-        rmOption.foreach(TestingUtils.stopActorGracefully)
-        tm1Option.foreach(TestingUtils.stopActorGracefully)
-        tm2Option.foreach(TestingUtils.stopActorGracefully)
+        jmOption.foreach(ActorUtils.stopActorGracefully)
+        rmOption.foreach(ActorUtils.stopActorGracefully)
+        tm1Option.foreach(ActorUtils.stopActorGracefully)
+        tm2Option.foreach(ActorUtils.stopActorGracefully)
       }
     }
 
@@ -239,8 +239,8 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll with Befor
           }
         }
       } finally {
-        jmOption.foreach(TestingUtils.stopActorGracefully)
-        rmOption.foreach(TestingUtils.stopActorGracefully)
+        jmOption.foreach(ActorUtils.stopActorGracefully)
+        rmOption.foreach(ActorUtils.stopActorGracefully)
       }
     }
   }
@@ -277,8 +277,8 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll with Befor
       components._6,
       highAvailabilityServices.getJobManagerLeaderElectionService(
         HighAvailabilityServices.DEFAULT_JOB_ID),
-      highAvailabilityServices.getSubmittedJobGraphStore(),
-      highAvailabilityServices.getCheckpointRecoveryFactory(),
+      highAvailabilityServices.getSubmittedJobGraphStore,
+      highAvailabilityServices.getCheckpointRecoveryFactory,
       components._9,
       components._10,
       None)
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 9d791a90f9d..8eae515ac99 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -22,8 +22,8 @@ import java.util
 import java.util.concurrent._
 import java.util.{Collections, UUID}
 
-import akka.actor.{ActorRef, ActorSystem, Kill, Props}
-import akka.pattern.{Patterns, ask}
+import akka.actor.{ActorRef, ActorSystem, Props}
+import akka.pattern.ask
 import com.typesafe.config.ConfigFactory
 import grizzled.slf4j.Logger
 import org.apache.flink.api.common.time.Time
@@ -295,76 +295,6 @@ object TestingUtils {
     new AkkaActorGateway(taskManager, leaderId)
   }
 
-  /** Stops the given actor by sending it a Kill message
-    *
-    * @param actor
-    */
-  def stopActor(actor: ActorRef): Unit = {
-    if (actor != null) {
-      actor ! Kill
-    }
-  }
-
-  /** Stops the given actor by sending it a Kill message
-    *
-    * @param actorGateway
-    */
-  def stopActor(actorGateway: ActorGateway): Unit = {
-    if (actorGateway != null) {
-      stopActor(actorGateway.actor())
-    }
-  }
-
-  def stopActorGracefully(actor: ActorRef): Unit = {
-    val gracefulStopFuture = Patterns.gracefulStop(actor, 
TestingUtils.TESTING_TIMEOUT)
-
-    Await.result(gracefulStopFuture, TestingUtils.TESTING_TIMEOUT)
-  }
-
-  def stopActorGracefully(actorGateway: ActorGateway): Unit = {
-    stopActorGracefully(actorGateway.actor())
-  }
-
-  def stopActorsGracefully(actors: ActorRef*): Unit = {
-    val gracefulStopFutures = actors.flatMap{
-      actor =>
-        Option(actor) match {
-          case Some(actorRef) => Some(Patterns.gracefulStop(actorRef, 
TestingUtils.TESTING_TIMEOUT))
-          case None => None
-        }
-    }
-
-    implicit val executionContext = defaultExecutionContext
-
-    val globalStopFuture = 
scala.concurrent.Future.sequence(gracefulStopFutures)
-
-    Await.result(globalStopFuture, TestingUtils.TESTING_TIMEOUT)
-  }
-
-  def stopActorsGracefully(actors: java.util.List[ActorRef]): Unit = {
-    import scala.collection.JavaConverters._
-
-    stopActorsGracefully(actors.asScala: _*)
-  }
-
-  def stopActorGatewaysGracefully(actorGateways: ActorGateway*): Unit = {
-    val actors = actorGateways.flatMap {
-      actorGateway =>
-        Option(actorGateway) match {
-          case Some(actorGateway) => Some(actorGateway.actor())
-          case None => None
-        }
-    }
-
-    stopActorsGracefully(actors: _*)
-  }
-
-  def stopActorGatewaysGracefully(actorGateways: 
java.util.List[ActorGateway]): Unit = {
-    import scala.collection.JavaConverters._
-
-    stopActorGatewaysGracefully(actorGateways.asScala: _*)
-  }
-
   /** Creates a testing JobManager using the default recovery mode (standalone)
     *
     * @param actorSystem The ActorSystem to use
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index 5c9b1fb0c23..9c314d042a0 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.scala.runtime.jobmanager
 import akka.actor.ActorSystem
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.configuration.{ConfigConstants, Configuration, 
JobManagerOptions, TaskManagerOptions}
-import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
+import org.apache.flink.runtime.akka.{ActorUtils, AkkaUtils, 
ListeningBehaviour}
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex}
 import org.apache.flink.runtime.messages.Acknowledge
 import org.apache.flink.runtime.messages.JobManagerMessages._
@@ -55,7 +55,7 @@ class JobManagerFailsITCase(_system: ActorSystem)
       val cluster = startDeathwatchCluster(num_slots, 1)
 
       try {
-        val tm = cluster.getTaskManagers(0)
+        val tm = cluster.getTaskManagers.head
         val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
 
         // disable disconnect message to test death watch
@@ -66,7 +66,7 @@ class JobManagerFailsITCase(_system: ActorSystem)
           expectMsg(1)
 
           // stop the current leader and make sure that he is gone
-          TestingUtils.stopActorGracefully(jmGateway)
+          ActorUtils.stopActorGracefully(jmGateway)
 
           cluster.restartLeadingJobManager()
 
@@ -99,14 +99,13 @@ class JobManagerFailsITCase(_system: ActorSystem)
 
       try {
         var jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
-        val tm = cluster.getTaskManagers(0)
 
         within(TestingUtils.TESTING_DURATION) {
           jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), 
self)
           expectMsg(JobSubmitSuccess(jobGraph.getJobID))
 
           // stop the current leader and make sure that he is gone
-          TestingUtils.stopActorGracefully(jmGateway)
+          ActorUtils.stopActorGracefully(jmGateway)
 
           cluster.restartLeadingJobManager()
 
@@ -119,11 +118,11 @@ class JobManagerFailsITCase(_system: ActorSystem)
 
           jmGateway.tell(SubmitJob(jobGraph2, 
ListeningBehaviour.EXECUTION_RESULT), self)
 
-          expectMsg(JobSubmitSuccess(jobGraph2.getJobID()))
+          expectMsg(JobSubmitSuccess(jobGraph2.getJobID))
 
           val result = expectMsgType[JobResultSuccess]
 
-          result.result.getJobId() should equal(jobGraph2.getJobID)
+          result.result.getJobId should equal(jobGraph2.getJobID)
         }
       } finally {
         cluster.stop()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Unify stopActor utils
> ---------------------
>
>                 Key: FLINK-10349
>                 URL: https://issues.apache.org/jira/browse/FLINK-10349
>             Project: Flink
>          Issue Type: Improvement
>          Components: Core
>    Affects Versions: 1.7.0
>            Reporter: 陈梓立
>            Assignee: 陈梓立
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to