Not sure what caused it but the partition size was 3 million there. The RDD
was created from mongo hadoop 1.5.1
Earlier (mongo hadoop 1.3 and spark 1.5) it worked just fine, not sure what
changed.

A a fix, I applied a repartition(40) (where 40 varies by my processing
logic) before the cartesian and it fixed the problem.

On Wed, Aug 3, 2016 at 10:04 AM, Utkarsh Sengar <utkarsh2...@gmail.com>
wrote:

> After an upgrade from 1.5.1 to 2.0, one of the tasks never completes and
> keeps spilling data to disk overtime.
>         long count = resultRdd.count();
>         LOG.info("TOTAL in resultRdd: " + count);
>
> resultRdd is has a rather complex structure:
>
>         JavaPairRDD<Long, Tuple3<LocalDateTime, Integer, SimResult>>
> resultRdd = myRdd
>                 .cartesian(runsRdd)
>                 .cartesian(datesToRunRdd)
>                 .coalesce(datesToRun.size() * runs.size() *
> ridsToRun.size())
>                 .mapToPair(t -> { return result});
>
> "mapToPair" to pair does a bunch of processing over the cartesian product
> and constructs "result".
>
> This works fine in spark 1.6.1 and the logic inside "mapToPair" is nicely
> unit tested.
>
> This is the threaddump, any suggestions on the possible issue?
>
>
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154)
>
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:154)
>
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:153)
>
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:153)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> org.apache.spark.rdd.CartesianRDD.compute(CartesianRDD.scala:75)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> org.apache.spark.rdd.CartesianRDD.compute(CartesianRDD.scala:75)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:96)
>
> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:95)
> scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1682)
> org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1115)
> org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1115)
>
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
>
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> org.apache.spark.scheduler.Task.run(Task.scala:85)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
>
> --
> Thanks,
> -Utkarsh
>



-- 
Thanks,
-Utkarsh

Reply via email to