Hi all,
I want to read some filtering rules from mysql (jdbc mysql driver) specifically
its a char type containing a field and value to process in a kafka streaming
input.
The main idea is to process this from a web UI (livy server).
Any suggestion or guidelines?
e.g., I have this:
object Streaming {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics>
<numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
var spc = SparkContext.getOrCreate()
val ssc = new StreamingContext(spc, Seconds(3))
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topics ->
5)).map(_._2)
/* TEST MYSQL */
val sqlContext = new SQLContext(spc)
val prop = new java.util.Properties
val url = "jdbc:mysql://52.22.38.81:3306/tmp"
val tbl_users = "santander_demo_users"
val tbl_rules = "santander_demo_filters"
val tbl_campaigns = "santander_demo_campaigns"
prop.setProperty("user", "root")
prop.setProperty("password", "Exalitica2014")
val users = sqlContext.read.jdbc(url, tbl_users, prop)
val rules = sqlContext.read.jdbc(url, tbl_rules, prop)
val campaigns = sqlContext.read.jdbc(url, tbl_campaigns, prop)
val toolbox = currentMirror.mkToolBox()
val toRemove = "\"”.toSet
var mto = “0"
def rule_apply (n:Int, t:String, rules:DataFrame) : String = {
// reading rules from mysql
var r = (rules.filter(rules("CID") ===
n).select("FILTER_DSC").first())(0).toString()
// using mkToolbox for pre-processing rules
return toolbox.eval(toolbox.parse("""
val mto = """ + t + """
if(""" + r + """) {
return “true"
} else {
return “false"
}
""")).toString()
}
/* TEST MYSQL */
lines.map{x =>
if(x.split(",").length > 1) {
// reading from kafka input
mto = spc.broadcast(x.split(",")(5).filterNot(toRemove))
}
}
var msg = rule_apply(1, mto, rules)
var word = lines.map(x => msg)
word.print()
ssc.start()
ssc.awaitTermination()
}
}
The problem is that mto variable always returns to “0” value after mapping
lines DStream. I tried to process rule_apply into map but I get not
serializable mkToolbox class error.
Thanks in advance.
Franco Barrientos
Data Scientist
Málaga #115, Of. 1003, Las Condes.
Santiago, Chile.
(+562)-29699649
(+569)-76347893
[email protected]
www.exalitica.com