Hi all, I am running a test Flink streaming task under YARN. It reads messages from a Kafka topic and writes them to local file system.
object PricerEvent { def main(args:Array[String]) { val kafkaProp = new Properties() kafkaProp.setProperty("bootstrap.servers", "localhost:6667") kafkaProp.setProperty("auto.offset.reset", "earliest") val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(new MemoryStateBackend) val wins = env.addSource(new FlinkKafkaConsumer09[String]("events", new SimpleStringSchema, kafkaProp)) wins.writeAsText("/home/user/flink_out/" + new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss").format(new Date)) env.execute } } With the command flink run -m yarn-cluster -yn 1 -ytm 2048 -c PricerEvent /home/user/flink-example/build/libs/flink-example-1.0-all.jar The task runs fine for a moment and then terminates. I looked into the error log and found following out-of-memory error message: 2016-07-28 22:34:40,397 INFO org.apache.flink.yarn.YarnJobManager - Container container_e05_1467433388200_0136_01_000002 is completed with diagnostics: Container [pid=5832,containerID=container_e05_1467433388200_0136_01_000002] is running beyond physical memory limits. Current usage: 2.3 GB of 2 GB physical memory used; 6.1 GB of 4.2 GB virtual memory used. Killing container. Dump of the process-tree for container_e05_1467433388200_0136_01_000002 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 5838 5832 5832 5832 (java) 2817 481 6553391104 592779 /usr/jdk64/jdk1.8.0_60/bin/java -Xms1448m -Xmx1448m -XX:MaxDirectMemorySize=1448m -Dlog.file=/mnt/a/hadoop/yarn/log/application_1467433388200_0136/container_e05_1467433388200_0136_01_000002/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . |- 5832 5830 5832 5832 (bash) 0 0 12759040 357 /bin/bash -c /usr/jdk64/jdk1.8.0_60/bin/java -Xms1448m -Xmx1448m -XX:MaxDirectMemorySize=1448m -Dlog.file=/mnt/a/hadoop/yarn/log/application_1467433388200_0136/container_e05_1467433388200_0136_01_000002/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1> /mnt/a/hadoop/yarn/log/application_1467433388200_0136/container_e05_1467433388200_0136_01_000002/taskmanager.out 2> /mnt/a/hadoop/yarn/log/application_1467433388200_0136/container_e05_1467433388200_0136_01_000002/taskmanager.err Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143 I don't understand how it should fail due to out-of-memory error. I would expect the task to slow down if there are too many messages to process, but not failing altogether. I am not storing any states either. Does anyone know the reason and the way to fix/avoid this issue? Thanks, Jack