Hi Yong,

Have you tried increasing your level of parallelism? How many tasks are you
getting in failing stage? 2-3 tasks per CPU core is recommended, though
maybe you need more for your shuffle operation?

You can configure spark.default.parallelism, or pass in a level of
parallelism as second parameter to a suitable operation in your code.

Deb

On Thu, Feb 5, 2015 at 1:03 PM, java8964 <java8...@hotmail.com> wrote:

> I am evaluating Spark for our production usage. Our production cluster is
> Hadoop 2.2.0 without Yarn. So I want to test Spark with Standalone
> deployment running with Hadoop.
>
> What I have in mind is to test a very complex Hive query, which joins
> between 6 tables, lots of nested structure with exploding, and currently
> takes 8 hours daily running in our production.
>
> All the data of this query are in AVRO + Snappy.
>
> I setup one Box (24 core + 64G memory), installed the same version of
> Hadoop as our production, and put 5% of data on it (which is about 60G,
> snappy compressed AVRO files)
>
> I am running the same query in Hive. It took 6 rounds of MR jobs, finished
> around 30 hours on this one box.
>
> Now, I start to have fun with Spark.
>
> I checked out Spark 1.2.0, built it following Spark build instructions,
> and installed on this one box.
>
> Since the test data is all in AVRO format, so I also built the latest
> development version of SparkAvro, from
> https://github.com/databricks/spark-avro
>
> 1) First, I got some problems to use the AVRO data in spark-avro. It turns
> our that Spark 1.2.0 build processing will merge the mismatched version of
> AVRO core and AVRO mapred jars. I manually fixed it. See issue here:
> https://github.com/databricks/spark-avro/issues/24
> 2) After that, I am impressed because
>
>    - The AVRO file just works from HDFS to Spark 1.2
>    - The complex query (about 200 lines) just starts to run in Spark 1.2
>    using org.apache.spark.sql.hive.HiveContext without any problem.
>    This HiveContext just works in Spark SQL 1.2. Very nice.
>
> 3) I got several OOM, which is reasonable. I finally changes the memory
> setting to:
>
>    - export SPARK_WORKER_MEMORY=8g
>    - export SPARK_DRIVER_MEMORY=2g
>    - export SPARK_EXECUTOR_MEMORY=8g
>
>
>       As 4g just doesn't work for the test data volume. After I set to 8G,
> the job won't fail due to OOM.
>
> 4) It looks like Spark generates 8 stages for the big query. It finishes
> the stage 1 and stage 2, then failed on stage 3 twice with the following
> error:
>
> FetchFailed(null, shuffleId=7, mapId=-1, reduceId=7, message=
>
> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
> location for shuffle 7
>
> at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
>
> at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
>
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>
> at
> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
>
> at
> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178)
>
> at
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
>
> at
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
>
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>
> at
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
>
> at java.lang.Thread.run(Thread.java:853)
>
> )
>
>
> During whole test, the CPUs load average is about 16, and still had enough
> physical memory to use. I don't know what could be reason for above error.
>
>
> Spark did retry, but it had to retry from stage 1. Is this because in
> Spark, it doesn't persist the intermedia data in HDFS?
>
> I guess you have to pay the performance price somewhere no matter what.
>
>
> What I am looking for is that:
>
>
> 1) Can spark finish this query. Hive is slower, but it is reliable to
> finish a complex query you give it.
>
> 2) How fast it can finish this query.
>
> 3) I am going to check the final output vs the result from Hive.
>
>
> Spark stage 3 failed twice for me so far. We will see.
>
>
> Thanks
>
>
> Yong
>

Reply via email to