[ https://issues.apache.org/jira/browse/FLINK-4384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15418614#comment-15418614 ]
ASF GitHub Bot commented on FLINK-4384: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2360#discussion_r74568505 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import akka.actor.ActorSystem; +import akka.util.Timeout; + +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcMethod; +import org.apache.flink.runtime.rpc.RpcService; + +import org.junit.AfterClass; +import org.junit.Test; + +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +public class AsyncCallsTest { + + // ------------------------------------------------------------------------ + // shared test members + // ------------------------------------------------------------------------ + + private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); + + private static AkkaRpcService akkaRpcService = + new AkkaRpcService(actorSystem, new Timeout(10000, TimeUnit.MILLISECONDS)); + + @AfterClass + public static void shutdown() { + akkaRpcService.stopService(); + actorSystem.shutdown(); + } + + + // ------------------------------------------------------------------------ + // tests + // ------------------------------------------------------------------------ + + @Test + public void testScheduleWithNoDelay() throws Exception { + + // to collect all the thread references + final BlockingQueue<Thread> queue = new LinkedBlockingQueue<>(); + + TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, queue); + TestGateway gateway = testEndpoint.getSelf(); + + // a bunch of gateway calls + gateway.someCall(); + gateway.anotherCall(); + gateway.someCall(); + + // run something asynchronously + testEndpoint.runAsync(new Runnable() { + @Override + public void run() { + queue.add(Thread.currentThread()); + } + }); + + Future<String> result = testEndpoint.callAsync(new Callable<String>() { + @Override + public String call() throws Exception { + return "test"; --- End diff -- ah, true, this should queue its current thread. Will fix that. > Add a "scheduleRunAsync()" feature to the RpcEndpoint > ----------------------------------------------------- > > Key: FLINK-4384 > URL: https://issues.apache.org/jira/browse/FLINK-4384 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination > Environment: FLIP-6 feature branch > Reporter: Stephan Ewen > Fix For: 1.2.0 > > > It is a common pattern to schedule a call to be executed in the future. > Examples are > - delays in retries > - heartbeats, > - checking for heartbeat timeouts > I suggest to add a {{scheduleRunAsync()}} method to the {{RpcEndpoint}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)