Hi Yong, Have you tried increasing your level of parallelism? How many tasks are you getting in failing stage? 2-3 tasks per CPU core is recommended, though maybe you need more for your shuffle operation?
You can configure spark.default.parallelism, or pass in a level of parallelism as second parameter to a suitable operation in your code. Deb On Thu, Feb 5, 2015 at 1:03 PM, java8964 <java8...@hotmail.com> wrote: > I am evaluating Spark for our production usage. Our production cluster is > Hadoop 2.2.0 without Yarn. So I want to test Spark with Standalone > deployment running with Hadoop. > > What I have in mind is to test a very complex Hive query, which joins > between 6 tables, lots of nested structure with exploding, and currently > takes 8 hours daily running in our production. > > All the data of this query are in AVRO + Snappy. > > I setup one Box (24 core + 64G memory), installed the same version of > Hadoop as our production, and put 5% of data on it (which is about 60G, > snappy compressed AVRO files) > > I am running the same query in Hive. It took 6 rounds of MR jobs, finished > around 30 hours on this one box. > > Now, I start to have fun with Spark. > > I checked out Spark 1.2.0, built it following Spark build instructions, > and installed on this one box. > > Since the test data is all in AVRO format, so I also built the latest > development version of SparkAvro, from > https://github.com/databricks/spark-avro > > 1) First, I got some problems to use the AVRO data in spark-avro. It turns > our that Spark 1.2.0 build processing will merge the mismatched version of > AVRO core and AVRO mapred jars. I manually fixed it. See issue here: > https://github.com/databricks/spark-avro/issues/24 > 2) After that, I am impressed because > > - The AVRO file just works from HDFS to Spark 1.2 > - The complex query (about 200 lines) just starts to run in Spark 1.2 > using org.apache.spark.sql.hive.HiveContext without any problem. > This HiveContext just works in Spark SQL 1.2. Very nice. > > 3) I got several OOM, which is reasonable. I finally changes the memory > setting to: > > - export SPARK_WORKER_MEMORY=8g > - export SPARK_DRIVER_MEMORY=2g > - export SPARK_EXECUTOR_MEMORY=8g > > > As 4g just doesn't work for the test data volume. After I set to 8G, > the job won't fail due to OOM. > > 4) It looks like Spark generates 8 stages for the big query. It finishes > the stage 1 and stage 2, then failed on stage 3 twice with the following > error: > > FetchFailed(null, shuffleId=7, mapId=-1, reduceId=7, message= > > org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output > location for shuffle 7 > > at > org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386) > > at > org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383) > > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) > > at > org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382) > > at > org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178) > > at > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) > > at > org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) > > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > > at > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) > > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > > at org.apache.spark.scheduler.Task.run(Task.scala:56) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) > > at java.lang.Thread.run(Thread.java:853) > > ) > > > During whole test, the CPUs load average is about 16, and still had enough > physical memory to use. I don't know what could be reason for above error. > > > Spark did retry, but it had to retry from stage 1. Is this because in > Spark, it doesn't persist the intermedia data in HDFS? > > I guess you have to pay the performance price somewhere no matter what. > > > What I am looking for is that: > > > 1) Can spark finish this query. Hive is slower, but it is reliable to > finish a complex query you give it. > > 2) How fast it can finish this query. > > 3) I am going to check the final output vs the result from Hive. > > > Spark stage 3 failed twice for me so far. We will see. > > > Thanks > > > Yong >