Count causes the overall performance to drop drastically. Infact beyond 50 files it starts to hang. if i force materialization. Regards Mayur
Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi <https://twitter.com/mayur_rustagi> On Tue, May 13, 2014 at 9:34 PM, Xiangrui Meng <men...@gmail.com> wrote: > After checkPoint, call count directly to materialize it. -Xiangrui > > On Tue, May 13, 2014 at 4:20 AM, Mayur Rustagi <mayur.rust...@gmail.com> > wrote: > > We are running into same issue. After 700 or so files the stack > overflows, > > cache, persist & checkpointing dont help. > > Basically checkpointing only saves the RDD when it is materialized & it > only > > materializes in the end, then it runs out of stack. > > > > Regards > > Mayur > > > > Mayur Rustagi > > Ph: +1 (760) 203 3257 > > http://www.sigmoidanalytics.com > > @mayur_rustagi > > > > > > > > On Tue, May 13, 2014 at 11:40 AM, Xiangrui Meng <men...@gmail.com> > wrote: > >> > >> You have a long lineage that causes the StackOverflow error. Try > >> rdd.checkPoint() and rdd.count() for every 20~30 iterations. > >> checkPoint can cut the lineage. -Xiangrui > >> > >> On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan <gh...@lanl.gov> wrote: > >> > Dear Sparkers: > >> > > >> > I am using Python spark of version 0.9.0 to implement some iterative > >> > algorithm. I got some errors shown at the end of this email. It seems > >> > that > >> > it's due to the Java Stack Overflow error. The same error has been > >> > duplicated on a mac desktop and a linux workstation, both running the > >> > same > >> > version of Spark. > >> > > >> > The same line of code works correctly after quite some iterations. At > >> > the > >> > line of error, rdd__new.count() could be 0. (In some previous rounds, > >> > this > >> > was also 0 without any problem). > >> > > >> > Any thoughts on this? > >> > > >> > Thank you very much, > >> > - Guanhua > >> > > >> > > >> > ======================================== > >> > CODE: print "round", round, rdd__new.count() > >> > ======================================== > >> > File > >> > > >> > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", > >> > line 542, in count > >> > 14/05/12 16:20:28 INFO TaskSetManager: Loss was due to > >> > java.lang.StackOverflowError [duplicate 1] > >> > return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() > >> > 14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times; > >> > aborting job > >> > File > >> > > >> > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", > >> > line 533, in sum > >> > 14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state > >> > FAILED > >> > from TID 1774 because its task set is gone > >> > return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) > >> > File > >> > > >> > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", > >> > line 499, in reduce > >> > vals = self.mapPartitions(func).collect() > >> > File > >> > > >> > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py", > >> > line 463, in collect > >> > bytesInJava = self._jrdd.collect().iterator() > >> > File > >> > > >> > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", > >> > line 537, in __call__ > >> > File > >> > > >> > > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", > >> > line 300, in get_return_value > >> > py4j.protocol.Py4JJavaError: An error occurred while calling > >> > o4317.collect. > >> > : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1 > >> > times > >> > (most recent failure: Exception failure: java.lang.StackOverflowError) > >> > at > >> > > >> > > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) > >> > at > >> > > >> > > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) > >> > at > >> > > >> > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > >> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > >> > at > >> > > >> > org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) > >> > at > >> > > >> > > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) > >> > at > >> > > >> > > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) > >> > at scala.Option.foreach(Option.scala:236) > >> > at > >> > > >> > > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) > >> > at > >> > > >> > > org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) > >> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > >> > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > >> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > >> > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > >> > at > >> > > >> > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > >> > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > >> > at > >> > > >> > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > >> > at > >> > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > >> > at > >> > > >> > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > >> > > >> > ====================================== > >> > The stack overflow error is shown as follows: > >> > ====================================== > >> > > >> > 14/05/12 16:20:28 ERROR Executor: Exception in task ID 1774 > >> > java.lang.StackOverflowError > >> > at java.util.zip.Inflater.inflate(Inflater.java:259) > >> > at > java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152) > >> > at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116) > >> > at > >> > > >> > > java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310) > >> > at > >> > > >> > > java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323) > >> > at > >> > > >> > > java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2818) > >> > at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1452) > >> > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1511) > >> > at > >> > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > >> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > >> > at > >> > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > >> > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > >> > at > >> > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > >> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > >> > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > >> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > >> > at > >> > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > >> > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > >> > at > >> > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > >> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > >> > at > >> > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > >> > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > >> > at > >> > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > >> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > >> > at > >> > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > >> > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > >> > at > >> > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > >> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > >> > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > >> > at scala.collection.immutable.$colon$colon.readObject(List.scala:362) > >> > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) > >> > at > >> > > >> > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> > at java.lang.reflect.Method.invoke(Method.java:606) > >> > at > >> > > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > >> > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > >> > at > >> > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > >> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > >> > at > >> > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > >> > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > >> > at > >> > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > >> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > >> > at > >> > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > >> > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > >> > at > >> > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > >> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > >> > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > >> > at scala.collection.immutable.$colon$colon.readObject(List.scala:362) > >> > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) > >> > at > >> > > >> > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> > at java.lang.reflect.Method.invoke(Method.java:606) > >> > at > >> > > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > >> > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > >> > at > >> > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > >> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > >> > at > >> > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > >> > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > >> > at > >> > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > >> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > >> > at > >> > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > >> > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > >> > at > >> > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > >> > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > >> > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > >> > at scala.collection.immutable.$colon$colon.readObject(List.scala:362) > >> > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) > >> > The above replicated many times after this … > >> > ====================================== > > > > >