What was the answer, was it only setting spark.sql.shuffle.partitions? On Thu, Apr 30, 2015 at 12:14 PM, Ulanov, Alexander <alexander.ula...@hp.com > wrote:
> After day of debugging (actually, more), I can answer my question: > > The problem is that the default value 200 of > “spark.sql.shuffle.partitions” is too small for sorting 2B rows. It was > hard to realize because Spark executors just crash with various exceptions > one by one. The other takeaway is that Dataframe “order by” and RDD.sortBy > are implemented in different ways. BTW., why? > > > > Small synthetic test (copied from my blog): > > Create 2B rows of MyRecord within 2000 partitions, so each partition will > have 1M of rows. > > import sqlContext.implicits._ > > val sqlContext = new org.apache.spark.sql.SQLContext(sc) > > case class MyRecord(time: Double, id: String) > > val rdd = sc.parallelize(1 to 200, 200).flatMap(x => > Seq.fill(10000000)(MyRecord(util.Random.nextDouble, "xxx"))) > > > > Lets sort this RDD by time: > > val sorted = rdd.sortBy(x => x.time) > > result.count > > > > It finished in about 8 minutes on my cluster of 8 nodes. Everything's > fine. You can also check tasks that were completed in Spark web UI. The > number of reducers was equal to the number of partitions, i.e. 2000 > > > > Lets convert the original RDD to Dataframe and sort again: > > val df = sqlContext.createDataFrame(rdd) > > df.registerTempTable("data") > > val result = sqlContext.sql("select * from data order by time") > > result.count > > > > It will run for a while and then crash. If you check tasks in the Spark > Web UI, you will see that some of them were cancelled due to lost executors > (ExecutorLost) due to some strange Exceptions. It is really hard to trace > back which executor was first to be lost. The other follow it as in house > of cards. What's the problem? The number of reducers. For the first task it > is equal to the number of partitions, i.e. 2000, but for the second it > switched to 200. > > > > *From:* Ulanov, Alexander > *Sent:* Wednesday, April 29, 2015 1:08 PM > *To:* user@spark.apache.org > *Subject:* Sort (order by) of the big dataset > > > > Hi, > > > > I have a 2 billion records dataset witch schema <eventId: String, time: > Double, value: Double>. It is stored in Parquet format in HDFS, size 23GB. > Specs: Spark 1.3, Hadoop 1.2.1, 8 nodes with Xeon 16GB RAM, 1TB disk space, > each node has 3 workers with 3GB memory. > > > > I keep failing to sort the mentioned dataset in Spark. I do the following: > > val pf = sqlContext.parquetFile(“hdfs://my.net/data.parquet”) > > pf.registerTempTable(“data”) > > val sorted = sqlContext.sql(“select * from data order by time”) > > sorted.saveAsParquetFile(“hdfs://my.net/data-sorted.parquet”) > > > > Spark starts to execute tasks and then errors like “Exector Lost” pop up > in the web UI (task mapPartitions at Exchange.scala and runJob at > newParquet.scala), giving different types of Exceptions in explanation. My > thinking is that the main problem is with “GC overhead limit” exception > however I observe exceptions related to connection time out and shuffling > write (java.lang.IllegalStateException: Shutdown in progress; > org.apache.spark.shuffle.FetchFailedException: > java.io.FileNotFoundException ). > > > > What I tried: > > 1)Tried to “order by eventId” and by both “order by eventId, time” with > the same result. > > 2)Looked at the shuffle parameters but the default do make sense. > > 3)Tried to repartition the data I am loading from Parquet: val pf3000 = > pf.repartition(3000) in order to get smaller chunks of data passed to > executors (originally there are 300 partitions). It did not help either. > Surprisingly this dataset takes 50GB on hdfs versus 23GB that took the > original. > > 4)Tried to have 8 workers instead of 24 and gave them 10G of memory. It > did not help. > > > > Could you suggest what might be the problem and what is the workaround? > Just in case, I cannot have more RAM or more machines J > > > > Best regards, Alexander >