Hi Pavel!

Sorry for late. I just do some investigation in these days with my colleague. 
Here is my thought: from spark 1.2, we use Netty with off-heap memory to reduce 
GC during shuffle and cache block transfer. In my case, if I try to increase 
the memory overhead enough. I will get the Max direct buffer exception. When 
Netty do block transferring, there will be five threads by default to grab the 
data chunk to target executor. In my situation, one single chunk is too big to 
fit into the buffer. So gc won’t help here. My final solution is to do another 
repartition before the repartition(1). Just to make 10x times more partitions 
than original’s. In this way, I can reduce the size of each chunk Netty 
transfer. 

Also I want to say that it’s not a good choice to repartition a big dataset 
into single file. This extremely unbalanced scenario is kind of waste your 
compute resources. 

I don’t know whether my explanation is right. Plz correct me if you find any 
issue.THX

Best,
Yang
>  On 2017年1月23日, at 18:03, Pavel Plotnikov <pavel.plotni...@team.wrike.com> 
> wrote:
> 
> Hi Yang!
> 
> I don't know exactly why this happen, but i think GC can't work to fast 
> enough or size of data with additional objects created while computations to 
> big for executor. 
> And i found that this problem only if you make some data manipulations. You 
> can cache you data first, after that, write in one partiton.
> For example  
> val dropDF = 
> df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")
> dropDF.cache()
> or
> dropDF.write.mode(SaveMode.ErrorIfExists).parquet(temppath)
> val dropDF = spark.read.parquet(temppath)
> and then
> dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)
> Best,
> 
> On Sun, Jan 22, 2017 at 12:31 PM Yang Cao <cybea...@gmail.com 
> <mailto:cybea...@gmail.com>> wrote:
> Also, do you know why this happen? 
> 
>> On 2017年1月20日, at 18:23, Pavel Plotnikov <pavel.plotni...@team.wrike.com 
>> <mailto:pavel.plotni...@team.wrike.com>> wrote:
>> 
> 
>> Hi Yang,
>> i have faced with the same problem on Mesos and to circumvent this issue i 
>> am usually increase partition number. On last step in your code you reduce 
>> number of partitions to 1, try to set bigger value, may be it solve this 
>> problem.
>> 
>> Cheers,
>> Pavel
>> 
>> On Fri, Jan 20, 2017 at 12:35 PM Yang Cao <cybea...@gmail.com 
>> <mailto:cybea...@gmail.com>> wrote:
>> Hi all,
>> 
>> I am running a spark application on YARN-client mode with 6 executors (each 
>> 4 cores and executor memory = 6G and Overhead = 4G, spark version: 1.6.3 / 
>> 2.1.0). I find that my executor memory keeps increasing until get killed by 
>> node manager; and give out the info that tells me to boost 
>> spark.yarn.excutor.memoryOverhead. I know that this param mainly control the 
>> size of memory allocated off-heap. But I don’t know when and how the spark 
>> engine will use this part of memory. Also increase that part of memory not 
>> always solve my problem. sometimes works sometimes not. It trends to be 
>> useless when the input data is large.
>> 
>> FYI, my app’s logic is quite simple. It means to combine the small files 
>> generated in one single day (one directory one day) into a single one and 
>> write back to hdfs. Here is the core code:
>> val df = spark.read.parquet(originpath).filter(s"m = ${ts.month} AND d = 
>> ${ts.day}").coalesce(400)
>> val dropDF = 
>> df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")
>> dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)
>> The source file may have hundreds to thousands level’s partition. And the 
>> total parquet file is around 1to 5 gigs. Also I find that in the step that 
>> shuffle reading data from different machines, The size of shuffle read is 
>> about 4 times larger than the input size, Which is wired or some principle I 
>> don’t know. 
>> 
>> Anyway, I have done some search myself for this problem. Some article said 
>> that it’s on the direct buffer memory (I don’t set myself). Some article 
>> said that people solve it with more frequent full GC. Also I find one people 
>> on SO with very similar situation: 
>> http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn
>>  
>> <http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn>
>> This guy claimed that it’s a bug with parquet but comment questioned him. 
>> People in this mail list may also receive an email hours ago from blondowski 
>> who described this problem while writing json: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none
>>  
>> <http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none>
>> 
>> So it looks like to be common question for different output format. I hope 
>> someone with experience about this problem could make an explanation about 
>> this issue. Why this happen and what is a reliable way to solve this 
>> problem. 
>> 
>> Best,
>> 
>> 

Reply via email to