Anyone any thought on this?

On 22 April 2015 at 22:49, Jeetendra Gangele <gangele...@gmail.com> wrote:

> I made 7000 tasks in mapTopair and in distinct also I made same number of
> tasks.
> Still lots of shuffle read and write is happening due to application
> running for much longer time.
> Any idea?
>
> On 17 April 2015 at 11:55, Akhil Das <ak...@sigmoidanalytics.com> wrote:
>
>> How many tasks are you seeing in your mapToPair stage? Is it 7000? then i
>> suggest you giving a number similar/close to 7000 in your .distinct call,
>> what is happening in your case is that, you are repartitioning your data to
>> a smaller number (32) which would put a lot of load on processing i
>> believe, you can try increasing it.
>>
>> Thanks
>> Best Regards
>>
>> On Fri, Apr 17, 2015 at 1:48 AM, Jeetendra Gangele <gangele...@gmail.com>
>> wrote:
>>
>>> Akhil, any thought on this?
>>>
>>> On 16 April 2015 at 23:07, Jeetendra Gangele <gangele...@gmail.com>
>>> wrote:
>>>
>>>> No I did not tried the partitioning below is the full code
>>>>
>>>> public static  void  matchAndMerge(JavaRDD<VendorRecord>
>>>> matchRdd,JavaSparkContext jsc) throws IOException{
>>>>  long start = System.currentTimeMillis();
>>>>   JavaPairRDD<Long, MatcherReleventData> RddForMarch
>>>> =matchRdd.zipWithIndex().mapToPair(new
>>>> PairFunction<Tuple2<VendorRecord,Long>, Long, MatcherReleventData>() {
>>>>
>>>> @Override
>>>> public Tuple2<Long, MatcherReleventData> call(Tuple2<VendorRecord,
>>>> Long> t)
>>>> throws Exception {
>>>> MatcherReleventData matcherData = new MatcherReleventData();
>>>> Tuple2<Long, MatcherReleventData> tuple = new Tuple2<Long,
>>>> MatcherReleventData>(t._2,
>>>> matcherData.convertVendorDataToMatcherData(t._1));
>>>>  return tuple;
>>>> }
>>>>
>>>> }).cache();
>>>>  log.info("after index"+RddForMarch.take(1));
>>>>  Map<Long, MatcherReleventData> tmp =RddForMarch.collectAsMap();
>>>> Map<Long, MatcherReleventData> matchData = new HashMap<Long,
>>>> MatcherReleventData>(tmp);
>>>> final Broadcast<Map<Long, MatcherReleventData>> dataMatchGlobal =
>>>> jsc.broadcast(matchData);
>>>>
>>>> JavaPairRDD<Long,String> blockingRdd = RddForMarch.flatMapValues(new
>>>> Function<MatcherReleventData, Iterable<String>>(){
>>>>
>>>> @Override
>>>> public Iterable<String> call(MatcherReleventData v1)
>>>> throws Exception {
>>>> List<String> values = new ArrayList<String>();
>>>> HelperUtilities helper1 = new HelperUtilities();
>>>> MatcherKeys matchkeys=helper1.getBlockinkeys(v1);
>>>> if(matchkeys.get_companyName() !=null){
>>>> values.add(matchkeys.get_companyName());
>>>> }
>>>> if(matchkeys.get_phoneNumberr() !=null){
>>>> values.add(matchkeys.get_phoneNumberr());
>>>> }
>>>> if(matchkeys.get_zipCode() !=null){
>>>> values.add(matchkeys.get_zipCode());
>>>> }
>>>> if(matchkeys.getM_domain() !=null){
>>>> values.add(matchkeys.getM_domain());
>>>> }
>>>>   return values;
>>>> }
>>>>  });
>>>>  log.info("blocking RDD is"+blockingRdd.count());
>>>> int count=0;
>>>> log.info("Starting printing");
>>>>   for (Tuple2<Long, String> entry : blockingRdd.collect()) {
>>>>
>>>>   log.info(entry._1() + ":" + entry._2());
>>>>       count++;
>>>>     }
>>>>   log.info("total count"+count);
>>>>  JavaPairRDD<Long,Integer>
>>>> completeDataToprocess=blockingRdd.flatMapValues( new Function<String,
>>>> Iterable<Integer>>(){
>>>>
>>>> @Override
>>>> public Iterable<Integer> call(String v1) throws Exception {
>>>> return ckdao.getSingelkeyresult(v1);
>>>> }
>>>>  }).distinct(32);
>>>>  log.info("after hbase count is"+completeDataToprocess.count());
>>>>  log.info("data for process"+completeDataToprocess.take(1));
>>>>  JavaPairRDD<Long, Tuple2<Integer, Double>> withScore
>>>> =completeDataToprocess.mapToPair( new PairFunction<Tuple2<Long,Integer>,
>>>> Long, Tuple2<Integer, Double>>(){
>>>>
>>>> @Override
>>>> public Tuple2<Long, Tuple2<Integer, Double>> call(Tuple2<Long, Integer>
>>>> t)
>>>> throws Exception {
>>>> Scoring scoreObj = new Scoring();
>>>> double score =scoreObj.computeMatchScore(companyDAO.get(t._2()),
>>>> dataMatchGlobal.getValue().get(t._1()));
>>>> Tuple2<Integer, Double> maptuple = new Tuple2<Integer, Double>(t._2(),
>>>> score);
>>>> Tuple2<Long, Tuple2<Integer, Double>> tuple = new Tuple2<Long,
>>>> Tuple2<Integer,Double>>(t._1(), maptuple);
>>>> return tuple;
>>>> }
>>>>  });
>>>>  log.info("with score tuple is"+withScore.take(1));
>>>>  JavaPairRDD<Long, Tuple2<Integer,Double>> maxScoreRDD
>>>> =withScore.reduceByKey( new Function2<Tuple2<Integer,Double>,
>>>> Tuple2<Integer,Double>, Tuple2<Integer,Double>>(){
>>>>
>>>> @Override
>>>> public Tuple2<Integer, Double> call(Tuple2<Integer, Double> v1,
>>>> Tuple2<Integer, Double> v2) throws Exception {
>>>>  int res =v1._2().compareTo(v2._2());
>>>> if(res >0){
>>>>  Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v1._1(),
>>>> v1._2());
>>>> return result;
>>>>  }
>>>> else if(res<0){
>>>> Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v2._1(),
>>>> v2._2());
>>>> return result;
>>>> }
>>>> else{
>>>> Tuple2<Integer, Double> result = new Tuple2<Integer, Double>(v2._1(),
>>>> v2._2());
>>>> return result;
>>>> }
>>>>   }
>>>>  });
>>>>  log.info("max score RDD"+maxScoreRDD.take(10));
>>>>
>>>>  maxScoreRDD.foreach( new
>>>> VoidFunction<Tuple2<Long,Tuple2<Integer,Double>>>(){
>>>>
>>>> @Override
>>>> public void call(Tuple2<Long, Tuple2<Integer, Double>> t)
>>>> throws Exception {
>>>> MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1());
>>>> log.info("broadcast is"+dataMatchGlobal.getValue().get(t._1()));
>>>> //Set the score for better understanding of merge
>>>> matchedData.setScore(t._2()._2());
>>>> vdDoa.updateMatchedRecordWithScore(matchedData, t._2()._1(),"Souce_id");
>>>>  }
>>>>  });
>>>>  log.info("took " + (System.currentTimeMillis() - start) + " mills to
>>>> run matcher");
>>>>
>>>>
>>>>
>>>>  }
>>>>
>>>>
>>>> On 16 April 2015 at 22:25, Akhil Das <ak...@sigmoidanalytics.com>
>>>> wrote:
>>>>
>>>>> Can you paste your complete code? Did you try repartioning/increasing
>>>>> level of parallelism to speed up the processing. Since you have 16 cores,
>>>>> and I'm assuming your 400k records isn't bigger than a 10G dataset.
>>>>>
>>>>> Thanks
>>>>> Best Regards
>>>>>
>>>>> On Thu, Apr 16, 2015 at 10:00 PM, Jeetendra Gangele <
>>>>> gangele...@gmail.com> wrote:
>>>>>
>>>>>> I already checked and G is taking 1 secs for each task. is this too
>>>>>> much? if yes how to avoid this?
>>>>>>
>>>>>>
>>>>>> On 16 April 2015 at 21:58, Akhil Das <ak...@sigmoidanalytics.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Open the driver ui and see which stage is taking time, you can look
>>>>>>> whether its adding any GC time etc.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Best Regards
>>>>>>>
>>>>>>> On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra Gangele <
>>>>>>> gangele...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi All I have below code whether distinct is running for more time.
>>>>>>>>
>>>>>>>> blockingRdd is the combination of <Long,String> and it will have
>>>>>>>> 400K records
>>>>>>>> JavaPairRDD<Long,Integer>
>>>>>>>> completeDataToprocess=blockingRdd.flatMapValues( new Function<String,
>>>>>>>> Iterable<Integer>>(){
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public Iterable<Integer> call(String v1) throws Exception {
>>>>>>>> return ckdao.getSingelkeyresult(v1);
>>>>>>>> }
>>>>>>>>  }).distinct(32);
>>>>>>>>
>>>>>>>> I am running distinct on 800K records and its taking 2 hours on 16
>>>>>>>> cores and 20 GB RAM.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>>
>>
>
>
>

Reply via email to