Hi,
The aim:
- collects syslogs.
- filter (discard really unneeded events)
- save to cassandra table the rest
- later I will have to integrate search engine (Solr based)
Environment:
spark 1.5.2/scala 2.10.4
spark-cassandra-connector_2.11-1.5.0-M2.jar
flume 1.6.0
I have reviewed a lot examples and written a small scala code (I'm scala
beginner).
Now I have a problem with "extracting" and saving to C*.
Small code to describe what I mean with my comments:
object myPollingEvents {
def containsERR(x : String): Boolean = x match {
case x if x contains "error: Could not load host key" => false
case x if x contains "Did not receive identification string from
10." => false
case _ => true
}
def createContext(outputPath: String, checkpointDirectory: String)
: StreamingContext = {
println("Creating new context")
val outputFile = new File(outputPath)
if (outputFile.exists()) outputFile.delete()
val batchInterval = Milliseconds(2000)
val nms = new InetSocketAddress("nms.rsm.crp", 12345)
val sparkConf = new SparkConf()
.setMaster("local[4]")
.setAppName("myPollingEvents")
.set("spark.cleaner.ttl", "3600")
.set("spark.cassandra.connection.host", "127.0.0.1")
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
.setJars(Array("/data/spark/lib/spark-cassandra-connector_2.11-1.5.0-M2.jar"))
val ssc = new StreamingContext(sparkConf, batchInterval)
ssc.checkpoint(checkpointDirectory)
// I can "extract" body, filter and save to C* /CREATE TABLE
CassandraTableRaw(message text PRIMARY KEY);
val stream = FlumeUtils.createPollingStream(ssc, Seq(nms),
StorageLevel.MEMORY_AND_DISK_SER)
.map(e => new String(e.event.getBody.array(),
UTF_8)).filter(containsERR)
.map(Tuple1(_))
stream.print
stream.saveToCassandra("cassandrakeyspace", "cassandratableraw")
// Just control
stream.count().map(cnt => "Received " + cnt + " flume events.").print
// I can print "headers". On console something like "Map(Severity ->
6, Facility -> 3, host -> vpn10, priority -> 30, timestamp ->
1448702438000)"
// val headers = stream.map(_.event.getHeaders.asScala.map { case
(key, value) => (key.toString, value.toString)})
.print
But, I can't find the Scala examples for Flume-Spark streaming how to
combine filtered getBody and needed getHeaders and save to
C* (to something like CREATE TABLE CassandraTableRaw(hostname text
PRIMARY KEY, priority int, timestamp timestamp, msg text, program text,
pid int)
I would very very thankful if someone share the code example to resolve
this issue.
Thank a lot for reading and your help.
Oleksandr Yermolenko
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org