[ https://issues.apache.org/jira/browse/FLINK-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15449277#comment-15449277 ]
ASF GitHub Bot commented on FLINK-4516: --------------------------------------- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2427#discussion_r76815420 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java --- @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import akka.dispatch.ExecutionContexts; +import akka.dispatch.Futures; +import akka.util.Timeout; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.util.DirectExecutorService; +import org.apache.flink.util.Preconditions; +import scala.concurrent.Await; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.lang.annotation.Annotation; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.BitSet; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + + +/** + * An RPC Service implementation for testing. This RPC service directly executes all asynchronous calls one by one in the main thread. + */ +public class TestingSerialRpcService extends TestingRpcService { + + private final DirectExecutorService executorService; + private final ConcurrentHashMap<String, RpcGateway> registeredConnections; + + public TestingSerialRpcService() { + executorService = new DirectExecutorService(); + this.registeredConnections = new ConcurrentHashMap<>(); + } + + @Override + public void scheduleRunnable(final Runnable runnable, final long delay, final TimeUnit unit) { + try { + unit.sleep(delay); + runnable.run(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + @Override + public ExecutionContext getExecutionContext() { + return ExecutionContexts.fromExecutorService(executorService); + } + + @Override + public void stopService() { + executorService.shutdown(); + registeredConnections.clear(); + } + + @Override + public void stopServer(RpcGateway selfGateway) { + + } + + @Override + public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint) { + final String address = UUID.randomUUID().toString(); + + InvocationHandler akkaInvocationHandler = new TestingSerialInvocationHandler(address, rpcEndpoint); + ClassLoader classLoader = getClass().getClassLoader(); + + @SuppressWarnings("unchecked") + C self = (C) Proxy.newProxyInstance( + classLoader, + new Class<?>[]{ + rpcEndpoint.getSelfGatewayType(), + MainThreadExecutor.class, + StartStoppable.class, + RpcGateway.class}, + akkaInvocationHandler); + + return self; + } + + private static class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, MainThreadExecutor, StartStoppable { + + private final T rpcEndpoint; + + /** default timeout for asks */ + private final Timeout timeout; + + private final String address; + + private TestingSerialInvocationHandler(String address, T rpcEndpoint) { + this(address, rpcEndpoint, new Timeout(new FiniteDuration(10, TimeUnit.SECONDS))); + } + + private TestingSerialInvocationHandler(String address, T rpcEndpoint, Timeout timeout) { + this.rpcEndpoint = rpcEndpoint; + this.timeout = timeout; + this.address = address; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + Class<?> declaringClass = method.getDeclaringClass(); + if (declaringClass.equals(MainThreadExecutor.class) || + declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) || + declaringClass.equals(RpcGateway.class)) { + return method.invoke(this, args); + } else { + final String methodName = method.getName(); + Class<?>[] parameterTypes = method.getParameterTypes(); + Annotation[][] parameterAnnotations = method.getParameterAnnotations(); + Timeout futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout); + + final Tuple2<Class<?>[], Object[]> filteredArguments = filterArguments( + parameterTypes, + parameterAnnotations, + args); + + Class<?> returnType = method.getReturnType(); + + if (returnType.equals(Future.class)) { + try { + Object result = handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout); + return Futures.successful(result); + } catch (Throwable e) { + return Futures.failed(e); + } + } else { + return handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout); + } + } + } + + /** + * Handle rpc invocations by looking up the rpc method on the rpc endpoint and calling this + * method with the provided method arguments. If the method has a return value, it is returned + * to the sender of the call. + */ + private Object handleRpcInvocationSync(final String methodName, + final Class<?>[] parameterTypes, + final Object[] args, + final Timeout futureTimeout) throws Exception { + final Method rpcMethod = lookupRpcMethod(methodName, parameterTypes); + Object result = rpcMethod.invoke(rpcEndpoint, args); + + if (result != null && result instanceof Future) { + Future<?> future = (Future<?>) result; + return Await.result(future, futureTimeout.duration()); + } else { + return result; + } + } + + @Override + public void runAsync(Runnable runnable) { + runnable.run(); + } + + @Override + public <V> Future<V> callAsync(Callable<V> callable, Timeout callTimeout) { + try { + TimeUnit.MILLISECONDS.sleep(callTimeout.duration().toMillis()); + return Futures.successful(callable.call()); + } catch (Throwable e) { + return Futures.failed(e); + } + } + + @Override + public void scheduleRunAsync(final Runnable runnable, final long delay) { + try { + TimeUnit.MILLISECONDS.sleep(delay); + runnable.run(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + @Override + public void start() { --- End diff -- No need to implement `StartStoppable` if you don't use the methods. > ResourceManager leadership election > ----------------------------------- > > Key: FLINK-4516 > URL: https://issues.apache.org/jira/browse/FLINK-4516 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management > Reporter: zhangjing > Assignee: zhangjing > > 1. When a resourceManager is started, it starts the leadership election > service first and take part in contending for leadership > 2. Every resourceManager contains a ResourceManagerLeaderContender, when it > is granted leadership, it will start SlotManager and other main components. > when it is revoked leadership, it will stop all its components and clear > everything. -- This message was sent by Atlassian JIRA (v6.3.4#6332)