Thank you Jakob, you were bang on the money. Jorge appologies my snippets was partial and I hadn't made it equivelent to my failing test.
For reference and for all that pass this way, here is the (a) working solution with passing tests without inferring a schema, it was the second test that had been failing prior to Jakobs pointer. import org.apache.spark.sql.Dataset import org.scalatest.Matchers case class Sample(val time:Long , val opt: Option[Long] = None) class SampleTest extends SparkSimpleContextConfigurator with Matchers{ "A JSON Object" should "Parse Correctly" in { val jsonStr = """ {"time": 2, "opt": 1 } """ val rdd = sc.parallelize(Seq(jsonStr)) import sqlContext.implicits._ val samples: Dataset[Sample] = sqlContext.read.json(rdd).as[Sample] val sample: Sample = samples.first() sample.time should be (2) sample.opt.isDefined should be (true) sample.opt.get should be (1) } "A Partial JSON Object" should "Parse Correctly" in { val json = Seq( """ {"time": 2 } """ , """ {"time": 10,"opt": 10} """ ) val rdd = sc.parallelize(json) import sqlContext.implicits._ val samples: Dataset[Sample] = sqlContext.read.json(rdd).as[Sample] val sample: Sample = samples.first() sample.time should be (2) sample.opt.isDefined should be (false) } } Phone: 087 - 9179799 Quidquid latine dictum sit, altum sonatur On 23 February 2016 at 00:43, Jakob Odersky <ja...@odersky.com> wrote: > I think the issue is that the `json.read` function has no idea of the > underlying schema, in fact the documentation > ( > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader > ) > says: > > > Unless the schema is specified using schema function, this function goes > through the input once to determine the input schema. > > so since your test data does not contain a record with a product_id, > json.read creates a schema that does not contain it. Only after > determining the (incorrect) schema, you treat it as a Dataset of > CustomerEvent which will fail. > Try creating a schema (StructType) manually for your CustomerEvent > case class and pass it to the `json.schema` function before calling > `read`. I.e. something like > > val sch = StructType(StructField("customer_id",StringType,false), > StructField(porduct_id,IntegerType,true)) //there's probably a better > way to get the schema from a case class > val customers: Dataset[CustomerEvent] = > sqlContext.read.schema(sch).json(rdd).as[CustomerEvent] > > just a pointer, I haven't tested this. > regards, > --Jakob > > On Mon, Feb 22, 2016 at 12:17 PM, Jorge Machado <jom...@me.com> wrote: > > Hi Anthony, > > > > I try the code on my self. I think it is on the jsonStr: > > > > I do it with : val jsonStr = """{"customer_id": > > "3ee066ab571e03dd5f3c443a6c34417a","product_id": 3}”"" > > > > or is it the “,” after your 3 oder the “\n” > > > > Regards > > > > > > > > On 22/02/2016, at 15:42, Anthony Brew <atb...@gmail.com> wrote: > > > > Hi, > > I'm trying to parse JSON data into a case class using the > > DataFrame.as[] function, nut I am hitting an unusual error and the > interweb > > isnt solving my pain so thought I would reach out for help. Ive > truncated my > > code a little here to make it readable, but the error is full > > > > My case class looks like.... > > > > case class CustomerEvent( > > customer_id: String, > > product_id: Option[Long] = None, > > ) > > > > > > My passing test looks like > > > > "A Full CustomerEvent JSON Object" should "Parse Correctly" in { > > val jsonStr = """ { > > "customer_id": "3ee066ab571e03dd5f3c443a6c34417a", > > "product_id": 3, > > } > > """ > > // apparently deprecation is not an issue > > val rdd = sc.parallelize(Seq(jsonStr)) > > > > import sqlContext.implicits._ > > val customers: Dataset[CustomerEvent] = > > sqlContext.read.json(rdd).as[CustomerEvent] > > > > val ce: CustomerEvent = customers.first() > > ce.customer_id should be ("3ee066ab571e03dd5f3c443a6c34417a") > > ce.product_id.get should be (3) > > } > > > > My issue is when the product_id is not part of the json, I get a encoding > > error > > > > ie the following > > > > "A Partial CustomerEvent JSON Object" should " should Parse Correctly" > in > > { > > val jsonStr = """ { > > "customer_id": "3ee066ab571e03dd5f3c443a6c34417a" > > } > > """ > > // apparently deprecation is not an issue > > val rdd = sc.parallelize(Seq(jsonStr)) > > > > import sqlContext.implicits._ > > val customers: Dataset[CustomerEvent] = > > sqlContext.read.json(rdd).as[CustomerEvent] > > > > val ce: CustomerEvent = customers.first() > > ce.customer_id should be ("3ee066ab571e03dd5f3c443a6c34417a") > > ce.product_id.isDefined should be (false) > > > > } > > > > > > > > My error looks like > > > > Error while decoding: java.lang.UnsupportedOperationException: Cannot > > evaluate expression: upcast('product_id,DoubleType,- field (class: > > "scala.Option", name: "product_id"),- root class: "data.CustomerEvent") > > newinstance(class data.CustomerEvent,invoke(input[3, > > StringType],toString,ObjectType(class java.lang.String)),input[0, > > LongType],input[9, LongType],invoke(input[5, > > StringType],toString,ObjectType(class java.lang.String)),invoke(input[6, > > StringType],toString,ObjectType(class java.lang.String)),input[7, > > LongType],invoke(input[1, StringType],toString,ObjectType(class > > java.lang.String)),wrapoption(input[8, > > LongType]),wrapoption(upcast('product_id,DoubleType,- field (class: > > "scala.Option", name: "product_id"),- root class: > > "data.CustomerEvent")),wrapoption(input[4, > > DoubleType]),wrapoption(invoke(input[2, > > StringType],toString,ObjectType(class > > java.lang.String))),false,ObjectType(class data.CustomerEvent),None) > > :- invoke(input[3, StringType],toString,ObjectType(class > java.lang.String)) > > : +- input[3, StringType] > > :- input[0, LongType] > > :- input[9, LongType] > > :- invoke(input[5, StringType],toString,ObjectType(class > java.lang.String)) > > : +- input[5, StringType] > > :- invoke(input[6, StringType],toString,ObjectType(class > java.lang.String)) > > : +- input[6, StringType] > > :- input[7, LongType] > > :- invoke(input[1, StringType],toString,ObjectType(class > java.lang.String)) > > : +- input[1, StringType] > > :- wrapoption(input[8, LongType]) > > : +- input[8, LongType] > > :- wrapoption(upcast('product_id,DoubleType,- field (class: > "scala.Option", > > name: "product_id"),- root class: "data.CustomerEvent")) > > : +- upcast('product_id,DoubleType,- field (class: "scala.Option", name: > > "product_id"),- root class: "data.CustomerEvent") > > : +- 'product_id > > :- wrapoption(input[4, DoubleType]) > > : +- input[4, DoubleType] > > +- wrapoption(invoke(input[2, StringType],toString,ObjectType(class > > java.lang.String))) > > +- invoke(input[2, StringType],toString,ObjectType(class > > java.lang.String)) > > +- input[2, StringType] > > > > > > at > > > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:224) > > at > > org.apache.spark.sql.Dataset$$anonfun$collect$2.apply(Dataset.scala:668) > > at > > org.apache.spark.sql.Dataset$$anonfun$collect$2.apply(Dataset.scala:668) > > at > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) > > at > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) > > at > > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:245) > > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) > > at org.apache.spark.sql.Dataset.collect(Dataset.scala:668) > > at org.apache.spark.sql.Dataset.take(Dataset.scala:689) > > at org.apache.spark.sql.Dataset.first(Dataset.scala:654) > > at > > > data.TestLoadingCustomerEventFromJSON$$anonfun$2.apply$mcV$sp(TestLoadingCustomerEventFromJSON.scala:70) > > at > > > data.TestLoadingCustomerEventFromJSON$$anonfun$2.apply(TestLoadingCustomerEventFromJSON.scala:50) > > at > > > data.TestLoadingCustomerEventFromJSON$$anonfun$2.apply(TestLoadingCustomerEventFromJSON.scala:50) > > at > > > org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) > > at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) > > at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > > at org.scalatest.Transformer.apply(Transformer.scala:22) > > at org.scalatest.Transformer.apply(Transformer.scala:20) > > at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1647) > > at org.scalatest.Suite$class.withFixture(Suite.scala:1122) > > at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1683) > > at > > > org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1644) > > at > > > org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656) > > at > > > org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656) > > at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) > > at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1656) > > > > > > > > Any pointers on what I am doing wrong would be gratefully accepted! > > > > Thanks a Million, > > Anthony > > > > >