I can't tell immediately, but you might be able to get more info with the
hint provided here:
http://stackoverflow.com/questions/27980781/spark-task-not-serializable-with-simple-accumulator
(short version, set -Dsun.io.serialization.extendedDebugInfo=true)

Also, unless you're simplifying your example a lot, you only have 2
regexes, so I'm not quite sure why you want to broadcast them, as opposed
to just having an object that holds them on each executor, or just create
them at the start of mapPartitions (outside of iter.hasNext as shown in
your second snippet). Broadcasting seems overcomplicated, but maybe you
just showed a simplified example...

On Wed, Jun 24, 2015 at 8:41 AM, yuemeng (A) <yueme...@huawei.com> wrote:

>  hi ,all
>
> there two examples one is throw Task not serializable when execute in
> spark shell,the other one is ok,i am very puzzled,can anyone give what's
> different about this two code and why the other is ok
>
> 1.The one which throw Task not serializable :
>
> import org.apache.spark._
> import SparkContext._
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming.{Seconds, StreamingContext}
> import org.apache.spark.broadcast._
>
>
>
> @transient val ssc = new StreamingContext(sc, Seconds(5))
> val lines = ssc.textFileStream("/a.log")
>
>
>
>
>
> val testFun = (line:String) => {
>             if ((line.contains(" ERROR")) || (line.startsWith("Spark"))){
>                 true
>             }
>             else{
>                 false
>             }
>         }
>
> val p_date_bc = sc.broadcast("^\\w+ \\w+ \\d+ \\d{2}:\\d{2}:\\d{2}
> \\d{4}".r)
> val p_ORA_bc = sc.broadcast("^ORA-\\d+.+".r)
> val A = (iter:
> Iterator[String],data_bc:Broadcast[scala.util.matching.Regex],ORA_bc:Broadcast[scala.util.matching.Regex])
> => {
>             val p_date = data_bc.value
>             val p_ORA = ORA_bc.value
>             var res = List[String]()
>             var lasttime = ""
>
>             while (iter.hasNext) {
>                 val line = iter.next.toString
>                 val currentcode = p_ORA findFirstIn line getOrElse null
>                 if (currentcode != null){
>                     res ::= lasttime + " | " + currentcode
>                 }else{
>                     val currentdate = p_date findFirstIn line getOrElse
> null
>                     if (currentdate != null){
>                         lasttime = currentdate
>                     }
>                 }
>             }
>             res.iterator
>         }
>
> val cdd = lines.filter(testFun).mapPartitions(x =>
> A(x,p_date_bc,p_ORA_bc))  //org.apache.spark.SparkException: Task not
> serializable
>
>
>
> 2.The other one is ok:
>
>
>
> import org.apache.spark._
> import SparkContext._
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming.{Seconds, StreamingContext}
> import org.apache.spark.broadcast._
>
>
>
> val ssc = new StreamingContext(sc, Seconds(5))
> val lines = ssc.textFileStream("/a.log")
>
>
>
>
>
> val testFun = (line:String) => {
>             if ((line.contains(" ERROR")) || (line.startsWith("Spark"))){
>                 true
>             }
>             else{
>                 false
>             }
>         }
>
>
>
> val A = (iter: Iterator[String]) => {
>
>             var res = List[String]()
>             var lasttime = ""
>             while (iter.hasNext) {
>                 val line = iter.next.toString
>                 val currentcode = "^\\w+ \\w+ \\d+ \\d{2}:\\d{2}:\\d{2}
> \\d{4}".r.findFirstIn(line).getOrElse(null)
>                 if (currentcode != null){
>                     res ::= lasttime + " | " + currentcode
>                 }else{
>                  val currentdate =
> "^ORA-\\d+.+".r.findFirstIn(line).getOrElse(null)
>                     if (currentdate != null){
>                         lasttime = currentdate
>                     }
>                 }
>             }
>             res.iterator
>         }
>
>
>
> val cdd= lines.filter(testFun).mapPartitions(A)
>
>
>
>
>
>
>
>
>

Reply via email to