Could you set "spark.shuffle.io.preferDirectBufs" to false to turn off the
off-heap allocation of netty?

Best Regards,
Shixiong Zhu

2015-06-03 11:58 GMT+08:00 Ji ZHANG <zhangj...@gmail.com>:

> Hi,
>
> Thanks for you information. I'll give spark1.4 a try when it's released.
>
> On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das <t...@databricks.com>
> wrote:
>
>> Could you try it out with Spark 1.4 RC3?
>>
>> Also pinging, Cloudera folks, they may be aware of something.
>>
>> BTW, the way I have debugged memory leaks in the past is as follows.
>>
>> Run with a small driver memory, say 1 GB. Periodically (maybe a script),
>> take snapshots of histogram and also do memory dumps. Say every hour. And
>> then compare the difference between two histo/dumps that are few hours
>> separated (more the better). Diffing histo is easy. Diff two dumps can be
>> done in JVisualVM, it will show the diff in the objects that got added in
>> the later dump. That makes it easy to debug what is not getting cleaned.
>>
>> TD
>>
>>
>> On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG <zhangj...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Thanks for you reply. Here's the top 30 entries of jmap -histo:live
>>> result:
>>>
>>>  num     #instances         #bytes  class name
>>> ----------------------------------------------
>>>    1:         40802      145083848  [B
>>>    2:         99264       12716112  <methodKlass>
>>>    3:         99264       12291480  <constMethodKlass>
>>>    4:          8472        9144816  <constantPoolKlass>
>>>    5:          8472        7625192  <instanceKlassKlass>
>>>    6:           186        6097824
>>>  [Lscala.concurrent.forkjoin.ForkJoinTask;
>>>    7:          7045        4804832  <constantPoolCacheKlass>
>>>    8:        139168        4453376  java.util.HashMap$Entry
>>>    9:          9427        3542512  <methodDataKlass>
>>>   10:        141312        3391488
>>>  io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
>>>   11:        135491        3251784  java.lang.Long
>>>   12:         26192        2765496  [C
>>>   13:           813        1140560  [Ljava.util.HashMap$Entry;
>>>   14:          8997        1061936  java.lang.Class
>>>   15:         16022         851384  [[I
>>>   16:         16447         789456  java.util.zip.Inflater
>>>   17:         13855         723376  [S
>>>   18:         17282         691280  java.lang.ref.Finalizer
>>>   19:         25725         617400  java.lang.String
>>>   20:           320         570368
>>>  [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
>>>   21:         16066         514112
>>>  java.util.concurrent.ConcurrentHashMap$HashEntry
>>>   22:         12288         491520
>>>  org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment
>>>   23:         13343         426976
>>>  java.util.concurrent.locks.ReentrantLock$NonfairSync
>>>   24:         12288         396416
>>>  [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry;
>>>   25:         16447         394728  java.util.zip.ZStreamRef
>>>   26:           565         370080  [I
>>>   27:           508         272288  <objArrayKlassKlass>
>>>   28:         16233         259728  java.lang.Object
>>>   29:           771         209232
>>>  [Ljava.util.concurrent.ConcurrentHashMap$HashEntry;
>>>   30:          2524         192312  [Ljava.lang.Object;
>>>
>>> But as I mentioned above, the heap memory seems OK, the extra memory is
>>> consumed by some off-heap data. I can't find a way to figure out what is in
>>> there.
>>>
>>> Besides, I did some extra experiments, i.e. run the same program in
>>> difference environments to test whether it has off-heap memory issue:
>>>
>>> spark1.0 + standalone = no
>>> spark1.0 + yarn = no
>>> spark1.3 + standalone = no
>>> spark1.3 + yarn = yes
>>>
>>> I'm using CDH5.1, so the spark1.0 is provided by cdh, and
>>> spark-1.3.1-bin-hadoop2.3 is downloaded from the official website.
>>>
>>> I could use spark1.0 + yarn, but I can't find a way to handle the logs,
>>> level and rolling, so it'll explode the harddrive.
>>>
>>> Currently I'll stick to spark1.0 + standalone, until our ops team
>>> decides to upgrade cdh.
>>>
>>>
>>>
>>> On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das <t...@databricks.com>
>>> wrote:
>>>
>>>> While you are running is it possible for you login into the YARN node
>>>> and get histograms of live objects using "jmap -histo:live". That may
>>>> reveal something.
>>>>
>>>>
>>>> On Thursday, May 28, 2015, Ji ZHANG <zhangj...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Unfortunately, they're still growing, both driver and executors.
>>>>>
>>>>> I run the same job with local mode, everything is fine.
>>>>>
>>>>> On Thu, May 28, 2015 at 5:26 PM, Akhil Das <ak...@sigmoidanalytics.com
>>>>> > wrote:
>>>>>
>>>>>> Can you replace your counting part with this?
>>>>>>
>>>>>> logs.filter(_.s_id > 0).foreachRDD(rdd => logger.info(rdd.count()))
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>> Best Regards
>>>>>>
>>>>>> On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG <zhangj...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I wrote a simple test job, it only does very basic operations. for
>>>>>>> example:
>>>>>>>
>>>>>>>     val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
>>>>>>> Map(topic -> 1)).map(_._2)
>>>>>>>     val logs = lines.flatMap { line =>
>>>>>>>       try {
>>>>>>>         Some(parse(line).extract[Impression])
>>>>>>>       } catch {
>>>>>>>         case _: Exception => None
>>>>>>>       }
>>>>>>>     }
>>>>>>>
>>>>>>>     logs.filter(_.s_id > 0).count.foreachRDD { rdd =>
>>>>>>>       rdd.foreachPartition { iter =>
>>>>>>>         iter.foreach(count => logger.info(count.toString))
>>>>>>>       }
>>>>>>>     }
>>>>>>>
>>>>>>> It receives messages from Kafka, parse the json, filter and count
>>>>>>> the records, and then print it to logs.
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>>
>>>>>>> On Thu, May 28, 2015 at 3:07 PM, Akhil Das <
>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>
>>>>>>>> Hi Zhang,
>>>>>>>>
>>>>>>>> Could you paste your code in a gist? Not sure what you are doing
>>>>>>>> inside the code to fill up memory.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG <zhangj...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Yes, I'm using createStream, but the storageLevel param is by
>>>>>>>>> default MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also
>>>>>>>>> growing. I don't think Kafka messages will be cached in driver.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, May 28, 2015 at 12:24 AM, Akhil Das <
>>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>>
>>>>>>>>>> Are you using the createStream or createDirectStream api? If its
>>>>>>>>>> the former, you can try setting the StorageLevel to MEMORY_AND_DISK 
>>>>>>>>>> (it
>>>>>>>>>> might slow things down though). Another way would be to try the 
>>>>>>>>>> later one.
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Best Regards
>>>>>>>>>>
>>>>>>>>>> On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG <zhangj...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Akhil,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for your reply. Accoding to the Streaming tab of Web UI,
>>>>>>>>>>> the Processing Time is around 400ms, and there's no Scheduling 
>>>>>>>>>>> Delay, so I
>>>>>>>>>>> suppose it's not the Kafka messages that eat up the off-heap 
>>>>>>>>>>> memory. Or
>>>>>>>>>>> maybe it is, but how to tell?
>>>>>>>>>>>
>>>>>>>>>>> I googled about how to check the off-heap memory usage, there's
>>>>>>>>>>> a tool called pmap, but I don't know how to interprete the results.
>>>>>>>>>>>
>>>>>>>>>>> On Wed, May 27, 2015 at 3:08 PM, Akhil Das <
>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> After submitting the job, if you do a ps aux | grep
>>>>>>>>>>>> spark-submit then you can see all JVM params. Are you using the 
>>>>>>>>>>>> highlevel
>>>>>>>>>>>> consumer (receiver based) for receiving data from Kafka? In that 
>>>>>>>>>>>> case if
>>>>>>>>>>>> your throughput is high and the processing delay exceeds batch 
>>>>>>>>>>>> interval
>>>>>>>>>>>> then you will hit this memory issues as the data will keep on 
>>>>>>>>>>>> receiving and
>>>>>>>>>>>> is dumped to memory. You can set StorageLevel to MEMORY_AND_DISK 
>>>>>>>>>>>> (but it
>>>>>>>>>>>> slows things down). Another alternate will be to use the lowlevel
>>>>>>>>>>>> kafka consumer
>>>>>>>>>>>> <https://github.com/dibbhatt/kafka-spark-consumer> or to use
>>>>>>>>>>>> the non-receiver based directStream
>>>>>>>>>>>> <https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers>
>>>>>>>>>>>> that comes up with spark.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG <zhangj...@gmail.com
>>>>>>>>>>>> > wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster
>>>>>>>>>>>>> mode. I find out that YARN is killing the driver and executor 
>>>>>>>>>>>>> process
>>>>>>>>>>>>> because of excessive use of memory. Here's something I tried:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s),
>>>>>>>>>>>>> so the extra memory is not used by heap.
>>>>>>>>>>>>> 2. I set the two memoryOverhead params to 1024 (default is
>>>>>>>>>>>>> 384), but the memory just keeps growing and then hits the limit.
>>>>>>>>>>>>> 3. This problem is not shown in low-throughput jobs, neither
>>>>>>>>>>>>> in standalone mode.
>>>>>>>>>>>>> 4. The test job just receives messages from Kafka, with batch
>>>>>>>>>>>>> interval of 1, do some filtering and aggregation, and then print 
>>>>>>>>>>>>> to
>>>>>>>>>>>>> executor logs. So it's not some 3rd party library that causes the 
>>>>>>>>>>>>> 'leak'.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Spark 1.3 is built by myself, with correct hadoop versions.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Any ideas will be appreciated.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Jerry
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Jerry
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Jerry
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Jerry
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Jerry
>>>>>
>>>>
>>>
>>>
>>> --
>>> Jerry
>>>
>>
>>
>
>
> --
> Jerry
>

Reply via email to