Hi,

The aim:
- collects syslogs.
- filter (discard really unneeded events)
- save to cassandra table the rest
- later I will have to integrate search engine (Solr based)

Environment:
spark 1.5.2/scala 2.10.4
spark-cassandra-connector_2.11-1.5.0-M2.jar
flume 1.6.0

I have reviewed a lot examples and written a small scala code (I'm scala beginner).
Now I have a problem with "extracting" and saving  to C*.
Small code to describe what I mean with my comments:

object myPollingEvents {

  def containsERR(x : String): Boolean = x match {
    case x if x contains "error: Could not load host key" => false
case x if x contains "Did not receive identification string from 10." => false
    case _ => true
  }

  def createContext(outputPath: String, checkpointDirectory: String)
    : StreamingContext = {

    println("Creating new context")
    val outputFile = new File(outputPath)
    if (outputFile.exists()) outputFile.delete()

    val batchInterval = Milliseconds(2000)
    val nms = new InetSocketAddress("nms.rsm.crp", 12345)

    val sparkConf = new SparkConf()
      .setMaster("local[4]")
      .setAppName("myPollingEvents")
      .set("spark.cleaner.ttl", "3600")
      .set("spark.cassandra.connection.host", "127.0.0.1")
      .set("spark.streaming.receiver.writeAheadLog.enable", "true")
.setJars(Array("/data/spark/lib/spark-cassandra-connector_2.11-1.5.0-M2.jar"))

    val ssc = new StreamingContext(sparkConf, batchInterval)
    ssc.checkpoint(checkpointDirectory)

// I can "extract" body, filter and save to C* /CREATE TABLE CassandraTableRaw(message text PRIMARY KEY); val stream = FlumeUtils.createPollingStream(ssc, Seq(nms), StorageLevel.MEMORY_AND_DISK_SER) .map(e => new String(e.event.getBody.array(), UTF_8)).filter(containsERR)
        .map(Tuple1(_))

        stream.print
        stream.saveToCassandra("cassandrakeyspace", "cassandratableraw")

    // Just control
    stream.count().map(cnt => "Received " + cnt + " flume events.").print

// I can print "headers". On console something like "Map(Severity -> 6, Facility -> 3, host -> vpn10, priority -> 30, timestamp -> 1448702438000)" // val headers = stream.map(_.event.getHeaders.asScala.map { case (key, value) => (key.toString, value.toString)})
           .print

But, I can't find the Scala examples for Flume-Spark streaming how to combine filtered getBody and needed getHeaders and save to C* (to something like CREATE TABLE CassandraTableRaw(hostname text PRIMARY KEY, priority int, timestamp timestamp, msg text, program text, pid int)

I would very very thankful if someone share the code example to resolve this issue.

Thank a lot for reading and your help.

Oleksandr Yermolenko

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to