I currently writing an application that uses spark streaming. What I am
trying to do is basically read in a few files (I do this by using the spark
context textFile) and then process those files inside an action that I apply
to a streaming RDD. Here is the main code below:
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("EmailIngestion")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val sc = new SparkContext(sparkConf)
val badWords = sc.textFile("/filters/badwords.txt")
val urlBlacklist = sc.textFile("/filters/source_url_blacklist.txt")
val domainBlacklist = sc.textFile("/filters/domain_blacklist.txt")
val emailBlacklist = sc.textFile("/filters/blacklist.txt")
val lines = FlumeUtils.createStream(ssc, "localhost", 4545,
StorageLevel.MEMORY_ONLY_SER_2)
lines.foreachRDD(rdd => rdd.foreachPartition(json =>
Processor.ProcessRecord(json, badWords, urlBlacklist, domainBlacklist,
emailBlacklist)))
ssc.start()
ssc.awaitTermination()
}
Here is the code for processing the files found inside the ProcessRecord
method:
val emailBlacklistCnt = emailBlacklist.filter(black =>
black.contains(email)).count
It looks like this throws an exception. Is it possible to do this?
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-call-a-transform-action-inside-an-action-tp17568.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]