You are correct in that I am trying to publish inside of a foreachRDD loop.
I am currently refactoring and will try publishing inside the
foreachPartition loop. Below is the code showing the way it is currently
written, thanks!
object myData {
def main(args: Array[String]) {
val ssc = new StreamingContext("local[8]", "Data", Seconds(10))
ssc.checkpoint("checkpoint")
val topicMap = Map("pagehit.data" -> 1)
val factory = new ConnectionFactory()
factory.setUsername("officialUsername")
factory.setPassword("crypticPassword")
factory.setVirtualHost("/")
factory.setHost("rabbit-env")
factory.setPort(0000)
val connection = factory.newConnection()
val SQLChannel = connection.createChannel()
SQLChannel.queueDeclare("SQLQueue", true, false, false, null)
val Pipe = KafkaUtils.createStream(ssc,
"Zookeeper_1,Zookeeper_1,Zookeeper_3", "Cons1",
topicMap).map(_._2)
//PARSE SOME JSON ETC
windowStream.foreachRDD(pagehit => {
val mongoClient = MongoClient("my-mongodb")
val db = mongoClient("myClient")
val SQLCollection = db("SQLCalls")
val callArray = pagehit.map(_._1).collect
val avg = (callArray.reduceLeft[Long](_+_))/callArray.length
val URL = pagehit.take(1).map(_._2)
SQLCollection += MongoDBObject("URL" -> URL(0).substring(7,
URL(0).length - 1),
"Avg Page Load
Time" -> avg)
val toBuildJSON = Seq(baseMsg, avg.toString, closingBrace)
val byteArray = toBuildJSON.mkString.getBytes()
SQLChannel.basicPublish("", "SQLQueue", null, byteArray)
})
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Writing-to-RabbitMQ-tp11283p11445.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]