I can't tell immediately, but you might be able to get more info with the hint provided here: http://stackoverflow.com/questions/27980781/spark-task-not-serializable-with-simple-accumulator (short version, set -Dsun.io.serialization.extendedDebugInfo=true)
Also, unless you're simplifying your example a lot, you only have 2 regexes, so I'm not quite sure why you want to broadcast them, as opposed to just having an object that holds them on each executor, or just create them at the start of mapPartitions (outside of iter.hasNext as shown in your second snippet). Broadcasting seems overcomplicated, but maybe you just showed a simplified example... On Wed, Jun 24, 2015 at 8:41 AM, yuemeng (A) <yueme...@huawei.com> wrote: > hi ,all > > there two examples one is throw Task not serializable when execute in > spark shell,the other one is ok,i am very puzzled,can anyone give what's > different about this two code and why the other is ok > > 1.The one which throw Task not serializable : > > import org.apache.spark._ > import SparkContext._ > import org.apache.spark.SparkConf > import org.apache.spark.streaming.{Seconds, StreamingContext} > import org.apache.spark.broadcast._ > > > > @transient val ssc = new StreamingContext(sc, Seconds(5)) > val lines = ssc.textFileStream("/a.log") > > > > > > val testFun = (line:String) => { > if ((line.contains(" ERROR")) || (line.startsWith("Spark"))){ > true > } > else{ > false > } > } > > val p_date_bc = sc.broadcast("^\\w+ \\w+ \\d+ \\d{2}:\\d{2}:\\d{2} > \\d{4}".r) > val p_ORA_bc = sc.broadcast("^ORA-\\d+.+".r) > val A = (iter: > Iterator[String],data_bc:Broadcast[scala.util.matching.Regex],ORA_bc:Broadcast[scala.util.matching.Regex]) > => { > val p_date = data_bc.value > val p_ORA = ORA_bc.value > var res = List[String]() > var lasttime = "" > > while (iter.hasNext) { > val line = iter.next.toString > val currentcode = p_ORA findFirstIn line getOrElse null > if (currentcode != null){ > res ::= lasttime + " | " + currentcode > }else{ > val currentdate = p_date findFirstIn line getOrElse > null > if (currentdate != null){ > lasttime = currentdate > } > } > } > res.iterator > } > > val cdd = lines.filter(testFun).mapPartitions(x => > A(x,p_date_bc,p_ORA_bc)) //org.apache.spark.SparkException: Task not > serializable > > > > 2.The other one is ok: > > > > import org.apache.spark._ > import SparkContext._ > import org.apache.spark.SparkConf > import org.apache.spark.streaming.{Seconds, StreamingContext} > import org.apache.spark.broadcast._ > > > > val ssc = new StreamingContext(sc, Seconds(5)) > val lines = ssc.textFileStream("/a.log") > > > > > > val testFun = (line:String) => { > if ((line.contains(" ERROR")) || (line.startsWith("Spark"))){ > true > } > else{ > false > } > } > > > > val A = (iter: Iterator[String]) => { > > var res = List[String]() > var lasttime = "" > while (iter.hasNext) { > val line = iter.next.toString > val currentcode = "^\\w+ \\w+ \\d+ \\d{2}:\\d{2}:\\d{2} > \\d{4}".r.findFirstIn(line).getOrElse(null) > if (currentcode != null){ > res ::= lasttime + " | " + currentcode > }else{ > val currentdate = > "^ORA-\\d+.+".r.findFirstIn(line).getOrElse(null) > if (currentdate != null){ > lasttime = currentdate > } > } > } > res.iterator > } > > > > val cdd= lines.filter(testFun).mapPartitions(A) > > > > > > > > >