Could you set "spark.shuffle.io.preferDirectBufs" to false to turn off the off-heap allocation of netty?
Best Regards, Shixiong Zhu 2015-06-03 11:58 GMT+08:00 Ji ZHANG <zhangj...@gmail.com>: > Hi, > > Thanks for you information. I'll give spark1.4 a try when it's released. > > On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das <t...@databricks.com> > wrote: > >> Could you try it out with Spark 1.4 RC3? >> >> Also pinging, Cloudera folks, they may be aware of something. >> >> BTW, the way I have debugged memory leaks in the past is as follows. >> >> Run with a small driver memory, say 1 GB. Periodically (maybe a script), >> take snapshots of histogram and also do memory dumps. Say every hour. And >> then compare the difference between two histo/dumps that are few hours >> separated (more the better). Diffing histo is easy. Diff two dumps can be >> done in JVisualVM, it will show the diff in the objects that got added in >> the later dump. That makes it easy to debug what is not getting cleaned. >> >> TD >> >> >> On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG <zhangj...@gmail.com> wrote: >> >>> Hi, >>> >>> Thanks for you reply. Here's the top 30 entries of jmap -histo:live >>> result: >>> >>> num #instances #bytes class name >>> ---------------------------------------------- >>> 1: 40802 145083848 [B >>> 2: 99264 12716112 <methodKlass> >>> 3: 99264 12291480 <constMethodKlass> >>> 4: 8472 9144816 <constantPoolKlass> >>> 5: 8472 7625192 <instanceKlassKlass> >>> 6: 186 6097824 >>> [Lscala.concurrent.forkjoin.ForkJoinTask; >>> 7: 7045 4804832 <constantPoolCacheKlass> >>> 8: 139168 4453376 java.util.HashMap$Entry >>> 9: 9427 3542512 <methodDataKlass> >>> 10: 141312 3391488 >>> io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry >>> 11: 135491 3251784 java.lang.Long >>> 12: 26192 2765496 [C >>> 13: 813 1140560 [Ljava.util.HashMap$Entry; >>> 14: 8997 1061936 java.lang.Class >>> 15: 16022 851384 [[I >>> 16: 16447 789456 java.util.zip.Inflater >>> 17: 13855 723376 [S >>> 18: 17282 691280 java.lang.ref.Finalizer >>> 19: 25725 617400 java.lang.String >>> 20: 320 570368 >>> [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry; >>> 21: 16066 514112 >>> java.util.concurrent.ConcurrentHashMap$HashEntry >>> 22: 12288 491520 >>> org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment >>> 23: 13343 426976 >>> java.util.concurrent.locks.ReentrantLock$NonfairSync >>> 24: 12288 396416 >>> [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry; >>> 25: 16447 394728 java.util.zip.ZStreamRef >>> 26: 565 370080 [I >>> 27: 508 272288 <objArrayKlassKlass> >>> 28: 16233 259728 java.lang.Object >>> 29: 771 209232 >>> [Ljava.util.concurrent.ConcurrentHashMap$HashEntry; >>> 30: 2524 192312 [Ljava.lang.Object; >>> >>> But as I mentioned above, the heap memory seems OK, the extra memory is >>> consumed by some off-heap data. I can't find a way to figure out what is in >>> there. >>> >>> Besides, I did some extra experiments, i.e. run the same program in >>> difference environments to test whether it has off-heap memory issue: >>> >>> spark1.0 + standalone = no >>> spark1.0 + yarn = no >>> spark1.3 + standalone = no >>> spark1.3 + yarn = yes >>> >>> I'm using CDH5.1, so the spark1.0 is provided by cdh, and >>> spark-1.3.1-bin-hadoop2.3 is downloaded from the official website. >>> >>> I could use spark1.0 + yarn, but I can't find a way to handle the logs, >>> level and rolling, so it'll explode the harddrive. >>> >>> Currently I'll stick to spark1.0 + standalone, until our ops team >>> decides to upgrade cdh. >>> >>> >>> >>> On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das <t...@databricks.com> >>> wrote: >>> >>>> While you are running is it possible for you login into the YARN node >>>> and get histograms of live objects using "jmap -histo:live". That may >>>> reveal something. >>>> >>>> >>>> On Thursday, May 28, 2015, Ji ZHANG <zhangj...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> Unfortunately, they're still growing, both driver and executors. >>>>> >>>>> I run the same job with local mode, everything is fine. >>>>> >>>>> On Thu, May 28, 2015 at 5:26 PM, Akhil Das <ak...@sigmoidanalytics.com >>>>> > wrote: >>>>> >>>>>> Can you replace your counting part with this? >>>>>> >>>>>> logs.filter(_.s_id > 0).foreachRDD(rdd => logger.info(rdd.count())) >>>>>> >>>>>> >>>>>> >>>>>> Thanks >>>>>> Best Regards >>>>>> >>>>>> On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG <zhangj...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I wrote a simple test job, it only does very basic operations. for >>>>>>> example: >>>>>>> >>>>>>> val lines = KafkaUtils.createStream(ssc, zkQuorum, group, >>>>>>> Map(topic -> 1)).map(_._2) >>>>>>> val logs = lines.flatMap { line => >>>>>>> try { >>>>>>> Some(parse(line).extract[Impression]) >>>>>>> } catch { >>>>>>> case _: Exception => None >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> logs.filter(_.s_id > 0).count.foreachRDD { rdd => >>>>>>> rdd.foreachPartition { iter => >>>>>>> iter.foreach(count => logger.info(count.toString)) >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> It receives messages from Kafka, parse the json, filter and count >>>>>>> the records, and then print it to logs. >>>>>>> >>>>>>> Thanks. >>>>>>> >>>>>>> >>>>>>> On Thu, May 28, 2015 at 3:07 PM, Akhil Das < >>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>> >>>>>>>> Hi Zhang, >>>>>>>> >>>>>>>> Could you paste your code in a gist? Not sure what you are doing >>>>>>>> inside the code to fill up memory. >>>>>>>> >>>>>>>> Thanks >>>>>>>> Best Regards >>>>>>>> >>>>>>>> On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG <zhangj...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Yes, I'm using createStream, but the storageLevel param is by >>>>>>>>> default MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also >>>>>>>>> growing. I don't think Kafka messages will be cached in driver. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, May 28, 2015 at 12:24 AM, Akhil Das < >>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>> >>>>>>>>>> Are you using the createStream or createDirectStream api? If its >>>>>>>>>> the former, you can try setting the StorageLevel to MEMORY_AND_DISK >>>>>>>>>> (it >>>>>>>>>> might slow things down though). Another way would be to try the >>>>>>>>>> later one. >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> Best Regards >>>>>>>>>> >>>>>>>>>> On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG <zhangj...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Akhil, >>>>>>>>>>> >>>>>>>>>>> Thanks for your reply. Accoding to the Streaming tab of Web UI, >>>>>>>>>>> the Processing Time is around 400ms, and there's no Scheduling >>>>>>>>>>> Delay, so I >>>>>>>>>>> suppose it's not the Kafka messages that eat up the off-heap >>>>>>>>>>> memory. Or >>>>>>>>>>> maybe it is, but how to tell? >>>>>>>>>>> >>>>>>>>>>> I googled about how to check the off-heap memory usage, there's >>>>>>>>>>> a tool called pmap, but I don't know how to interprete the results. >>>>>>>>>>> >>>>>>>>>>> On Wed, May 27, 2015 at 3:08 PM, Akhil Das < >>>>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> After submitting the job, if you do a ps aux | grep >>>>>>>>>>>> spark-submit then you can see all JVM params. Are you using the >>>>>>>>>>>> highlevel >>>>>>>>>>>> consumer (receiver based) for receiving data from Kafka? In that >>>>>>>>>>>> case if >>>>>>>>>>>> your throughput is high and the processing delay exceeds batch >>>>>>>>>>>> interval >>>>>>>>>>>> then you will hit this memory issues as the data will keep on >>>>>>>>>>>> receiving and >>>>>>>>>>>> is dumped to memory. You can set StorageLevel to MEMORY_AND_DISK >>>>>>>>>>>> (but it >>>>>>>>>>>> slows things down). Another alternate will be to use the lowlevel >>>>>>>>>>>> kafka consumer >>>>>>>>>>>> <https://github.com/dibbhatt/kafka-spark-consumer> or to use >>>>>>>>>>>> the non-receiver based directStream >>>>>>>>>>>> <https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers> >>>>>>>>>>>> that comes up with spark. >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> Best Regards >>>>>>>>>>>> >>>>>>>>>>>> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG <zhangj...@gmail.com >>>>>>>>>>>> > wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster >>>>>>>>>>>>> mode. I find out that YARN is killing the driver and executor >>>>>>>>>>>>> process >>>>>>>>>>>>> because of excessive use of memory. Here's something I tried: >>>>>>>>>>>>> >>>>>>>>>>>>> 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), >>>>>>>>>>>>> so the extra memory is not used by heap. >>>>>>>>>>>>> 2. I set the two memoryOverhead params to 1024 (default is >>>>>>>>>>>>> 384), but the memory just keeps growing and then hits the limit. >>>>>>>>>>>>> 3. This problem is not shown in low-throughput jobs, neither >>>>>>>>>>>>> in standalone mode. >>>>>>>>>>>>> 4. The test job just receives messages from Kafka, with batch >>>>>>>>>>>>> interval of 1, do some filtering and aggregation, and then print >>>>>>>>>>>>> to >>>>>>>>>>>>> executor logs. So it's not some 3rd party library that causes the >>>>>>>>>>>>> 'leak'. >>>>>>>>>>>>> >>>>>>>>>>>>> Spark 1.3 is built by myself, with correct hadoop versions. >>>>>>>>>>>>> >>>>>>>>>>>>> Any ideas will be appreciated. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks. >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Jerry >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Jerry >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Jerry >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Jerry >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Jerry >>>>> >>>> >>> >>> >>> -- >>> Jerry >>> >> >> > > > -- > Jerry >