It happens irrespective of whether there is traffic or no traffic on the kafka topic. Also, there is no clue i could see in the heap space. The heap looks healthy and stable. Its something off heap which is constantly growing. I also checked the JNI reference count from the dumps which appear stable (its constantly getting GCed) and tried to limit the size of meatspace and direct memory using following
--conf spark.driver.extraJavaOptions="-XX:MaxMetaspaceSize=128M -XX:MaxDirectMemorySize=128M" \ but with no success. Thanks for offering help Regards, Apoorva > On 14-Jul-2015, at 12:43 am, Cody Koeninger <c...@koeninger.org> wrote: > > Does the issue only happen when you have no traffic on the topic? > > Have you profiled to see what's using heap space? > > > On Mon, Jul 13, 2015 at 1:05 PM, Apoorva Sareen <apoorva.sar...@gmail.com > <mailto:apoorva.sar...@gmail.com>> wrote: > Hi, > > I am running spark streaming 1.4.0 on Yarn (Apache distribution 2.6.0) with > java 1.8.0_45 and also Kafka direct stream. I am also using spark with scala > 2.11 support. > > The issue I am seeing is that both driver and executor containers are > gradually increasing the physical memory usage till a point where yarn > container kill it. I have configured upto 192M Heap and 384 off heap space in > my driver but it eventually runs out of it > > The Heap memory appears to be fine with regular GC cycles. There is no > OutOffMemory encountered ever in any such runs > > Infact I am not generating any traffic on the kafka queues still this > happens. Here is the code I am using > > object SimpleSparkStreaming extends App { > > val conf = new SparkConf() > val ssc = new > StreamingContext(conf,Seconds(conf.getLong("spark.batch.window.size",1L))); > ssc.checkpoint("checkpoint") > val topics = Set(conf.get("spark.kafka.topic.name > <http://spark.kafka.topic.name/>")); > val kafkaParams = Map[String, String]("metadata.broker.list" -> > conf.get("spark.kafka.broker.list")) > val kafkaStream = > KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, > kafkaParams, topics) > kafkaStream.foreachRDD(rdd => { > rdd.foreach(x => { > println(x._2) > }) > > }) > kafkaStream.print() > ssc.start() > > ssc.awaitTermination() > > } > I am running this on CentOS 7. The command used for spark submit is following > > ./bin/spark-submit --class > com.rasa.cloud.prototype.spark.SimpleSparkStreaming \ > --conf spark.yarn.executor.memoryOverhead=256 \ > --conf spark.yarn.driver.memoryOverhead=384 \ > --conf spark.kafka.topic.name <http://spark.kafka.topic.name/>=test \ > --conf spark.kafka.broker.list=172.31.45.218:9092 > <http://172.31.45.218:9092/> \ > --conf spark.batch.window.size=1 \ > --conf spark.app.name <http://spark.app.name/>="Simple Spark Kafka > application" \ > --master yarn-cluster \ > --num-executors 1 \ > --driver-memory 192m \ > --executor-memory 128m \ > --executor-cores 1 \ > /home/centos/spark-poc/target/lib/spark-streaming-prototype-0.0.1-SNAPSHOT.jar > > Any help is greatly appreciated > > Regards, > > Apoorva > >