You have to restart the yarn cluster to let your changes take effect. You can do that via HADOOP_HOME/sbin/stop-yarn.sh; HADOOP_HOME/sbin/start-yarn.sh.
The commands yarn-session.sh ... and bin/flink run -m yarn cluster start a new yarn application within the yarn cluster. Cheers, Till On Mon, Jan 11, 2016 at 1:39 PM, Ana M. Martinez <a...@cs.aau.dk> wrote: > Hi Till, > > Thanks for your help. I have checked both in Yarn’s web interface and > through command line and it seems that there are not occupied containers. > > Additionally, I have checked the configuration values in the web interface > and even though I have changed the log.aggregation property in the > yarn-site.xml file to true, it appears as false and with the following > source label: > <property> > <name>yarn.log-aggregation-enable</name> > <value>false</value> > <source>java.io.BufferedInputStream@3c407114</source> > </property> > > I am not sure if that is relevant. I had assumed that the "./bin/flink run > -m yarn-cluster" command is starting a yarn session and thus reloading the > yarn-site file. Is that right? If I am wrong here, then, how can I restart > it so that the modifications in the yarn-site.xml file are considered? (I > have also tried with ./bin/yarn-session.sh and then ./bin/flink run without > success…). > > I am not sure if this is related to flink anymore, should I move my > problem to the yarn community instead? > > Thanks, > Ana > > On 11 Jan 2016, at 10:37, Till Rohrmann <trohrm...@apache.org> wrote: > > Hi Ana, > > good to hear that you found the logging statements. You can check in > Yarn’s web interface whether there are still occupied containers. > Alternatively you can go to the different machines and run jps which > lists you the running Java processes. If you see an ApplicationMaster or > YarnTaskManagerRunner process, then there is still a container running > with Flink on this machine. I hope this helps you. > > Cheers, > Till > > > On Mon, Jan 11, 2016 at 9:37 AM, Ana M. Martinez <a...@cs.aau.dk> wrote: > >> Hi Till, >> >> Thanks for that! I can see the "Logger in LineSplitter.flatMap” output if >> I retrieve the task manager logs manually >> (under /var/log/hadoop-yarn/containers/application_X/…). However that >> solution is not ideal when for instance I am using 32 machines for my >> mapReduce operations. >> >> I would like to know why Yarn’s log aggregation is not working. Can you >> tell me how to check if there are some Yarn containers running after the >> Flink job has finished? I have tried: >> hadoop job -list >> but I cannot see any jobs there, although I am not sure that it means >> that there are not containers running... >> >> Thanks, >> Ana >> >> On 08 Jan 2016, at 16:24, Till Rohrmann <trohrm...@apache.org> wrote: >> >> You’re right that the log statements of the LineSplitter are in the logs >> of the cluster nodes, because that’s where the LineSplitter code is >> executed. In contrast, you create a TestClass on the client when you >> submit the program. Therefore, you see the logging statement “Logger in >> TestClass” on the command line or in the cli log file. >> >> So I would assume that the problem is Yarn’s log aggregation. Either your >> configuration is not correct or there are still some Yarn containers >> running after the Flink job has finished. Yarn will only show you the logs >> after all containers are terminated. Maybe you could check that. >> Alternatively, you can try to retrieve the taskmanager logs manually by >> going to the machine where your yarn container was executed. Then under >> hadoop/logs/userlogs you should find somewhere the logs. >> >> Cheers, >> Till >> >> >> On Fri, Jan 8, 2016 at 3:50 PM, Ana M. Martinez <a...@cs.aau.dk> wrote: >> >>> Thanks for the tip Robert! It was a good idea to rule out other possible >>> causes, but I am afraid that is not the problem. If we stick to the >>> WordCountExample (for simplicity), the Exception is thrown if placed into >>> the flatMap function. >>> >>> I am going to try to re-write my problem and all the settings below: >>> >>> When I try to aggregate all logs: >>> $yarn logs -applicationId application_1452250761414_0005 >>> >>> the following message is retrieved: >>> 16/01/08 13:32:37 INFO client.RMProxy: Connecting to ResourceManager at >>> ip-172-31-33-221.us-west-2.compute.internal/172.31.33.221:8032 >>> /var/log/hadoop-yarn/apps/hadoop/logs/application_1452250761414_0005does >>> not exist. >>> Log aggregation has not completed or is not enabled. >>> >>> (Tried the same command a few minutes later and got the same message, so >>> might it be that log aggregation is not properly enabled??) >>> >>> I am going to carefully enumerate all the steps I have followed (and >>> settings) to see if someone can identify why the Logger messages from CORE >>> nodes (in an Amazon cluster) are not shown. >>> >>> 1) Enable yarn.log-aggregation-enable property to true >>> in /etc/alternatives/hadoop-conf/yarn-site.xml. >>> >>> 2) Include log messages in my WordCountExample as follows: >>> >>> import org.apache.flink.api.common.functions.FlatMapFunction; >>> import org.apache.flink.api.java.DataSet; >>> import org.apache.flink.api.java.ExecutionEnvironment; >>> import org.apache.flink.api.java.tuple.Tuple2; >>> import org.apache.flink.core.fs.FileSystem; >>> import org.apache.flink.util.Collector; >>> import org.slf4j.Logger; >>> import org.slf4j.LoggerFactory; >>> >>> import java.io.Serializable; >>> import java.util.ArrayList; >>> import java.util.List; >>> >>> >>> public class WordCountExample { >>> static Logger logger = LoggerFactory.getLogger(WordCountExample.class); >>> >>> public static void main(String[] args) throws Exception { >>> final ExecutionEnvironment env = >>> ExecutionEnvironment.getExecutionEnvironment(); >>> >>> logger.info("Entering application."); >>> >>> DataSet<String> text = env.fromElements( >>> "Who's there?", >>> "I think I hear them. Stand, ho! Who's there?"); >>> >>> List<Integer> elements = new ArrayList<Integer>(); >>> elements.add(0); >>> >>> >>> DataSet<TestClass> set = env.fromElements(new TestClass(elements)); >>> >>> DataSet<Tuple2<String, Integer>> wordCounts = text >>> .flatMap(new LineSplitter()) >>> .withBroadcastSet(set, "set") >>> .groupBy(0) >>> .sum(1); >>> >>> wordCounts.writeAsText(*“*output.txt", >>> FileSystem.WriteMode.OVERWRITE); >>> >>> >>> } >>> >>> public static class LineSplitter implements FlatMapFunction<String, >>> Tuple2<String, Integer>> { >>> >>> static Logger loggerLineSplitter = >>> LoggerFactory.getLogger(LineSplitter.class); >>> >>> @Override >>> public void flatMap(String line, Collector<Tuple2<String, Integer>> >>> out) { >>> loggerLineSplitter.info("Logger in LineSplitter.flatMap"); >>> >>> for (String word : line.split(" ")) { >>> out.collect(new Tuple2<String, Integer>(word, >>> 1)); >>> //throw new RuntimeException("LineSplitter >>> class called"); >>> } >>> >>> } >>> } >>> >>> public static class TestClass implements Serializable { >>> private static final long serialVersionUID = -2932037991574118651L; >>> >>> static Logger loggerTestClass = >>> LoggerFactory.getLogger("TestClass.class"); >>> >>> List<Integer> integerList; >>> public TestClass(List<Integer> integerList){ >>> this.integerList=integerList; >>> loggerTestClass.info("Logger in TestClass"); >>> } >>> >>> >>> } >>> } >>> >>> 3) Start a yarn-cluster and execute my program with the following >>> command: >>> >>> $./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c >>> eu.amidst.flinklink.examples.WordCountExample ../flinklink.jar >>> >>> >>> 4) The output in the log folder is as follows: >>> >>> 13:31:04,945 INFO org.apache.flink.client.CliFrontend >>> - >>> -------------------------------------------------------------------------------- >>> 13:31:04,947 INFO org.apache.flink.client.CliFrontend >>> - Starting Command Line Client (Version: 0.10.0, Rev:ab2cca4, >>> Date:10.11.2015 @ 13:50:14 UTC) >>> 13:31:04,947 INFO org.apache.flink.client.CliFrontend >>> - Current user: hadoop >>> 13:31:04,947 INFO org.apache.flink.client.CliFrontend >>> - JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - >>> 1.8/25.65-b01 >>> 13:31:04,947 INFO org.apache.flink.client.CliFrontend >>> - Maximum heap size: 3344 MiBytes >>> 13:31:04,947 INFO org.apache.flink.client.CliFrontend >>> - JAVA_HOME: /etc/alternatives/jre >>> 13:31:04,950 INFO org.apache.flink.client.CliFrontend >>> - Hadoop version: 2.6.0 >>> 13:31:04,950 INFO org.apache.flink.client.CliFrontend >>> - JVM Options: >>> 13:31:04,950 INFO org.apache.flink.client.CliFrontend >>> - >>> -Dlog.file=/home/hadoop/flink-0.10.0/log/flink-hadoop-client-ip-172-31-33-221.log >>> 13:31:04,950 INFO org.apache.flink.client.CliFrontend >>> - >>> -Dlog4j.configuration=file:/home/hadoop/flink-0.10.0/conf/log4j-cli.properties >>> 13:31:04,950 INFO org.apache.flink.client.CliFrontend >>> - >>> -Dlogback.configurationFile=file:/home/hadoop/flink-0.10.0/conf/logback.xml >>> 13:31:04,951 INFO org.apache.flink.client.CliFrontend >>> - Program Arguments: >>> 13:31:04,951 INFO org.apache.flink.client.CliFrontend >>> - run >>> 13:31:04,951 INFO org.apache.flink.client.CliFrontend >>> - -m >>> 13:31:04,951 INFO org.apache.flink.client.CliFrontend >>> - yarn-cluster >>> 13:31:04,951 INFO org.apache.flink.client.CliFrontend >>> - -yn >>> 13:31:04,951 INFO org.apache.flink.client.CliFrontend >>> - 1 >>> 13:31:04,951 INFO org.apache.flink.client.CliFrontend >>> - -ys >>> 13:31:04,951 INFO org.apache.flink.client.CliFrontend >>> - 4 >>> 13:31:04,951 INFO org.apache.flink.client.CliFrontend >>> - -yjm >>> 13:31:04,951 INFO org.apache.flink.client.CliFrontend >>> - 1024 >>> 13:31:04,951 INFO org.apache.flink.client.CliFrontend >>> - -ytm >>> 13:31:04,951 INFO org.apache.flink.client.CliFrontend >>> - 1024 >>> 13:31:04,952 INFO org.apache.flink.client.CliFrontend >>> - -c >>> 13:31:04,952 INFO org.apache.flink.client.CliFrontend >>> - eu.amidst.flinklink.examples.WordCountExample >>> 13:31:04,952 INFO org.apache.flink.client.CliFrontend >>> - ../flinklink.jar >>> 13:31:04,952 INFO org.apache.flink.client.CliFrontend >>> - >>> -------------------------------------------------------------------------------- >>> 13:31:04,954 INFO org.apache.flink.client.CliFrontend >>> - Using configuration directory /home/hadoop/flink-0.10.0/conf >>> 13:31:04,954 INFO org.apache.flink.client.CliFrontend >>> - Trying to load configuration file >>> 13:31:05,193 INFO org.apache.flink.client.CliFrontend >>> - Running 'run' command. >>> 13:31:05,201 INFO org.apache.flink.client.CliFrontend >>> - Building program from JAR file >>> 13:31:05,326 INFO org.apache.flink.client.CliFrontend >>> - YARN cluster mode detected. Switching Log4j output to console >>> 13:31:05,385 INFO org.apache.hadoop.yarn.client.RMProxy >>> - Connecting to ResourceManager at ip-172-31-33-221.us >>> -west-2.compute.internal/172.31.33.221:8032 >>> 13:31:05,534 INFO org.apache.flink.client.FlinkYarnSessionCli >>> - No path for the flink jar passed. Using the location of class >>> org.apache.flink.yarn.FlinkYarnClient to locate the jar >>> 13:31:05,545 INFO org.apache.flink.yarn.FlinkYarnClient >>> - Using values: >>> 13:31:05,547 INFO org.apache.flink.yarn.FlinkYarnClient >>> - TaskManager count = 1 >>> 13:31:05,547 INFO org.apache.flink.yarn.FlinkYarnClient >>> - JobManager memory = 1024 >>> 13:31:05,547 INFO org.apache.flink.yarn.FlinkYarnClient >>> - TaskManager memory = 1024 >>> 13:31:06,112 INFO org.apache.flink.yarn.Utils >>> - Copying from >>> file:/home/hadoop/flink-0.10.0/lib/flink-dist-0.10.0.jar to >>> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/flink-dist-0.10.0.jar >>> 13:31:06,843 INFO org.apache.flink.yarn.Utils >>> - Copying from /home/hadoop/flink-0.10.0/conf/flink-conf.yaml to >>> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/flink-conf.yaml >>> 13:31:06,857 INFO org.apache.flink.yarn.Utils >>> - Copying from file:/home/hadoop/flink-0.10.0/conf/logback.xml to >>> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/logback.xml >>> 13:31:06,869 INFO org.apache.flink.yarn.Utils >>> - Copying from file:/home/hadoop/flink-0.10.0/conf/log4j.properties >>> to >>> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005/log4j.properties >>> 13:31:06,892 INFO org.apache.flink.yarn.FlinkYarnClient >>> - Submitting application master application_1452250761414_0005 >>> 13:31:06,917 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl >>> - Submitted application application_1452250761414_0005 >>> 13:31:06,917 INFO org.apache.flink.yarn.FlinkYarnClient >>> - Waiting for the cluster to be allocated >>> 13:31:06,919 INFO org.apache.flink.yarn.FlinkYarnClient >>> - Deploying cluster, current state ACCEPTED >>> 13:31:07,920 INFO org.apache.flink.yarn.FlinkYarnClient >>> - Deploying cluster, current state ACCEPTED >>> 13:31:08,922 INFO org.apache.flink.yarn.FlinkYarnClient >>> - Deploying cluster, current state ACCEPTED >>> 13:31:09,924 INFO org.apache.flink.yarn.FlinkYarnClient >>> - Deploying cluster, current state ACCEPTED >>> 13:31:10,925 INFO org.apache.flink.yarn.FlinkYarnClient >>> - YARN application has been deployed successfully. >>> 13:31:10,929 INFO org.apache.flink.yarn.FlinkYarnCluster >>> - Start actor system. >>> 13:31:11,412 INFO akka.event.slf4j.Slf4jLogger >>> - Slf4jLogger started >>> 13:31:11,472 INFO Remoting >>> - Starting remoting >>> 13:31:11,698 INFO Remoting >>> - Remoting started; listening on addresses :[ >>> akka.tcp://flink@172.31.33.221:39464] >>> 13:31:11,733 INFO org.apache.flink.yarn.FlinkYarnCluster >>> - Start application client. >>> 13:31:11,737 INFO org.apache.flink.client.CliFrontend >>> - YARN cluster started >>> 13:31:11,737 INFO org.apache.flink.client.CliFrontend >>> - JobManager web interface address >>> http://ip-172-31-33-221.us-west-2.compute.internal:20888/proxy/application_1452250761414_0005/ >>> 13:31:11,737 INFO org.apache.flink.client.CliFrontend >>> - Waiting until all TaskManagers have connected >>> 13:31:11,748 INFO org.apache.flink.yarn.ApplicationClient >>> - Notification about new leader address >>> akka.tcp://flink@172.31.45.98:46965/user/jobmanager with session ID >>> null. >>> 13:31:11,752 INFO org.apache.flink.client.CliFrontend >>> - No status updates from the YARN cluster received so far. Waiting >>> ... >>> 13:31:11,752 INFO org.apache.flink.yarn.ApplicationClient >>> - Received address of new leader >>> akka.tcp://flink@172.31.45.98:46965/user/jobmanager with session ID >>> null. >>> 13:31:11,753 INFO org.apache.flink.yarn.ApplicationClient >>> - Disconnect from JobManager null. >>> 13:31:11,757 INFO org.apache.flink.yarn.ApplicationClient >>> - Trying to register at JobManager >>> akka.tcp://flink@172.31.45.98:46965/user/jobmanager. >>> 13:31:12,040 INFO org.apache.flink.yarn.ApplicationClient >>> - Successfully registered at the JobManager Actor[ >>> akka.tcp://flink@172.31.45.98:46965/user/jobmanager#1549383782] >>> 13:31:12,253 INFO org.apache.flink.client.CliFrontend >>> - TaskManager status (0/1) >>> 13:31:12,753 INFO org.apache.flink.client.CliFrontend >>> - TaskManager status (0/1) >>> 13:31:13,254 INFO org.apache.flink.client.CliFrontend >>> - TaskManager status (0/1) >>> 13:31:13,755 INFO org.apache.flink.client.CliFrontend >>> - TaskManager status (0/1) >>> 13:31:14,255 INFO org.apache.flink.client.CliFrontend >>> - TaskManager status (0/1) >>> 13:31:14,756 INFO org.apache.flink.client.CliFrontend >>> - TaskManager status (0/1) >>> 13:31:15,257 INFO org.apache.flink.client.CliFrontend >>> - TaskManager status (0/1) >>> 13:31:15,758 INFO org.apache.flink.client.CliFrontend >>> - TaskManager status (0/1) >>> 13:31:16,258 INFO org.apache.flink.client.CliFrontend >>> - All TaskManagers are connected >>> 13:31:16,264 INFO org.apache.flink.client.program.Client >>> - Starting client actor system >>> 13:31:16,265 INFO org.apache.flink.runtime.client.JobClient >>> - Starting JobClient actor system >>> 13:31:16,283 INFO akka.event.slf4j.Slf4jLogger >>> - Slf4jLogger started >>> 13:31:16,288 INFO Remoting >>> - Starting remoting >>> 13:31:16,301 INFO Remoting >>> - Remoting started; listening on addresses :[ >>> akka.tcp://flink@127.0.0.1:45919] >>> 13:31:16,302 INFO org.apache.flink.runtime.client.JobClient >>> - Started JobClient actor system at 127.0.0.1:45919 >>> 13:31:16,302 INFO org.apache.flink.client.CliFrontend >>> - Using the parallelism provided by the remote cluster (4). To use >>> another parallelism, set it at the ./bin/flink client. >>> 13:31:16,302 INFO org.apache.flink.client.CliFrontend >>> - Starting execution of program >>> 13:31:16,303 INFO org.apache.flink.client.program.Client >>> - Starting program in interactive mode >>> 13:31:16,313 INFO eu.amidst.flinklink.examples.WordCountExample >>> - Entering application. >>> 13:31:16,342 INFO TestClass.class >>> - Logger in TestClass >>> 13:31:16,346 INFO org.apache.flink.api.java.typeutils.TypeExtractor >>> - class eu.amidst.flinklink.examples.WordCountExample$TestClass is >>> not a valid POJO type >>> 13:31:16,376 INFO org.apache.flink.client.CliFrontend >>> - Program execution finished >>> 13:31:16,384 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator >>> - Shutting down remote daemon. >>> 13:31:16,386 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator >>> - Remote daemon shut down; proceeding with flushing remote >>> transports. >>> 13:31:16,408 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator >>> - Remoting shut down. >>> 13:31:16,431 INFO org.apache.flink.client.CliFrontend >>> - Shutting down YARN cluster >>> 13:31:16,431 INFO org.apache.flink.yarn.FlinkYarnCluster >>> - Sending shutdown request to the Application Master >>> 13:31:16,432 INFO org.apache.flink.yarn.ApplicationClient >>> - Sending StopYarnSession request to ApplicationMaster. >>> 13:31:16,568 INFO org.apache.flink.yarn.ApplicationClient >>> - Remote JobManager has been stopped successfully. Stopping local >>> application client >>> 13:31:16,570 INFO org.apache.flink.yarn.ApplicationClient >>> - Stopped Application client. >>> 13:31:16,570 INFO org.apache.flink.yarn.ApplicationClient >>> - Disconnect from JobManager Actor[ >>> akka.tcp://flink@172.31.45.98:46965/user/jobmanager#1549383782]. >>> 13:31:16,573 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator >>> - Shutting down remote daemon. >>> 13:31:16,573 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator >>> - Remote daemon shut down; proceeding with flushing remote >>> transports. >>> 13:31:16,584 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator >>> - Remoting shut down. >>> 13:31:16,595 INFO org.apache.flink.yarn.FlinkYarnCluster >>> - Deleting files in >>> hdfs://ip-172-31-33-221.us-west-2.compute.internal:8020/user/hadoop/.flink/application_1452250761414_0005 >>> 13:31:16,596 INFO org.apache.flink.yarn.FlinkYarnCluster >>> - Application application_1452250761414_0005 finished with state >>> FINISHED and final state SUCCEEDED at 1452259876445 >>> 13:31:16,747 INFO org.apache.flink.yarn.FlinkYarnCluster >>> - YARN Client is shutting down >>> >>> >>> You can see the log messages from the WordCountExample and TestClass >>> classes. But I have problems to show the logger message (INFO) in the >>> LineSplitter class. Presumably, because it is executed in the CORE nodes >>> and node in the MASTER node (it all runs well in my local computer). >>> >>> Any tips? >>> Ana >>> >>> >>> On 06 Jan 2016, at 15:58, Ana M. Martinez <a...@cs.aau.dk> wrote: >>> >>> Hi Till, >>> >>> I am afraid it does not work in any case. >>> >>> I am following the steps you indicate on your websites (for yarn >>> configuration and loggers with slf4j): >>> >>> 1) Enable log aggregation in yarn-site: >>> >>> https://ci.apache.org/projects/flink/flink-docs-release-0.7/yarn_setup.html#log-files >>> >>> 2) Include Loggers as indicated here (see WordCountExample below): >>> >>> https://ci.apache.org/projects/flink/flink-docs-release-0.7/internal_logging.html >>> >>> But I cannot get the log messages that run in the map functions. Am I >>> missing something? >>> >>> Thanks, >>> Ana >>> >>> On 04 Jan 2016, at 14:00, Till Rohrmann <trohrm...@apache.org> wrote: >>> >>> I think the YARN application has to be finished in order for the logs to >>> be accessible. >>> >>> Judging from you commands, you’re starting a long running YARN >>> application running Flink with ./bin/yarn-session.sh -n 1 -tm 2048 -s 4. >>> This cluster won’t be used though, because you’re executing your job with >>> ./bin/flink >>> run -m yarn-cluster which will start another YARN application which is >>> only alive as long as the Flink job is executed. If you want to run your >>> job on the long running YARN application, then you simply have to omit -m >>> yarn-cluster. >>> >>> Cheers, >>> Till >>> >>> >>> On Mon, Jan 4, 2016 at 12:36 PM, Ana M. Martinez <a...@cs.aau.dk> wrote: >>> >>>> Hi Till, >>>> >>>> Sorry for the delay (Xmas break). I have activated log aggregation >>>> on flink-conf.yaml with yarn.log-aggregation-enable: true (as I can’t find >>>> a yarn-site.xml). >>>> But the command yarn logs -applicationId application_1451903796996_0008 >>>> gives me the following output: >>>> >>>> INFO client.RMProxy: Connecting to ResourceManager at xxx >>>> /var/log/hadoop-yarn/apps/hadoop/logs/application_1451903796996_0008does >>>> not exist. >>>> Log aggregation has not completed or is not enabled >>>> >>>> >>>> I’ve tried to restart the Flink JobManager and TaskManagers as follows: >>>> ./bin/yarn-session.sh -n 1 -tm 2048 -s 4 >>>> and then with a detached screen, run my application with ./bin/flink >>>> run -m yarn-cluster ... >>>> >>>> I am not sure if my problem is that I am not setting the >>>> log-aggregation-enable property well or I am not restarting the Flink >>>> JobManager and TaskManagers as I should… Any idea? >>>> >>>> Thanks, >>>> Ana >>>> >>>> On 18 Dec 2015, at 16:29, Till Rohrmann <trohrm...@apache.org> wrote: >>>> >>>> In which log file are you exactly looking for the logging statements? >>>> And on what machine? You have to look on the machines on which the yarn >>>> container were started. Alternatively if you have log aggregation >>>> activated, then you can simply retrieve the log files via yarn logs. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Fri, Dec 18, 2015 at 3:49 PM, Ana M. Martinez <a...@cs.aau.dk> wrote: >>>> >>>>> Hi Till, >>>>> >>>>> Many thanks for your quick response. >>>>> >>>>> I have modified the WordCountExample to re-reproduce my problem in a >>>>> simple example. >>>>> >>>>> I run the code below with the following command: >>>>> ./bin/flink run -m yarn-cluster -yn 1 -ys 4 -yjm 1024 -ytm 1024 -c >>>>> mypackage.WordCountExample ../flinklink.jar >>>>> >>>>> And if I check the log file I see all logger messages except the one >>>>> in the flatMap function of the inner LineSplitter class, which is actually >>>>> the one I am most interested in. >>>>> >>>>> Is that an expected behaviour? >>>>> >>>>> Thanks, >>>>> Ana >>>>> >>>>> import org.apache.flink.api.common.functions.FlatMapFunction; >>>>> import org.apache.flink.api.java.DataSet; >>>>> import org.apache.flink.api.java.ExecutionEnvironment; >>>>> import org.apache.flink.api.java.tuple.Tuple2; >>>>> import org.apache.flink.util.Collector; >>>>> import org.slf4j.Logger; >>>>> import org.slf4j.LoggerFactory; >>>>> >>>>> import java.io.Serializable; >>>>> import java.util.ArrayList; >>>>> import java.util.List; >>>>> >>>>> public class WordCountExample { >>>>> static Logger logger = >>>>> LoggerFactory.getLogger(WordCountExample.class); >>>>> >>>>> public static void main(String[] args) throws Exception { >>>>> final ExecutionEnvironment env = >>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>> >>>>> logger.info("Entering application."); >>>>> >>>>> DataSet<String> text = env.fromElements( >>>>> "Who's there?", >>>>> "I think I hear them. Stand, ho! Who's there?"); >>>>> >>>>> List<Integer> elements = new ArrayList<Integer>(); >>>>> elements.add(0); >>>>> >>>>> >>>>> DataSet<TestClass> set = env.fromElements(new >>>>> TestClass(elements)); >>>>> >>>>> DataSet<Tuple2<String, Integer>> wordCounts = text >>>>> .flatMap(new LineSplitter()) >>>>> .withBroadcastSet(set, "set") >>>>> .groupBy(0) >>>>> .sum(1); >>>>> >>>>> wordCounts.print(); >>>>> >>>>> >>>>> } >>>>> >>>>> public static class LineSplitter implements FlatMapFunction<String, >>>>> Tuple2<String, Integer>> { >>>>> >>>>> static Logger loggerLineSplitter = >>>>> LoggerFactory.getLogger(LineSplitter.class); >>>>> >>>>> @Override >>>>> public void flatMap(String line, Collector<Tuple2<String, >>>>> Integer>> out) { >>>>> loggerLineSplitter.info("Logger in LineSplitter.flatMap"); >>>>> for (String word : line.split(" ")) { >>>>> out.collect(new Tuple2<String, Integer>(word, 1)); >>>>> } >>>>> } >>>>> } >>>>> >>>>> public static class TestClass implements Serializable { >>>>> private static final long serialVersionUID = >>>>> -2932037991574118651L; >>>>> >>>>> static Logger loggerTestClass = >>>>> LoggerFactory.getLogger("WordCountExample.TestClass"); >>>>> >>>>> List<Integer> integerList; >>>>> public TestClass(List<Integer> integerList){ >>>>> this.integerList=integerList; >>>>> loggerTestClass.info("Logger in TestClass"); >>>>> } >>>>> >>>>> >>>>> } >>>>> } >>>>> >>>>> >>>>> >>>>> >>>>> On 17 Dec 2015, at 16:08, Till Rohrmann <trohrm...@apache.org> wrote: >>>>> >>>>> Hi Ana, >>>>> >>>>> you can simply modify the `log4j.properties` file in the `conf` >>>>> directory. It should be automatically included in the Yarn application. >>>>> >>>>> Concerning your logging problem, it might be that you have set the >>>>> logging level too high. Could you share the code with us? >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Thu, Dec 17, 2015 at 1:56 PM, Ana M. Martinez <a...@cs.aau.dk> >>>>> wrote: >>>>> >>>>>> Hi flink community, >>>>>> >>>>>> I am trying to show log messages using log4j. >>>>>> It works fine overall except for the messages I want to show in an >>>>>> inner class that implements >>>>>> org.apache.flink.api.common.aggregators.ConvergenceCriterion. >>>>>> I am very new to this, but it seems that I’m having problems to show >>>>>> the messages included in the isConverged function, as it runs in the task >>>>>> managers? >>>>>> E.g. the log messages in the outer class (before map-reduce >>>>>> operations) are properly shown. >>>>>> >>>>>> I am also interested in providing my own log4j.properties file. I am >>>>>> using the ./bin/flink run -m yarn-cluster on Amazon clusters. >>>>>> >>>>>> Thanks, >>>>>> Ana >>>>> >>>>> >>>>> >>>>> >>>> >>>> >>> >>> >>> >> >> > >