Not sure what caused it but the partition size was 3 million there. The RDD was created from mongo hadoop 1.5.1 Earlier (mongo hadoop 1.3 and spark 1.5) it worked just fine, not sure what changed.
A a fix, I applied a repartition(40) (where 40 varies by my processing logic) before the cartesian and it fixed the problem. On Wed, Aug 3, 2016 at 10:04 AM, Utkarsh Sengar <utkarsh2...@gmail.com> wrote: > After an upgrade from 1.5.1 to 2.0, one of the tasks never completes and > keeps spilling data to disk overtime. > long count = resultRdd.count(); > LOG.info("TOTAL in resultRdd: " + count); > > resultRdd is has a rather complex structure: > > JavaPairRDD<Long, Tuple3<LocalDateTime, Integer, SimResult>> > resultRdd = myRdd > .cartesian(runsRdd) > .cartesian(datesToRunRdd) > .coalesce(datesToRun.size() * runs.size() * > ridsToRun.size()) > .mapToPair(t -> { return result}); > > "mapToPair" to pair does a bunch of processing over the cartesian product > and constructs "result". > > This works fine in spark 1.6.1 and the logic inside "mapToPair" is nicely > unit tested. > > This is the threaddump, any suggestions on the possible issue? > > > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > > org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154) > > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:154) > > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:153) > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:153) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > org.apache.spark.rdd.CartesianRDD.compute(CartesianRDD.scala:75) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > org.apache.spark.rdd.CartesianRDD.compute(CartesianRDD.scala:75) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > > org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:96) > > org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:95) > scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1682) > org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1115) > org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1115) > > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897) > > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897) > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > org.apache.spark.scheduler.Task.run(Task.scala:85) > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > > -- > Thanks, > -Utkarsh > -- Thanks, -Utkarsh