The read seem to be successfully as the values for each field in record are different and correct. The problem is when i collect it or trigger next processing (join with other table) , each of this probably triggers serialization and thats when all the fields in the record get the value of first field (or element).
On Sun, Apr 12, 2015 at 9:14 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote: > We have very large processing being done on Hadoop (400 M/r Jobs, 1 Day > duration, 100s of TB data, 100s of joins). We are exploring Spark as > alternative to speed up our processing time. We use Scala + Scoobie today > and Avro is the data format across steps. > > > I observed a strange behavior, i read sample data (avro format, 10 > records) and i collect it and print each record. All the data for each > element within a record is wiped out and i only see data of first element > being copied for everything. > > Is this a problem with Spark ? Or with using Avro ? > > > Example: > > I took that RDD run through it and printed 4 elements from it, they all > printed correctly. > > > val x = viEvents.map { > case (itemId, event) => > println(event.get("guid"), itemId, event.get("itemId"), > event.get("siteId")) > (itemId, event) > } > > The above code prints > > (27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3) > (27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3) > (27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3) > (340da8c014a46272c0c8c830011c3bf0,221733319941,221733319941,77) > (340da8c014a46272c0c8c830011c3bf0,181704048554,181704048554,77) > (340da8c014a46272c0c8c830011c3bf0,231524481696,231524481696,77) > (340da8c014a46272c0c8c830011c3bf0,271830464992,271830464992,77) > (393938d71480a2aaf8e440d1fff709f4,141586046141,141586046141,0) > (3a792a7c14c0a35882346c04fff4e236,161605492016,161605492016,0) > (3a792a7c14c0a35882346c04fff4e236,161605492016,161605492016,0) > > viEvents.collect.foreach(a => println(a._2.get("guid"), a._1, > a._2.get("itemId"), a._2.get("siteId"))) > > *Now, i collected it, this might have lead to serialization of the RDD.* Now > when i print the same 4 elements, *i only get guid values for all. Has > this got something to do with serialization ?* > > > (27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00) > > (27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00) > > (27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00) > > (340da8c014a46272c0c8c830011c3bf0,221733319941,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0) > > (340da8c014a46272c0c8c830011c3bf0,181704048554,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0) > > (340da8c014a46272c0c8c830011c3bf0,231524481696,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0) > > (340da8c014a46272c0c8c830011c3bf0,271830464992,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0) > > (393938d71480a2aaf8e440d1fff709f4,141586046141,393938d71480a2aaf8e440d1fff709f4,393938d71480a2aaf8e440d1fff709f4) > > (3a792a7c14c0a35882346c04fff4e236,161605492016,3a792a7c14c0a35882346c04fff4e236,3a792a7c14c0a35882346c04fff4e236) > > (3a792a7c14c0a35882346c04fff4e236,161605492016,3a792a7c14c0a35882346c04fff4e236,3a792a7c14c0a35882346c04fff4e236) > > > > The RDD is of type org.apache.spark.rdd.RDD[(Long, > com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord)] > > At the time of context creation i did this > val conf = new SparkConf() > .setAppName(detail) > .set("spark.serializer", "org.apache.spark.serializer. > *KryoSerializer*") > .set("spark.kryoserializer.buffer.mb", > arguments.get("buffersize").get) > .set("spark.kryoserializer.buffer.max.mb", > arguments.get("maxbuffersize").get) > .set("spark.driver.maxResultSize", > arguments.get("maxResultSize").get) > .set("spark.yarn.maxAppAttempts", "1") > > .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum], > > classOf[com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord], > > classOf[com.ebay.ep.poc.spark.reporting.process.detail.model.InputRecord], > > classOf[com.ebay.ep.poc.spark.reporting.process.model.SessionRecord], > > classOf[com.ebay.ep.poc.spark.reporting.process.model.DataRecord], > > classOf[com.ebay.ep.poc.spark.reporting.process.model.ExperimentationRecord])) > > The class heirarchy is > > DetailInputRecord extends InputRecord extends SessionRecord extends > ExperimentationRecord extends > org.apache.avro.generic.GenericRecord.Record(schema: Schema) > > > Please suggest. > > > > -- > Deepak > > -- Deepak