I am running into a different problem relating to this spark app right now
and I'm thinking it may be due to the fact that I am publishing to RabbitMQ
inside of a foreachPartition loop.  I would like to publish once for each
window and the app is publishing a lot more than that (it varies sometimes
it publishes 5 messages sometimes 15).  I am running this locally with 2
cores specified I believe (local[2]) so I wouldn't think there would be lots
of partitions.  Does anyone have any ideas on how to resolve this?  Below is
the code, thanks for any help!


import java.util.Calendar
import java.text.SimpleDateFormat


import kafka.producer._

import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

import com.mongodb.casbah.Imports._

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer

import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.json4s.native.Serialization.{ read, write, writePretty }

case class PageHit(SQLCalls: Double, URL: String, PageLoadTime: Double)

case class RabbitPayload2(URL: String,SQLCalls: Double, PageLoadTime:
Double, NumberPageHits: Int, CurrentSecond: String)

object myData {
  def main(args: Array[String]) {

    val ssc = new StreamingContext("local[2]", "Data", Seconds(30))
    ssc.checkpoint("checkpoint")
    val topicMap = Map("data" -> 1)

    val pipe = KafkaUtils.createStream(ssc,
"Zookeeper_1,Zookeeper_2,Zookeeper_3", "Cons1", topicMap).map(_._2)

    val pageHitStream = pipe.map(data => {
      parse(data)
    }).map(json => {
      implicit val formats = DefaultFormats
      val SQLCalls = (json \ "SQLCalls").extractOpt[Double]
      val URL = (json \ "URL").extractOpt[String]
      val PageLoadTime = (json \ "PageLoadTime").extractOpt[Double]

      PageHit(SQLCalls.getOrElse(0.0), URL.getOrElse("Empty"),
PageLoadTime.getOrElse(0.0))
    })

    def truncate(value: Double): Double = { return (value * 1000).round /
1000.toDouble }
    
    val windowStream = pageHitStream.filter(pageHit => pageHit.URL !=
"Empty").window(Minutes(1), Seconds(30))

    windowStream.foreachRDD(pageHit => {

      if (pageHit.count > 0) {
        val SQLCallArray = pageHit.collect.map(pageHit => pageHit.SQLCalls)
        var avgNumberSQlCalls = 0.0
        avgNumberSQlCalls = truncate((SQLCallArray.reduceLeft[Double](_ +
_)) / SQLCallArray.length)

        var totalPageLoadTime = 0.0
        val PageLoadTimeArray = pageHit.collect.map(pageHit =>
pageHit.PageLoadTime)

        val avgPageLoadTime =
truncate(PageLoadTimeArray.reduceLeft[Double](_ + _) /
PageLoadTimeArray.length)

        var URLArray = pageHit.collect.take(1).map(pageHit => pageHit.URL)
        val URL = URLArray(0)

        pageHit.foreachPartition(partition => {
          val factory = new ConnectionFactory()
          factory.setUsername("myUserName")
          factory.setPassword("myPassword")
          factory.setVirtualHost("/")
          factory.setHost("rabbits")
          factory.setPort(5672)
          val connection = factory.newConnection()

          val pageHitChannel = connection.createChannel()

          pageHitChannel.exchangeDeclare("PageHits", "fanout")
          
          val today = Calendar.getInstance().getTime()
          val currentSecondAsString = new SimpleDateFormat("ss")
          val secString = currentSecondAsString.format(today)


          implicit val formats = DefaultFormats

          val payload2 = RabbitPayload2(URL, avgNumberSQlCalls,
avgPageLoadTime, SQLCallArray.length / 2, secString)

     
          pageHitChannel.basicPublish("PageHits", "", null,
writePretty(payload2).getBytes)
     
          val mongoClient = MongoClient("mongodb")
          val db = mongoClient("Dev")
          val SQLCollection = db("SQLCalls")

          SQLCollection += MongoDBObject("Event" -> "Page Hit",
            "URL" -> URL,
            "Avg number of SQL Calls" -> avgNumberSQlCalls,
            "Avg Page Load Time" -> avgPageLoadTime,
            "Number of Page Hits in window" -> SQLCallArray.length / 2)

        })
      }
    })

    ssc.start()
    ssc.awaitTermination()

  }
}





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p12322.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to