Silvio,
Thanks for your responses and patience. It worked after i reshuffled the
arguments and removed avro dependencies.

On Fri, Jun 26, 2015 at 9:55 AM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

>   OK, here’s how I did it, using just the built-in Avro libraries with
> Spark 1.3:
>
>  import org.apache.avro.generic.{GenericData, GenericRecord}
> import org.apache.avro.mapred.AvroKey
> import org.apache.avro.mapreduce.AvroKeyInputFormat
> import org.apache.hadoop.io.NullWritable
> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
>
>  val hadoopConf = new
> org.apache.hadoop.conf.Configuration(sc.hadoopConfiguration)
> hadoopConf.setLong(FileInputFormat.SPLIT_MAXSIZE, 100)
>
>  val input = sc.newAPIHadoopFile(
>   "examples/src/main/resources/users.avro",
>   classOf[AvroKeyInputFormat[GenericRecord]],
>   classOf[AvroKey[GenericRecord]],
>   classOf[NullWritable],
>   hadoopConf).map(_._1.datum.get("name"))
>
>  println(input.partitions.size)
>
>
>
>
>   From: "ÐΞ€ρ@Ҝ (๏̯͡๏)"
> Date: Friday, June 26, 2015 at 11:04 AM
> To: Silvio Fiorito
> Cc: user
> Subject: Re:
>
>   <dependency>
>
> <groupId>org.apache.avro</groupId>
>
> <artifactId>avro</artifactId>
>
> <version>1.7.7</version>
>
> <scope>provided</scope>
>
> </dependency>
>
> <dependency>
>
> <groupId>com.databricks</groupId>
>
> <artifactId>spark-avro_2.10</artifactId>
>
> <version>1.0.0</version>
>
> </dependency>
>
> <dependency>
>
> <groupId>org.apache.avro</groupId>
>
> <artifactId>avro-mapred</artifactId>
>
> <version>1.7.7</version>
>
> <classifier>hadoop2</classifier>
>
> <scope>provided</scope>
>
> </dependency>
>
> On Fri, Jun 26, 2015 at 8:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
> wrote:
>
>> Same code of yours works for me as well
>>
>> On Fri, Jun 26, 2015 at 8:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>> wrote:
>>
>>> Is that its not supported with Avro. Unlikely.
>>>
>>> On Fri, Jun 26, 2015 at 8:01 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>>> wrote:
>>>
>>>> My imports:
>>>>
>>>>  import org.apache.avro.generic.GenericData
>>>>
>>>> import org.apache.avro.generic.GenericRecord
>>>>
>>>> import org.apache.avro.mapred.AvroKey
>>>>
>>>> import org.apache.avro.Schema
>>>>
>>>> import org.apache.hadoop.io.NullWritable
>>>>
>>>> import org.apache.avro.mapreduce.AvroKeyInputFormat
>>>>
>>>> import org.apache.hadoop.conf.Configuration
>>>>
>>>> import org.apache.hadoop.fs.FileSystem
>>>>
>>>> import org.apache.hadoop.fs.Path
>>>>
>>>> import org.apache.hadoop.io.Text
>>>>
>>>>
>>>>    def readGenericRecords(sc: SparkContext, inputDir: String,
>>>> startDate: Date, endDate: Date) = {
>>>>
>>>>     val path = getInputPaths(inputDir, startDate, endDate)
>>>>
>>>>     val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)
>>>>
>>>>     hadoopConfiguration.set(
>>>> "mapreduce.input.fileinputformat.split.maxsize", "67108864")
>>>>
>>>>     sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
>>>> AvroKeyInputFormat[GenericRecord]](path + "/*.avro")
>>>>
>>>>   }
>>>>
>>>> I need to read Avro datasets and am using strings instead of constant
>>>> from InputFormat class.
>>>>
>>>>
>>>>  When i click on any hadoop dependency from eclipse, i see they point
>>>> to hadoop 2.2.x jars.
>>>>
>>>>
>>>>
>>>> On Fri, Jun 26, 2015 at 7:44 AM, Silvio Fiorito <
>>>> silvio.fior...@granturing.com> wrote:
>>>>
>>>>>   Make sure you’re importing the right namespace for Hadoop v2.0.
>>>>> This is what I tried:
>>>>>
>>>>>   import org.apache.hadoop.io.{LongWritable, Text}
>>>>> import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat,
>>>>> TextInputFormat}
>>>>>
>>>>>  val hadoopConf = new org.apache.hadoop.conf.Configuration()
>>>>> hadoopConf.setLong(FileInputFormat.SPLIT_MAXSIZE, 2048)
>>>>>
>>>>>  val input = sc.newAPIHadoopFile(
>>>>>   "README.md",
>>>>>   classOf[TextInputFormat],
>>>>>   classOf[LongWritable],
>>>>>   classOf[Text],
>>>>>   hadoopConf).map(_._2.toString())
>>>>>
>>>>>  println(input.partitions.size)
>>>>>
>>>>>  input.
>>>>>   flatMap(_.split(" ")).
>>>>>   filter(_.length > 0).
>>>>>   map((_, 1)).
>>>>>   reduceByKey(_ + _).
>>>>>   coalesce(1).
>>>>>   sortBy(_._2, false).
>>>>>   take(10).
>>>>>   foreach(println)
>>>>>
>>>>>
>>>>>   From: "ÐΞ€ρ@Ҝ (๏̯͡๏)"
>>>>> Date: Friday, June 26, 2015 at 10:18 AM
>>>>> To: Silvio Fiorito
>>>>> Cc: user
>>>>> Subject: Re:
>>>>>
>>>>>   All these throw compilation error at newAPIHadoopFile
>>>>>
>>>>> 1)
>>>>>
>>>>> val hadoopConfiguration = new Configuration()
>>>>>
>>>>>     hadoopConfiguration.set(
>>>>> "mapreduce.input.fileinputformat.split.maxsize", "67108864")
>>>>>
>>>>>     sc.newAPIHadoopFile[AvroKey, NullWritable,
>>>>> AvroKeyInputFormat](path + "/*.avro", classOf[AvroKey],
>>>>> classOf[NullWritable], classOf[AvroKeyInputFormat], hadoopConfiguration)
>>>>>
>>>>> 2)
>>>>>
>>>>> val hadoopConfiguration = new Configuration()
>>>>>
>>>>>     hadoopConfiguration.set(
>>>>> "mapreduce.input.fileinputformat.split.maxsize", "67108864")
>>>>>
>>>>>     sc.newAPIHadoopFile[AvroKey, NullWritable,
>>>>> AvroKeyInputFormat](path + "/*.avro",
>>>>> classOf[AvroKey[GenericRecord]], classOf[NullWritable],
>>>>> classOf[AvroKeyInputFormat[GenericRecord]],hadoopConfiguration)
>>>>>
>>>>> 3)
>>>>>
>>>>>     val hadoopConfiguration = new Configuration()
>>>>>
>>>>>     hadoopConfiguration.set(
>>>>> "mapreduce.input.fileinputformat.split.maxsize", "67108864")
>>>>>
>>>>>     sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
>>>>> AvroKeyInputFormat[GenericRecord]](path + "/*.avro",
>>>>> classOf[AvroKey[GenericRecord]], classOf[NullWritable],
>>>>> classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)
>>>>>
>>>>> Error:
>>>>>
>>>>> [ERROR]
>>>>> /Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37:
>>>>> error: overloaded method value newAPIHadoopFile with alternatives:
>>>>>
>>>>> [INFO]   (path: String,fClass:
>>>>> Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],kClass:
>>>>> Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],vClass:
>>>>> Class[org.apache.hadoop.io.NullWritable],conf:
>>>>> org.apache.hadoop.conf.Configuration)org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],
>>>>> org.apache.hadoop.io.NullWritable)] <and>
>>>>>
>>>>> [INFO]   (path: String)(implicit km:
>>>>> scala.reflect.ClassTag[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],
>>>>> implicit vm: scala.reflect.ClassTag[org.apache.hadoop.io.NullWritable],
>>>>> implicit fm:
>>>>> scala.reflect.ClassTag[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]])org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],
>>>>> org.apache.hadoop.io.NullWritable)]
>>>>>
>>>>> [INFO]  cannot be applied to (String,
>>>>> Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],
>>>>> Class[org.apache.hadoop.io.NullWritable],
>>>>> Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],
>>>>> org.apache.hadoop.conf.Configuration)
>>>>>
>>>>> [INFO]     sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
>>>>> AvroKeyInputFormat[GenericRecord]](path + "/*.avro",
>>>>> classOf[AvroKey[GenericRecord]], classOf[NullWritable],
>>>>> classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Jun 25, 2015 at 4:14 PM, Silvio Fiorito <
>>>>> silvio.fior...@granturing.com> wrote:
>>>>>
>>>>>>  Ok, in that case I think you can set the max split size in the
>>>>>> Hadoop config object, using the FileInputFormat.SPLIT_MAXSIZE config
>>>>>> parameter.
>>>>>>
>>>>>>  Again, I haven’t done this myself, but looking through the Spark
>>>>>> codebase here:
>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1053
>>>>>>
>>>>>>  And the HDFS FileInputFormat implementation, that seems like a good
>>>>>> option to try.
>>>>>>
>>>>>>  You should be able to call
>>>>>> conf.setLong(FileInputFormat.SPLIT_MAXSIZE, max).
>>>>>>
>>>>>>  I hope that helps!
>>>>>>
>>>>>>   From: "ÐΞ€ρ@Ҝ (๏̯͡๏)"
>>>>>> Date: Thursday, June 25, 2015 at 5:49 PM
>>>>>> To: Silvio Fiorito
>>>>>> Cc: user
>>>>>> Subject: Re:
>>>>>>
>>>>>>   I use
>>>>>>
>>>>>> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
>>>>>> AvroKeyInputFormat[GenericRecord]](path + "/*.avro")
>>>>>>
>>>>>>
>>>>>>
>>>>>> https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/SparkContext.html#newAPIHadoopFile(java.lang.String,
>>>>>> java.lang.Class, java.lang.Class, java.lang.Class,
>>>>>> org.apache.hadoop.conf.Configuration)
>>>>>>
>>>>>> Does not seem to have that partition option.
>>>>>>
>>>>>> On Thu, Jun 25, 2015 at 12:24 PM, Silvio Fiorito <
>>>>>> silvio.fior...@granturing.com> wrote:
>>>>>>
>>>>>>>  Hi Deepak,
>>>>>>>
>>>>>>>  Have you tried specifying the minimum partitions when you load the
>>>>>>> file? I haven’t tried that myself against HDFS before, so I’m not sure 
>>>>>>> if
>>>>>>> it will affect data locality. Ideally not, it should still maintain data
>>>>>>> locality but just more partitions. Once your job runs, you can check in 
>>>>>>> the
>>>>>>> Spark tasks web UI to ensure they’re all Node local.
>>>>>>>
>>>>>>>  val details = sc.textFile(“hdfs://….”, 500)
>>>>>>>
>>>>>>>  If you’re using something other than text file you can also
>>>>>>> specify minimum partitions when using sc.hadoopFile.
>>>>>>>
>>>>>>>  Thanks,
>>>>>>> Silvio
>>>>>>>
>>>>>>>   From: "ÐΞ€ρ@Ҝ (๏̯͡๏)"
>>>>>>> Date: Thursday, June 25, 2015 at 3:10 PM
>>>>>>> To: Akhil Das
>>>>>>> Cc: user
>>>>>>> Subject: Re:
>>>>>>>
>>>>>>>   How can i increase the number of tasks from 174 to 500 without
>>>>>>> running repartition.
>>>>>>>
>>>>>>>  The input size is 512.0 MB (hadoop) / 4159106. Can this be reduced
>>>>>>> to 64 MB so as to increase the number of tasks. Similar to split size 
>>>>>>> that
>>>>>>> increases the number of mappers in Hadoop M/R.
>>>>>>>
>>>>>>> On Thu, Jun 25, 2015 at 12:06 AM, Akhil Das <
>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>
>>>>>>>>  Look in the tuning section
>>>>>>>> <https://spark.apache.org/docs/latest/tuning.html>, also you need
>>>>>>>> to figure out whats taking time and where's your bottleneck etc. If
>>>>>>>> everything is tuned properly, then you will need to throw more cores :)
>>>>>>>>
>>>>>>>>  Thanks
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> On Thu, Jun 25, 2015 at 12:19 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Its taking an hour and on Hadoop it takes 1h 30m, is there a way
>>>>>>>>> to make it run faster ?
>>>>>>>>>
>>>>>>>>> On Wed, Jun 24, 2015 at 11:39 AM, Akhil Das <
>>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>>
>>>>>>>>>> Cool. :)
>>>>>>>>>>  On 24 Jun 2015 23:44, "ÐΞ€ρ@Ҝ (๏̯͡๏)" <deepuj...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Its running now.
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jun 24, 2015 at 10:45 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Now running with
>>>>>>>>>>>>
>>>>>>>>>>>>  *--num-executors 9973 --driver-memory 14g
>>>>>>>>>>>> --driver-java-options "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M"
>>>>>>>>>>>> --executor-memory 14g --executor-cores 1*
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jun 24, 2015 at 10:34 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>>  There are multiple of these
>>>>>>>>>>>>>
>>>>>>>>>>>>>  1)
>>>>>>>>>>>>> 15/06/24 09:53:37 ERROR executor.Executor: Exception in task
>>>>>>>>>>>>> 443.0 in stage 3.0 (TID 1767)
>>>>>>>>>>>>>  java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>>>>>>>>>>>  at
>>>>>>>>>>>>> sun.reflect.GeneratedSerializationConstructorAccessor1327.newInstance(Unknown
>>>>>>>>>>>>> Source)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:56)
>>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
>>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>>>>>>>>>  at
>>>>>>>>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
>>>>>>>>>>>>>  15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on
>>>>>>>>>>>>> LARS? timer thread
>>>>>>>>>>>>>
>>>>>>>>>>>>>  2)
>>>>>>>>>>>>> 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on
>>>>>>>>>>>>> LARS? timer thread
>>>>>>>>>>>>>  java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>>>>>>>>>>>  at
>>>>>>>>>>>>> akka.dispatch.AbstractNodeQueue.<init>(AbstractNodeQueue.java:22)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> akka.actor.LightArrayRevolverScheduler$TaskQueue.<init>(Scheduler.scala:443)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>> 3)
>>>>>>>>>>>>> # java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>>>>>>>>>>> # -XX:OnOutOfMemoryError="kill %p"
>>>>>>>>>>>>> #   Executing /bin/sh -c "kill 20674"...
>>>>>>>>>>>>> [ERROR] [06/24/2015 09:53:37.590] [Executor task launch
>>>>>>>>>>>>> worker-5] [akka.tcp://
>>>>>>>>>>>>> sparkdri...@phxdpehdc9dn2137.stratus.phx.ebay.com:47708/]
>>>>>>>>>>>>> swallowing exception during message send
>>>>>>>>>>>>> (akka.remote.RemoteTransportExceptionNoStackTrace)
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jun 24, 2015 at 10:31 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I see this
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>>>>>>>>>>>> at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>>>>>>>>>>> at java.lang.String.<init>(String.java:203)
>>>>>>>>>>>>>> at java.lang.StringBuilder.toString(StringBuilder.java:405)
>>>>>>>>>>>>>> at java.io.UnixFileSystem.resolve(UnixFileSystem.java:108)
>>>>>>>>>>>>>> at java.io.File.<init>(File.java:367)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:81)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:84)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getIndexFile(IndexShuffleBlockManager.scala:60)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:107)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:304)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Jun 24, 2015 at 7:16 AM, Akhil Das <
>>>>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  Can you look a bit more in the error logs? It could be
>>>>>>>>>>>>>>> getting killed because of OOM etc. One thing you can try is to 
>>>>>>>>>>>>>>> set the
>>>>>>>>>>>>>>> spark.shuffle.blockTransferService to nio from netty.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  Thanks
>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Jun 24, 2015 at 5:46 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>  I have a Spark job that has 7 stages. The first 3 stage
>>>>>>>>>>>>>>>> complete and the fourth stage beings (joins two RDDs). This 
>>>>>>>>>>>>>>>> stage has
>>>>>>>>>>>>>>>> multiple task  failures all the below exception.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>  Multiple tasks (100s) of them get the same exception with
>>>>>>>>>>>>>>>> different hosts. How can all the host suddenly stop responding 
>>>>>>>>>>>>>>>> when few
>>>>>>>>>>>>>>>> moments ago 3 stages ran successfully. If I re-run the three 
>>>>>>>>>>>>>>>> stages will
>>>>>>>>>>>>>>>> again run successfully. I cannot think of it being a cluster 
>>>>>>>>>>>>>>>> issue.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>  Any suggestions ?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>  Spark Version : 1.3.1
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>  Exception:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> org.apache.spark.shuffle.FetchFailedException: Failed to 
>>>>>>>>>>>>>>>> connect to HOST
>>>>>>>>>>>>>>>>        at 
>>>>>>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>>>>>>>>>>>>>>>>        at 
>>>>>>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>>>>>>>>>>>>>>        at 
>>>>>>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>>>>>>>>>>>>>>        at 
>>>>>>>>>>>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>>>>>>>>>>>        at 
>>>>>>>>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>>>>>>>>>>>>        at 
>>>>>>>>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>>>>>>>>>>>>        at 
>>>>>>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>>>>>>        at 
>>>>>>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>>>>>>        at 
>>>>>>>>>>>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
>>>>>>>>>>>>>>>>        at org.apache.sp
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>  --
>>>>>>>>>>>>>>>>  Deepak
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>   --
>>>>>>>>>>>>>>  Deepak
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>   --
>>>>>>>>>>>>>  Deepak
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>   --
>>>>>>>>>>>>  Deepak
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>  --
>>>>>>>>>>>  Deepak
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>   --
>>>>>>>>>  Deepak
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>  --
>>>>>>>  Deepak
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>  --
>>>>>>  Deepak
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>  --
>>>>>  Deepak
>>>>>
>>>>>
>>>>
>>>>
>>>>   --
>>>>  Deepak
>>>>
>>>>
>>>
>>>
>>>   --
>>>  Deepak
>>>
>>>
>>
>>
>>   --
>>  Deepak
>>
>>
>
>
>  --
>  Deepak
>
>


-- 
Deepak
  • Re: Silvio Fiorito
    • Re: ๏̯͡๏

Reply via email to