I am running into a different problem relating to this spark app right now and I'm thinking it may be due to the fact that I am publishing to RabbitMQ inside of a foreachPartition loop. I would like to publish once for each window and the app is publishing a lot more than that (it varies sometimes it publishes 5 messages sometimes 15). I am running this locally with 2 cores specified I believe (local[2]) so I wouldn't think there would be lots of partitions. Does anyone have any ideas on how to resolve this? Below is the code, thanks for any help!
import java.util.Calendar import java.text.SimpleDateFormat import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf import com.mongodb.casbah.Imports._ import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer import org.json4s._ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization import org.json4s.native.Serialization.{ read, write, writePretty } case class PageHit(SQLCalls: Double, URL: String, PageLoadTime: Double) case class RabbitPayload2(URL: String,SQLCalls: Double, PageLoadTime: Double, NumberPageHits: Int, CurrentSecond: String) object myData { def main(args: Array[String]) { val ssc = new StreamingContext("local[2]", "Data", Seconds(30)) ssc.checkpoint("checkpoint") val topicMap = Map("data" -> 1) val pipe = KafkaUtils.createStream(ssc, "Zookeeper_1,Zookeeper_2,Zookeeper_3", "Cons1", topicMap).map(_._2) val pageHitStream = pipe.map(data => { parse(data) }).map(json => { implicit val formats = DefaultFormats val SQLCalls = (json \ "SQLCalls").extractOpt[Double] val URL = (json \ "URL").extractOpt[String] val PageLoadTime = (json \ "PageLoadTime").extractOpt[Double] PageHit(SQLCalls.getOrElse(0.0), URL.getOrElse("Empty"), PageLoadTime.getOrElse(0.0)) }) def truncate(value: Double): Double = { return (value * 1000).round / 1000.toDouble } val windowStream = pageHitStream.filter(pageHit => pageHit.URL != "Empty").window(Minutes(1), Seconds(30)) windowStream.foreachRDD(pageHit => { if (pageHit.count > 0) { val SQLCallArray = pageHit.collect.map(pageHit => pageHit.SQLCalls) var avgNumberSQlCalls = 0.0 avgNumberSQlCalls = truncate((SQLCallArray.reduceLeft[Double](_ + _)) / SQLCallArray.length) var totalPageLoadTime = 0.0 val PageLoadTimeArray = pageHit.collect.map(pageHit => pageHit.PageLoadTime) val avgPageLoadTime = truncate(PageLoadTimeArray.reduceLeft[Double](_ + _) / PageLoadTimeArray.length) var URLArray = pageHit.collect.take(1).map(pageHit => pageHit.URL) val URL = URLArray(0) pageHit.foreachPartition(partition => { val factory = new ConnectionFactory() factory.setUsername("myUserName") factory.setPassword("myPassword") factory.setVirtualHost("/") factory.setHost("rabbits") factory.setPort(5672) val connection = factory.newConnection() val pageHitChannel = connection.createChannel() pageHitChannel.exchangeDeclare("PageHits", "fanout") val today = Calendar.getInstance().getTime() val currentSecondAsString = new SimpleDateFormat("ss") val secString = currentSecondAsString.format(today) implicit val formats = DefaultFormats val payload2 = RabbitPayload2(URL, avgNumberSQlCalls, avgPageLoadTime, SQLCallArray.length / 2, secString) pageHitChannel.basicPublish("PageHits", "", null, writePretty(payload2).getBytes) val mongoClient = MongoClient("mongodb") val db = mongoClient("Dev") val SQLCollection = db("SQLCalls") SQLCollection += MongoDBObject("Event" -> "Page Hit", "URL" -> URL, "Avg number of SQL Calls" -> avgNumberSQlCalls, "Avg Page Load Time" -> avgPageLoadTime, "Number of Page Hits in window" -> SQLCallArray.length / 2) }) } }) ssc.start() ssc.awaitTermination() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p12322.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org