Hi,

I have a 2 billion records dataset witch schema <eventId: String, time: Double, 
value: Double>. It is stored in Parquet format in HDFS, size 23GB. Specs: Spark 
1.3, Hadoop 1.2.1, 8 nodes with Xeon 16GB RAM, 1TB disk space, each node has 3 
workers with 3GB memory.

I keep failing to sort the mentioned dataset in Spark. I do the following:
val pf = sqlContext.parquetFile("hdfs://my.net/data.parquet")
pf.registerTempTable("data")
val sorted = sqlContext.sql("select * from data order by time")
sorted.saveAsParquetFile("hdfs://my.net/data-sorted.parquet")

Spark starts to execute tasks and then errors like "Exector Lost" pop up in the 
web UI (task mapPartitions at Exchange.scala and runJob at newParquet.scala), 
giving different types of Exceptions in explanation. My thinking is that the 
main problem is with "GC overhead limit" exception however I observe exceptions 
related to connection time out and shuffling write 
(java.lang.IllegalStateException: Shutdown in progress; 
org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException ).

What I tried:
1)Tried to "order by eventId" and by both "order by eventId, time" with the 
same result.
2)Looked at the shuffle parameters but the default do make sense.
3)Tried to repartition the data I am loading from Parquet:  val pf3000 = 
pf.repartition(3000) in order to get smaller chunks of data passed to executors 
(originally there are 300 partitions). It did not help either. Surprisingly 
this dataset takes 50GB on hdfs versus 23GB that took the original.
4)Tried to have 8 workers instead of 24 and gave them 10G of memory. It did not 
help.

Could you suggest what might be the problem and what is the workaround? Just in 
case, I cannot have more RAM or more machines :)

Best  regards, Alexander

Reply via email to