You're getting confused about what code is running on the driver vs what code is running on the executor. Read
http://spark.apache.org/docs/latest/programming-guide.html#understanding-closures-a-nameclosureslinka On Mon, Feb 29, 2016 at 8:00 AM, franco barrientos < [email protected]> wrote: > Hi all, > > I want to read some filtering rules from mysql (jdbc mysql driver) > specifically its a char type containing a field and value to process in a > kafka streaming input. > > The main idea is to process this from a web UI (livy server). > > Any suggestion or guidelines? > > e.g., I have this: > > *object Streaming {* > * def main(args: Array[String]) {* > * if (args.length < 4) {* > * System.err.println("Usage: KafkaWordCount <zkQuorum> <group> > <topics> <numThreads>")* > * System.exit(1)* > * }* > * val Array(zkQuorum, group, topics, numThreads) = args* > * var spc = SparkContext.getOrCreate()* > * val ssc = new StreamingContext(spc, Seconds(3))* > * val lines = KafkaUtils.createStream(ssc, zkQuorum, group, > Map(topics -> 5)).map(_._2)* > * /* TEST MYSQL */* > * val sqlContext = new SQLContext(spc)* > * val prop = new java.util.Properties* > * val url = "jdbc:mysql://52.22.38.81:3306/tmp > <http://52.22.38.81:3306/tmp>"* > * val tbl_users = "santander_demo_users"* > * val tbl_rules = "santander_demo_filters"* > * val tbl_campaigns = "santander_demo_campaigns"* > * prop.setProperty("user", "root")* > * prop.setProperty("password", "Exalitica2014")* > * val users = sqlContext.read.jdbc(url, tbl_users, prop)* > * val rules = sqlContext.read.jdbc(url, tbl_rules, prop)* > * val campaigns = sqlContext.read.jdbc(url, tbl_campaigns, prop)* > * val toolbox = currentMirror.mkToolBox()* > * val toRemove = "\"”.toSet* > * var mto = “0"* > > * def rule_apply (n:Int, t:String, rules:DataFrame) : String = {* > * // reading rules from mysql* > * var r = (rules.filter(rules("CID") === > n).select("FILTER_DSC").first())(0).toString()* > > * // using mkToolbox for pre-processing rules* > * return toolbox.eval(toolbox.parse("""* > * val mto = """ + t + """* > * if(""" + r + """) {* > * return “true"* > * } else {* > * return “false"* > * }* > * """)).toString()* > * }* > * /* TEST MYSQL */* > > * lines.map{x =>* > * if(x.split(",").length > 1) {* > * // reading from kafka input* > * mto = spc.broadcast(x.split(",")(5).filterNot(toRemove))* > * }* > * }* > * var msg = rule_apply(1, mto, rules)* > * var word = lines.map(x => msg)* > * word.print()* > * ssc.start()* > * ssc.awaitTermination()* > * }* > *}* > > The problem is that *mto* variable always returns to “0” value after > mapping lines DStream. I tried to process *rule_apply *into map but I get > not serializable mkToolbox class error. > > Thanks in advance. > > *Franco Barrientos* > Data Scientist > > Málaga #115, Of. 1003, Las Condes. > Santiago, Chile. > (+562)-29699649 > (+569)-76347893 > > [email protected] > > www.exalitica.com > > [image: http://exalitica.com/web/img/frim.png] >
