All these throw compilation error at newAPIHadoopFile

1)

val hadoopConfiguration = new Configuration()

    hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize",
"67108864")

    sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path +
"/*.avro", classOf[AvroKey], classOf[NullWritable],
classOf[AvroKeyInputFormat], hadoopConfiguration)

2)

val hadoopConfiguration = new Configuration()

    hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize",
"67108864")

    sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path +
"/*.avro", classOf[AvroKey[GenericRecord]], classOf[NullWritable],
classOf[AvroKeyInputFormat[GenericRecord]],hadoopConfiguration)

3)

    val hadoopConfiguration = new Configuration()

    hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize",
"67108864")

    sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
AvroKeyInputFormat[GenericRecord]](path + "/*.avro",
classOf[AvroKey[GenericRecord]], classOf[NullWritable],
classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)

Error:

[ERROR]
/Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37:
error: overloaded method value newAPIHadoopFile with alternatives:

[INFO]   (path: String,fClass:
Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],kClass:
Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],vClass:
Class[org.apache.hadoop.io.NullWritable],conf:
org.apache.hadoop.conf.Configuration)org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],
org.apache.hadoop.io.NullWritable)] <and>

[INFO]   (path: String)(implicit km:
scala.reflect.ClassTag[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],
implicit vm: scala.reflect.ClassTag[org.apache.hadoop.io.NullWritable],
implicit fm:
scala.reflect.ClassTag[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]])org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],
org.apache.hadoop.io.NullWritable)]

[INFO]  cannot be applied to (String,
Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],
Class[org.apache.hadoop.io.NullWritable],
Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],
org.apache.hadoop.conf.Configuration)

[INFO]     sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
AvroKeyInputFormat[GenericRecord]](path + "/*.avro",
classOf[AvroKey[GenericRecord]], classOf[NullWritable],
classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)



On Thu, Jun 25, 2015 at 4:14 PM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

