[ 
https://issues.apache.org/jira/browse/FLINK-5298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15834788#comment-15834788
 ] 

ASF GitHub Bot commented on FLINK-5298:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2974#discussion_r97347534
  
    --- Diff: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
 ---
    @@ -0,0 +1,134 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.webmonitor.handlers;
    +
    +import io.netty.buffer.ByteBuf;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.handler.codec.http.DefaultFullHttpRequest;
    +import io.netty.handler.codec.http.HttpMethod;
    +import io.netty.handler.codec.http.HttpVersion;
    +import io.netty.handler.codec.http.router.Routed;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.concurrent.CompletableFuture;
    +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.Instance;
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +import scala.Option;
    +import scala.collection.JavaConverters;
    +import scala.concurrent.ExecutionContext$;
    +import scala.concurrent.Future$;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Matchers.isA;
    +import static org.powermock.api.mockito.PowerMockito.mock;
    +import static org.powermock.api.mockito.PowerMockito.when;
    +
    +public class TaskManagerLogHandlerTest {
    +   @Test
    +   public void testLogFetchingFailure() throws Exception {
    +           // ========= setup TaskManager 
=================================================================================
    +           InstanceID tmID = new InstanceID();
    +           ResourceID tmRID = new ResourceID(tmID.toString());
    +           TaskManagerGateway taskManagerGateway = 
mock(TaskManagerGateway.class);
    +           when(taskManagerGateway.getAddress()).thenReturn("/tm/address");
    +
    +           Instance taskManager = mock(Instance.class);
    +           when(taskManager.getId()).thenReturn(tmID);
    +           when(taskManager.getTaskManagerID()).thenReturn(tmRID);
    +           
when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway);
    +           CompletableFuture<BlobKey> future = new 
FlinkCompletableFuture<>();
    +           future.completeExceptionally(new IOException("failure"));
    +           
when(taskManagerGateway.requestTaskManagerLog(any(Time.class))).thenReturn(future);
    +
    +           // ========= setup JobManager 
==================================================================================
    +
    +           ActorGateway jobManagerGateway = mock(ActorGateway.class);
    +           Object registeredTaskManagersAnswer = new 
JobManagerMessages.RegisteredTaskManagers(
    +                   
JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(taskManager)).asScala());
    +
    +           
when(jobManagerGateway.ask(isA(JobManagerMessages.RequestRegisteredTaskManagers$.class),
 any(FiniteDuration.class)))
    +                   
.thenReturn(Future$.MODULE$.successful(registeredTaskManagersAnswer));
    +           
when(jobManagerGateway.ask(isA(JobManagerMessages.getRequestBlobManagerPort().getClass()),
 any(FiniteDuration.class)))
    +                   .thenReturn(Future$.MODULE$.successful((Object) 5));
    +           
when(jobManagerGateway.ask(isA(JobManagerMessages.RequestTaskManagerInstance.class),
 any(FiniteDuration.class)))
    +                   .thenReturn(Future$.MODULE$.successful((Object) new 
JobManagerMessages.TaskManagerInstance(Option.apply(taskManager))));
    +           when(jobManagerGateway.path()).thenReturn("/jm/address");
    +
    +           JobManagerRetriever retriever = mock(JobManagerRetriever.class);
    +           when(retriever.getJobManagerGatewayAndWebPort())
    +                   .thenReturn(Option.apply(new scala.Tuple2<ActorGateway, 
Integer>(jobManagerGateway, 0)));
    +
    +
    +           TaskManagerLogHandler handler = new TaskManagerLogHandler(
    +                   retriever,
    +                   ExecutionContext$.MODULE$.fromExecutor(new 
CurrentThreadExecutor()),
    +                   Future$.MODULE$.successful("/jm/address"),
    +                   AkkaUtils.getDefaultClientTimeout(),
    +                   TaskManagerLogHandler.FileMode.LOG,
    +                   new Configuration(),
    +                   false);
    +
    +           final AtomicReference<String> exception = new 
AtomicReference<>();
    +           
    +           ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
    +           when(ctx.write(isA(ByteBuf.class))).thenAnswer(new 
Answer<Object>() {
    +                   @Override
    +                   public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
    +                           ByteBuf data = 
invocationOnMock.getArgumentAt(0, ByteBuf.class);
    +                           exception.set(new String(data.array()));
    +                           return null;
    +                   }
    +           });
    +
    +           Map<String, String> pathParams = new HashMap<>();
    +           pathParams.put(TaskManagersHandler.TASK_MANAGER_ID_KEY, 
tmID.toString());
    +           Routed routed = mock(Routed.class);
    +           when(routed.pathParams()).thenReturn(pathParams);
    +           when(routed.request()).thenReturn(new 
DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/taskmanagers/" + 
tmID + "/log"));
    +
    +           handler.respondAsLeader(ctx, routed, jobManagerGateway);
    +
    +           Assert.assertEquals("Fetching TaskManager log failed.", 
exception.get());
    +   }
    +
    +   public class CurrentThreadExecutor implements Executor {
    +           public void execute(Runnable r) {
    +                   r.run();
    +           }
    +   }
    --- End diff --
    
    `Executors.directExecutor` could help.


> TaskManager crashes when TM log not existant
> --------------------------------------------
>
>                 Key: FLINK-5298
>                 URL: https://issues.apache.org/jira/browse/FLINK-5298
>             Project: Flink
>          Issue Type: Bug
>          Components: Mesos, TaskManager, Webfrontend
>    Affects Versions: 1.1.0, 1.2.0
>            Reporter: Mischa Krüger
>            Assignee: Chesnay Schepler
>            Priority: Trivial
>             Fix For: 1.2.0
>
>
> {code}
> java.io.FileNotFoundException: flink-taskmanager.out (No such file or 
> directory)
>     at java.io.FileInputStream.open0(Native Method)
>     at java.io.FileInputStream.open(FileInputStream.java:195)
>     at java.io.FileInputStream.<init>(FileInputStream.java:138)
>     at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleRequestTaskManagerLog(TaskManager.scala:833)
>     at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:340)
>     at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>     at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>     at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>     at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
>     at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>     at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>     at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>     at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>     at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>     at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>     at 
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:122)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2016-12-08 16:45:14,995 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager  - Stopping 
> TaskManager akka://flink/user/taskmanager#1361882659.
> 2016-12-08 16:45:14,995 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager  - 
> Disassociating from JobManager
> 2016-12-08 16:45:14,997 INFO  org.apache.flink.runtime.blob.BlobCache         
>               - Shutting down BlobCache
> 2016-12-08 16:45:15,006 INFO  
> org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager 
> removed spill file directory 
> /tmp/flink-io-e61f717b-630c-4a2a-b3e3-62ccb40aa2f9
> 2016-12-08 16:45:15,006 INFO  
> org.apache.flink.runtime.io.network.NetworkEnvironment        - Shutting down 
> the network environment and its components.
> 2016-12-08 16:45:15,008 INFO  
> org.apache.flink.runtime.io.network.netty.NettyClient         - Successful 
> shutdown (took 1 ms).
> 2016-12-08 16:45:15,009 INFO  
> org.apache.flink.runtime.io.network.netty.NettyServer         - Successful 
> shutdown (took 0 ms).
> 2016-12-08 16:45:15,020 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager  - Task 
> manager akka://flink/user/taskmanager is completely shut down.
> 2016-12-08 16:45:15,023 ERROR 
> org.apache.flink.runtime.taskmanager.TaskManager              - Actor 
> akka://flink/user/taskmanager#1361882659 terminated, stopping process...
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to