It happens irrespective of whether there is traffic or no traffic on the kafka 
topic. Also, there is no clue i could see in the heap space. The heap looks 
healthy and stable. Its something off heap which is constantly growing. I also 
checked the JNI reference count from the dumps which appear stable (its 
constantly getting GCed) and tried to limit the size of meatspace and direct 
memory using following

--conf spark.driver.extraJavaOptions="-XX:MaxMetaspaceSize=128M 
-XX:MaxDirectMemorySize=128M" \

but with no success.  Thanks for offering help

Regards,
Apoorva

> On 14-Jul-2015, at 12:43 am, Cody Koeninger <c...@koeninger.org> wrote:
> 
> Does the issue only happen when you have no traffic on the topic?
> 
> Have you profiled to see what's using heap space?
> 
> 
> On Mon, Jul 13, 2015 at 1:05 PM, Apoorva Sareen <apoorva.sar...@gmail.com 
> <mailto:apoorva.sar...@gmail.com>> wrote:
> Hi,
> 
> I am running spark streaming 1.4.0 on Yarn (Apache distribution 2.6.0) with 
> java 1.8.0_45 and also Kafka direct stream. I am also using spark with scala 
> 2.11 support.
> 
> The issue I am seeing is that both driver and executor containers are 
> gradually increasing the physical memory usage till a point where yarn 
> container kill it. I have configured upto 192M Heap and 384 off heap space in 
> my driver but it eventually runs out of it
> 
> The Heap memory appears to be fine with regular GC cycles. There is no 
> OutOffMemory encountered ever in any such runs
> 
> Infact I am not generating any traffic on the kafka queues still this 
> happens. Here is the code I am using
> 
> object SimpleSparkStreaming extends App {
> 
> val conf = new SparkConf()
> val ssc = new 
> StreamingContext(conf,Seconds(conf.getLong("spark.batch.window.size",1L)));
> ssc.checkpoint("checkpoint")
> val topics = Set(conf.get("spark.kafka.topic.name 
> <http://spark.kafka.topic.name/>")); 
>     val kafkaParams = Map[String, String]("metadata.broker.list" -> 
> conf.get("spark.kafka.broker.list"))
>             val kafkaStream = 
> KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, 
> kafkaParams, topics)
>             kafkaStream.foreachRDD(rdd => {
>                 rdd.foreach(x => {
>                     println(x._2)
>                 })
> 
>             })
>     kafkaStream.print()
>             ssc.start() 
> 
>             ssc.awaitTermination()
> 
> }
> I am running this on CentOS 7. The command used for spark submit is following
> 
> ./bin/spark-submit --class 
> com.rasa.cloud.prototype.spark.SimpleSparkStreaming \
> --conf spark.yarn.executor.memoryOverhead=256 \
> --conf spark.yarn.driver.memoryOverhead=384 \
> --conf spark.kafka.topic.name <http://spark.kafka.topic.name/>=test \
> --conf spark.kafka.broker.list=172.31.45.218:9092 
> <http://172.31.45.218:9092/> \
> --conf spark.batch.window.size=1 \
> --conf spark.app.name <http://spark.app.name/>="Simple Spark Kafka 
> application" \
> --master yarn-cluster \
> --num-executors 1 \
> --driver-memory 192m \
> --executor-memory 128m \
> --executor-cores 1 \
> /home/centos/spark-poc/target/lib/spark-streaming-prototype-0.0.1-SNAPSHOT.jar
>  
> Any help is greatly appreciated
> 
> Regards,
> 
> Apoorva
> 
> 

Reply via email to