Hi Sowen,

You're right, that example works, but look what example does not work for
me:

object Main  {  
  def main(args: Array[String]) { 
    val conf = new SparkConf().setAppName("name") 
    val sc = new SparkContext(conf) 
    val accum = sc.accumulator(0) 
    for (i <- 1 to 10) { 
      val y = sc.parallelize(Array(1, 2, 3, 4)).mapPartitions(x =>
foo(x,accum)).reduce(_ + _)
    } 
    println("Result : " + accum.value)
    sc.stop 
  } 
  
  def foo(i: Iterator[Int], a: Accumulator[Int]) : Iterator[Int] = {
    while (i.hasNext)  a += i.next
    List().iterator
  }
}  

This gives (I run it on a cluster with 16 nodes) the error:

Exception in thread "main" org.apache.spark.SparkException: Task not
serializable
        at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
        at EL_LBP_Spark$$anonfun$main$1.apply$mcVI$sp(EL_LBP_Spark.scala:16)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at EL_LBP_Spark$.main(EL_LBP_Spark.scala:15)
        at EL_LBP_Spark.main(EL_LBP_Spark.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
        at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
        at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
        at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
        ... 14 more


Seems that there is a problem with mapPartitions ...

Thanks for your suggestion,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bug-in-Accumulators-tp17263p19576.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to