Thanks Enno, let me have a look at Stream Parser version of Jackson. Thanks Best Regards
On Sat, Feb 14, 2015 at 9:30 PM, Enno Shioji <eshi...@gmail.com> wrote: > Huh, that would come to 6.5ms per one JSON. That does feel like a lot but > if your JSON file is big enough, I guess you could get that sort of > processing time. > > Jackson is more or less the most efficient JSON parser out there, so > unless the Scala API is somehow affecting it, I don't see any better way. > If you only need to read parts of the JSON, you could look into exploiting > Jackson's stream parsing API > <http://wiki.fasterxml.com/JacksonStreamingApi>. > > I guess the good news is you can throw machines at it. You could also look > into other serialization frameworks. > > > > ᐧ > > On Sat, Feb 14, 2015 at 2:49 PM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: > >> Thanks again! >> Its with the parser only, just tried the parser >> <https://gist.github.com/akhld/3948a5d91d218eaf809d> without Spark. And >> it took me 52 Sec to process 8k json records. Not sure if there's an >> efficient way to do this in Spark, i know if i use sparkSQL with schemaRDD >> and all it will be much faster, but i need that in SparkStreaming. >> >> Thanks >> Best Regards >> >> On Sat, Feb 14, 2015 at 8:04 PM, Enno Shioji <eshi...@gmail.com> wrote: >> >>> I see. I'd really benchmark how the parsing performs outside Spark (in a >>> tight loop or something). If *that* is slow, you know it's the parsing. If >>> not, it's not the parsing. >>> >>> Another thing you want to look at is CPU usage. If the actual parsing >>> really is the bottleneck, you should see very high CPU utilization. If not, >>> it's not the parsing per se but rather the ability to feed the messages to >>> the parsing library. >>> >>> >>> ᐧ >>> >>> On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das <ak...@sigmoidanalytics.com> >>> wrote: >>> >>>> Ah my bad, it works without serializable exception. But not much >>>> performance difference is there though. >>>> >>>> Thanks >>>> Best Regards >>>> >>>> On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das <ak...@sigmoidanalytics.com> >>>> wrote: >>>> >>>>> Thanks for the suggestion, but doing that gives me this exception: >>>>> >>>>> http://pastebin.com/ni80NqKn >>>>> >>>>> Over this piece of code: >>>>> >>>>> object Holder extends Serializable { >>>>> @transient lazy val mapper = new ObjectMapper() with >>>>> ScalaObjectMapper >>>>> mapper.registerModule(DefaultScalaModule) >>>>> } >>>>> >>>>> val jsonStream = myDStream.map(x=> { >>>>> Holder.mapper.readValue[Map[String,Any]](x) >>>>> }) >>>>> >>>>> Thanks >>>>> Best Regards >>>>> >>>>> On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji <eshi...@gmail.com> >>>>> wrote: >>>>> >>>>>> (adding back user) >>>>>> >>>>>> Fair enough. Regarding serialization exception, the hack I use is to >>>>>> have a object with a transient lazy field, like so: >>>>>> >>>>>> >>>>>> object Holder extends Serializable { >>>>>> @transient lazy val mapper = new ObjectMapper() >>>>>> } >>>>>> >>>>>> This way, the ObjectMapper will be instantiated at the destination >>>>>> and you can share the instance. >>>>>> >>>>>> >>>>>> >>>>>> ᐧ >>>>>> >>>>>> On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das < >>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>> >>>>>>> Thanks for the reply Enno, in my case rate from the stream is not >>>>>>> the bottleneck as i'm able to consume all those records at a time (have >>>>>>> tested it). And regarding the ObjectMapper, if i take it outside of my >>>>>>> map >>>>>>> operation then it throws Serializable Exceptions (Caused by: >>>>>>> java.io.NotSerializableException: >>>>>>> com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier). >>>>>>> >>>>>>> Thanks >>>>>>> Best Regards >>>>>>> >>>>>>> On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji <eshi...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> If I were you I'd first parse some test jsons in isolation (outside >>>>>>>> Spark) to determine if the bottleneck is really the parsing. There are >>>>>>>> plenty other places that could be affecting your performance, like the >>>>>>>> rate >>>>>>>> you are able to read from your stream source etc. >>>>>>>> >>>>>>>> Apart from that, I notice that you are instantiating the >>>>>>>> ObjectMapper every time. This is quite expensive and jackson >>>>>>>> recommends you >>>>>>>> to share the instance. However, if you tried other parsers / >>>>>>>> mapPartitions >>>>>>>> without success, this probably won't fix your problem either. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das < >>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>> >>>>>>>>> I'm getting a low performance while parsing json data. My cluster >>>>>>>>> setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of >>>>>>>>> memory >>>>>>>>> and 4 cores. >>>>>>>>> >>>>>>>>> I tried both scala.util.parsing.json.JSON and and fasterxml's >>>>>>>>> Jackson parser. >>>>>>>>> >>>>>>>>> This is what i basically do: >>>>>>>>> >>>>>>>>> *//Approach 1:* >>>>>>>>> val jsonStream = myDStream.map(x=> { >>>>>>>>> val mapper = new ObjectMapper() with ScalaObjectMapper >>>>>>>>> mapper.registerModule(DefaultScalaModule) >>>>>>>>> mapper.readValue[Map[String,Any]](x) >>>>>>>>> }) >>>>>>>>> >>>>>>>>> jsonStream.count().print() >>>>>>>>> >>>>>>>>> >>>>>>>>> *//Approach 2:* >>>>>>>>> val jsonStream2 = >>>>>>>>> myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String, >>>>>>>>> Any]]) >>>>>>>>> >>>>>>>>> jsonStream2.count().print() >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> It takes around 15-20 Seconds to process/parse 35k json documents >>>>>>>>> (contains nested documents and arrays) which i put in the stream. >>>>>>>>> >>>>>>>>> Is there any better approach/parser to process it faster? i also >>>>>>>>> tried it with mapPartitions but it did not make any difference. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Best Regards >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >