Anyone any thought on this? On 22 April 2015 at 22:49, Jeetendra Gangele <gangele...@gmail.com> wrote:
> I made 7000 tasks in mapTopair and in distinct also I made same number of > tasks. > Still lots of shuffle read and write is happening due to application > running for much longer time. > Any idea? > > On 17 April 2015 at 11:55, Akhil Das <ak...@sigmoidanalytics.com> wrote: > >> How many tasks are you seeing in your mapToPair stage? Is it 7000? then i >> suggest you giving a number similar/close to 7000 in your .distinct call, >> what is happening in your case is that, you are repartitioning your data to >> a smaller number (32) which would put a lot of load on processing i >> believe, you can try increasing it. >> >> Thanks >> Best Regards >> >> On Fri, Apr 17, 2015 at 1:48 AM, Jeetendra Gangele <gangele...@gmail.com> >> wrote: >> >>> Akhil, any thought on this? >>> >>> On 16 April 2015 at 23:07, Jeetendra Gangele <gangele...@gmail.com> >>> wrote: >>> >>>> No I did not tried the partitioning below is the full code >>>> >>>> public static void matchAndMerge(JavaRDD<VendorRecord> >>>> matchRdd,JavaSparkContext jsc) throws IOException{ >>>> long start = System.currentTimeMillis(); >>>> JavaPairRDD<Long, MatcherReleventData> RddForMarch >>>> =matchRdd.zipWithIndex().mapToPair(new >>>> PairFunction<Tuple2<VendorRecord,Long>, Long, MatcherReleventData>() { >>>> >>>> @Override >>>> public Tuple2<Long, MatcherReleventData> call(Tuple2<VendorRecord, >>>> Long> t) >>>> throws Exception { >>>> MatcherReleventData matcherData = new MatcherReleventData(); >>>> Tuple2<Long, MatcherReleventData> tuple = new Tuple2<Long, >>>> MatcherReleventData>(t._2, >>>> matcherData.convertVendorDataToMatcherData(t._1)); >>>> return tuple; >>>> } >>>> >>>> }).cache(); >>>> log.info("after index"+RddForMarch.take(1)); >>>> Map<Long, MatcherReleventData> tmp =RddForMarch.collectAsMap(); >>>> Map<Long, MatcherReleventData> matchData = new HashMap<Long, >>>> MatcherReleventData>(tmp); >>>> final Broadcast<Map<Long, MatcherReleventData>> dataMatchGlobal = >>>> jsc.broadcast(matchData); >>>> >>>> JavaPairRDD<Long,String> blockingRdd = RddForMarch.flatMapValues(new >>>> Function<MatcherReleventData, Iterable<String>>(){ >>>> >>>> @Override >>>> public Iterable<String> call(MatcherReleventData v1) >>>> throws Exception { >>>> List<String> values = new ArrayList<String>(); >>>> HelperUtilities helper1 = new HelperUtilities(); >>>> MatcherKeys matchkeys=helper1.getBlockinkeys(v1); >>>> if(matchkeys.get_companyName() !=null){ >>>> values.add(matchkeys.get_companyName()); >>>> } >>>> if(matchkeys.get_phoneNumberr() !=null){ >>>> values.add(matchkeys.get_phoneNumberr()); >>>> } >>>> if(matchkeys.get_zipCode() !=null){ >>>> values.add(matchkeys.get_zipCode()); >>>> } >>>> if(matchkeys.getM_domain() !=null){ >>>> values.add(matchkeys.getM_domain()); >>>> } >>>> return values; >>>> } >>>> }); >>>> log.info("blocking RDD is"+blockingRdd.count()); >>>> int count=0; >>>> log.info("Starting printing"); >>>> for (Tuple2<Long, String> entry : blockingRdd.collect()) { >>>> >>>> log.info(entry._1() + ":" + entry._2()); >>>> count++; >>>> } >>>> log.info("total count"+count); >>>> JavaPairRDD<Long,Integer> >>>> completeDataToprocess=blockingRdd.flatMapValues( new Function<String, >>>> Iterable<Integer>>(){ >>>> >>>> @Override >>>> public Iterable<Integer> call(String v1) throws Exception { >>>> return ckdao.getSingelkeyresult(v1); >>>> } >>>> }).distinct(32); >>>> log.info("after hbase count is"+completeDataToprocess.count()); >>>> log.info("data for process"+completeDataToprocess.take(1)); >>>> JavaPairRDD<Long, Tuple2<Integer, Double>> withScore >>>> =completeDataToprocess.mapToPair( new PairFunction<Tuple2<Long,Integer>, >>>> Long, Tuple2<Integer, Double>>(){ >>>> >>>> @Override >>>> public Tuple2<Long, Tuple2<Integer, Double>> call(Tuple2<Long, Integer> >>>> t) >>>> throws Exception { >>>> Scoring scoreObj = new Scoring(); >>>> double score =scoreObj.computeMatchScore(companyDAO.get(t._2()), >>>> dataMatchGlobal.getValue().get(t._1())); >>>> Tuple2<Integer, Double> maptuple = new Tuple2<Integer, Double>(t._2(), >>>> score); >>>> Tuple2<Long, Tuple2<Integer, Double>> tuple = new Tuple2<Long, >>>> Tuple2<Integer,Double>>(t._1(), maptuple); >>>> return tuple; >>>> } >>>> }); >>>> log.info("with score tuple is"+withScore.take(1)); >>>> JavaPairRDD<Long, Tuple2<Integer,Double>> maxScoreRDD >>>> =withScore.reduceByKey( new Function2<Tuple2<Integer,Double>, >>>> Tuple2<Integer,Double>, Tuple2<Integer,Double>>(){ >>>> >>>> @Override >>>> public Tuple2<Integer, Double> call(Tuple2<Integer, Double> v1, >>>> Tuple2<Integer, Double> v2) throws Exception { >>>> int res =v1._2().compareTo(v2._2()); >>>> if(res >0){ >>>> Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v1._1(), >>>> v1._2()); >>>> return result; >>>> } >>>> else if(res<0){ >>>> Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v2._1(), >>>> v2._2()); >>>> return result; >>>> } >>>> else{ >>>> Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v2._1(), >>>> v2._2()); >>>> return result; >>>> } >>>> } >>>> }); >>>> log.info("max score RDD"+maxScoreRDD.take(10)); >>>> >>>> maxScoreRDD.foreach( new >>>> VoidFunction<Tuple2<Long,Tuple2<Integer,Double>>>(){ >>>> >>>> @Override >>>> public void call(Tuple2<Long, Tuple2<Integer, Double>> t) >>>> throws Exception { >>>> MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1()); >>>> log.info("broadcast is"+dataMatchGlobal.getValue().get(t._1())); >>>> //Set the score for better understanding of merge >>>> matchedData.setScore(t._2()._2()); >>>> vdDoa.updateMatchedRecordWithScore(matchedData, t._2()._1(),"Souce_id"); >>>> } >>>> }); >>>> log.info("took " + (System.currentTimeMillis() - start) + " mills to >>>> run matcher"); >>>> >>>> >>>> >>>> } >>>> >>>> >>>> On 16 April 2015 at 22:25, Akhil Das <ak...@sigmoidanalytics.com> >>>> wrote: >>>> >>>>> Can you paste your complete code? Did you try repartioning/increasing >>>>> level of parallelism to speed up the processing. Since you have 16 cores, >>>>> and I'm assuming your 400k records isn't bigger than a 10G dataset. >>>>> >>>>> Thanks >>>>> Best Regards >>>>> >>>>> On Thu, Apr 16, 2015 at 10:00 PM, Jeetendra Gangele < >>>>> gangele...@gmail.com> wrote: >>>>> >>>>>> I already checked and G is taking 1 secs for each task. is this too >>>>>> much? if yes how to avoid this? >>>>>> >>>>>> >>>>>> On 16 April 2015 at 21:58, Akhil Das <ak...@sigmoidanalytics.com> >>>>>> wrote: >>>>>> >>>>>>> Open the driver ui and see which stage is taking time, you can look >>>>>>> whether its adding any GC time etc. >>>>>>> >>>>>>> Thanks >>>>>>> Best Regards >>>>>>> >>>>>>> On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra Gangele < >>>>>>> gangele...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi All I have below code whether distinct is running for more time. >>>>>>>> >>>>>>>> blockingRdd is the combination of <Long,String> and it will have >>>>>>>> 400K records >>>>>>>> JavaPairRDD<Long,Integer> >>>>>>>> completeDataToprocess=blockingRdd.flatMapValues( new Function<String, >>>>>>>> Iterable<Integer>>(){ >>>>>>>> >>>>>>>> @Override >>>>>>>> public Iterable<Integer> call(String v1) throws Exception { >>>>>>>> return ckdao.getSingelkeyresult(v1); >>>>>>>> } >>>>>>>> }).distinct(32); >>>>>>>> >>>>>>>> I am running distinct on 800K records and its taking 2 hours on 16 >>>>>>>> cores and 20 GB RAM. >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>>> >>>> >>> >>> >>> >>> >> > > >