>  Ok, in that case I think you can set the max split size in the Hadoop
> config object, using the FileInputFormat.SPLIT_MAXSIZE config parameter.
>
>  Again, I haven’t done this myself, but looking through the Spark
> codebase here:
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1053
>
>  And the HDFS FileInputFormat implementation, that seems like a good
> option to try.
>
>  You should be able to call conf.setLong(FileInputFormat.SPLIT_MAXSIZE,
> max).
>
>  I hope that helps!
>
>   From: "ÐΞ€ρ@Ҝ (๏̯͡๏)"
> Date: Thursday, June 25, 2015 at 5:49 PM
> To: Silvio Fiorito
> Cc: user
> Subject: Re:
>
>   I use
>
> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
> AvroKeyInputFormat[GenericRecord]](path + "/*.avro")
>
>
>
> https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/SparkContext.html#newAPIHadoopFile(java.lang.String,
> java.lang.Class, java.lang.Class, java.lang.Class,
> org.apache.hadoop.conf.Configuration)
>
> Does not seem to have that partition option.
>
> On Thu, Jun 25, 2015 at 12:24 PM, Silvio Fiorito <
> silvio.fior...@granturing.com> wrote:
>
>>  Hi Deepak,
>>
>>  Have you tried specifying the minimum partitions when you load the
>> file? I haven’t tried that myself against HDFS before, so I’m not sure if
>> it will affect data locality. Ideally not, it should still maintain data
>> locality but just more partitions. Once your job runs, you can check in the
>> Spark tasks web UI to ensure they’re all Node local.
>>
>>  val details = sc.textFile(“hdfs://….”, 500)
>>
>>  If you’re using something other than text file you can also specify
>> minimum partitions when using sc.hadoopFile.
>>
>>  Thanks,
>> Silvio
>>
>>   From: "ÐΞ€ρ@Ҝ (๏̯͡๏)"
>> Date: Thursday, June 25, 2015 at 3:10 PM
>> To: Akhil Das
>> Cc: user
>> Subject: Re:
>>
>>   How can i increase the number of tasks from 174 to 500 without running
>> repartition.
>>
>>  The input size is 512.0 MB (hadoop) / 4159106. Can this be reduced to
>> 64 MB so as to increase the number of tasks. Similar to split size that
>> increases the number of mappers in Hadoop M/R.
>>
>> On Thu, Jun 25, 2015 at 12:06 AM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>>  Look in the tuning section
>>> <https://spark.apache.org/docs/latest/tuning.html>, also you need to
>>> figure out whats taking time and where's your bottleneck etc. If everything
>>> is tuned properly, then you will need to throw more cores :)
>>>
>>>  Thanks
>>> Best Regards
>>>
>>> On Thu, Jun 25, 2015 at 12:19 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>>> wrote:
>>>
>>>> Its taking an hour and on Hadoop it takes 1h 30m, is there a way to
>>>> make it run faster ?
>>>>
>>>> On Wed, Jun 24, 2015 at 11:39 AM, Akhil Das <ak...@sigmoidanalytics.com
>>>> > wrote:
>>>>
>>>>> Cool. :)
>>>>>  On 24 Jun 2015 23:44, "ÐΞ€ρ@Ҝ (๏̯͡๏)" <deepuj...@gmail.com> wrote:
>>>>>
>>>>>> Its running now.
>>>>>>
>>>>>> On Wed, Jun 24, 2015 at 10:45 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Now running with
>>>>>>>
>>>>>>>  *--num-executors 9973 --driver-memory 14g --driver-java-options
>>>>>>> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M" --executor-memory 14g
>>>>>>> --executor-cores 1*
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jun 24, 2015 at 10:34 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>>  There are multiple of these
>>>>>>>>
>>>>>>>>  1)
>>>>>>>> 15/06/24 09:53:37 ERROR executor.Executor: Exception in task 443.0
>>>>>>>> in stage 3.0 (TID 1767)
>>>>>>>>  java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>>>>>>  at
>>>>>>>> sun.reflect.GeneratedSerializationConstructorAccessor1327.newInstance(Unknown
>>>>>>>> Source)
>>>>>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>>>>>>>> at
>>>>>>>> org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:56)
>>>>>>>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065)
>>>>>>>> at
>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228)
>>>>>>>> at
>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>> at
>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134)
>>>>>>>> at
>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>>>>>>>> at
>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>>>> at
>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>>>>>>>> at
>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>>>> at
>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>>>>>>>> at
>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>>>> at
>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>> at
>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>>>>>>>> at
>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>> at
>>>>>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138)
>>>>>>>> at
>>>>>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>>>>>>>> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>>> at
>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>>>>  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>>> at
>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>>>> at
>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>> at
>>>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
>>>>>>>>  15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on LARS?
>>>>>>>> timer thread
>>>>>>>>
>>>>>>>>  2)
>>>>>>>> 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on LARS?
>>>>>>>> timer thread
>>>>>>>>  java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>>>>>>  at
>>>>>>>> akka.dispatch.AbstractNodeQueue.<init>(AbstractNodeQueue.java:22)
>>>>>>>> at
>>>>>>>> akka.actor.LightArrayRevolverScheduler$TaskQueue.<init>(Scheduler.scala:443)
>>>>>>>> at
>>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409)
>>>>>>>> at
>>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>> 3)
>>>>>>>> # java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>>>>>> # -XX:OnOutOfMemoryError="kill %p"
>>>>>>>> #   Executing /bin/sh -c "kill 20674"...
>>>>>>>> [ERROR] [06/24/2015 09:53:37.590] [Executor task launch worker-5]
>>>>>>>> [akka.tcp://
>>>>>>>> sparkdri...@phxdpehdc9dn2137.stratus.phx.ebay.com:47708/]
>>>>>>>> swallowing exception during message send
>>>>>>>> (akka.remote.RemoteTransportExceptionNoStackTrace)
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jun 24, 2015 at 10:31 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I see this
>>>>>>>>>
>>>>>>>>>  java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>>>>>>> at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>>>>>> at java.lang.String.<init>(String.java:203)
>>>>>>>>> at java.lang.StringBuilder.toString(StringBuilder.java:405)
>>>>>>>>> at java.io.UnixFileSystem.resolve(UnixFileSystem.java:108)
>>>>>>>>> at java.io.File.<init>(File.java:367)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:81)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:84)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getIndexFile(IndexShuffleBlockManager.scala:60)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:107)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:304)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>>>>>>>> at
>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>>>>>> at
>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>>>>>> at
>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>>>>>> at
>>>>>>>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>>>>>>> at
>>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>>>>>>>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
>>>>>>>>> at
>>>>>>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>>>>>>>> at
>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>> at
>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>>>> at
>>>>>>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>>>>>>>> at
>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>> at
>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>>>> at
>>>>>>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>>>>>>>>> at
>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>>
>>>>>>>>> On Wed, Jun 24, 2015 at 7:16 AM, Akhil Das <
>>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>>
>>>>>>>>>>  Can you look a bit more in the error logs? It could be getting
>>>>>>>>>> killed because of OOM etc. One thing you can try is to set the
>>>>>>>>>> spark.shuffle.blockTransferService to nio from netty.
>>>>>>>>>>
>>>>>>>>>>  Thanks
>>>>>>>>>> Best Regards
>>>>>>>>>>
>>>>>>>>>> On Wed, Jun 24, 2015 at 5:46 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>>  I have a Spark job that has 7 stages. The first 3 stage
>>>>>>>>>>> complete and the fourth stage beings (joins two RDDs). This stage 
>>>>>>>>>>> has
>>>>>>>>>>> multiple task  failures all the below exception.
>>>>>>>>>>>
>>>>>>>>>>>  Multiple tasks (100s) of them get the same exception with
>>>>>>>>>>> different hosts. How can all the host suddenly stop responding when 
>>>>>>>>>>> few
>>>>>>>>>>> moments ago 3 stages ran successfully. If I re-run the three stages 
>>>>>>>>>>> will
>>>>>>>>>>> again run successfully. I cannot think of it being a cluster issue.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>  Any suggestions ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>  Spark Version : 1.3.1
>>>>>>>>>>>
>>>>>>>>>>>  Exception:
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
>>>>>>>>>>> HOST
>>>>>>>>>>>     at 
>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>>>>>>>>>>>     at 
>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>>>>>>>>>     at 
>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>>>>>>>>>     at 
>>>>>>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>>>>>>     at 
>>>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>>>>>>>     at 
>>>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>>>>>>>     at 
>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>     at 
>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>     at 
>>>>>>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
>>>>>>>>>>>     at org.apache.sp
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>  --
>>>>>>>>>>>  Deepak
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>   --
>>>>>>>>>  Deepak
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>   --
>>>>>>>>  Deepak
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>   --
>>>>>>>  Deepak
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>  --
>>>>>>  Deepak
>>>>>>
>>>>>>
>>>>
>>>>
>>>>   --
>>>>  Deepak
>>>>
>>>>
>>>
>>
>>
>>  --
>>  Deepak
>>
>>
>
>
>  --
>  Deepak
>
>


-- 
Deepak
  • Re: Silvio Fiorito
    • Re: ๏̯͡๏

Reply via email to