>I'm doing the same thing for using Cassandra, For Cassandra, use the Spark-Cassandra connector [1], which does the Session management, as described by TD, for you.
[1] https://github.com/datastax/spark-cassandra-connector -kr, Gerard. On Thu, Dec 11, 2014 at 1:55 PM, Ashic Mahtab <as...@live.com> wrote: > That makes sense. I'll try that. > > Thanks :) > > > From: tathagata.das1...@gmail.com > > Date: Thu, 11 Dec 2014 04:53:01 -0800 > > Subject: Re: "Session" for connections? > > To: as...@live.com > > CC: user@spark.apache.org > > > > > You could create a lazily initialized singleton factory and connection > > pool. Whenever an executor starts running the firt task that needs to > > push out data, it will create the connection pool as a singleton. And > > subsequent tasks running on the executor is going to use the > > connection pool. You will also have to intelligently shutdown the > > connections because there is not a obvious way to shut them down. You > > could have a usage timeout - shutdown connection after not being used > > for 10 x batch interval. > > > > TD > > > > On Thu, Dec 11, 2014 at 4:28 AM, Ashic Mahtab <as...@live.com> wrote: > > > Hi, > > > I was wondering if there's any way of having long running session type > > > behaviour in spark. For example, let's say we're using Spark Streaming > to > > > listen to a stream of events. Upon receiving an event, we process it, > and if > > > certain conditions are met, we wish to send a message to rabbitmq. Now, > > > rabbit clients have the concept of a connection factory, from which you > > > create a connection, from which you create a channel. You use the > channel to > > > get a queue, and finally the queue is what you publish messages on. > > > > > > Currently, what I'm doing can be summarised as : > > > > > > dstream.foreachRDD(x => x.forEachPartition(y => { > > > val factory = .. > > > val connection = ... > > > val channel = ... > > > val queue = channel.declareQueue(...); > > > > > > y.foreach(z => Processor.Process(z, queue)); > > > > > > cleanup the queue stuff. > > > })); > > > > > > I'm doing the same thing for using Cassandra, etc. Now in these cases, > the > > > session initiation is expensive, so foing it per message is not a good > idea. > > > However, I can't find a way to say "hey...do this per worker once and > only > > > once". > > > > > > Is there a better pattern to do this? > > > > > > Regards, > > > Ashic. > > > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > > For additional commands, e-mail: user-h...@spark.apache.org > > >