All these throw compilation error at newAPIHadoopFile 1)
val hadoopConfiguration = new Configuration() hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", "67108864") sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path + "/*.avro", classOf[AvroKey], classOf[NullWritable], classOf[AvroKeyInputFormat], hadoopConfiguration) 2) val hadoopConfiguration = new Configuration() hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", "67108864") sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path + "/*.avro", classOf[AvroKey[GenericRecord]], classOf[NullWritable], classOf[AvroKeyInputFormat[GenericRecord]],hadoopConfiguration) 3) val hadoopConfiguration = new Configuration() hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", "67108864") sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](path + "/*.avro", classOf[AvroKey[GenericRecord]], classOf[NullWritable], classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration) Error: [ERROR] /Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37: error: overloaded method value newAPIHadoopFile with alternatives: [INFO] (path: String,fClass: Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],kClass: Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],vClass: Class[org.apache.hadoop.io.NullWritable],conf: org.apache.hadoop.conf.Configuration)org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], org.apache.hadoop.io.NullWritable)] <and> [INFO] (path: String)(implicit km: scala.reflect.ClassTag[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]], implicit vm: scala.reflect.ClassTag[org.apache.hadoop.io.NullWritable], implicit fm: scala.reflect.ClassTag[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]])org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], org.apache.hadoop.io.NullWritable)] [INFO] cannot be applied to (String, Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]], Class[org.apache.hadoop.io.NullWritable], Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]], org.apache.hadoop.conf.Configuration) [INFO] sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](path + "/*.avro", classOf[AvroKey[GenericRecord]], classOf[NullWritable], classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration) On Thu, Jun 25, 2015 at 4:14 PM, Silvio Fiorito < silvio.fior...@granturing.com> wrote: > Ok, in that case I think you can set the max split size in the Hadoop > config object, using the FileInputFormat.SPLIT_MAXSIZE config parameter. > > Again, I haven’t done this myself, but looking through the Spark > codebase here: > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1053 > > And the HDFS FileInputFormat implementation, that seems like a good > option to try. > > You should be able to call conf.setLong(FileInputFormat.SPLIT_MAXSIZE, > max). > > I hope that helps! > > From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" > Date: Thursday, June 25, 2015 at 5:49 PM > To: Silvio Fiorito > Cc: user > Subject: Re: > > I use > > sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, > AvroKeyInputFormat[GenericRecord]](path + "/*.avro") > > > > https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/SparkContext.html#newAPIHadoopFile(java.lang.String, > java.lang.Class, java.lang.Class, java.lang.Class, > org.apache.hadoop.conf.Configuration) > > Does not seem to have that partition option. > > On Thu, Jun 25, 2015 at 12:24 PM, Silvio Fiorito < > silvio.fior...@granturing.com> wrote: > >> Hi Deepak, >> >> Have you tried specifying the minimum partitions when you load the >> file? I haven’t tried that myself against HDFS before, so I’m not sure if >> it will affect data locality. Ideally not, it should still maintain data >> locality but just more partitions. Once your job runs, you can check in the >> Spark tasks web UI to ensure they’re all Node local. >> >> val details = sc.textFile(“hdfs://….”, 500) >> >> If you’re using something other than text file you can also specify >> minimum partitions when using sc.hadoopFile. >> >> Thanks, >> Silvio >> >> From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" >> Date: Thursday, June 25, 2015 at 3:10 PM >> To: Akhil Das >> Cc: user >> Subject: Re: >> >> How can i increase the number of tasks from 174 to 500 without running >> repartition. >> >> The input size is 512.0 MB (hadoop) / 4159106. Can this be reduced to >> 64 MB so as to increase the number of tasks. Similar to split size that >> increases the number of mappers in Hadoop M/R. >> >> On Thu, Jun 25, 2015 at 12:06 AM, Akhil Das <ak...@sigmoidanalytics.com> >> wrote: >> >>> Look in the tuning section >>> <https://spark.apache.org/docs/latest/tuning.html>, also you need to >>> figure out whats taking time and where's your bottleneck etc. If everything >>> is tuned properly, then you will need to throw more cores :) >>> >>> Thanks >>> Best Regards >>> >>> On Thu, Jun 25, 2015 at 12:19 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>> wrote: >>> >>>> Its taking an hour and on Hadoop it takes 1h 30m, is there a way to >>>> make it run faster ? >>>> >>>> On Wed, Jun 24, 2015 at 11:39 AM, Akhil Das <ak...@sigmoidanalytics.com >>>> > wrote: >>>> >>>>> Cool. :) >>>>> On 24 Jun 2015 23:44, "ÐΞ€ρ@Ҝ (๏̯͡๏)" <deepuj...@gmail.com> wrote: >>>>> >>>>>> Its running now. >>>>>> >>>>>> On Wed, Jun 24, 2015 at 10:45 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Now running with >>>>>>> >>>>>>> *--num-executors 9973 --driver-memory 14g --driver-java-options >>>>>>> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M" --executor-memory 14g >>>>>>> --executor-cores 1* >>>>>>> >>>>>>> >>>>>>> On Wed, Jun 24, 2015 at 10:34 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com >>>>>>> > wrote: >>>>>>> >>>>>>>> There are multiple of these >>>>>>>> >>>>>>>> 1) >>>>>>>> 15/06/24 09:53:37 ERROR executor.Executor: Exception in task 443.0 >>>>>>>> in stage 3.0 (TID 1767) >>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>> at >>>>>>>> sun.reflect.GeneratedSerializationConstructorAccessor1327.newInstance(Unknown >>>>>>>> Source) >>>>>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:526) >>>>>>>> at >>>>>>>> org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:56) >>>>>>>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217) >>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) >>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>> at >>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42) >>>>>>>> at >>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) >>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>> at >>>>>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138) >>>>>>>> at >>>>>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) >>>>>>>> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>>>>>>> at >>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>>>>> at >>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>>>> at >>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>>>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>> at >>>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) >>>>>>>> 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on LARS? >>>>>>>> timer thread >>>>>>>> >>>>>>>> 2) >>>>>>>> 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on LARS? >>>>>>>> timer thread >>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>> at >>>>>>>> akka.dispatch.AbstractNodeQueue.<init>(AbstractNodeQueue.java:22) >>>>>>>> at >>>>>>>> akka.actor.LightArrayRevolverScheduler$TaskQueue.<init>(Scheduler.scala:443) >>>>>>>> at >>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409) >>>>>>>> at >>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) >>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>> 3) >>>>>>>> # java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>> # -XX:OnOutOfMemoryError="kill %p" >>>>>>>> # Executing /bin/sh -c "kill 20674"... >>>>>>>> [ERROR] [06/24/2015 09:53:37.590] [Executor task launch worker-5] >>>>>>>> [akka.tcp:// >>>>>>>> sparkdri...@phxdpehdc9dn2137.stratus.phx.ebay.com:47708/] >>>>>>>> swallowing exception during message send >>>>>>>> (akka.remote.RemoteTransportExceptionNoStackTrace) >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Jun 24, 2015 at 10:31 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>> >>>>>>>>> I see this >>>>>>>>> >>>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>>> at java.util.Arrays.copyOfRange(Arrays.java:2694) >>>>>>>>> at java.lang.String.<init>(String.java:203) >>>>>>>>> at java.lang.StringBuilder.toString(StringBuilder.java:405) >>>>>>>>> at java.io.UnixFileSystem.resolve(UnixFileSystem.java:108) >>>>>>>>> at java.io.File.<init>(File.java:367) >>>>>>>>> at >>>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:81) >>>>>>>>> at >>>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:84) >>>>>>>>> at >>>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getIndexFile(IndexShuffleBlockManager.scala:60) >>>>>>>>> at >>>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:107) >>>>>>>>> at >>>>>>>>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:304) >>>>>>>>> at >>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >>>>>>>>> at >>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >>>>>>>>> at >>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>>>>>>> at >>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>>>>>>> at >>>>>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>>>>>>>> at >>>>>>>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >>>>>>>>> at >>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >>>>>>>>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >>>>>>>>> at >>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) >>>>>>>>> at >>>>>>>>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) >>>>>>>>> at >>>>>>>>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) >>>>>>>>> at >>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) >>>>>>>>> at >>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) >>>>>>>>> at >>>>>>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >>>>>>>>> at >>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>> at >>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>>>>>> at >>>>>>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >>>>>>>>> at >>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>> at >>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>>>>>> at >>>>>>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) >>>>>>>>> at >>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2015 at 7:16 AM, Akhil Das < >>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>> >>>>>>>>>> Can you look a bit more in the error logs? It could be getting >>>>>>>>>> killed because of OOM etc. One thing you can try is to set the >>>>>>>>>> spark.shuffle.blockTransferService to nio from netty. >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> Best Regards >>>>>>>>>> >>>>>>>>>> On Wed, Jun 24, 2015 at 5:46 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> I have a Spark job that has 7 stages. The first 3 stage >>>>>>>>>>> complete and the fourth stage beings (joins two RDDs). This stage >>>>>>>>>>> has >>>>>>>>>>> multiple task failures all the below exception. >>>>>>>>>>> >>>>>>>>>>> Multiple tasks (100s) of them get the same exception with >>>>>>>>>>> different hosts. How can all the host suddenly stop responding when >>>>>>>>>>> few >>>>>>>>>>> moments ago 3 stages ran successfully. If I re-run the three stages >>>>>>>>>>> will >>>>>>>>>>> again run successfully. I cannot think of it being a cluster issue. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Any suggestions ? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Spark Version : 1.3.1 >>>>>>>>>>> >>>>>>>>>>> Exception: >>>>>>>>>>> >>>>>>>>>>> org.apache.spark.shuffle.FetchFailedException: Failed to connect to >>>>>>>>>>> HOST >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >>>>>>>>>>> at >>>>>>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>>>>>>>>>> at >>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>>> at >>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) >>>>>>>>>>> at org.apache.sp >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Deepak >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Deepak >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Deepak >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Deepak >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Deepak >>>>>> >>>>>> >>>> >>>> >>>> -- >>>> Deepak >>>> >>>> >>> >> >> >> -- >> Deepak >> >> > > > -- > Deepak > > -- Deepak