For anyone running into this same issue, it looks like Avro deserialization is just broken when used with SparkSQL and partitioned schemas. I created an bug report with details and a simplified example on how to reproduce: https://issues.apache.org/jira/browse/SPARK-13709
-- Chris Miller On Fri, Mar 4, 2016 at 12:11 AM, Chris Miller <cmiller11...@gmail.com> wrote: > One more thing -- just to set aside any question about my specific schema > or data, I used the sample schema and data record from Oracle's > documentation on Avro support. It's a pretty simple schema: > https://docs.oracle.com/cd/E26161_02/html/GettingStartedGuide/jsonbinding-overview.html > > When I create a table with this schema and then try to query the > Avro-encoded record, I get the same type of error: > > ******************** > org.apache.avro.AvroTypeException: Found avro.FullName, expecting union > at > org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292) > at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) > at > org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) > at > org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) > at > org.apache.hadoop.hive.serde2.avro.AvroDeserializer$SchemaReEncoder.reencode(AvroDeserializer.java:111) > at > org.apache.hadoop.hive.serde2.avro.AvroDeserializer.deserialize(AvroDeserializer.java:175) > at > org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:201) > at > org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:409) > at > org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:408) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to > (TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > ******************** > > To me, this "feels" like a bug -- I just can't identify if it's a Spark > issue or an Avro issue. Decoding the same files work fine with Hive, and I > imagine the same deserializer code is used there too. > > Thoughts? > > -- > Chris Miller > > On Thu, Mar 3, 2016 at 9:38 PM, Igor Berman <igor.ber...@gmail.com> wrote: > >> your field name is >> *enum1_values* >> >> but you have data >> { "foo1": "test123", *"enum1"*: "BLUE" } >> >> i.e. since you defined enum and not union(null, enum) >> it tries to find value for enum1_values and doesn't find one... >> >> On 3 March 2016 at 11:30, Chris Miller <cmiller11...@gmail.com> wrote: >> >>> I've been digging into this a little deeper. Here's what I've found: >>> >>> test1.avsc: >>> ******************** >>> { >>> "namespace": "com.cmiller", >>> "name": "test1", >>> "type": "record", >>> "fields": [ >>> { "name":"foo1", "type":"string" } >>> ] >>> } >>> ******************** >>> >>> test2.avsc: >>> ******************** >>> { >>> "namespace": "com.cmiller", >>> "name": "test1", >>> "type": "record", >>> "fields": [ >>> { "name":"foo1", "type":"string" }, >>> { "name":"enum1", "type": { "type":"enum", "name":"enum1_values", >>> "symbols":["BLUE","RED", "GREEN"]} } >>> ] >>> } >>> ******************** >>> >>> test1.json (encoded and saved to test/test1.avro): >>> ******************** >>> { "foo1": "test123" } >>> ******************** >>> >>> test2.json (encoded and saved to test/test1.avro): >>> ******************** >>> { "foo1": "test123", "enum1": "BLUE" } >>> ******************** >>> >>> Here is how I create the tables and add the data: >>> >>> ******************** >>> CREATE TABLE test1 >>> PARTITIONED BY (ds STRING) >>> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' >>> STORED AS INPUTFORMAT >>> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' >>> OUTPUTFORMAT >>> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' >>> TBLPROPERTIES ('avro.schema.url'='hdfs:///path/to/schemas/test1.avsc'); >>> >>> ALTER TABLE test1 ADD PARTITION (ds='1') LOCATION >>> 's3://spark-data/dev/test1'; >>> >>> >>> CREATE TABLE test2 >>> PARTITIONED BY (ds STRING) >>> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' >>> STORED AS INPUTFORMAT >>> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' >>> OUTPUTFORMAT >>> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' >>> TBLPROPERTIES ('avro.schema.url'='hdfs:///path/to/schemas/test2.avsc'); >>> >>> ALTER TABLE test2 ADD PARTITION (ds='1') LOCATION >>> 's3://spark-data/dev/test2'; >>> ******************** >>> >>> And here's what I get: >>> >>> ******************** >>> SELECT * FROM test1; >>> -- works fine, shows data >>> >>> SELECT * FROM test2; >>> >>> org.apache.avro.AvroTypeException: Found com.cmiller.enum1_values, >>> expecting union >>> at >>> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292) >>> at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) >>> at >>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) >>> at >>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) >>> at >>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) >>> at >>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) >>> at >>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) >>> at >>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) >>> at >>> org.apache.hadoop.hive.serde2.avro.AvroDeserializer$SchemaReEncoder.reencode(AvroDeserializer.java:111) >>> at >>> org.apache.hadoop.hive.serde2.avro.AvroDeserializer.deserialize(AvroDeserializer.java:175) >>> at >>> org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:201) >>> at >>> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:409) >>> at >>> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:408) >>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >>> at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) >>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>> at >>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >>> at >>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >>> at >>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) >>> at scala.collection.TraversableOnce$class.to >>> (TraversableOnce.scala:273) >>> at scala.collection.AbstractIterator.to(Iterator.scala:1157) >>> at >>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) >>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) >>> at >>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) >>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) >>> at >>> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) >>> at >>> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) >>> at >>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >>> at >>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:745) >>> ******************** >>> >>> In addition to the above, I also tried putting the test Avro files on >>> HDFS instead of S3 -- the error is the same. I also tried querying from >>> Scala instead of using Zeppelin, and I get the same error. >>> >>> Where should I begin with troubleshooting this problem? This same query >>> runs fine on Hive. Based on the error, it appears to be something in the >>> deserializer though... but if it were a bug in the Avro deserializer, why >>> does it only appear with Spark? I imagine Hive queries would be using the >>> same deserializer, no? >>> >>> Thanks! >>> >>> >>> >>> -- >>> Chris Miller >>> >>> On Thu, Mar 3, 2016 at 4:33 AM, Chris Miller <cmiller11...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> I have a strange issue occurring when I use manual partitions. >>>> >>>> If I create a table as follows, I am able to query the data with no >>>> problem: >>>> >>>> ******** >>>> CREATE TABLE test1 >>>> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' >>>> STORED AS INPUTFORMAT >>>> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' >>>> OUTPUTFORMAT >>>> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' >>>> LOCATION 's3://analytics-bucket/prod/logs/avro/2016/03/02/' >>>> TBLPROPERTIES ('avro.schema.url'='hdfs:///data/schemas/schema.avsc'); >>>> ******** >>>> >>>> If I create the table like this, however, and then add a partition with >>>> a LOCATION specified, I am unable to query: >>>> >>>> ******** >>>> CREATE TABLE test2 >>>> PARTITIONED BY (ds STRING) >>>> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' >>>> STORED AS INPUTFORMAT >>>> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' >>>> OUTPUTFORMAT >>>> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' >>>> TBLPROPERTIES ('avro.schema.url'='hdfs:///data/schemas/schema.avsc'); >>>> >>>> ALTER TABLE test7 ADD PARTITION (ds='1') LOCATION >>>> 's3://analytics-bucket/prod/logs/avro/2016/03/02/'; >>>> ******** >>>> >>>> This is what happens >>>> >>>> ******** >>>> SELECT * FROM test2 LIMIT 1; >>>> >>>> org.apache.avro.AvroTypeException: Found ActionEnum, expecting union >>>> at >>>> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292) >>>> at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) >>>> at >>>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) >>>> at >>>> org.apache.hadoop.hive.serde2.avro.AvroDeserializer$SchemaReEncoder.reencode(AvroDeserializer.java:111) >>>> at >>>> org.apache.hadoop.hive.serde2.avro.AvroDeserializer.deserialize(AvroDeserializer.java:175) >>>> at >>>> org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:201) >>>> at >>>> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:409) >>>> at >>>> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:408) >>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >>>> at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) >>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>>> at >>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >>>> at >>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >>>> at >>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) >>>> at scala.collection.TraversableOnce$class.to >>>> (TraversableOnce.scala:273) >>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157) >>>> at >>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) >>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) >>>> at >>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) >>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) >>>> at >>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >>>> at >>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >>>> at >>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >>>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>>> at >>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>> at java.lang.Thread.run(Thread.java:745) >>>> ******** >>>> >>>> The data is exactly the same, and I can still go back and query the >>>> test1 table without issue. I don't have control over the directory >>>> structure, so I need to add the partitions manually so that I can specify a >>>> location. >>>> >>>> For what it's worth, "ActionEnum" is the first field in my schema. This >>>> same table and query structure works fine with Hive. When I try to run this >>>> with SparkSQL, however, I get the above error. >>>> >>>> Anyone have any idea what the problem is here? Thanks! >>>> >>>> -- >>>> Chris Miller >>>> >>> >>> >> >