I have a flatmap function that shouldn't possibly emit duplicates and yet it does. The output of my function is a HashSet so the function itself cannot output duplicates and yet I see many copies of keys emmited from it (in one case up to 62). The curious thing is I can't get this to happen until I ramp up the size of the input lines to about 100,000. For example: (350000000087005221,[[(80530632,0.20824391739360665)], [(80530632,0.20824391739360665)]])
Will expand to (350000000087005221,(80530632,0.37312230565577803)) (350000000087005221,(80530632,0.37312230565577803)) (350000000087005221,(80530632,0.37312230565577803)) . . . (350000000087005221,(80530632,0.37312230565577803)) 62 total times If I run this line only as input I only get the one line of output as expected. It seems to be a scaling up issue. My code is as follows: JavaPairRDD<Long,Iterable<Iterable<Tuple2<Integer,Double>>>> preAggData = indidKeyedJoinedData.groupByKey(); JavaPairRDD<Long,Tuple2<Integer,Double>> aggregatedData = preAggData.flatMapToPair(new AggregateLikeSims()); Where: static class AggregateLikeSims implements PairFlatMapFunction<Tuple2<Long,Iterable<Iterable<Tuple2<Integer,Double>>>>, Long,Tuple2<Integer,Double>>{ HashSet<Tuple2<Long, Tuple2<Integer, Double>>> output = new HashSet<Tuple2<Long, Tuple2<Integer, Double>>>(); Map<Integer,List<Double>> intermediateMap = new HashMap<Integer,List<Double>>(); Iterator<Tuple2<Integer,Double>> intIterator; Tuple2<Integer,Double> currentTuple; Double MAX_RECO_VALUE = 1.0; Iterator<Iterable<Tuple2<Integer,Double>>> itIterator; Accumulator<Integer> accum; @Override public Iterable<Tuple2<Long, Tuple2<Integer, Double>>> call(Tuple2<Long,Iterable<Iterable<Tuple2<Integer,Double>>>> inTuple){ itIterator = inTuple._2.iterator(); while(itIterator.hasNext()){ intIterator = itIterator.next().iterator(); while(intIterator.hasNext()){ currentTuple = intIterator.next(); if (intermediateMap.containsKey(currentTuple._1)){ intermediateMap.get(currentTuple._1).add(currentTuple._2); } else { List<Double> listOfDoubles = new ArrayList<Double>(); listOfDoubles.add(currentTuple._2); intermediateMap.put(currentTuple._1, listOfDoubles); } } } Iterator<Map.Entry<Integer,List<Double>>> it = intermediateMap.entrySet().iterator(); while (it.hasNext()) { Map.Entry<Integer,List<Double>> pairs = it.next(); if (pairs.getValue().size() > 1) { output.add(new Tuple2<Long, Tuple2<Integer, Double>>(inTuple._1,new Tuple2<Integer,Double>(pairs.getKey(),aggregate(pairs.getValue())))); } else { output.add(new Tuple2<Long, Tuple2<Integer, Double>>(inTuple._1,new Tuple2<Integer,Double>(pairs.getKey(),pairs.getValue().get(0)))); } it.remove(); } return output; } private double aggregate(List<Double> simsList) { if (simsList == null) { return 0; } if (simsList.size() == 1) { return simsList.get(0); } double max = 0; double sum = 0; // Find max and sum up all elements of array. for (double sim : simsList) { if (sim > max) { max = sim; } if (sim != Double.NaN) { sum = sum + sim; } } sum = sum - max; double agr = max + (MAX_RECO_VALUE - max) * sum / (double)((simsList.size() - 1) * MAX_RECO_VALUE); return agr; } public void setAccum(Accumulator<Integer> in){ accum = in; } } This email and any files included with it may contain privileged, proprietary and/or confidential information that is for the sole use of the intended recipient(s). Any disclosure, copying, distribution, posting, or use of the information contained in or attached to this email is prohibited unless permitted by the sender. If you have received this email in error, please immediately notify the sender via return email, telephone, or fax and destroy this original transmission and its included files without reading or saving it in any manner. Thank you.