[ https://issues.apache.org/jira/browse/FLINK-5298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15834788#comment-15834788 ]
ASF GitHub Bot commented on FLINK-5298: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2974#discussion_r97347534 --- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.router.Routed; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.CompletableFuture; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import scala.Option; +import scala.collection.JavaConverters; +import scala.concurrent.ExecutionContext$; +import scala.concurrent.Future$; +import scala.concurrent.duration.FiniteDuration; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.isA; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +public class TaskManagerLogHandlerTest { + @Test + public void testLogFetchingFailure() throws Exception { + // ========= setup TaskManager ================================================================================= + InstanceID tmID = new InstanceID(); + ResourceID tmRID = new ResourceID(tmID.toString()); + TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class); + when(taskManagerGateway.getAddress()).thenReturn("/tm/address"); + + Instance taskManager = mock(Instance.class); + when(taskManager.getId()).thenReturn(tmID); + when(taskManager.getTaskManagerID()).thenReturn(tmRID); + when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway); + CompletableFuture<BlobKey> future = new FlinkCompletableFuture<>(); + future.completeExceptionally(new IOException("failure")); + when(taskManagerGateway.requestTaskManagerLog(any(Time.class))).thenReturn(future); + + // ========= setup JobManager ================================================================================== + + ActorGateway jobManagerGateway = mock(ActorGateway.class); + Object registeredTaskManagersAnswer = new JobManagerMessages.RegisteredTaskManagers( + JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(taskManager)).asScala()); + + when(jobManagerGateway.ask(isA(JobManagerMessages.RequestRegisteredTaskManagers$.class), any(FiniteDuration.class))) + .thenReturn(Future$.MODULE$.successful(registeredTaskManagersAnswer)); + when(jobManagerGateway.ask(isA(JobManagerMessages.getRequestBlobManagerPort().getClass()), any(FiniteDuration.class))) + .thenReturn(Future$.MODULE$.successful((Object) 5)); + when(jobManagerGateway.ask(isA(JobManagerMessages.RequestTaskManagerInstance.class), any(FiniteDuration.class))) + .thenReturn(Future$.MODULE$.successful((Object) new JobManagerMessages.TaskManagerInstance(Option.apply(taskManager)))); + when(jobManagerGateway.path()).thenReturn("/jm/address"); + + JobManagerRetriever retriever = mock(JobManagerRetriever.class); + when(retriever.getJobManagerGatewayAndWebPort()) + .thenReturn(Option.apply(new scala.Tuple2<ActorGateway, Integer>(jobManagerGateway, 0))); + + + TaskManagerLogHandler handler = new TaskManagerLogHandler( + retriever, + ExecutionContext$.MODULE$.fromExecutor(new CurrentThreadExecutor()), + Future$.MODULE$.successful("/jm/address"), + AkkaUtils.getDefaultClientTimeout(), + TaskManagerLogHandler.FileMode.LOG, + new Configuration(), + false); + + final AtomicReference<String> exception = new AtomicReference<>(); + + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + when(ctx.write(isA(ByteBuf.class))).thenAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + ByteBuf data = invocationOnMock.getArgumentAt(0, ByteBuf.class); + exception.set(new String(data.array())); + return null; + } + }); + + Map<String, String> pathParams = new HashMap<>(); + pathParams.put(TaskManagersHandler.TASK_MANAGER_ID_KEY, tmID.toString()); + Routed routed = mock(Routed.class); + when(routed.pathParams()).thenReturn(pathParams); + when(routed.request()).thenReturn(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/taskmanagers/" + tmID + "/log")); + + handler.respondAsLeader(ctx, routed, jobManagerGateway); + + Assert.assertEquals("Fetching TaskManager log failed.", exception.get()); + } + + public class CurrentThreadExecutor implements Executor { + public void execute(Runnable r) { + r.run(); + } + } --- End diff -- `Executors.directExecutor` could help. > TaskManager crashes when TM log not existant > -------------------------------------------- > > Key: FLINK-5298 > URL: https://issues.apache.org/jira/browse/FLINK-5298 > Project: Flink > Issue Type: Bug > Components: Mesos, TaskManager, Webfrontend > Affects Versions: 1.1.0, 1.2.0 > Reporter: Mischa Krüger > Assignee: Chesnay Schepler > Priority: Trivial > Fix For: 1.2.0 > > > {code} > java.io.FileNotFoundException: flink-taskmanager.out (No such file or > directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.<init>(FileInputStream.java:138) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleRequestTaskManagerLog(TaskManager.scala:833) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:340) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:122) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 2016-12-08 16:45:14,995 INFO > org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager - Stopping > TaskManager akka://flink/user/taskmanager#1361882659. > 2016-12-08 16:45:14,995 INFO > org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager - > Disassociating from JobManager > 2016-12-08 16:45:14,997 INFO org.apache.flink.runtime.blob.BlobCache > - Shutting down BlobCache > 2016-12-08 16:45:15,006 INFO > org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager > removed spill file directory > /tmp/flink-io-e61f717b-630c-4a2a-b3e3-62ccb40aa2f9 > 2016-12-08 16:45:15,006 INFO > org.apache.flink.runtime.io.network.NetworkEnvironment - Shutting down > the network environment and its components. > 2016-12-08 16:45:15,008 INFO > org.apache.flink.runtime.io.network.netty.NettyClient - Successful > shutdown (took 1 ms). > 2016-12-08 16:45:15,009 INFO > org.apache.flink.runtime.io.network.netty.NettyServer - Successful > shutdown (took 0 ms). > 2016-12-08 16:45:15,020 INFO > org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager - Task > manager akka://flink/user/taskmanager is completely shut down. > 2016-12-08 16:45:15,023 ERROR > org.apache.flink.runtime.taskmanager.TaskManager - Actor > akka://flink/user/taskmanager#1361882659 terminated, stopping process... > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)