I am trying to generate all (N-1)(N)/2 lexicographical 2-tuples from a glom() JavaPairRDD<List<Tuple2>>. The construction of these initial Tuple2's JavaPairRDD<AQ,Integer> space is well formed from case classes I provide it (AQ, AQV, AQQ, CT) and is performant; minimized code:
SparkConf conf = new SparkConf() .setAppName("JavaTestMain") .set("spark.driver.maxResultSize", "0") .set("spark.akka.frameSize", "512"); ... JavaRDD<AQV> aqv = sc.textFile(data_account_question_value_file).map((String line) -> { String[] s = line.trim().split(","); AQV key = new AQV(Integer.parseInt(s[0].trim()), s[1].trim(), s[2].trim()); // (0,1,2) return key; }); JavaPairRDD<AQ, Integer> c0 = aqv.distinct().mapToPair((AQV c) -> { return new Tuple2<AQ, Integer>(new AQ(c.getAccount(), c.getQuestion()), 1); }); JavaPairRDD<AQ, Integer> c1 = c0.reduceByKey((Integer i1, Integer i2) -> { return (i1 + i2); }); logger.info(c1.count()); This code snippet above works well and returns a value JavaTestMain: 8010 in a few seconds which is perfect. When I try to generate the iterative lexicographic permutation space (32,076,045 elements), it is not performant and results in a thrown java.lang.OutOfMemoryError: Java heap space; minimized code continued: JavaRDD<List<Tuple2<AQ, Integer>>> c2 = c1.glom(); JavaRDD<CT> c3 = c2.flatMap((List<Tuple2<AQ,Integer>> cl) -> { List<CT> output = new ArrayList<>((cl.size() - 1) * (cl.size()) / 2); Tuple2<AQ, Integer> key1, key2; for (ListIterator<Tuple2<AQ, Integer>> cloit = cl.listIterator(); cloit.hasNext(); ) { // outer loop key1 = cloit.next(); for (ListIterator<Tuple2<AQ, Integer>> cliit = cl.listIterator(cloit.nextIndex()); cliit.hasNext(); ) { // inner loop, if applicable key2 = cliit.next(); output.add(new CT(new AQQ(key1._1(), key2._1()), key1._2(), key2._2())); } } return output; }); c3.collect().stream().forEach(System.out::println); 15/04/30 11:29:59 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 4) java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:82) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/04/30 11:29:59 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main] java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:82) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/04/30 11:29:59 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 4, localhost): java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:82) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Any thoughts here? I would like a well-formed parallel approach per the units of work per each iterated current pointer in the List to the subsequent elements to generate the desired output. This is similar to saying that for each iterable position in the List (pointer_i -> element_i), the corresponding sublist of size (N - arg(pointer_i)) following (element_i) is the only unit of work that is required to be performed at (pointer_i) forall pointers in (N), and as such this lexicographic process should be parallelizable over (N). I am new to spark, so it's very possible I'm glossing over something glaringly apparent which is causing parallelism gains of the framework to break down. Thanks, -Dan