You have to restart the yarn cluster to let your changes take effect. You
can do that via HADOOP_HOME/sbin/stop-yarn.sh;
HADOOP_HOME/sbin/start-yarn.sh.

The commands yarn-session.sh ... and bin/flink run -m yarn cluster start a
new yarn application within the yarn cluster.

Cheers,
Till
​

On Mon, Jan 11, 2016 at 1:39 PM, Ana M. Martinez <a...@cs.aau.dk> wrote:

> Hi Till,
>
> Thanks for your help. I have checked both in Yarn’s web interface and
> through command line and it seems that there are not occupied containers.
>
> Additionally, I have checked the configuration values in the web interface
> and even though I have changed the log.aggregation property in the
> yarn-site.xml file to true, it appears as false and with the following
> source label:
> <property>
> <name>yarn.log-aggregation-enable</name>
> <value>false</value>
> <source>java.io.BufferedInputStream@3c407114</source>
> </property>
>
> I am not sure if that is relevant. I had assumed that the "./bin/flink run
> -m yarn-cluster" command is starting a yarn session and thus reloading the
> yarn-site file. Is that right? If I am wrong here, then, how can I restart
> it so that the modifications in the yarn-site.xml file are considered? (I
> have also tried with ./bin/yarn-session.sh and then ./bin/flink run without
> success…).
>
> I am not sure if this is related to flink anymore, should I move my
> problem to the yarn community instead?
>
> Thanks,
> Ana
>
> On 11 Jan 2016, at 10:37, Till Rohrmann <trohrm...@apache.org> wrote:
>
> Hi Ana,
>
> good to hear that you found the logging statements. You can check in
> Yarn’s web interface whether there are still occupied containers.
> Alternatively you can go to the different machines and run jps which
> lists you the running Java processes. If you see an ApplicationMaster or
> YarnTaskManagerRunner process, then there is still a container running
> with Flink on this machine. I hope this helps you.
>
> Cheers,
> Till
> ​
>
> On Mon, Jan 11, 2016 at 9:37 AM, Ana M. Martinez <a...@cs.aau.dk> wrote:
>
>> Hi Till,
>>
>> Thanks for that! I can see the "Logger in LineSplitter.flatMap” output if
>> I retrieve the task manager logs manually
>> (under /var/log/hadoop-yarn/containers/application_X/…). However that
>> solution is not ideal when for instance I am using 32 machines for my
>> mapReduce operations.
>>
>> I would like to know why Yarn’s log aggregation is not working. Can you
>> tell me how to check if there are some Yarn containers running after the
>> Flink job has finished? I have tried:
>> hadoop job -list
>> but I cannot see any jobs there, although I am not sure that it means
>> that there are not containers running...
>>
>> Thanks,
>> Ana
>>
>> On 08 Jan 2016, at 16:24, Till Rohrmann <trohrm...@apache.org> wrote:
>>
>> You’re right that the log statements of the LineSplitter are in the logs
>> of the cluster nodes, because that’s where the LineSplitter code is
>> executed. In contrast, you create a TestClass on the client when you
>> submit the program. Therefore, you see the logging statement “Logger in
>> TestClass” on the command line or in the cli log file.
>>
>> So I would assume that the problem is Yarn’s log aggregation. Either your
>> configuration is not correct or there are still some Yarn containers
>> running after the Flink job has finished. Yarn will only show you the logs
>> after all containers are terminated. Maybe you could check that.
>> Alternatively, you can try to retrieve the taskmanager logs manually by
>> going to the machine where your yarn container was executed. Then under
>> hadoop/logs/userlogs you should find somewhere the logs.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Fri, Jan 8, 2016 at 3:50 PM, Ana M. Martinez <a...@cs.aau.dk> wrote:
>>
>>> Thanks for the tip Robert! It was a good idea to rule out other possible
>>> causes, but I am afraid that is not the problem. If we stick to the
>>> WordCountExample (for simplicity), the Exception is thrown if placed into
>>> the flatMap function.
>>>
>>> I am going to try to re-write my problem and all the settings below:
>>>
>>> When I try to aggregate all logs:
>>>  $yarn logs -applicationId application_1452250761414_0005
>>>
>>> the following message is retrieved:
>>> 16/01/08 13:32:37 INFO client.RMProxy: Connecting to ResourceManager at
>>> ip-172-31-33-221.us-west-2.compute.internal/172.31.33.221:8032
>>> /var/log/hadoop-yarn/apps/hadoop/logs/application_1452250761414_0005does
>>> not exist.
>>> Log aggregation has not completed or is not enabled.
>>>
>>> (Tried the same command a few minutes later and got the same message, so
>>> might it be that log aggregation is not properly enabled??)
>>>
>>> I am going to carefully enumerate all the steps I have followed (and
>>> settings) to see if someone can identify why the Logger messages from CORE
>>> nodes (in an Amazon cluster) are not shown.
>>>
>>> 1) Enable yarn.log-aggregation-enable property to true
>>> in /etc/alternatives/hadoop-conf/yarn-site.xml.
>>>
>>> 2) Include log messages in my WordCountExample as follows:
>>>
>>> import org.apache.flink.api.common.functions.FlatMapFunction;
>>> import org.apache.flink.api.java.DataSet;
>>> import org.apache.flink.api.java.ExecutionEnvironment;
>>> import org.apache.flink.api.java.tuple.Tuple2;
>>> import org.apache.flink.core.fs.FileSystem;
>>> import org.apache.flink.util.Collector;
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>>
>>> import java.io.Serializable;
>>> import java.util.ArrayList;
>>> import java.util.List;
>>>
>>>
>>> public class WordCountExample {
>>>     static Logger logger = LoggerFactory.getLogger(WordCountExample.class);
>>>
>>>     public static void main(String[] args) throws Exception {
>>>         final ExecutionEnvironment env = 
>>> ExecutionEnvironment.getExecutionEnvironment();
>>>
>>>         logger.info("Entering application.");
>>>
>>>         DataSet<String> text = env.fromElements(
>>>                 "Who's there?",
>>>                 "I think I hear them. Stand, ho! Who's there?");
>>>
>>>         List<Integer> elements = new ArrayList<Integer>();
>>>         elements.add(0);
>>>
>>>
>>>         DataSet<TestClass> set = env.fromElements(new TestClass(elements));
>>>
>>>         DataSet<Tuple2<String, Integer>> wordCounts = text
>>>                 .flatMap(new LineSplitter())
>>>                 .withBroadcastSet(set, "set")
>>>                 .groupBy(0)
>>>                 .sum(1);
>>>
>>>         wordCounts.writeAsText(*“*output.txt", 
>>> FileSystem.WriteMode.OVERWRITE);
>>>
>>>
>>>     }
>>>
>>>     public static class LineSplitter implements FlatMapFunction<String, 
>>> Tuple2<String, Integer>> {
>>>
>>>         static Logger loggerLineSplitter = 
>>> LoggerFactory.getLogger(LineSplitter.class);
>>>
>>>         @Override
>>>         public void flatMap(String line, Collector<Tuple2<String, Integer>> 
>>> out) {
>>>             loggerLineSplitter.info("Logger in LineSplitter.flatMap");
>>>
>>>             for (String word : line.split(" ")) {
>>>                             out.collect(new Tuple2<String, Integer>(word, 
>>> 1));
>>>                             //throw new RuntimeException("LineSplitter 
>>> class called");
>>>             }
>>>
>>>         }
>>>     }
>>>
>>>     public static class TestClass implements Serializable {
>>>         private static final long serialVersionUID = -2932037991574118651L;
>>>
>>>         static Logger loggerTestClass = 
>>> LoggerFactory.getLogger("TestClass.class");
>>>
>>>         List<Integer> integerList;
>>>         public TestClass(List<Integer> integerList){
>>>             this.integerList=integerList;
>>>             loggerTestClass.info("Logger in TestClass");
>>>         }
>>>
>>>
>>>     }
>>> }
>>>
>>> 3) Start a yarn-cluster and execute my program with the following
>>> command:
>>>
>>> $./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c
>>> eu.amidst.flinklink.examples.WordCountExample ../flinklink.jar
>>>
>>>
>>> 4) The output in the log folder is as follows:
>>>
>>> 13:31:04,945 INFO  org.apache.flink.client.CliFrontend
>>>         -
>>> --------------------------------------------------------------------------------
>>> 13:31:04,947 INFO  org.apache.flink.client.CliFrontend
>>>         -  Starting Command Line Client (Version: 0.10.0, Rev:ab2cca4,
>>> Date:10.11.2015 @ 13:50:14 UTC)
>>> 13:31:04,947 INFO  org.apache.flink.client.CliFrontend
>>>         -  Current user: hadoop
>>> 13:31:04,947 INFO  org.apache.flink.client.CliFrontend
>>>         -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation -
>>> 1.8/25.65-b01
>>> 13:31:04,947 INFO  org.apache.flink.client.CliFrontend
>>>         -  Maximum heap size: 3344 MiBytes
>>> 13:31:04,947 INFO  org.apache.flink.client.CliFrontend
>>>         -  JAVA_HOME: /etc/alternatives/jre
>>> 13:31:04,950 INFO  org.apache.flink.client.CliFrontend
>>>         -  Hadoop version: 2.6.0
>>> 13:31:04,950 INFO  org.apache.flink.client.CliFrontend
>>>         -  JVM Options:
>>> 13:31:04,950 INFO  org.apache.flink.client.CliFrontend
>>>         -
>>> -Dlog.file=/home/hadoop/flink-0.10.0/log/flink-hadoop-client-ip-172-31-33-221.log
>>> 13:31:04,950 INFO  org.apache.flink.client.CliFrontend
>>>         -
>>> -Dlog4j.configuration=file:/home/hadoop/flink-0.10.0/conf/log4j-cli.properties
>>> 13:31:04,950 INFO  org.apache.flink.client.CliFrontend
>>>         -
>>> -Dlogback.configurationFile=file:/home/hadoop/flink-0.10.0/conf/logback.xml
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -  Program Arguments:
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -     run
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -     -m
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -     yarn-cluster
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -     -yn
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -     1
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -     -ys
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -     4
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -     -yjm
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -     1024
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -     -ytm
>>> 13:31:04,951 INFO  org.apache.flink.client.CliFrontend
>>>         -     1024
>>> 13:31:04,952 INFO  org.apache.flink.client.CliFrontend
>>>         -     -c
>>> 13:31:04,952 INFO  org.apache.flink.client.CliFrontend
>>>         -     eu.amidst.flinklink.examples.WordCountExample
>>> 13:31:04,952 INFO  org.apache.flink.client.CliFrontend
>>>         -     ../flinklink.jar
>>> 13:31:04,952 INFO  org.apache.flink.client.CliFrontend
>>>         -
>>> --------------------------------------------------------------------------------
>>> 13:31:04,954 INFO  org.apache.flink.client.CliFrontend
>>>         - Using configuration directory /home/hadoop/flink-0.10.0/conf
>>> 13:31:04,954 INFO  org.apache.flink.client.CliFrontend
>>>         - Trying to load configuration file
>>> 13:31:05,193 INFO  org.apache.flink.client.CliFrontend
>>>         - Running 'run' command.
>>> 13:31:05,201 INFO  org.apache.flink.client.CliFrontend
>>>         - Building program from JAR file
>>> 13:31:05,326 INFO  org.apache.flink.client.CliFrontend
>>>         - YARN cluster mode detected. Switching Log4j output to console
>>> 13:31:05,385 INFO  org.apache.hadoop.yarn.client.RMProxy
>>>         - Connecting to ResourceManager at ip-172-31-33-221.us
>>> -west-2.compute.internal/172.31.33.221:8032
>>> 13:31:05,534 INFO  org.apache.flink.client.FlinkYarnSessionCli
>>>         - No path for the flink jar passed. Using the location of class
>>> org.apache.flink.yarn.FlinkYarnClient to locate the jar
>>> 13:31:05,545 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - Using values:
>>> 13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - TaskManager count = 1
>>> 13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - JobManager memory = 1024
>>> 13:31:05,547 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - TaskManager memory = 1024
>>> 13:31:06,112 INFO  org.apache.flink.yarn.Utils
>>>         - Copying from
>>> file:/home/hadoop/flink-0.10.0/lib/flink-dist-0.10.0.jar to
>>> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/flink-dist-0.10.0.jar
>>> 13:31:06,843 INFO  org.apache.flink.yarn.Utils
>>>         - Copying from /home/hadoop/flink-0.10.0/conf/flink-conf.yaml to
>>> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/flink-conf.yaml
>>> 13:31:06,857 INFO  org.apache.flink.yarn.Utils
>>>         - Copying from file:/home/hadoop/flink-0.10.0/conf/logback.xml to
>>> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/logback.xml
>>> 13:31:06,869 INFO  org.apache.flink.yarn.Utils
>>>         - Copying from file:/home/hadoop/flink-0.10.0/conf/log4j.properties
>>> to
>>> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/log4j.properties
>>> 13:31:06,892 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - Submitting application master application_1452250761414_0005
>>> 13:31:06,917 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl
>>>         - Submitted application application_1452250761414_0005
>>> 13:31:06,917 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - Waiting for the cluster to be allocated
>>> 13:31:06,919 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - Deploying cluster, current state ACCEPTED
>>> 13:31:07,920 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - Deploying cluster, current state ACCEPTED
>>> 13:31:08,922 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - Deploying cluster, current state ACCEPTED
>>> 13:31:09,924 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - Deploying cluster, current state ACCEPTED
>>> 13:31:10,925 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - YARN application has been deployed successfully.
>>> 13:31:10,929 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>>         - Start actor system.
>>> 13:31:11,412 INFO  akka.event.slf4j.Slf4jLogger
>>>         - Slf4jLogger started
>>> 13:31:11,472 INFO  Remoting
>>>         - Starting remoting
>>> 13:31:11,698 INFO  Remoting
>>>         - Remoting started; listening on addresses :[
>>> akka.tcp://flink@172.31.33.221:39464]
>>> 13:31:11,733 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>>         - Start application client.
>>> 13:31:11,737 INFO  org.apache.flink.client.CliFrontend
>>>         - YARN cluster started
>>> 13:31:11,737 INFO  org.apache.flink.client.CliFrontend
>>>         - JobManager web interface address
>>> http://ip-172-31-33-221.us-west-2.compute.internal:20888/proxy/application_1452250761414_0005/
>>> 13:31:11,737 INFO  org.apache.flink.client.CliFrontend
>>>         - Waiting until all TaskManagers have connected
>>> 13:31:11,748 INFO  org.apache.flink.yarn.ApplicationClient
>>>         - Notification about new leader address
>>> akka.tcp://flink@172.31.45.98:46965/user/jobmanager with session ID
>>> null.
>>> 13:31:11,752 INFO  org.apache.flink.client.CliFrontend
>>>         - No status updates from the YARN cluster received so far. Waiting
>>> ...
>>> 13:31:11,752 INFO  org.apache.flink.yarn.ApplicationClient
>>>         - Received address of new leader
>>> akka.tcp://flink@172.31.45.98:46965/user/jobmanager with session ID
>>> null.
>>> 13:31:11,753 INFO  org.apache.flink.yarn.ApplicationClient
>>>         - Disconnect from JobManager null.
>>> 13:31:11,757 INFO  org.apache.flink.yarn.ApplicationClient
>>>         - Trying to register at JobManager
>>> akka.tcp://flink@172.31.45.98:46965/user/jobmanager.
>>> 13:31:12,040 INFO  org.apache.flink.yarn.ApplicationClient
>>>         - Successfully registered at the JobManager Actor[
>>> akka.tcp://flink@172.31.45.98:46965/user/jobmanager#1549383782]
>>> 13:31:12,253 INFO  org.apache.flink.client.CliFrontend
>>>         - TaskManager status (0/1)
>>> 13:31:12,753 INFO  org.apache.flink.client.CliFrontend
>>>         - TaskManager status (0/1)
>>> 13:31:13,254 INFO  org.apache.flink.client.CliFrontend
>>>         - TaskManager status (0/1)
>>> 13:31:13,755 INFO  org.apache.flink.client.CliFrontend
>>>         - TaskManager status (0/1)
>>> 13:31:14,255 INFO  org.apache.flink.client.CliFrontend
>>>         - TaskManager status (0/1)
>>> 13:31:14,756 INFO  org.apache.flink.client.CliFrontend
>>>         - TaskManager status (0/1)
>>> 13:31:15,257 INFO  org.apache.flink.client.CliFrontend
>>>         - TaskManager status (0/1)
>>> 13:31:15,758 INFO  org.apache.flink.client.CliFrontend
>>>         - TaskManager status (0/1)
>>> 13:31:16,258 INFO  org.apache.flink.client.CliFrontend
>>>         - All TaskManagers are connected
>>> 13:31:16,264 INFO  org.apache.flink.client.program.Client
>>>         - Starting client actor system
>>> 13:31:16,265 INFO  org.apache.flink.runtime.client.JobClient
>>>         - Starting JobClient actor system
>>> 13:31:16,283 INFO  akka.event.slf4j.Slf4jLogger
>>>         - Slf4jLogger started
>>> 13:31:16,288 INFO  Remoting
>>>         - Starting remoting
>>> 13:31:16,301 INFO  Remoting
>>>         - Remoting started; listening on addresses :[
>>> akka.tcp://flink@127.0.0.1:45919]
>>> 13:31:16,302 INFO  org.apache.flink.runtime.client.JobClient
>>>         - Started JobClient actor system at 127.0.0.1:45919
>>> 13:31:16,302 INFO  org.apache.flink.client.CliFrontend
>>>         - Using the parallelism provided by the remote cluster (4). To use
>>> another parallelism, set it at the ./bin/flink client.
>>> 13:31:16,302 INFO  org.apache.flink.client.CliFrontend
>>>         - Starting execution of program
>>> 13:31:16,303 INFO  org.apache.flink.client.program.Client
>>>         - Starting program in interactive mode
>>> 13:31:16,313 INFO  eu.amidst.flinklink.examples.WordCountExample
>>>         - Entering application.
>>> 13:31:16,342 INFO  TestClass.class
>>>         - Logger in TestClass
>>> 13:31:16,346 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>>         - class eu.amidst.flinklink.examples.WordCountExample$TestClass is
>>> not a valid POJO type
>>> 13:31:16,376 INFO  org.apache.flink.client.CliFrontend
>>>         - Program execution finished
>>> 13:31:16,384 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>>>         - Shutting down remote daemon.
>>> 13:31:16,386 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>>>         - Remote daemon shut down; proceeding with flushing remote
>>> transports.
>>> 13:31:16,408 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>>>         - Remoting shut down.
>>> 13:31:16,431 INFO  org.apache.flink.client.CliFrontend
>>>         - Shutting down YARN cluster
>>> 13:31:16,431 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>>         - Sending shutdown request to the Application Master
>>> 13:31:16,432 INFO  org.apache.flink.yarn.ApplicationClient
>>>         - Sending StopYarnSession request to ApplicationMaster.
>>> 13:31:16,568 INFO  org.apache.flink.yarn.ApplicationClient
>>>         - Remote JobManager has been stopped successfully. Stopping local
>>> application client
>>> 13:31:16,570 INFO  org.apache.flink.yarn.ApplicationClient
>>>         - Stopped Application client.
>>> 13:31:16,570 INFO  org.apache.flink.yarn.ApplicationClient
>>>         - Disconnect from JobManager Actor[
>>> akka.tcp://flink@172.31.45.98:46965/user/jobmanager#1549383782].
>>> 13:31:16,573 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>>>         - Shutting down remote daemon.
>>> 13:31:16,573 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>>>         - Remote daemon shut down; proceeding with flushing remote
>>> transports.
>>> 13:31:16,584 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator
>>>         - Remoting shut down.
>>> 13:31:16,595 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>>         - Deleting files in
>>> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005
>>> 13:31:16,596 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>>         - Application application_1452250761414_0005 finished with state
>>> FINISHED and final state SUCCEEDED at 1452259876445
>>> 13:31:16,747 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>>         - YARN Client is shutting down
>>>
>>>
>>> You can see the log messages from the WordCountExample and TestClass
>>> classes. But I have problems to show the logger message (INFO) in the
>>> LineSplitter class. Presumably, because it is executed in the CORE nodes
>>> and node in the MASTER node (it all runs well in my local computer).
>>>
>>> Any tips?
>>> Ana
>>>
>>>
>>> On 06 Jan 2016, at 15:58, Ana M. Martinez <a...@cs.aau.dk> wrote:
>>>
>>> Hi Till,
>>>
>>> I am afraid it does not work in any case.
>>>
>>> I am following the steps you indicate on your websites (for yarn
>>> configuration and loggers with slf4j):
>>>
>>> 1) Enable log aggregation in yarn-site:
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-0.7/yarn_setup.html#log-files
>>>
>>> 2) Include Loggers as indicated here (see WordCountExample below):
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-0.7/internal_logging.html
>>>
>>> But I cannot get the log messages that run in the map functions. Am I
>>> missing something?
>>>
>>> Thanks,
>>> Ana
>>>
>>> On 04 Jan 2016, at 14:00, Till Rohrmann <trohrm...@apache.org> wrote:
>>>
>>> I think the YARN application has to be finished in order for the logs to
>>> be accessible.
>>>
>>> Judging from you commands, you’re starting a long running YARN
>>> application running Flink with ./bin/yarn-session.sh -n 1 -tm 2048 -s 4.
>>> This cluster won’t be used though, because you’re executing your job with 
>>> ./bin/flink
>>> run -m yarn-cluster which will start another YARN application which is
>>> only alive as long as the Flink job is executed. If you want to run your
>>> job on the long running YARN application, then you simply have to omit -m
>>> yarn-cluster.
>>>
>>> Cheers,
>>> Till
>>> ​
>>>
>>> On Mon, Jan 4, 2016 at 12:36 PM, Ana M. Martinez <a...@cs.aau.dk> wrote:
>>>
>>>> Hi Till,
>>>>
>>>> Sorry for the delay (Xmas break). I have activated log aggregation
>>>> on flink-conf.yaml with yarn.log-aggregation-enable: true (as I can’t find
>>>> a yarn-site.xml).
>>>> But the command yarn logs -applicationId application_1451903796996_0008
>>>> gives me the following output:
>>>>
>>>> INFO client.RMProxy: Connecting to ResourceManager at xxx
>>>> /var/log/hadoop-yarn/apps/hadoop/logs/application_1451903796996_0008does
>>>> not exist.
>>>> Log aggregation has not completed or is not enabled
>>>>
>>>>
>>>> I’ve tried to restart the Flink JobManager and TaskManagers as follows:
>>>> ./bin/yarn-session.sh -n 1 -tm 2048 -s 4
>>>> and then with a detached screen, run my application with ./bin/flink
>>>> run -m yarn-cluster ...
>>>>
>>>> I am not sure if my problem is that I am not setting the
>>>> log-aggregation-enable property well or I am not restarting the Flink
>>>> JobManager and TaskManagers as I should… Any idea?
>>>>
>>>> Thanks,
>>>> Ana
>>>>
>>>> On 18 Dec 2015, at 16:29, Till Rohrmann <trohrm...@apache.org> wrote:
>>>>
>>>> In which log file are you exactly looking for the logging statements?
>>>> And on what machine? You have to look on the machines on which the yarn
>>>> container were started. Alternatively if you have log aggregation
>>>> activated, then you can simply retrieve the log files via yarn logs.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Fri, Dec 18, 2015 at 3:49 PM, Ana M. Martinez <a...@cs.aau.dk> wrote:
>>>>
>>>>> Hi Till,
>>>>>
>>>>> Many thanks for your quick response.
>>>>>
>>>>> I have modified the WordCountExample to re-reproduce my problem in a
>>>>> simple example.
>>>>>
>>>>> I run the code below with the following command:
>>>>> ./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c
>>>>> mypackage.WordCountExample ../flinklink.jar
>>>>>
>>>>> And if I check the log file I see all logger messages except the one
>>>>> in the flatMap function of the inner LineSplitter class, which is actually
>>>>> the one I am most interested in.
>>>>>
>>>>> Is that an expected behaviour?
>>>>>
>>>>> Thanks,
>>>>> Ana
>>>>>
>>>>> import org.apache.flink.api.common.functions.FlatMapFunction;
>>>>> import org.apache.flink.api.java.DataSet;
>>>>> import org.apache.flink.api.java.ExecutionEnvironment;
>>>>> import org.apache.flink.api.java.tuple.Tuple2;
>>>>> import org.apache.flink.util.Collector;
>>>>> import org.slf4j.Logger;
>>>>> import org.slf4j.LoggerFactory;
>>>>>
>>>>> import java.io.Serializable;
>>>>> import java.util.ArrayList;
>>>>> import java.util.List;
>>>>>
>>>>> public class WordCountExample {
>>>>>     static Logger logger = 
>>>>> LoggerFactory.getLogger(WordCountExample.class);
>>>>>
>>>>>     public static void main(String[] args) throws Exception {
>>>>>         final ExecutionEnvironment env = 
>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>
>>>>>         logger.info("Entering application.");
>>>>>
>>>>>     DataSet<String> text = env.fromElements(
>>>>>                 "Who's there?",
>>>>>                 "I think I hear them. Stand, ho! Who's there?");
>>>>>
>>>>>         List<Integer> elements = new ArrayList<Integer>();
>>>>>         elements.add(0);
>>>>>
>>>>>
>>>>>         DataSet<TestClass> set = env.fromElements(new 
>>>>> TestClass(elements));
>>>>>
>>>>>         DataSet<Tuple2<String, Integer>> wordCounts = text
>>>>>                 .flatMap(new LineSplitter())
>>>>>                 .withBroadcastSet(set, "set")
>>>>>                 .groupBy(0)
>>>>>                 .sum(1);
>>>>>
>>>>>         wordCounts.print();
>>>>>
>>>>>
>>>>>     }
>>>>>
>>>>>     public static class LineSplitter implements FlatMapFunction<String, 
>>>>> Tuple2<String, Integer>> {
>>>>>
>>>>>         static Logger loggerLineSplitter = 
>>>>> LoggerFactory.getLogger(LineSplitter.class);
>>>>>
>>>>>         @Override
>>>>>         public void flatMap(String line, Collector<Tuple2<String, 
>>>>> Integer>> out) {
>>>>>             loggerLineSplitter.info("Logger in LineSplitter.flatMap");
>>>>>             for (String word : line.split(" ")) {
>>>>>                 out.collect(new Tuple2<String, Integer>(word, 1));
>>>>>             }
>>>>>         }
>>>>>     }
>>>>>
>>>>>     public static class TestClass implements Serializable {
>>>>>         private static final long serialVersionUID = 
>>>>> -2932037991574118651L;
>>>>>
>>>>>         static Logger loggerTestClass = 
>>>>> LoggerFactory.getLogger("WordCountExample.TestClass");
>>>>>
>>>>>         List<Integer> integerList;
>>>>>         public TestClass(List<Integer> integerList){
>>>>>             this.integerList=integerList;
>>>>>             loggerTestClass.info("Logger in TestClass");
>>>>>         }
>>>>>
>>>>>
>>>>>     }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 17 Dec 2015, at 16:08, Till Rohrmann <trohrm...@apache.org> wrote:
>>>>>
>>>>> Hi Ana,
>>>>>
>>>>> you can simply modify the `log4j.properties` file in the `conf`
>>>>> directory. It should be automatically included in the Yarn application.
>>>>>
>>>>> Concerning your logging problem, it might be that you have set the
>>>>> logging level too high. Could you share the code with us?
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Thu, Dec 17, 2015 at 1:56 PM, Ana M. Martinez <a...@cs.aau.dk>
>>>>> wrote:
>>>>>
>>>>>> Hi flink community,
>>>>>>
>>>>>> I am trying to show log messages using log4j.
>>>>>> It works fine overall except for the messages I want to show in an
>>>>>> inner class that implements
>>>>>> org.apache.flink.api.common.aggregators.ConvergenceCriterion.
>>>>>> I am very new to this, but it seems that I’m having problems to show
>>>>>> the messages included in the isConverged function, as it runs in the task
>>>>>> managers?
>>>>>> E.g. the log messages in the outer class (before map-reduce
>>>>>> operations) are properly shown.
>>>>>>
>>>>>> I am also interested in providing my own log4j.properties file. I am
>>>>>> using the ./bin/flink run -m yarn-cluster on Amazon clusters.
>>>>>>
>>>>>> Thanks,
>>>>>> Ana
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>>
>>
>>
>
>

Reply via email to