Silvio, Thanks for your responses and patience. It worked after i reshuffled the arguments and removed avro dependencies.
On Fri, Jun 26, 2015 at 9:55 AM, Silvio Fiorito < silvio.fior...@granturing.com> wrote: > OK, here’s how I did it, using just the built-in Avro libraries with > Spark 1.3: > > import org.apache.avro.generic.{GenericData, GenericRecord} > import org.apache.avro.mapred.AvroKey > import org.apache.avro.mapreduce.AvroKeyInputFormat > import org.apache.hadoop.io.NullWritable > import org.apache.hadoop.mapreduce.lib.input.FileInputFormat > > val hadoopConf = new > org.apache.hadoop.conf.Configuration(sc.hadoopConfiguration) > hadoopConf.setLong(FileInputFormat.SPLIT_MAXSIZE, 100) > > val input = sc.newAPIHadoopFile( > "examples/src/main/resources/users.avro", > classOf[AvroKeyInputFormat[GenericRecord]], > classOf[AvroKey[GenericRecord]], > classOf[NullWritable], > hadoopConf).map(_._1.datum.get("name")) > > println(input.partitions.size) > > > > > From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" > Date: Friday, June 26, 2015 at 11:04 AM > To: Silvio Fiorito > Cc: user > Subject: Re: > > <dependency> > > <groupId>org.apache.avro</groupId> > > <artifactId>avro</artifactId> > > <version>1.7.7</version> > > <scope>provided</scope> > > </dependency> > > <dependency> > > <groupId>com.databricks</groupId> > > <artifactId>spark-avro_2.10</artifactId> > > <version>1.0.0</version> > > </dependency> > > <dependency> > > <groupId>org.apache.avro</groupId> > > <artifactId>avro-mapred</artifactId> > > <version>1.7.7</version> > > <classifier>hadoop2</classifier> > > <scope>provided</scope> > > </dependency> > > On Fri, Jun 26, 2015 at 8:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> > wrote: > >> Same code of yours works for me as well >> >> On Fri, Jun 26, 2015 at 8:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >> wrote: >> >>> Is that its not supported with Avro. Unlikely. >>> >>> On Fri, Jun 26, 2015 at 8:01 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>> wrote: >>> >>>> My imports: >>>> >>>> import org.apache.avro.generic.GenericData >>>> >>>> import org.apache.avro.generic.GenericRecord >>>> >>>> import org.apache.avro.mapred.AvroKey >>>> >>>> import org.apache.avro.Schema >>>> >>>> import org.apache.hadoop.io.NullWritable >>>> >>>> import org.apache.avro.mapreduce.AvroKeyInputFormat >>>> >>>> import org.apache.hadoop.conf.Configuration >>>> >>>> import org.apache.hadoop.fs.FileSystem >>>> >>>> import org.apache.hadoop.fs.Path >>>> >>>> import org.apache.hadoop.io.Text >>>> >>>> >>>> def readGenericRecords(sc: SparkContext, inputDir: String, >>>> startDate: Date, endDate: Date) = { >>>> >>>> val path = getInputPaths(inputDir, startDate, endDate) >>>> >>>> val hadoopConfiguration = new Configuration(sc.hadoopConfiguration) >>>> >>>> hadoopConfiguration.set( >>>> "mapreduce.input.fileinputformat.split.maxsize", "67108864") >>>> >>>> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, >>>> AvroKeyInputFormat[GenericRecord]](path + "/*.avro") >>>> >>>> } >>>> >>>> I need to read Avro datasets and am using strings instead of constant >>>> from InputFormat class. >>>> >>>> >>>> When i click on any hadoop dependency from eclipse, i see they point >>>> to hadoop 2.2.x jars. >>>> >>>> >>>> >>>> On Fri, Jun 26, 2015 at 7:44 AM, Silvio Fiorito < >>>> silvio.fior...@granturing.com> wrote: >>>> >>>>> Make sure you’re importing the right namespace for Hadoop v2.0. >>>>> This is what I tried: >>>>> >>>>> import org.apache.hadoop.io.{LongWritable, Text} >>>>> import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, >>>>> TextInputFormat} >>>>> >>>>> val hadoopConf = new org.apache.hadoop.conf.Configuration() >>>>> hadoopConf.setLong(FileInputFormat.SPLIT_MAXSIZE, 2048) >>>>> >>>>> val input = sc.newAPIHadoopFile( >>>>> "README.md", >>>>> classOf[TextInputFormat], >>>>> classOf[LongWritable], >>>>> classOf[Text], >>>>> hadoopConf).map(_._2.toString()) >>>>> >>>>> println(input.partitions.size) >>>>> >>>>> input. >>>>> flatMap(_.split(" ")). >>>>> filter(_.length > 0). >>>>> map((_, 1)). >>>>> reduceByKey(_ + _). >>>>> coalesce(1). >>>>> sortBy(_._2, false). >>>>> take(10). >>>>> foreach(println) >>>>> >>>>> >>>>> From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" >>>>> Date: Friday, June 26, 2015 at 10:18 AM >>>>> To: Silvio Fiorito >>>>> Cc: user >>>>> Subject: Re: >>>>> >>>>> All these throw compilation error at newAPIHadoopFile >>>>> >>>>> 1) >>>>> >>>>> val hadoopConfiguration = new Configuration() >>>>> >>>>> hadoopConfiguration.set( >>>>> "mapreduce.input.fileinputformat.split.maxsize", "67108864") >>>>> >>>>> sc.newAPIHadoopFile[AvroKey, NullWritable, >>>>> AvroKeyInputFormat](path + "/*.avro", classOf[AvroKey], >>>>> classOf[NullWritable], classOf[AvroKeyInputFormat], hadoopConfiguration) >>>>> >>>>> 2) >>>>> >>>>> val hadoopConfiguration = new Configuration() >>>>> >>>>> hadoopConfiguration.set( >>>>> "mapreduce.input.fileinputformat.split.maxsize", "67108864") >>>>> >>>>> sc.newAPIHadoopFile[AvroKey, NullWritable, >>>>> AvroKeyInputFormat](path + "/*.avro", >>>>> classOf[AvroKey[GenericRecord]], classOf[NullWritable], >>>>> classOf[AvroKeyInputFormat[GenericRecord]],hadoopConfiguration) >>>>> >>>>> 3) >>>>> >>>>> val hadoopConfiguration = new Configuration() >>>>> >>>>> hadoopConfiguration.set( >>>>> "mapreduce.input.fileinputformat.split.maxsize", "67108864") >>>>> >>>>> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, >>>>> AvroKeyInputFormat[GenericRecord]](path + "/*.avro", >>>>> classOf[AvroKey[GenericRecord]], classOf[NullWritable], >>>>> classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration) >>>>> >>>>> Error: >>>>> >>>>> [ERROR] >>>>> /Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37: >>>>> error: overloaded method value newAPIHadoopFile with alternatives: >>>>> >>>>> [INFO] (path: String,fClass: >>>>> Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],kClass: >>>>> Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],vClass: >>>>> Class[org.apache.hadoop.io.NullWritable],conf: >>>>> org.apache.hadoop.conf.Configuration)org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], >>>>> org.apache.hadoop.io.NullWritable)] <and> >>>>> >>>>> [INFO] (path: String)(implicit km: >>>>> scala.reflect.ClassTag[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]], >>>>> implicit vm: scala.reflect.ClassTag[org.apache.hadoop.io.NullWritable], >>>>> implicit fm: >>>>> scala.reflect.ClassTag[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]])org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], >>>>> org.apache.hadoop.io.NullWritable)] >>>>> >>>>> [INFO] cannot be applied to (String, >>>>> Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]], >>>>> Class[org.apache.hadoop.io.NullWritable], >>>>> Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]], >>>>> org.apache.hadoop.conf.Configuration) >>>>> >>>>> [INFO] sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, >>>>> AvroKeyInputFormat[GenericRecord]](path + "/*.avro", >>>>> classOf[AvroKey[GenericRecord]], classOf[NullWritable], >>>>> classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration) >>>>> >>>>> >>>>> >>>>> On Thu, Jun 25, 2015 at 4:14 PM, Silvio Fiorito < >>>>> silvio.fior...@granturing.com> wrote: >>>>> >>>>>> Ok, in that case I think you can set the max split size in the >>>>>> Hadoop config object, using the FileInputFormat.SPLIT_MAXSIZE config >>>>>> parameter. >>>>>> >>>>>> Again, I haven’t done this myself, but looking through the Spark >>>>>> codebase here: >>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1053 >>>>>> >>>>>> And the HDFS FileInputFormat implementation, that seems like a good >>>>>> option to try. >>>>>> >>>>>> You should be able to call >>>>>> conf.setLong(FileInputFormat.SPLIT_MAXSIZE, max). >>>>>> >>>>>> I hope that helps! >>>>>> >>>>>> From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" >>>>>> Date: Thursday, June 25, 2015 at 5:49 PM >>>>>> To: Silvio Fiorito >>>>>> Cc: user >>>>>> Subject: Re: >>>>>> >>>>>> I use >>>>>> >>>>>> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, >>>>>> AvroKeyInputFormat[GenericRecord]](path + "/*.avro") >>>>>> >>>>>> >>>>>> >>>>>> https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/SparkContext.html#newAPIHadoopFile(java.lang.String, >>>>>> java.lang.Class, java.lang.Class, java.lang.Class, >>>>>> org.apache.hadoop.conf.Configuration) >>>>>> >>>>>> Does not seem to have that partition option. >>>>>> >>>>>> On Thu, Jun 25, 2015 at 12:24 PM, Silvio Fiorito < >>>>>> silvio.fior...@granturing.com> wrote: >>>>>> >>>>>>> Hi Deepak, >>>>>>> >>>>>>> Have you tried specifying the minimum partitions when you load the >>>>>>> file? I haven’t tried that myself against HDFS before, so I’m not sure >>>>>>> if >>>>>>> it will affect data locality. Ideally not, it should still maintain data >>>>>>> locality but just more partitions. Once your job runs, you can check in >>>>>>> the >>>>>>> Spark tasks web UI to ensure they’re all Node local. >>>>>>> >>>>>>> val details = sc.textFile(“hdfs://….”, 500) >>>>>>> >>>>>>> If you’re using something other than text file you can also >>>>>>> specify minimum partitions when using sc.hadoopFile. >>>>>>> >>>>>>> Thanks, >>>>>>> Silvio >>>>>>> >>>>>>> From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" >>>>>>> Date: Thursday, June 25, 2015 at 3:10 PM >>>>>>> To: Akhil Das >>>>>>> Cc: user >>>>>>> Subject: Re: >>>>>>> >>>>>>> How can i increase the number of tasks from 174 to 500 without >>>>>>> running repartition. >>>>>>> >>>>>>> The input size is 512.0 MB (hadoop) / 4159106. Can this be reduced >>>>>>> to 64 MB so as to increase the number of tasks. Similar to split size >>>>>>> that >>>>>>> increases the number of mappers in Hadoop M/R. >>>>>>> >>>>>>> On Thu, Jun 25, 2015 at 12:06 AM, Akhil Das < >>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>> >>>>>>>> Look in the tuning section >>>>>>>> <https://spark.apache.org/docs/latest/tuning.html>, also you need >>>>>>>> to figure out whats taking time and where's your bottleneck etc. If >>>>>>>> everything is tuned properly, then you will need to throw more cores :) >>>>>>>> >>>>>>>> Thanks >>>>>>>> Best Regards >>>>>>>> >>>>>>>> On Thu, Jun 25, 2015 at 12:19 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Its taking an hour and on Hadoop it takes 1h 30m, is there a way >>>>>>>>> to make it run faster ? >>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2015 at 11:39 AM, Akhil Das < >>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>> >>>>>>>>>> Cool. :) >>>>>>>>>> On 24 Jun 2015 23:44, "ÐΞ€ρ@Ҝ (๏̯͡๏)" <deepuj...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Its running now. >>>>>>>>>>> >>>>>>>>>>> On Wed, Jun 24, 2015 at 10:45 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Now running with >>>>>>>>>>>> >>>>>>>>>>>> *--num-executors 9973 --driver-memory 14g >>>>>>>>>>>> --driver-java-options "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M" >>>>>>>>>>>> --executor-memory 14g --executor-cores 1* >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Jun 24, 2015 at 10:34 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> There are multiple of these >>>>>>>>>>>>> >>>>>>>>>>>>> 1) >>>>>>>>>>>>> 15/06/24 09:53:37 ERROR executor.Executor: Exception in task >>>>>>>>>>>>> 443.0 in stage 3.0 (TID 1767) >>>>>>>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>>>>>>> at >>>>>>>>>>>>> sun.reflect.GeneratedSerializationConstructorAccessor1327.newInstance(Unknown >>>>>>>>>>>>> Source) >>>>>>>>>>>>> at >>>>>>>>>>>>> java.lang.reflect.Constructor.newInstance(Constructor.java:526) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:56) >>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) >>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) >>>>>>>>>>>>> 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on >>>>>>>>>>>>> LARS? timer thread >>>>>>>>>>>>> >>>>>>>>>>>>> 2) >>>>>>>>>>>>> 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on >>>>>>>>>>>>> LARS? timer thread >>>>>>>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>>>>>>> at >>>>>>>>>>>>> akka.dispatch.AbstractNodeQueue.<init>(AbstractNodeQueue.java:22) >>>>>>>>>>>>> at >>>>>>>>>>>>> akka.actor.LightArrayRevolverScheduler$TaskQueue.<init>(Scheduler.scala:443) >>>>>>>>>>>>> at >>>>>>>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409) >>>>>>>>>>>>> at >>>>>>>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) >>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>>>>> 3) >>>>>>>>>>>>> # java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>>>>>>> # -XX:OnOutOfMemoryError="kill %p" >>>>>>>>>>>>> # Executing /bin/sh -c "kill 20674"... >>>>>>>>>>>>> [ERROR] [06/24/2015 09:53:37.590] [Executor task launch >>>>>>>>>>>>> worker-5] [akka.tcp:// >>>>>>>>>>>>> sparkdri...@phxdpehdc9dn2137.stratus.phx.ebay.com:47708/] >>>>>>>>>>>>> swallowing exception during message send >>>>>>>>>>>>> (akka.remote.RemoteTransportExceptionNoStackTrace) >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Jun 24, 2015 at 10:31 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I see this >>>>>>>>>>>>>> >>>>>>>>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>>>>>>>> at java.util.Arrays.copyOfRange(Arrays.java:2694) >>>>>>>>>>>>>> at java.lang.String.<init>(String.java:203) >>>>>>>>>>>>>> at java.lang.StringBuilder.toString(StringBuilder.java:405) >>>>>>>>>>>>>> at java.io.UnixFileSystem.resolve(UnixFileSystem.java:108) >>>>>>>>>>>>>> at java.io.File.<init>(File.java:367) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:81) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:84) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getIndexFile(IndexShuffleBlockManager.scala:60) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:107) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:304) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Jun 24, 2015 at 7:16 AM, Akhil Das < >>>>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Can you look a bit more in the error logs? It could be >>>>>>>>>>>>>>> getting killed because of OOM etc. One thing you can try is to >>>>>>>>>>>>>>> set the >>>>>>>>>>>>>>> spark.shuffle.blockTransferService to nio from netty. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Jun 24, 2015 at 5:46 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I have a Spark job that has 7 stages. The first 3 stage >>>>>>>>>>>>>>>> complete and the fourth stage beings (joins two RDDs). This >>>>>>>>>>>>>>>> stage has >>>>>>>>>>>>>>>> multiple task failures all the below exception. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Multiple tasks (100s) of them get the same exception with >>>>>>>>>>>>>>>> different hosts. How can all the host suddenly stop responding >>>>>>>>>>>>>>>> when few >>>>>>>>>>>>>>>> moments ago 3 stages ran successfully. If I re-run the three >>>>>>>>>>>>>>>> stages will >>>>>>>>>>>>>>>> again run successfully. I cannot think of it being a cluster >>>>>>>>>>>>>>>> issue. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Any suggestions ? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Spark Version : 1.3.1 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Exception: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> org.apache.spark.shuffle.FetchFailedException: Failed to >>>>>>>>>>>>>>>> connect to HOST >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) >>>>>>>>>>>>>>>> at org.apache.sp >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>> Deepak >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Deepak >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Deepak >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Deepak >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Deepak >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Deepak >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Deepak >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Deepak >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Deepak >>>>> >>>>> >>>> >>>> >>>> -- >>>> Deepak >>>> >>>> >>> >>> >>> -- >>> Deepak >>> >>> >> >> >> -- >> Deepak >> >> > > > -- > Deepak > > -- Deepak