You fundamentally want (half of) the Cartesian product so I don't think it
gets a lot faster to form this. You could implement this on cogroup
directly and maybe avoid forming the tuples you will filter out.

I'd think more about whether you really need to do this thing, or whether
there is anything else about the real problem to exploit
On Apr 30, 2015 6:36 PM, "Dan DeCapria, CivicScience" <
dan.decap...@civicscience.com> wrote:

> Thought about it some more, and simplified the problem space for
> discussions:
>
> Given: JavaPairRDD<String, Integer> c1; // c1.count() == 8000.
>
> Goal: JavaPairRDD<Tuple2<String,Integer>,Tuple2<String,Integer>> c2; //
> all lexicographical pairs
> Where: all lexicographic permutations on c1 ::
> (c1_i._1().compareTo(c1_j._1()) < 0) -> new
> Tuple2<Tuple2<String,Integer>,Tuple2<String,Integer>>(c1_i, c1_j); //
> forall c1_i < c1_j \in c1
>
> Not sure how to efficiently generate c2.
> c1.cartesian(c1).filter(... (c1_i._1().compareTo(c1_j._1()) < 0) ...) was
> just terrible performance-wise.
>
> Thanks, -Dan
>
>
>
>
> On Thu, Apr 30, 2015 at 11:58 AM, Dan DeCapria, CivicScience <
> dan.decap...@civicscience.com> wrote:
>
>> I am trying to generate all (N-1)(N)/2 lexicographical 2-tuples from a
>> glom() JavaPairRDD<List<Tuple2>>.  The construction of these initial
>> Tuple2's JavaPairRDD<AQ,Integer> space is well formed from case classes I
>> provide it (AQ, AQV, AQQ, CT) and is performant; minimized code:
>>
>> SparkConf conf = new SparkConf()
>>         .setAppName("JavaTestMain")
>>         .set("spark.driver.maxResultSize", "0")
>>         .set("spark.akka.frameSize", "512");
>> ...
>> JavaRDD<AQV> aqv =
>> sc.textFile(data_account_question_value_file).map((String line) -> {
>>     String[] s = line.trim().split(",");
>>     AQV key = new AQV(Integer.parseInt(s[0].trim()), s[1].trim(),
>> s[2].trim()); // (0,1,2)
>>     return key;
>> });
>> JavaPairRDD<AQ, Integer> c0 = aqv.distinct().mapToPair((AQV c) -> {
>>         return new Tuple2<AQ, Integer>(new AQ(c.getAccount(),
>> c.getQuestion()), 1);
>> });
>> JavaPairRDD<AQ, Integer> c1 = c0.reduceByKey((Integer i1, Integer i2) -> {
>>         return (i1 + i2);
>> });
>> logger.info(c1.count());
>>
>> This code snippet above works well and returns a value JavaTestMain: 8010
>> in a few seconds which is perfect.  When I try to generate the iterative
>> lexicographic permutation space (32,076,045 elements), it is not performant
>> and results in a thrown java.lang.OutOfMemoryError: Java heap space;
>> minimized code continued:
>>
>> JavaRDD<List<Tuple2<AQ, Integer>>> c2 = c1.glom();
>> JavaRDD<CT> c3 = c2.flatMap((List<Tuple2<AQ,Integer>> cl) -> {
>>         List<CT> output = new ArrayList<>((cl.size() - 1) * (cl.size()) /
>> 2);
>>         Tuple2<AQ, Integer> key1, key2;
>>         for (ListIterator<Tuple2<AQ, Integer>> cloit = cl.listIterator();
>> cloit.hasNext(); ) { // outer loop
>>             key1 = cloit.next();
>>             for (ListIterator<Tuple2<AQ, Integer>> cliit =
>> cl.listIterator(cloit.nextIndex()); cliit.hasNext(); ) {    //  inner loop,
>> if applicable
>>                 key2 = cliit.next();
>>                 output.add(new CT(new AQQ(key1._1(), key2._1()),
>> key1._2(), key2._2()));
>>             }
>>         }
>>         return output;
>> });
>> c3.collect().stream().forEach(System.out::println);
>>
>>
>> 15/04/30 11:29:59 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID
>> 4)
>> java.lang.OutOfMemoryError: Java heap space
>> at java.util.Arrays.copyOf(Arrays.java:3236)
>> at
>> java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191)
>> at
>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:82)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> 15/04/30 11:29:59 ERROR SparkUncaughtExceptionHandler: Uncaught exception
>> in thread Thread[Executor task launch worker-0,5,main]
>> java.lang.OutOfMemoryError: Java heap space
>> at java.util.Arrays.copyOf(Arrays.java:3236)
>> at
>> java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191)
>> at
>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:82)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> 15/04/30 11:29:59 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 4,
>> localhost): java.lang.OutOfMemoryError: Java heap space
>> at java.util.Arrays.copyOf(Arrays.java:3236)
>> at
>> java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191)
>> at
>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:82)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>> Any thoughts here? I would like a well-formed parallel approach per the
>> units of work per each iterated current pointer in the List to the
>> subsequent elements to generate the desired output. This is similar to
>> saying that for each iterable position in the List (pointer_i ->
>> element_i), the corresponding sublist of size (N - arg(pointer_i))
>> following (element_i) is the only unit of work that is required to be
>> performed at (pointer_i) forall pointers in (N), and as such this
>> lexicographic process should be parallelizable over (N). I am new to spark,
>> so it's very possible I'm glossing over something glaringly apparent which
>> is causing parallelism gains of the framework to break down.
>>
>> Thanks, -Dan
>>
>>
>>
>
>
> --
> Dan DeCapria
> CivicScience, Inc.
> Back-End Data IS/BI/DM/ML Specialist
>

Reply via email to