Ok, I get it now. You might be right it is possible the Cassandra driver might 
be leaking some memory. The driver might have some open sockets or stuff like 
that.

However if it is a native memory leak issue I would suggest try an alternative 
driver or proof that this is indeed the problem unless of course you want to 
spend some worthy hours trying to search it yourself. In general a native leak 
is very difficult to find in the JVM. I wrote a blog some time ago about it but 
I am never too much into writing. 
http://apachegeek.blogspot.in/2015/04/fix-c-memory-leak.html?spref=bl Try this 
link it might give some starting points if you are a newbie. You might already 
know this otherwise.

Thanks,
Abhijeet

-----Original Message-----
From: Conor Fennell [mailto:conorapa...@gmail.com] 
Sent: Monday, December 14, 2015 8:29 PM
To: Singh, Abhijeet
Cc: user@spark.apache.org
Subject: Re: Spark streaming driver java process RSS memory constantly 
increasing using cassandra driver

Hi Abhijeet,

Thanks for pointing out the pics are not showing.
I have put all the images in this public google document:
https://docs.google.com/document/d/1xEJ0eTtXBlSso6SshLCWZHcRw4aEQMJflzKBsr2FHaw/edit?usp=sharing

All the code is in the first email; there is nothing else starting up threads 
except the 80 or so threads Spark start up.
By the heap I mean: CMS Old Gen, Par Eden Space and Par Survivor Space.
By the non-heap I mean: Code Cache and CMS Perm Gen.
Which is the JVM memory space.

In the document you will see I am quite aggressive with the JVM options.

It does indicate native memory is leaking, but I am at a loss to properly 
investigate it and it is looking like it is in the cassandra driver itself or a 
combination of how spark is running it and the driver.

It is also happening when running this within a foreachRDD:

      val cluster =
Cluster.builder().addContactPoints(CassandraHostname.split(DELIMITER):
_*).build();
      val session = cluster.connect()
      ranges.foreach(range => {

        session.execute(s"INSERT INTO
analytics_metadata_$JobEnvironment.kafka_offsets (topic, job_name, batch_time, 
partition, from_offset, until_offset) VALUES
('${range._1}','${range._2}',${range._3.getTime()},${range._4},${range._5},${range._6})")

      })

      session.close()
      cluster.close()


Thanks,
Conor



On Mon, Dec 14, 2015 at 2:12 PM, Singh, Abhijeet <absi...@informatica.com> 
wrote:
> Hi Conor,
>
> What do you mean when you say leak is not in "Heap or non-Heap". If it is not 
> heap related than it has to be the native memory that is leaking. I can't say 
> for sure but you do have Threads working there and that could be using the 
> native memory. We didn't get any pics of JConsole.
>
> Thanks.
>
> -----Original Message-----
> From: Conor Fennell [mailto:conorapa...@gmail.com]
> Sent: Monday, December 14, 2015 4:15 PM
> To: user@spark.apache.org
> Subject: Re: Spark streaming driver java process RSS memory constantly 
> increasing using cassandra driver
>
> Just bumping the issue I am having, if anyone can provide direction? I have 
> been stuck on this for a while now.
>
> Thanks,
> Conor
>
> On Fri, Dec 11, 2015 at 5:10 PM, Conor Fennell <conorapa...@gmail.com> wrote:
>> Hi,
>>
>> I have a memory leak in the spark driver which is not in the heap or 
>> the non-heap.
>> Even though neither of these are increasing, the java process RSS 
>> memory is and eventually takes up all the memory on the machine.
>> I am using Spark 1.5.2 and the spark-cassandra-connector 1.5.0-M2.
>>
>> I have reduced the leak to the code below.
>> If I remove cassandra from the code below the memory leak does not happen.
>> Can someone please explain why this is happening or what I can do to 
>> further investigate it.
>> I have also include pics from jconsole for a couple of hours and 
>> datadog showing the same timeframe the rss memory increase.
>>
>> Thanks,
>> Conor
>>
>>     val ssc = new StreamingContext(sparkConf,
>> Seconds(SparkStreamingBatchInterval))
>>
>>     ssc.checkpoint(HdfsNameNodeUriPath)
>>
>>     val kafkaParams = Map[String, String](METADATA_BROKER_LIST ->
>> MetadataBrokerList)
>>
>>     var kafkaMessages = KafkaUtils.createDirectStream[String, String, 
>> StringDecoder, StringDecoder](ssc, kafkaParams,
>> KafkaTopics.split(DELIMITER).toSet)
>>
>>     var eventBuckets = kafkaMessages.map(keyMessage => {
>>         implicit val formats = DefaultFormats.lossless
>>         val eventBucket = parse(keyMessage._2)
>>         val minute = new Date((eventBucket \ MINUTE).extract[Long])
>>         val business = (eventBucket \ BUSINESS).extract[String]
>>         val account = (eventBucket \ ACCOUNT).extract[String]
>>         (minute, business, account)})
>>
>>       var eventsToBeProcessed = eventBuckets.transform(rdd =>
>>         rdd.joinWithCassandraTable("analytics_events" + '_' + 
>> settings.JobEnvironment, "events", SomeColumns(MINUTE, BUSINESS, 
>> ACCOUNT, EVENT, JSON), SomeColumns(MINUTE, BUSINESS, 
>> ACCOUNT)).filter(entry => {
>>         //remove any entries without a result
>>         entry._2.length > 0
>>       }))
>>
>>       eventsToBeProcessed.foreachRDD(rdd => {
>>         println(rdd.take(1))
>>       })
>>
>>     sys.ShutdownHookThread {
>>       System.err.println(s"Gracefully stopping $JobName Spark 
>> Streaming Application")
>>       ssc.stop(stopSparkContext = true, stopGracefully = true)
>>       System.err.println(s"$JobName streaming job stopped")
>>     }
>>
>>     ssc.start()
>>     ssc.awaitTermination()
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
> additional commands, e-mail: user-h...@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to