bq. val result = fDB.mappartitions(testMP).collect Not sure if you pasted the above code - there was a typo: method name should be mapPartitions
Cheers On Sat, May 30, 2015 at 9:44 AM, unioah <uni...@gmail.com> wrote: > Hi, > > I try to aggregate the value in each partition internally. > For example, > > Before: > worker 1: worker 2: > 1, 2, 1 2, 1, 2 > > After: > worker 1: worker 2: > (1->2), (2->1) (1->1), (2->2) > > I try to use mappartitions, > object MyTest { > def main(args: Array[String]) { > val conf = new SparkConf().setAppName("This is a test") > val sc = new SparkContext(conf) > > val fDB = sc.parallelize(List(1, 2, 1, 2, 1, 2, 5, 5, 2), 3) > val result = fDB.mappartitions(testMP).collect > println(result.mkString) > sc.stop > } > > def testMP(iter: Iterator[Int]): Iterator[(Long, Int)] = { > var result = new LongMap[Int]() > var cur = 0l > > while (iter.hasNext) { > cur = iter.next.toLong > if (result.contains(cur)) { > result(cur) += 1 > } else { > result += (cur, 1) > } > } > result.toList.iterator > } > } > > But I got the error message no matter how I tried. > > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependent > Stages(DAGScheduler.scala:1204) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) > at > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) > at scala.Option.foreach(Option.scala:236) > at > > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) > at > > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) > at > > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > 15/05/30 10:41:21 ERROR SparkDeploySchedulerBackend: Asked to remove > non-existent executor 1 > > Anybody can help me? Thx > > > > -- > View this message in context: > http://apache-spark-developers-list.1001551.n3.nabble.com/problem-with-using-mapPartitions-tp12514.html > Sent from the Apache Spark Developers List mailing list archive at > Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org > For additional commands, e-mail: dev-h...@spark.apache.org > >