Hi Tathagata, Thanks for your response, just the advice I was looking for. I will try this out with Spark 1.0 when it comes out.
Best regards, Patrick On 5 May 2014 22:42, Tathagata Das <tathagata.das1...@gmail.com> wrote: > A few high-level suggestions. > > 1. I recommend using the new Receiver API in almost-released Spark 1.0 > (see branch-1.0 / master branch on github). Its a slightly better version > of the earlier NetworkReceiver, as it hides away blockgenerator (which > needed to be unnecessarily manually started and stopped) and add other > lifecycle management methods like stop, restart, reportError to deal with > errors in receiving data. Also, adds ability to write custom receiver from > Java. Take a look at this > example<https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala> > of > writing custom receiver in the new API. I am updating the custom receiver > guide right now (https://github.com/apache/spark/pull/652). > > 2. Once you create a JMSReceiver class by extending > NetworkReceiver/Receiver, you can create DStream out of the receiver by > > val jmsStream = ssc.networkStream(new JMSReceiver("....")) > > 3. As far as i understand from seeing the docs of > akka,camel.Consumer<http://doc.akka.io/api/akka/2.3.2/index.html#akka.camel.Consumer>, > it is essentially a specialized Akka actor. For Akka actors, there is a > ssc.actorStream, where you can specify your own actor class. You get actor > supervision (and therefore error handling, etc.) with that. See the example > AkkaWordCount - old style using > NetworkReceiver<https://github.com/apache/spark/blob/branch-0.9/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala>, > or new style using > Receiver<https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala> > . > > I havent personally played around with JMS before so cant comment much on > JMS specific intricacies. > > TD > > > > On Mon, May 5, 2014 at 5:31 AM, Patrick McGloin <mcgloin.patr...@gmail.com > > wrote: > >> Hi all, >> >> Is there a "best practice" for subscribing to JMS with Spark Streaming? >> I have searched but not found anything conclusive. >> >> In the absence of a standard practice the solution I was thinking of was >> to use Akka + Camel (akka.camel.Consumer) to create a subscription for a >> Spark Streaming Custom Receiver. So the actor would look something like >> this: >> >> class JmsReceiver(jmsURI: String) extends NetworkReceiver[String] with >> Consumer { >> //e.g. "jms:sonicmq://localhost:2506/queue?destination=SampleQ1" >> def endpointUri = jmsURI >> lazy val blockGenerator = new >> BlockGenerator(StorageLevel.MEMORY_ONLY_SER) >> >> protected override def onStart() { >> blockGenerator.start >> } >> >> def receive = { >> case msg: CamelMessage => { blockGenerator += msg.body } >> case _ => { /* ... */ } >> } >> >> protected override def onStop() { >> blockGenerator.stop >> } >> } >> >> And then in the main application create receivers like this: >> >> val ssc = new StreamingContext(...) >> object tascQueue extends JmsReceiver[String](ssc) { >> override def getReceiver():JmsReceiver[String] = { >> new JmsReceiver("jms >> :sonicmq://localhost:2506/queue?destination=TascQueue") >> } >> } >> ssc.registerInputStream(tascQueue) >> >> Is this the best way to go? >> >> Best regards, >> Patrick >> > >