The problem is likely that the underlying avro library is reusing objects
for speed.  You probably need to explicitly copy the values out of the
reused record before the collect.

On Sat, Apr 11, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote:

> The read seem to be successfully as the values for each field in record
> are different and correct. The problem is when i collect it or trigger next
> processing (join with other table) , each of this probably triggers
> serialization and thats when all the fields in the record get the value of
> first field (or element).
>
>
>
> On Sun, Apr 12, 2015 at 9:14 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
> wrote:
>
>> We have very large processing being done on Hadoop (400 M/r Jobs, 1 Day
>> duration, 100s of TB data, 100s of joins). We are exploring Spark as
>> alternative to speed up our processing time. We use Scala + Scoobie today
>> and Avro is the data format across steps.
>>
>>
>> I observed a strange behavior, i read sample data (avro format, 10
>> records) and i collect it and print each record. All the data for each
>> element within a record is wiped out and i only see data of first element
>> being copied for everything.
>>
>> Is this a problem with Spark ? Or with using Avro ?
>>
>>
>> Example:
>>
>> I took that RDD run through it and printed 4 elements from it, they all
>> printed correctly.
>>
>>
>> val x = viEvents.map {
>>       case (itemId, event) =>
>>         println(event.get("guid"), itemId, event.get("itemId"),
>> event.get("siteId"))
>>         (itemId, event)
>>     }
>>
>> The above code prints
>>
>> (27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3)
>> (27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3)
>> (27c9fbc014b4f61526f0574001b73b00,261197590161,261197590161,3)
>> (340da8c014a46272c0c8c830011c3bf0,221733319941,221733319941,77)
>> (340da8c014a46272c0c8c830011c3bf0,181704048554,181704048554,77)
>> (340da8c014a46272c0c8c830011c3bf0,231524481696,231524481696,77)
>> (340da8c014a46272c0c8c830011c3bf0,271830464992,271830464992,77)
>> (393938d71480a2aaf8e440d1fff709f4,141586046141,141586046141,0)
>> (3a792a7c14c0a35882346c04fff4e236,161605492016,161605492016,0)
>> (3a792a7c14c0a35882346c04fff4e236,161605492016,161605492016,0)
>>
>> viEvents.collect.foreach(a => println(a._2.get("guid"), a._1,
>> a._2.get("itemId"), a._2.get("siteId")))
>>
>> *Now, i collected it, this might have lead to serialization of the RDD.* Now
>> when i print the same 4 elements, *i only get guid values for all. Has
>> this got something to do with serialization ?*
>>
>>
>> (27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00)
>>
>> (27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00)
>>
>> (27c9fbc014b4f61526f0574001b73b00,261197590161,27c9fbc014b4f61526f0574001b73b00,27c9fbc014b4f61526f0574001b73b00)
>>
>> (340da8c014a46272c0c8c830011c3bf0,221733319941,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0)
>>
>> (340da8c014a46272c0c8c830011c3bf0,181704048554,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0)
>>
>> (340da8c014a46272c0c8c830011c3bf0,231524481696,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0)
>>
>> (340da8c014a46272c0c8c830011c3bf0,271830464992,340da8c014a46272c0c8c830011c3bf0,340da8c014a46272c0c8c830011c3bf0)
>>
>> (393938d71480a2aaf8e440d1fff709f4,141586046141,393938d71480a2aaf8e440d1fff709f4,393938d71480a2aaf8e440d1fff709f4)
>>
>> (3a792a7c14c0a35882346c04fff4e236,161605492016,3a792a7c14c0a35882346c04fff4e236,3a792a7c14c0a35882346c04fff4e236)
>>
>> (3a792a7c14c0a35882346c04fff4e236,161605492016,3a792a7c14c0a35882346c04fff4e236,3a792a7c14c0a35882346c04fff4e236)
>>
>>
>>
>> The RDD is of type org.apache.spark.rdd.RDD[(Long,
>>  com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord)]
>>
>> At the time of context creation i did this
>> val conf = new SparkConf()
>>       .setAppName(detail)
>>       .set("spark.serializer", "org.apache.spark.serializer.
>> *KryoSerializer*")
>>       .set("spark.kryoserializer.buffer.mb",
>> arguments.get("buffersize").get)
>>       .set("spark.kryoserializer.buffer.max.mb",
>> arguments.get("maxbuffersize").get)
>>       .set("spark.driver.maxResultSize",
>> arguments.get("maxResultSize").get)
>>       .set("spark.yarn.maxAppAttempts", "1")
>>
>> .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum],
>>
>> classOf[com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord],
>>
>> classOf[com.ebay.ep.poc.spark.reporting.process.detail.model.InputRecord],
>>
>> classOf[com.ebay.ep.poc.spark.reporting.process.model.SessionRecord],
>>
>> classOf[com.ebay.ep.poc.spark.reporting.process.model.DataRecord],
>>
>> classOf[com.ebay.ep.poc.spark.reporting.process.model.ExperimentationRecord]))
>>
>> The class heirarchy is
>>
>> DetailInputRecord extends InputRecord extends SessionRecord extends
>> ExperimentationRecord extends
>>    org.apache.avro.generic.GenericRecord.Record(schema: Schema)
>>
>>
>> Please suggest.
>>
>>
>>
>> --
>> Deepak
>>
>>
>
>
> --
> Deepak
>
>

Reply via email to