hi ,all
there two examples one is throw Task not serializable when execute in spark
shell,the other one is ok,i am very puzzled,can anyone give what's different
about this two code and why the other is ok
1.The one which throw Task not serializable :
import org.apache.spark._
import SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.broadcast._
@transient val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.textFileStream("/a.log")
val testFun = (line:String) => {
if ((line.contains(" ERROR")) || (line.startsWith("Spark"))){
true
}
else{
false
}
}
val p_date_bc = sc.broadcast("^\\w+ \\w<file://\\w>+ \\d<file://\\d>+
\\d{2}:\\d{2}:\\d{2<file://\\d{2}:\\d{2}:\\d{2>} \\d{4}".r<file://\\d{4}".r>)
val p_ORA_bc = sc.broadcast("^ORA-\\d+.+".r)
val A = (iter:
Iterator[String],data_bc:Broadcast[scala.util.matching.Regex],ORA_bc:Broadcast[scala.util.matching.Regex])
=> {
val p_date = data_bc.value
val p_ORA = ORA_bc.value
var res = List[String]()
var lasttime = ""
while (iter.hasNext) {
val line = iter.next.toString
val currentcode = p_ORA findFirstIn line getOrElse null
if (currentcode != null){
res ::= lasttime + " | " + currentcode
}else{
val currentdate = p_date findFirstIn line getOrElse null
if (currentdate != null){
lasttime = currentdate
}
}
}
res.iterator
}
val cdd = lines.filter(testFun).mapPartitions(x => A(x,p_date_bc,p_ORA_bc))
//org.apache.spark.SparkException: Task not serializable
2.The other one is ok:
import org.apache.spark._
import SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.broadcast._
val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.textFileStream("/a.log")
val testFun = (line:String) => {
if ((line.contains(" ERROR")) || (line.startsWith("Spark"))){
true
}
else{
false
}
}
val A = (iter: Iterator[String]) => {
var res = List[String]()
var lasttime = ""
while (iter.hasNext) {
val line = iter.next.toString
val currentcode = "^\\w+ \\w<file://\\w>+ \\d<file://\\d>+
\\d{2}:\\d{2}:\\d{2<file://\\d{2}:\\d{2}:\\d{2>}
\\d{4}".r.findFirstIn(line).getOrElse(null<file://\\d{4}".r.findFirstIn(line).getOrElse(null>)
if (currentcode != null){
res ::= lasttime + " | " + currentcode
}else{
val currentdate =
"^ORA-\\d+.+".r.findFirstIn(line).getOrElse(null)
if (currentdate != null){
lasttime = currentdate
}
}
}
res.iterator
}
val cdd= lines.filter(testFun).mapPartitions(A)