After day of debugging (actually, more), I can answer my question: The problem is that the default value 200 of "spark.sql.shuffle.partitions" is too small for sorting 2B rows. It was hard to realize because Spark executors just crash with various exceptions one by one. The other takeaway is that Dataframe "order by" and RDD.sortBy are implemented in different ways. BTW., why?
Small synthetic test (copied from my blog): Create 2B rows of MyRecord within 2000 partitions, so each partition will have 1M of rows. import sqlContext.implicits._ val sqlContext = new org.apache.spark.sql.SQLContext(sc) case class MyRecord(time: Double, id: String) val rdd = sc.parallelize(1 to 200, 200).flatMap(x => Seq.fill(10000000)(MyRecord(util.Random.nextDouble, "xxx"))) Lets sort this RDD by time: val sorted = rdd.sortBy(x => x.time) result.count It finished in about 8 minutes on my cluster of 8 nodes. Everything's fine. You can also check tasks that were completed in Spark web UI. The number of reducers was equal to the number of partitions, i.e. 2000 Lets convert the original RDD to Dataframe and sort again: val df = sqlContext.createDataFrame(rdd) df.registerTempTable("data") val result = sqlContext.sql("select * from data order by time") result.count It will run for a while and then crash. If you check tasks in the Spark Web UI, you will see that some of them were cancelled due to lost executors (ExecutorLost) due to some strange Exceptions. It is really hard to trace back which executor was first to be lost. The other follow it as in house of cards. What's the problem? The number of reducers. For the first task it is equal to the number of partitions, i.e. 2000, but for the second it switched to 200. From: Ulanov, Alexander Sent: Wednesday, April 29, 2015 1:08 PM To: user@spark.apache.org Subject: Sort (order by) of the big dataset Hi, I have a 2 billion records dataset witch schema <eventId: String, time: Double, value: Double>. It is stored in Parquet format in HDFS, size 23GB. Specs: Spark 1.3, Hadoop 1.2.1, 8 nodes with Xeon 16GB RAM, 1TB disk space, each node has 3 workers with 3GB memory. I keep failing to sort the mentioned dataset in Spark. I do the following: val pf = sqlContext.parquetFile("hdfs://my.net/data.parquet") pf.registerTempTable("data") val sorted = sqlContext.sql("select * from data order by time") sorted.saveAsParquetFile("hdfs://my.net/data-sorted.parquet") Spark starts to execute tasks and then errors like "Exector Lost" pop up in the web UI (task mapPartitions at Exchange.scala and runJob at newParquet.scala), giving different types of Exceptions in explanation. My thinking is that the main problem is with "GC overhead limit" exception however I observe exceptions related to connection time out and shuffling write (java.lang.IllegalStateException: Shutdown in progress; org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException ). What I tried: 1)Tried to "order by eventId" and by both "order by eventId, time" with the same result. 2)Looked at the shuffle parameters but the default do make sense. 3)Tried to repartition the data I am loading from Parquet: val pf3000 = pf.repartition(3000) in order to get smaller chunks of data passed to executors (originally there are 300 partitions). It did not help either. Surprisingly this dataset takes 50GB on hdfs versus 23GB that took the original. 4)Tried to have 8 workers instead of 24 and gave them 10G of memory. It did not help. Could you suggest what might be the problem and what is the workaround? Just in case, I cannot have more RAM or more machines :) Best regards, Alexander