I'm a bit new to Spark, but had a question on performance. I suspect a lot of
my issue is due to tuning and parameters. I have a Hive external table on
this data and to run queries against it runs in minutes
The Job:
+ 40gb of avro events on HDFS (100 million+ avro events)
+ Read in the files from HDFS and dedupe events by key (mapToPair then a
reduceByKey)
+ RDD returned and persisted (disk and memory)
+ Then passed to a job that take the RDD and mapToPair of new object data
and then reduceByKey and foreachpartion do work
The issue:
When I run this on my environment on Yarn this takes 20+ hours. Running on
yarn we see the first stage runs to do build the RDD deduped, but then when
the next stage starts, things fail and data is lost. This results in stage 0
starting over and over and just dragging it out.
Errors I see in the driver logs:
ERROR cluster.YarnClientClusterScheduler: Lost executor 1 on XXXXX: remote
Akka client disassociated
15/02/20 00:27:36 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.1
(TID 1335,XXXX): FetchFailed(BlockManagerId(3, iXXXX, 33958), shuffleId=1,
mapId=162, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException: Failed to connect
toXXXXX/XXXXX:33958
Also we see this, but I'm suspecting this is because the previous stage
fails and the next one starts:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 1
Cluster:
5 machines, each 2 core , 8gb machines
Spark-submit command:
spark-submit --class com.myco.SparkJob \
--master yarn \
/tmp/sparkjob.jar \
Any thoughts or where to look or how to start approaching this problem or
more data points to present.
Thanks..
Code for the job:
JavaRDD<AnalyticsEvent> events = ((JavaRDD<AvroKey<AnalyticsEvent>>)
context.newAPIHadoopRDD(
context.hadoopConfiguration(),
AvroKeyInputFormat.class,
AvroKey.class,
NullWritable.class
).keys())
.map(event -> AnalyticsEvent.newBuilder(event.datum()).build())
.filter(key -> { return
Optional.ofNullable(key.getStepEventKey()).isPresent(); })
.mapToPair(event -> new Tuple2<AnalyticsEvent, Integer>(event, 1))
.reduceByKey((analyticsEvent1, analyticsEvent2) -> analyticsEvent1)
.map(tuple -> tuple._1());
events.persist(StorageLevel.MEMORY_AND_DISK_2());
events.mapToPair(event -> {
return new Tuple2<T, RunningAggregates>(
keySelector.select(event),
new RunningAggregates(
Optional.ofNullable(event.getVisitors()).orElse(0L),
Optional.ofNullable(event.getImpressions()).orElse(0L),
Optional.ofNullable(event.getAmount()).orElse(0.0D),
Optional.ofNullable(event.getAmountSumOfSquares()).orElse(0.0D)));
})
.reduceByKey((left, right) -> { return left.add(right); })
.foreachpartition(dostuff)
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]