Its a serialization error with nested schema i guess. You can look at the twitters chill avro serializer library. Here's two discussion on the same:
- https://issues.apache.org/jira/browse/SPARK-3447 - http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-fails-with-avro-having-Arrays-and-unions-but-succeeds-with-simple-avro-td14549.html Thanks Best Regards On Thu, Jul 23, 2015 at 9:32 PM, Arbi Akhina <arbi.akh...@gmail.com> wrote: > Hi, > > I'm trying to read an avro file into a spark RDD, but I'm having > an Exception while getting task result. > > The avro schema file has the following content: > { > "type" : "record", > "name" : "sample_schema", > "namespace" : "com.adomik.avro", > "fields" : [ { > "name" : "username", > "type" : "string", > "doc" : "Name of the user account" > }, { > "name" : "events", > "type" : { > "type" : "array", > "items" : { > "name":"Event", > "type":"record", > "fields":[ > {"name":"action", "type":"string"}, {"name":"value", > "type":"long"} > ] > } > }, > "doc" : "The content of the user's Events message" > } ], > "doc:" : "A basic schema for storing Events messages" > } > > I create the avro file using avro-tools.jar file from the following json > file: > {"username":"miguno","events": [{"action":"signed", "value": 1}, > {"action": "loged", "value":1}] } > {"username":"blizzard","events": [{"action":"logout", "value": 2}, > {"action": "visited", "value":3}] } > > $ java -jar avro-tools-1.7.7.jar fromjson --schema-file myschema.avsc > data.json > data.avro > > I can correctly read the generated avro file with the avro-tools.jar as > follows: > $ java -jar avro-tools-1.7.7.jar tojson data.avro > > However I'm having an exception when I try to read this generated avro > file into a Spark RDD from spark shell as follows: > > > import org.apache.avro.mapred.AvroInputFormat > > import org.apache.avro.mapred.AvroWrapper > > import org.apache.hadoop.io.NullWritable > > import org.apache.hadoop.io.Text > > import org.apache.avro.generic.GenericRecord > > > val input = "/home/arbi/avro/data.avro" > > val rdd = sc.hadoopFile( > input, > classOf[AvroInputFormat[GenericRecord]], > classOf[AvroWrapper[GenericRecord]], > classOf[NullWritable] > ) > > Then when I call rdd.next, I see the following exception: > > 15/07/23 14:30:48 ERROR TaskResultGetter: Exception while getting task > result > > com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException > > Serialization trace: > > values (org.apache.avro.generic.GenericData$Record) > > datum (org.apache.avro.mapred.AvroWrapper) > > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) > > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) > > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) > > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) > > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) > > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) > > at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41) > > at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) > > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) > > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) > > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) > > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) > > at > org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173) > > at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) > > at > org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621) > > at > org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379) > > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82) > > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) > > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) > > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) > > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.NullPointerException > > at org.apache.avro.generic.GenericData$Array.add(GenericData.java:200) > > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) > > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18) > > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) > > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) > > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) > > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) > > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) > > ... 23 more > > org.apache.spark.SparkException: Job aborted due to stage failure: > Exception while getting task result: > com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException > > Serialization trace: > > values (org.apache.avro.generic.GenericData$Record) > > datum (org.apache.avro.mapred.AvroWrapper) > > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) > > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) > > at scala.Option.foreach(Option.scala:236) > > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) > > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) > > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) > > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > > Any idea what's causing this error? is the presence of arrays in avro > causes problem when generating spark RDDs?? > > Bests, >