Just tried the below code and works for me, not sure why is sparkContext being sent inside the mapPartitions function in your case. Can you try with simple map() instead of mapPartition?
val ac = sc.accumulator(0) > val or = sc.parallelize(1 to 10000) > val ps = or.map(x => (x,x+2)).map(x => ac +=1) > val test = ps.collect() > println(ac.value) Thanks Best Regards On Sun, Oct 26, 2014 at 1:44 AM, octavian.ganea <octavian.ga...@inf.ethz.ch> wrote: > Hi all, > > I tried to use accumulators without any success so far. > > My code is simple: > > val sc = new SparkContext(conf) > val accum = sc.accumulator(0) > val partialStats = sc.textFile(f.getAbsolutePath()) > .map(line => { val key = line.split("\t").head; (key , > line)} ) > .groupByKey(128) > .mapPartitions{iter => { accum += 1; foo(iter)}} > .reduce(_ + _) > println(accum.value) > > Now, if I remove the 'accum += 1', everything works fine. If I keep it, I > get this weird error: > > Exception in thread "main" 14/10/25 21:58:56 INFO TaskSchedulerImpl: > Cancelling stage 0 > org.apache.spark.SparkException: Job aborted due to stage failure: Task not > serializable: java.io.NotSerializableException: > org.apache.spark.SparkContext > at > org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) > at > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) > at > org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:890) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$1.apply(DAGScheduler.scala:887) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$1.apply(DAGScheduler.scala:887) > at scala.Option.foreach(Option.scala:236) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:887) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:886) > at > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:886) > at > > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1204) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > Can someone please help! > > Thank you! > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Accumulators-Task-not-serializable-java-io-NotSerializableException-org-apache-spark-SparkContext-tp17262.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >