Sorry Bryan. Unfortunately, this is not the root cause. Any other ideas? This is blocking my scenario. Thanks.
On Wed, Feb 21, 2018 at 4:26 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote: > Lian, > > You're writing Scala. Just remove the 'return'. No need for it in Scala. > > Get Outlook for Android <https://aka.ms/ghei36> > > ------------------------------ > *From:* Lian Jiang <jiangok2...@gmail.com> > *Sent:* Wednesday, February 21, 2018 4:16:08 PM > *To:* user > *Subject:* Return statements aren't allowed in Spark closures > > I can run below code in spark-shell using yarn client mode. > > val csv = spark.read.option("header", "true").csv("my.csv") > > def queryYahoo(row: Row) : Int = { return 10; } > > csv.repartition(5).rdd.foreachPartition{ p => p.foreach(r => { > queryYahoo(r) })} > > However, the same code failed when run using spark-submit in yarn client > or cluster mode due to error: > > 18/02/21 21:00:12 ERROR ApplicationMaster: User class threw exception: > org.apache.spark.util.ReturnStatementInClosureException: Return > statements aren't allowed in Spark closures > > org.apache.spark.util.ReturnStatementInClosureException: Return > statements aren't allowed in Spark closures > > at org.apache.spark.util.ReturnStatementFinder$$anon$1. > visitTypeInsn(ClosureCleaner.scala:371) > > at org.apache.xbean.asm5.ClassReader.a(Unknown Source) > > at org.apache.xbean.asm5.ClassReader.b(Unknown Source) > > at org.apache.xbean.asm5.ClassReader.accept(Unknown Source) > > at org.apache.xbean.asm5.ClassReader.accept(Unknown Source) > > at org.apache.spark.util.ClosureCleaner$.org$apache$ > spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:243) > > at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ > ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:306) > > at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ > ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:292) > > at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply( > TraversableLike.scala:733) > > at scala.collection.immutable.List.foreach(List.scala:381) > > at scala.collection.TraversableLike$WithFilter. > foreach(TraversableLike.scala:732) > > at org.apache.spark.util.ClosureCleaner$.org$apache$ > spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:292) > > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156) > > at org.apache.spark.SparkContext.clean(SparkContext.scala:2294) > > at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1. > apply(RDD.scala:925) > > at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1. > apply(RDD.scala:924) > > at org.apache.spark.rdd.RDDOperationScope$.withScope( > RDDOperationScope.scala:151) > > at org.apache.spark.rdd.RDDOperationScope$.withScope( > RDDOperationScope.scala:112) > > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > > at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924) > > > Any idea? Thanks. >