What was the answer, was it only setting spark.sql.shuffle.partitions?

On Thu, Apr 30, 2015 at 12:14 PM, Ulanov, Alexander <alexander.ula...@hp.com
> wrote:

>  After day of debugging (actually, more), I can answer my question:
>
> The problem is that the default value 200 of
> “spark.sql.shuffle.partitions” is too small for sorting 2B rows. It was
> hard to realize because Spark executors just crash with various exceptions
> one by one. The other takeaway is that Dataframe “order by” and RDD.sortBy
> are implemented in different ways. BTW., why?
>
>
>
> Small synthetic test (copied from my blog):
>
> Create 2B rows of MyRecord within 2000 partitions, so each partition will
> have 1M of rows.
>
> import sqlContext.implicits._
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>
> case class MyRecord(time: Double, id: String)
>
> val rdd = sc.parallelize(1 to 200, 200).flatMap(x =>
> Seq.fill(10000000)(MyRecord(util.Random.nextDouble, "xxx")))
>
>
>
> Lets sort this RDD by time:
>
> val sorted = rdd.sortBy(x => x.time)
>
> result.count
>
>
>
> It finished in about 8 minutes on my cluster of 8 nodes. Everything's
> fine. You can also check tasks that were completed in Spark web UI. The
> number of reducers was equal to the number of partitions, i.e. 2000
>
>
>
> Lets convert the original RDD to Dataframe and sort again:
>
> val df = sqlContext.createDataFrame(rdd)
>
> df.registerTempTable("data")
>
> val result = sqlContext.sql("select * from data order by time")
>
> result.count
>
>
>
> It will run for a while and then crash. If you check tasks in the Spark
> Web UI, you will see that some of them were cancelled due to lost executors
> (ExecutorLost) due to some strange Exceptions. It is really hard to trace
> back which executor was first to be lost. The other follow it as in house
> of cards. What's the problem? The number of reducers. For the first task it
> is equal to the number of partitions, i.e. 2000, but for the second it
> switched to 200.
>
>
>
> *From:* Ulanov, Alexander
> *Sent:* Wednesday, April 29, 2015 1:08 PM
> *To:* user@spark.apache.org
> *Subject:* Sort (order by) of the big dataset
>
>
>
> Hi,
>
>
>
> I have a 2 billion records dataset witch schema <eventId: String, time:
> Double, value: Double>. It is stored in Parquet format in HDFS, size 23GB.
> Specs: Spark 1.3, Hadoop 1.2.1, 8 nodes with Xeon 16GB RAM, 1TB disk space,
> each node has 3 workers with 3GB memory.
>
>
>
> I keep failing to sort the mentioned dataset in Spark. I do the following:
>
> val pf = sqlContext.parquetFile(“hdfs://my.net/data.parquet”)
>
> pf.registerTempTable(“data”)
>
> val sorted = sqlContext.sql(“select * from data order by time”)
>
> sorted.saveAsParquetFile(“hdfs://my.net/data-sorted.parquet”)
>
>
>
> Spark starts to execute tasks and then errors like “Exector Lost” pop up
> in the web UI (task mapPartitions at Exchange.scala and runJob at
> newParquet.scala), giving different types of Exceptions in explanation. My
> thinking is that the main problem is with “GC overhead limit” exception
> however I observe exceptions related to connection time out and shuffling
> write (java.lang.IllegalStateException: Shutdown in progress;
> org.apache.spark.shuffle.FetchFailedException:
> java.io.FileNotFoundException ).
>
>
>
> What I tried:
>
> 1)Tried to “order by eventId” and by both “order by eventId, time” with
> the same result.
>
> 2)Looked at the shuffle parameters but the default do make sense.
>
> 3)Tried to repartition the data I am loading from Parquet:  val pf3000 =
> pf.repartition(3000) in order to get smaller chunks of data passed to
> executors (originally there are 300 partitions). It did not help either.
> Surprisingly this dataset takes 50GB on hdfs versus 23GB that took the
> original.
>
> 4)Tried to have 8 workers instead of 24 and gave them 10G of memory. It
> did not help.
>
>
>
> Could you suggest what might be the problem and what is the workaround?
> Just in case, I cannot have more RAM or more machines J
>
>
>
> Best  regards, Alexander
>

Reply via email to