I have a flatmap function that shouldn't possibly emit duplicates and yet it 
does. The output of my function is a HashSet so the function itself cannot 
output duplicates and yet I see many copies of keys emmited from it (in one 
case up to 62). The curious thing is I can't get this to happen until I ramp up 
the size of the input lines to about 100,000. For example:
(350000000087005221,[[(80530632,0.20824391739360665)], 
[(80530632,0.20824391739360665)]])

Will expand to
(350000000087005221,(80530632,0.37312230565577803))
(350000000087005221,(80530632,0.37312230565577803))
(350000000087005221,(80530632,0.37312230565577803))
.
.
.
(350000000087005221,(80530632,0.37312230565577803))
62 total times

If I run this line only as input I only get the one line of output as expected. 
It seems to be a scaling up issue.

My code is as follows:
JavaPairRDD<Long,Iterable<Iterable<Tuple2<Integer,Double>>>> preAggData = 
indidKeyedJoinedData.groupByKey();

JavaPairRDD<Long,Tuple2<Integer,Double>> aggregatedData = 
preAggData.flatMapToPair(new AggregateLikeSims());

Where:
                static class AggregateLikeSims implements 
PairFlatMapFunction<Tuple2<Long,Iterable<Iterable<Tuple2<Integer,Double>>>>, 
Long,Tuple2<Integer,Double>>{
                                HashSet<Tuple2<Long, Tuple2<Integer, Double>>> 
output = new HashSet<Tuple2<Long, Tuple2<Integer, Double>>>();
                                Map<Integer,List<Double>> intermediateMap = new 
HashMap<Integer,List<Double>>();
                                Iterator<Tuple2<Integer,Double>> intIterator;
                                Tuple2<Integer,Double> currentTuple;
                                Double MAX_RECO_VALUE = 1.0;
                                Iterator<Iterable<Tuple2<Integer,Double>>> 
itIterator;
                                Accumulator<Integer> accum;

                                @Override
                                public Iterable<Tuple2<Long, Tuple2<Integer, 
Double>>> call(Tuple2<Long,Iterable<Iterable<Tuple2<Integer,Double>>>> inTuple){
                                                itIterator = 
inTuple._2.iterator();

                                                while(itIterator.hasNext()){
                                                                intIterator = 
itIterator.next().iterator();
                                                                
while(intIterator.hasNext()){
                                                                                
currentTuple = intIterator.next();
                                                                                
if (intermediateMap.containsKey(currentTuple._1)){
                                                                                
                intermediateMap.get(currentTuple._1).add(currentTuple._2);
                                                                                
} else {
                                                                                
                List<Double> listOfDoubles = new ArrayList<Double>();
                                                                                
                listOfDoubles.add(currentTuple._2);
                                                                                
                intermediateMap.put(currentTuple._1, listOfDoubles);
                                                                                
}
                                                                }
                                                }

                                                
Iterator<Map.Entry<Integer,List<Double>>> it = 
intermediateMap.entrySet().iterator();
                                    while (it.hasNext()) {
                                        Map.Entry<Integer,List<Double>> pairs = 
it.next();
                                        if (pairs.getValue().size() > 1) {
                                                output.add(new Tuple2<Long, 
Tuple2<Integer, Double>>(inTuple._1,new 
Tuple2<Integer,Double>(pairs.getKey(),aggregate(pairs.getValue()))));
                                        } else {
                                                output.add(new Tuple2<Long, 
Tuple2<Integer, Double>>(inTuple._1,new 
Tuple2<Integer,Double>(pairs.getKey(),pairs.getValue().get(0))));
                                        }
                                        it.remove();
                                    }

                                    return output;
                                }

                                private double aggregate(List<Double> simsList) 
{
                                                if (simsList == null) {
                                                                return 0;
                                                }
                                                if (simsList.size() == 1) {
                                                                return 
simsList.get(0);
                                                }

                                                double max = 0;
                                                double sum = 0;

                                                // Find max and sum up all 
elements of array.
                                                for (double sim : simsList) {
                                                                if (sim > max) {
                                                                                
max = sim;
                                                                }
                                                                if (sim != 
Double.NaN) {
                                                                                
sum = sum + sim;
                                                                }
                                                }
                                                sum = sum - max;
                                                double agr = max + 
(MAX_RECO_VALUE - max) * sum / (double)((simsList.size() - 1) * MAX_RECO_VALUE);

                                                return agr;
                                }

                                public void setAccum(Accumulator<Integer> in){
                                                accum = in;
                                }
                }




This email and any files included with it may contain privileged,
proprietary and/or confidential information that is for the sole use
of the intended recipient(s).  Any disclosure, copying, distribution,
posting, or use of the information contained in or attached to this
email is prohibited unless permitted by the sender.  If you have
received this email in error, please immediately notify the sender
via return email, telephone, or fax and destroy this original transmission
and its included files without reading or saving it in any manner.
Thank you.

Reply via email to