The read seem to be successfully as the values for each field in record are
different and correct. The problem is when i collect it or trigger next
processing (join with other table) , each of this probably triggers
serialization and thats when all the fields in the record get the value of
first field (or element).



On Sun, Apr 12, 2015 at 9:14 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote:

> We have very large processing being done on Hadoop (400 M/r Jobs, 1 Day
> duration, 100s of TB data, 100s of joins). We are exploring Spark as
> alternative to speed up our processing time. We use Scala + Scoobie today
> and Avro is the data format across steps.
>
>
> I observed a strange behavior, i read sample data (avro format, 10
> records) and i collect it and print each record. All the data for each
> element within a record is wiped out and i only see data of first element
> being copied for everything.
>
> Is this a problem with Spark ? Or with using Avro ?
>
>
> Example:
>
> I took that RDD run through it and printed 4 elements from it, they all
> printed correctly.
>
>
> val x = viEvents.map {
>       case (itemId, event) =>
>         println(event.get("guid"), itemId, event.get("itemId"),
> event.get("siteId"))
>         (itemId, event)
>     }
>
> The above code prints
>
> (27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3)
> (27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3)
> (27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3)
> (340da8c014a46272c0c8c830011c3bf0,221733319941,221733319941,77)
> (340da8c014a46272c0c8c830011c3bf0,181704048554,181704048554,77)
> (340da8c014a46272c0c8c830011c3bf0,231524481696,231524481696,77)
> (340da8c014a46272c0c8c830011c3bf0,271830464992,271830464992,77)
> (393938d71480a2aaf8e440d1fff709f4,141586046141,141586046141,0)
> (3a792a7c14c0a35882346c04fff4e236,161605492016,161605492016,0)
> (3a792a7c14c0a35882346c04fff4e236,161605492016,161605492016,0)
>
> viEvents.collect.foreach(a => println(a._2.get("guid"), a._1,
> a._2.get("itemId"), a._2.get("siteId")))
>
> *Now, i collected it, this might have lead to serialization of the RDD.* Now
> when i print the same 4 elements, *i only get guid values for all. Has
> this got something to do with serialization ?*
>
>
> (27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00)
>
> (27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00)
>
> (27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00)
>
> (340da8c014a46272c0c8c830011c3bf0,221733319941,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0)
>
> (340da8c014a46272c0c8c830011c3bf0,181704048554,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0)
>
> (340da8c014a46272c0c8c830011c3bf0,231524481696,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0)
>
> (340da8c014a46272c0c8c830011c3bf0,271830464992,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0)
>
> (393938d71480a2aaf8e440d1fff709f4,141586046141,393938d71480a2aaf8e440d1fff709f4,393938d71480a2aaf8e440d1fff709f4)
>
> (3a792a7c14c0a35882346c04fff4e236,161605492016,3a792a7c14c0a35882346c04fff4e236,3a792a7c14c0a35882346c04fff4e236)
>
> (3a792a7c14c0a35882346c04fff4e236,161605492016,3a792a7c14c0a35882346c04fff4e236,3a792a7c14c0a35882346c04fff4e236)
>
>
>
> The RDD is of type org.apache.spark.rdd.RDD[(Long,
>  com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord)]
>
> At the time of context creation i did this
> val conf = new SparkConf()
>       .setAppName(detail)
>       .set("spark.serializer", "org.apache.spark.serializer.
> *KryoSerializer*")
>       .set("spark.kryoserializer.buffer.mb",
> arguments.get("buffersize").get)
>       .set("spark.kryoserializer.buffer.max.mb",
> arguments.get("maxbuffersize").get)
>       .set("spark.driver.maxResultSize",
> arguments.get("maxResultSize").get)
>       .set("spark.yarn.maxAppAttempts", "1")
>
> .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum],
>
> classOf[com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord],
>
> classOf[com.ebay.ep.poc.spark.reporting.process.detail.model.InputRecord],
>
> classOf[com.ebay.ep.poc.spark.reporting.process.model.SessionRecord],
>
> classOf[com.ebay.ep.poc.spark.reporting.process.model.DataRecord],
>
> classOf[com.ebay.ep.poc.spark.reporting.process.model.ExperimentationRecord]))
>
> The class heirarchy is
>
> DetailInputRecord extends InputRecord extends SessionRecord extends
> ExperimentationRecord extends
>    org.apache.avro.generic.GenericRecord.Record(schema: Schema)
>
>
> Please suggest.
>
>
>
> --
> Deepak
>
>


-- 
Deepak

Reply via email to