bq.     val result = fDB.mappartitions(testMP).collect

Not sure if you pasted the above code - there was a typo: method name
should be mapPartitions

Cheers

On Sat, May 30, 2015 at 9:44 AM, unioah <uni...@gmail.com> wrote:

> Hi,
>
> I try to aggregate the value in each partition internally.
> For example,
>
> Before:
> worker 1:            worker 2:
> 1, 2, 1                 2, 1, 2
>
> After:
> worker 1:              worker 2:
> (1->2), (2->1)       (1->1), (2->2)
>
> I try to use mappartitions,
> object MyTest {
>   def main(args: Array[String]) {
>     val conf = new SparkConf().setAppName("This is a test")
>     val sc = new SparkContext(conf)
>
>     val fDB = sc.parallelize(List(1, 2, 1, 2, 1, 2, 5, 5, 2), 3)
>     val result = fDB.mappartitions(testMP).collect
>     println(result.mkString)
>     sc.stop
>   }
>
>   def testMP(iter: Iterator[Int]): Iterator[(Long, Int)] = {
>     var result = new LongMap[Int]()
>     var cur = 0l
>
>     while (iter.hasNext) {
>       cur = iter.next.toLong
>       if (result.contains(cur)) {
>         result(cur) += 1
>       } else {
>         result += (cur, 1)
>       }
>     }
>     result.toList.iterator
>   }
> }
>
> But I got the error message no matter how I tried.
>
> Driver stacktrace:
>         at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependent
> Stages(DAGScheduler.scala:1204)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>         at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>         at scala.Option.foreach(Option.scala:236)
>         at
>
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>         at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
>         at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 15/05/30 10:41:21 ERROR SparkDeploySchedulerBackend: Asked to remove
> non-existent executor 1
>
> Anybody can help me? Thx
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/problem-with-using-mapPartitions-tp12514.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>

Reply via email to