I am not able to reproduce your error. You should do something before you
do that last function and maybe get some more help from the exception it
returns. Like just add a csv.show (1) on the line before.  Also, can you
post the different exception when you took out the "return" value like when
Bryan suggested?

It's getting to this bit of code

private[spark] class ReturnStatementInClosureException
  extends SparkException("Return statements aren't allowed in Spark closures")

private class ReturnStatementFinder extends ClassVisitor(ASM5) {
  override def visitMethod(access: Int, name: String, desc: String,
      sig: String, exceptions: Array[String]): MethodVisitor = {
    if (name.contains("apply")) {
      new MethodVisitor(ASM5) {
        override def visitTypeInsn(op: Int, tp: String) {
          if (op == NEW && tp.contains("scala/runtime/NonLocalReturnControl")) {
            throw new ReturnStatementInClosureException
          }
        }
      }
    } else {
      new MethodVisitor(ASM5) {}
    }
  }
}

and it must see the NonLocalReturnControl exception. My first guess is that
the "queryYahoo" function is doing something that is causing an exception,
but for some reason (Networking thing maybe?) it works ok in spark-shell.

On Feb 21, 2018 10:47 PM, "Lian Jiang" <jiangok2...@gmail.com> wrote:

> Sorry Bryan. Unfortunately, this is not the root cause.
>
> Any other ideas? This is blocking my scenario. Thanks.
>
> On Wed, Feb 21, 2018 at 4:26 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
> wrote:
>
>> Lian,
>>
>> You're writing Scala. Just remove the 'return'. No need for it in Scala.
>>
>> Get Outlook for Android <https://aka.ms/ghei36>
>>
>> ------------------------------
>> *From:* Lian Jiang <jiangok2...@gmail.com>
>> *Sent:* Wednesday, February 21, 2018 4:16:08 PM
>> *To:* user
>> *Subject:* Return statements aren't allowed in Spark closures
>>
>> I can run below code in spark-shell using yarn client mode.
>>
>> val csv = spark.read.option("header", "true").csv("my.csv")
>>
>> def queryYahoo(row: Row) : Int = { return 10; }
>>
>> csv.repartition(5).rdd.foreachPartition{ p => p.foreach(r => {
>> queryYahoo(r) })}
>>
>> However, the same code failed when run using spark-submit in yarn client
>> or cluster mode due to error:
>>
>> 18/02/21 21:00:12 ERROR ApplicationMaster: User class threw exception:
>> org.apache.spark.util.ReturnStatementInClosureException: Return
>> statements aren't allowed in Spark closures
>>
>> org.apache.spark.util.ReturnStatementInClosureException: Return
>> statements aren't allowed in Spark closures
>>
>> at org.apache.spark.util.ReturnStatementFinder$$anon$1.visitTyp
>> eInsn(ClosureCleaner.scala:371)
>>
>> at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
>>
>> at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
>>
>> at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
>>
>> at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
>>
>> at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$
>> ClosureCleaner$$clean(ClosureCleaner.scala:243)
>>
>> at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spa
>> rk$util$ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:306)
>>
>> at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spa
>> rk$util$ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:292)
>>
>> at scala.collection.TraversableLike$WithFilter$$anonfun$foreach
>> $1.apply(TraversableLike.scala:733)
>>
>> at scala.collection.immutable.List.foreach(List.scala:381)
>>
>> at scala.collection.TraversableLike$WithFilter.foreach(Traversa
>> bleLike.scala:732)
>>
>> at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$
>> ClosureCleaner$$clean(ClosureCleaner.scala:292)
>>
>> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
>>
>> at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
>>
>> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(R
>> DD.scala:925)
>>
>> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(R
>> DD.scala:924)
>>
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati
>> onScope.scala:151)
>>
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati
>> onScope.scala:112)
>>
>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>>
>> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
>>
>>
>> Any idea? Thanks.
>>
>
>

Reply via email to