Hi Conor, What do you mean when you say leak is not in "Heap or non-Heap". If it is not heap related than it has to be the native memory that is leaking. I can't say for sure but you do have Threads working there and that could be using the native memory. We didn't get any pics of JConsole.
Thanks. -----Original Message----- From: Conor Fennell [mailto:conorapa...@gmail.com] Sent: Monday, December 14, 2015 4:15 PM To: user@spark.apache.org Subject: Re: Spark streaming driver java process RSS memory constantly increasing using cassandra driver Just bumping the issue I am having, if anyone can provide direction? I have been stuck on this for a while now. Thanks, Conor On Fri, Dec 11, 2015 at 5:10 PM, Conor Fennell <conorapa...@gmail.com> wrote: > Hi, > > I have a memory leak in the spark driver which is not in the heap or > the non-heap. > Even though neither of these are increasing, the java process RSS > memory is and eventually takes up all the memory on the machine. > I am using Spark 1.5.2 and the spark-cassandra-connector 1.5.0-M2. > > I have reduced the leak to the code below. > If I remove cassandra from the code below the memory leak does not happen. > Can someone please explain why this is happening or what I can do to > further investigate it. > I have also include pics from jconsole for a couple of hours and > datadog showing the same timeframe the rss memory increase. > > Thanks, > Conor > > val ssc = new StreamingContext(sparkConf, > Seconds(SparkStreamingBatchInterval)) > > ssc.checkpoint(HdfsNameNodeUriPath) > > val kafkaParams = Map[String, String](METADATA_BROKER_LIST -> > MetadataBrokerList) > > var kafkaMessages = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](ssc, kafkaParams, > KafkaTopics.split(DELIMITER).toSet) > > var eventBuckets = kafkaMessages.map(keyMessage => { > implicit val formats = DefaultFormats.lossless > val eventBucket = parse(keyMessage._2) > val minute = new Date((eventBucket \ MINUTE).extract[Long]) > val business = (eventBucket \ BUSINESS).extract[String] > val account = (eventBucket \ ACCOUNT).extract[String] > (minute, business, account)}) > > var eventsToBeProcessed = eventBuckets.transform(rdd => > rdd.joinWithCassandraTable("analytics_events" + '_' + > settings.JobEnvironment, "events", SomeColumns(MINUTE, BUSINESS, > ACCOUNT, EVENT, JSON), SomeColumns(MINUTE, BUSINESS, > ACCOUNT)).filter(entry => { > //remove any entries without a result > entry._2.length > 0 > })) > > eventsToBeProcessed.foreachRDD(rdd => { > println(rdd.take(1)) > }) > > sys.ShutdownHookThread { > System.err.println(s"Gracefully stopping $JobName Spark > Streaming Application") > ssc.stop(stopSparkContext = true, stopGracefully = true) > System.err.println(s"$JobName streaming job stopped") > } > > ssc.start() > ssc.awaitTermination() --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org