I am trying to generate all (N-1)(N)/2 lexicographical 2-tuples from a
glom() JavaPairRDD<List<Tuple2>>.  The construction of these initial
Tuple2's JavaPairRDD<AQ,Integer> space is well formed from case classes I
provide it (AQ, AQV, AQQ, CT) and is performant; minimized code:

SparkConf conf = new SparkConf()
        .setAppName("JavaTestMain")
        .set("spark.driver.maxResultSize", "0")
        .set("spark.akka.frameSize", "512");
...
JavaRDD<AQV> aqv =
sc.textFile(data_account_question_value_file).map((String line) -> {
    String[] s = line.trim().split(",");
    AQV key = new AQV(Integer.parseInt(s[0].trim()), s[1].trim(),
s[2].trim()); // (0,1,2)
    return key;
});
JavaPairRDD<AQ, Integer> c0 = aqv.distinct().mapToPair((AQV c) -> {
        return new Tuple2<AQ, Integer>(new AQ(c.getAccount(),
c.getQuestion()), 1);
});
JavaPairRDD<AQ, Integer> c1 = c0.reduceByKey((Integer i1, Integer i2) -> {
        return (i1 + i2);
});
logger.info(c1.count());

This code snippet above works well and returns a value JavaTestMain: 8010
in a few seconds which is perfect.  When I try to generate the iterative
lexicographic permutation space (32,076,045 elements), it is not performant
and results in a thrown java.lang.OutOfMemoryError: Java heap space;
minimized code continued:

JavaRDD<List<Tuple2<AQ, Integer>>> c2 = c1.glom();
JavaRDD<CT> c3 = c2.flatMap((List<Tuple2<AQ,Integer>> cl) -> {
        List<CT> output = new ArrayList<>((cl.size() - 1) * (cl.size()) /
2);
        Tuple2<AQ, Integer> key1, key2;
        for (ListIterator<Tuple2<AQ, Integer>> cloit = cl.listIterator();
cloit.hasNext(); ) { // outer loop
            key1 = cloit.next();
            for (ListIterator<Tuple2<AQ, Integer>> cliit =
cl.listIterator(cloit.nextIndex()); cliit.hasNext(); ) {    //  inner loop,
if applicable
                key2 = cliit.next();
                output.add(new CT(new AQQ(key1._1(), key2._1()), key1._2(),
key2._2()));
            }
        }
        return output;
});
c3.collect().stream().forEach(System.out::println);


15/04/30 11:29:59 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 4)
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:82)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/04/30 11:29:59 ERROR SparkUncaughtExceptionHandler: Uncaught exception
in thread Thread[Executor task launch worker-0,5,main]
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:82)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/04/30 11:29:59 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 4,
localhost): java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:82)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Any thoughts here? I would like a well-formed parallel approach per the
units of work per each iterated current pointer in the List to the
subsequent elements to generate the desired output. This is similar to
saying that for each iterable position in the List (pointer_i ->
element_i), the corresponding sublist of size (N - arg(pointer_i))
following (element_i) is the only unit of work that is required to be
performed at (pointer_i) forall pointers in (N), and as such this
lexicographic process should be parallelizable over (N). I am new to spark,
so it's very possible I'm glossing over something glaringly apparent which
is causing parallelism gains of the framework to break down.

Thanks, -Dan

Reply via email to