Hi all,

I am running a test Flink streaming task under YARN. It reads messages from
a Kafka topic and writes them to local file system.

object PricerEvent {
    def main(args:Array[String]) {
        val kafkaProp = new Properties()
        kafkaProp.setProperty("bootstrap.servers", "localhost:6667")
        kafkaProp.setProperty("auto.offset.reset", "earliest")

        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStateBackend(new MemoryStateBackend)

        val wins = env.addSource(new
FlinkKafkaConsumer09[String]("events", new SimpleStringSchema,
kafkaProp))
        wins.writeAsText("/home/user/flink_out/" + new
SimpleDateFormat("yyyy-MM-dd_HH-mm-ss").format(new Date))

        env.execute
    }
}

​
With the command

flink run -m yarn-cluster -yn 1 -ytm 2048 -c PricerEvent
/home/user/flink-example/build/libs/flink-example-1.0-all.jar

​

The task runs fine for a moment and then terminates. I looked into the
error log and found following out-of-memory error message:

2016-07-28 22:34:40,397 INFO  org.apache.flink.yarn.YarnJobManager
                     - Container
container_e05_1467433388200_0136_01_000002 is completed with
diagnostics: Container
[pid=5832,containerID=container_e05_1467433388200_0136_01_000002] is
running beyond physical memory limits. Current usage: 2.3 GB of 2 GB
physical memory used; 6.1 GB of 4.2 GB virtual memory used. Killing
container.
Dump of the process-tree for container_e05_1467433388200_0136_01_000002 :
    |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES)
FULL_CMD_LINE
    |- 5838 5832 5832 5832 (java) 2817 481 6553391104 592779
/usr/jdk64/jdk1.8.0_60/bin/java -Xms1448m -Xmx1448m
-XX:MaxDirectMemorySize=1448m
-Dlog.file=/mnt/a/hadoop/yarn/log/application_1467433388200_0136/container_e05_1467433388200_0136_01_000002/taskmanager.log
-Dlogback.configurationFile=file:logback.xml
-Dlog4j.configuration=file:log4j.properties
org.apache.flink.yarn.YarnTaskManagerRunner --configDir .
    |- 5832 5830 5832 5832 (bash) 0 0 12759040 357 /bin/bash -c
/usr/jdk64/jdk1.8.0_60/bin/java -Xms1448m -Xmx1448m
-XX:MaxDirectMemorySize=1448m
-Dlog.file=/mnt/a/hadoop/yarn/log/application_1467433388200_0136/container_e05_1467433388200_0136_01_000002/taskmanager.log
-Dlogback.configurationFile=file:logback.xml
-Dlog4j.configuration=file:log4j.properties
org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1>
/mnt/a/hadoop/yarn/log/application_1467433388200_0136/container_e05_1467433388200_0136_01_000002/taskmanager.out
2> 
/mnt/a/hadoop/yarn/log/application_1467433388200_0136/container_e05_1467433388200_0136_01_000002/taskmanager.err

Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143

​

I don't understand how it should fail due to out-of-memory error. I would
expect the task to slow down if there are too many messages to process, but
not failing altogether. I am not storing any states either.

Does anyone know the reason and the way to fix/avoid this issue?


Thanks,
Jack

Reply via email to