Here's what we've tried so far as a first example of a custom Mongo receiver
:
/class MongoStreamReceiver(host: String)
extends NetworkReceiver[String] {
protected lazy val blocksGenerator: BlockGenerator =
new BlockGenerator(StorageLevel.MEMORY_AND_DISK_SER_2)
protected def onStart() = {
blocksGenerator.start()
val driver = new MongoDriver
val connection = driver.connection(List("m01-pdp2"))
val db = connection.db("local")
val collection = db.collection[BSONCollection]("oplog.rs")
val query = BSONDocument("op" -> "i")
val enumerator =
collection.
find(query).
options(QueryOpts().tailable.awaitData).
cursor[BSONDocument].
enumerate()
val processor: Iteratee[BSONDocument, Unit] =
Iteratee.foreach { doc =>
blocksGenerator += BSONDocument.pretty(doc)
}
enumerator |>>> processor
}
protected def onStop() {
blocksGenerator.stop()
}
}
/
However this code doesn't run, probably because of serialization issues (no
logs to confirm this though, just no data in the stream...)
Note that if we comment out the ReactiveMongo-related code and put something
like this instead, the code runs fine :
/ for (i <- 0 until 1000) {
blocksGenerator += "hello world"
Thread.sleep(1000)
}
/
The Java socket example (found here
<http://spark.apache.org/docs/0.9.1/streaming-custom-receivers.html> )
works fine as well.
Any hints ?
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-ReactiveMongo-tp14568p14661.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]