Hi Pere, You've got the wrong user list.
-n On Mon, Mar 16, 2015 at 5:27 PM, Pere Kyle <[email protected]> wrote: > I am seeing extremely slow performance from Spark 1.2.1 (MAPR4) on Hadoop > 2.5.1 (YARN) on hive external tables on s3n. I am running a 'select > count(*) from s3_table' query on the nodes using Hive 0.13 and Spark SQL > 1.2.1. > > I am running a 5 node cluster on EC2 c3.2xlarge Mapr 4.0.2 M3 cluster. > The table is 100M rows and 25GB stored as a Hive table on s3 in 250Mb > splits (100 splits) > > Setup (on same cluster): > Hive: 14 VCPU and 25GB Reserved Ram > Spark: 40 Cores and 96GB > > Query: SELECT count(*) FROM table; > > Hive from local HDFS: *70s* > Spark from local HDFS:* 40s* (i feel this is slow as well) > Hive from S3n: *15m* > Spark from S3n: *2.2h* > As you can see the same query on Spark takes over 2 hours to complete with > 5 slaves. > Here are some metrics from a 1.2h run (i canceled at 50%) on 5 slaves 16Gb > and 8 CPUs per node > > MetricMin25th percentileMedian75th percentileMaxDuration53 s4.1 min5.9 > min10 > min22 minScheduler Delay5 ms8 ms9 ms10 ms51 msTask Deserialization Time0 > ms1 > ms1 ms1 ms53 msGC Time30 ms86 ms0.1 s0.2 s0.4 sResult Serialization Time0 > ms0 > ms0 ms0 ms1 msGetting Result Time0 ms0 ms0 ms0 ms0 msInput3.4 MB3.8 > MB3.8 MB32.0 > MB32.0 MBShuffle Write51.0 B51.0 B51.0 B51.0 B51.0 B > The only errors I am seeing in the Spark logs are occasional socket > timeouts (reading from s3). > > Here is what the tasks are logging to my console on INFO > > 15/03/11 23:03:05 INFO rdd.HadoopRDD: Input split: > s3n://bucket/warehouse/table/part-m-00017:536870912+67108864 > > 15/03/11 23:03:05 INFO s3n.S3NativeFileSystem: Opening > 's3n://bucket/warehouse/table/part-m-00017' for reading > > 15/03/11 23:03:05 INFO s3n.S3NativeFileSystem: Stream for key > 'warehouse/table/part-m-00017' seeking to position '469762048' > > 15/03/11 23:03:06 INFO s3n.S3NativeFileSystem: Stream for key > 'warehouse/table/part-m-00017' seeking to position '536870912' > > 15/03/11 22:50:09 INFO s3n.S3NativeFileSystem: Received Exception while > reading 'warehouse/table/part-m-00029', will retry by attempting to reopen > stream. > > java.net.SocketTimeoutException: Read timed out > > at java.net.SocketInputStream.socketRead0(Native Method) > > at java.net.SocketInputStream.read(SocketInputStream.java:152) > > at java.net.SocketInputStream.read(SocketInputStream.java:122) > > at sun.security.ssl.InputRecord.readFully(InputRecord.java:442) > > at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554) > > at sun.security.ssl.InputRecord.read(InputRecord.java:509) > > at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927) > > at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884) > > at sun.security.ssl.AppInputStream.read(AppInputStream.java:102) > > at > > org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:204) > > at > > org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:182) > > at > > org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:138) > > at > > com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:73) > > at > com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:151) > > at > > com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:73) > > at > > com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:73) > > at > > com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:73) > > at > com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:151) > > at > > com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:73) > > at > > com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:108) > > at > > com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:73) > > at > > com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem$NativeS3FsInputStream.read(S3NativeFileSystem.java:231) > > at java.io.BufferedInputStream.read1(BufferedInputStream.java:273) > > at java.io.BufferedInputStream.read(BufferedInputStream.java:334) > > at java.io.DataInputStream.read(DataInputStream.java:100) > > at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180) > > at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) > > at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) > > at > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209) > > at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47) > > at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:244) > > at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:210) > > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > > at > > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > > at > > org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:131) > > at > > org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:128) > > at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:618) > > at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:618) > > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) > > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) > > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) > > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > > at org.apache.spark.scheduler.Task.run(Task.scala:56) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) > > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > at java.lang.Thread.run(Thread.java:745) > > > > Thanks, > > Pere >
