Hi Tathagata,

Thanks for your response, just the advice I was looking for.  I will try
this out with Spark 1.0 when it comes out.

Best regards,
Patrick


On 5 May 2014 22:42, Tathagata Das <tathagata.das1...@gmail.com> wrote:

> A few high-level suggestions.
>
> 1. I recommend using the new Receiver API in almost-released Spark 1.0
> (see branch-1.0 / master branch on github). Its a slightly better version
> of the earlier NetworkReceiver, as it hides away blockgenerator (which
> needed to be unnecessarily manually started and stopped) and add other
> lifecycle management methods like stop, restart, reportError to deal with
> errors in receiving data. Also, adds ability to write custom receiver from
> Java. Take a look at this 
> example<https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala>
>  of
> writing custom receiver in the new API. I am updating the custom receiver
> guide right now (https://github.com/apache/spark/pull/652).
>
> 2. Once you create a JMSReceiver class by extending
> NetworkReceiver/Receiver, you can create DStream out of the receiver by
>
> val jmsStream = ssc.networkStream(new JMSReceiver("...."))
>
> 3. As far as i understand from seeing the docs of 
> akka,camel.Consumer<http://doc.akka.io/api/akka/2.3.2/index.html#akka.camel.Consumer>,
> it is essentially a specialized Akka actor. For Akka actors, there is a
> ssc.actorStream, where you can specify your own actor class. You get actor
> supervision (and therefore error handling, etc.) with that. See the example
> AkkaWordCount - old style using 
> NetworkReceiver<https://github.com/apache/spark/blob/branch-0.9/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala>,
> or new style using 
> Receiver<https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala>
> .
>
> I havent personally played around with JMS before so cant comment much on
> JMS specific intricacies.
>
> TD
>
>
>
> On Mon, May 5, 2014 at 5:31 AM, Patrick McGloin <mcgloin.patr...@gmail.com
> > wrote:
>
>> Hi all,
>>
>> Is there a "best practice" for subscribing to JMS with Spark Streaming?
>>  I have searched but not found anything conclusive.
>>
>> In the absence of a standard practice the solution I was thinking of was
>> to use Akka + Camel (akka.camel.Consumer) to create a subscription for a
>> Spark Streaming Custom Receiver.  So the actor would look something like
>> this:
>>
>> class JmsReceiver(jmsURI: String) extends NetworkReceiver[String] with
>> Consumer {
>>   //e.g. "jms:sonicmq://localhost:2506/queue?destination=SampleQ1"
>>   def endpointUri = jmsURI
>>   lazy val blockGenerator = new
>> BlockGenerator(StorageLevel.MEMORY_ONLY_SER)
>>
>>   protected override def onStart() {
>>     blockGenerator.start
>>   }
>>
>>   def receive = {
>>     case msg: CamelMessage => { blockGenerator += msg.body }
>>     case _                 => { /* ... */ }
>>   }
>>
>>   protected override def onStop() {
>>     blockGenerator.stop
>>   }
>> }
>>
>> And then in the main application create receivers like this:
>>
>> val ssc = new StreamingContext(...)
>> object tascQueue extends JmsReceiver[String](ssc) {
>> override def getReceiver():JmsReceiver[String] = {
>>  new JmsReceiver("jms
>> :sonicmq://localhost:2506/queue?destination=TascQueue")
>>  }
>> }
>> ssc.registerInputStream(tascQueue)
>>
>> Is this the best way to go?
>>
>> Best regards,
>> Patrick
>>
>
>

Reply via email to