You can set the Java option "-Dsun.io.serialization.extendedDebugInfo=true" to have more information about the object be printed. It will help you trace down the how the SparkContext is getting included in some kind of closure.
TD On Thu, Jul 24, 2014 at 9:48 AM, lihu <lihu...@gmail.com> wrote: > Which code do you used, do you caused by your own code or something in > spark itself? > > > On Tue, Jul 22, 2014 at 8:50 AM, hsy...@gmail.com <hsy...@gmail.com> > wrote: > >> I have the same problem >> >> >> On Sat, Jul 19, 2014 at 12:31 AM, lihu <lihu...@gmail.com> wrote: >> >>> Hi, >>> Everyone. I have a piece of following code. When I run it, >>> it occurred the error just like below, it seem that the SparkContext is not >>> serializable, but i do not try to use the SparkContext except the broadcast. >>> [In fact, this code is in the MLLib, I just try to broadcast the >>> centerArrays ] >>> >>> it can success in the redeceBykey operation, but failed at the >>> collect operation, this confused me. >>> >>> >>> INFO DAGScheduler: Failed to run collect at KMeans.scala:235 >>> [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task >>> not serializable: java.io.NotSerializableException: >>> org.apache.spark.SparkContext >>> org.apache.spark.SparkException: Job aborted: Task not serializable: >>> java.io.NotSerializableException: org.apache.spark.SparkContext >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) >>> >>> >>> >>> >>> private def initKMeansParallel(data: RDD[Array[Double]]): >>> Array[ClusterCenters] = { >>> >>> @transient val sc = data.sparkContext // I try to add the >>> transient >>> annotation here, but it doesn't work >>> >>> // Initialize each run's center to a random point >>> val seed = new XORShiftRandom().nextInt() >>> val sample = data.takeSample(true, runs, seed).toSeq >>> val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r))) >>> >>> // On each step, sample 2 * k points on average for each run with >>> probability proportional >>> // to their squared distance from that run's current centers >>> for (step <- 0 until initializationSteps) { >>> val centerArrays = sc.broadcast(centers.map(_.toArray)) >>> val sumCosts = data.flatMap { point => >>> for (r <- 0 until runs) yield (r, >>> KMeans.pointCost(centerArrays.value(r), point)) >>> }.reduceByKey(_ + _).collectAsMap() >>> //can pass at this point >>> val chosen = data.mapPartitionsWithIndex { (index, points) => >>> val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) >>> for { >>> p <- points >>> r <- 0 until runs >>> if rand.nextDouble() < KMeans.pointCost(centerArrays.value(r), >>> p) * 2 * k / sumCosts(r) >>> } yield (r, p) >>> }.collect() >>> // failed at this >>> point. >>> for ((r, p) <- chosen) { >>> centers(r) += p >>> } >>> } >>> >>> >>> >>> >>> >> > > > -- > *Best Wishes!* > > *Li Hu(李浒) | Graduate Student* > > *Institute for Interdisciplinary Information Sciences(IIIS > <http://iiis.tsinghua.edu.cn/>) * > *Tsinghua University, China* > > *Email: lihu...@gmail.com <lihu...@gmail.com>* > *Tel : +86 15120081920 <%2B86%2015120081920>* > *Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/ > <http://iiis.tsinghua.edu.cn/zh/lihu/>* > > >