Hi Max, Changing yarn-heap-cutoff-ratio works seem to suffice for the time being. Thanks for your help.
Regards, Jack On Tue, Aug 2, 2016 at 11:11 AM, Jack Huang <jackhu...@mz.com> wrote: > Hi Max, > > Is there a way to limit the JVM memory usage (something like the -Xmx > flag) for the task manager so that it won't go over the YARN limit but will > just run GC until there is memory to use? Trying to allocate "enough" > memory for this stream task is not ideal because I could have indefinitely > many messages backed-up in the source to be process. > > Thanks, > Jack > > > On Tue, Aug 2, 2016 at 5:21 AM, Maximilian Michels <m...@apache.org> wrote: > >> Your job creates a lot of String objects which need to be garbage >> collected. It could be that the JVM is not fast enough and Yarn kills >> the JVM for consuming too much memory. >> >> You can try two things: >> >> 1) Give the task manager more memory >> 2) Increase the Yarn heap cutoff ratio (e.g yarn.heap-cutoff-ratio: 0.4) >> >> If the error still occurs then we need to investigate further. >> >> Thanks, >> Max >> >> >> > >> > >> > >> > >> > On Fri, Jul 29, 2016 at 11:19 AM, Jack Huang <jackhu...@mz.com> wrote: >> >> >> >> Hi Max, >> >> >> >> Each events are only a few hundred bytes. I am reading from a Kafka >> topic >> >> from some offset in the past, so the events should be flowing in as >> fast as >> >> Flink can process them. >> >> >> >> The entire YARN task log, which contains both JobManager and >> TaskManager >> >> outputs, is attached. >> >> >> >> Thanks a lot, >> >> Jack >> >> >> >> >> >> On Fri, Jul 29, 2016 at 2:04 AM, Maximilian Michels <m...@apache.org> >> >> wrote: >> >>> >> >>> Hi Jack, >> >>> >> >>> Considering the type of job you're running, you shouldn't run out of >> >>> memory. Could it be that the events are quite large strings? It could >> >>> be that the TextOutputFormat doesn't write to disk fast enough and >> >>> accumulates memory. Actually, it doesn't perform regular flushing >> >>> which could be an issue. >> >>> >> >>> I'm just guessing, we need to investigate further. Could you please >> >>> supply the entire JobManager log file output? >> >>> >> >>> Thanks, >> >>> Max >> >>> >> >>> On Fri, Jul 29, 2016 at 12:59 AM, Jack Huang <jackhu...@mz.com> >> wrote: >> >>> > Hi all, >> >>> > >> >>> > I am running a test Flink streaming task under YARN. It reads >> messages >> >>> > from >> >>> > a Kafka topic and writes them to local file system. >> >>> > >> >>> > object PricerEvent { >> >>> > def main(args:Array[String]) { >> >>> > val kafkaProp = new Properties() >> >>> > kafkaProp.setProperty("bootstrap.servers", "localhost:6667") >> >>> > kafkaProp.setProperty("auto.offset.reset", "earliest") >> >>> > >> >>> > val env = StreamExecutionEnvironment.getExecutionEnvironment >> >>> > env.setStateBackend(new MemoryStateBackend) >> >>> > >> >>> > val wins = env.addSource(new >> >>> > FlinkKafkaConsumer09[String]("events", >> >>> > new SimpleStringSchema, kafkaProp)) >> >>> > wins.writeAsText("/home/user/flink_out/" + new >> >>> > SimpleDateFormat("yyyy-MM-dd_HH-mm-ss").format(new Date)) >> >>> > >> >>> > env.execute >> >>> > } >> >>> > } >> >>> > >> >>> > With the command >> >>> > >> >>> > flink run -m yarn-cluster -yn 1 -ytm 2048 -c PricerEvent >> >>> > /home/user/flink-example/build/libs/flink-example-1.0-all.jar >> >>> > >> >>> > >> >>> > The task runs fine for a moment and then terminates. I looked into >> the >> >>> > error >> >>> > log and found following out-of-memory error message: >> >>> > >> >>> > 2016-07-28 22:34:40,397 INFO org.apache.flink.yarn.YarnJobManager >> >>> > - Container container_e05_1467433388200_0136_01_000002 is completed >> >>> > with >> >>> > diagnostics: Container >> >>> > [pid=5832,containerID=container_e05_1467433388200_0136_01_000002] is >> >>> > running >> >>> > beyond physical memory limits. Current usage: 2.3 GB of 2 GB >> physical >> >>> > memory >> >>> > used; 6.1 GB of 4.2 GB virtual memory used. Killing container. >> >>> > Dump of the process-tree for >> container_e05_1467433388200_0136_01_000002 >> >>> > : >> >>> > |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) >> >>> > SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) >> FULL_CMD_LINE >> >>> > |- 5838 5832 5832 5832 (java) 2817 481 6553391104 592779 >> >>> > /usr/jdk64/jdk1.8.0_60/bin/java -Xms1448m -Xmx1448m >> >>> > -XX:MaxDirectMemorySize=1448m >> >>> > >> >>> > >> -Dlog.file=/mnt/a/hadoop/yarn/log/application_1467433388200_0136/container_e05_1467433388200_0136_01_000002/taskmanager.log >> >>> > -Dlogback.configurationFile=file:logback.xml >> >>> > -Dlog4j.configuration=file:log4j.properties >> >>> > org.apache.flink.yarn.YarnTaskManagerRunner --configDir . >> >>> > |- 5832 5830 5832 5832 (bash) 0 0 12759040 357 /bin/bash -c >> >>> > /usr/jdk64/jdk1.8.0_60/bin/java -Xms1448m -Xmx1448m >> >>> > -XX:MaxDirectMemorySize=1448m >> >>> > >> >>> > >> -Dlog.file=/mnt/a/hadoop/yarn/log/application_1467433388200_0136/container_e05_1467433388200_0136_01_000002/taskmanager.log >> >>> > -Dlogback.configurationFile=file:logback.xml >> >>> > -Dlog4j.configuration=file:log4j.properties >> >>> > org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1> >> >>> > >> >>> > >> /mnt/a/hadoop/yarn/log/application_1467433388200_0136/container_e05_1467433388200_0136_01_000002/taskmanager.out >> >>> > 2> >> >>> > >> >>> > >> /mnt/a/hadoop/yarn/log/application_1467433388200_0136/container_e05_1467433388200_0136_01_000002/taskmanager.err >> >>> > >> >>> > Container killed on request. Exit code is 143 >> >>> > Container exited with a non-zero exit code 143 >> >>> > >> >>> > >> >>> > I don't understand how it should fail due to out-of-memory error. I >> >>> > would >> >>> > expect the task to slow down if there are too many messages to >> process, >> >>> > but >> >>> > not failing altogether. I am not storing any states either. >> >>> > >> >>> > Does anyone know the reason and the way to fix/avoid this issue? >> >>> > >> >>> > >> >>> > Thanks, >> >>> > Jack >> >>> > >> >>> > >> >>> > >> >>> > >> >> >> >> >> > >> > >