Yeah, using that currently. Doing:
dstream.foreachRDD(x => x.foreachPartition(y =>
cassandraConnector.withSessionDo(session =>{
val myHelper = MyHelper(session)
y.foreach(m =>{
processMessage(m, myHelper)
})
}))
Is there a better approach?
From: [email protected]
Date: Thu, 11 Dec 2014 15:35:31 +0100
Subject: Re: "Session" for connections?
To: [email protected]
CC: [email protected]; [email protected]
>I'm doing the same thing for using Cassandra,
For Cassandra, use the Spark-Cassandra connector [1], which does the Session
management, as described by TD, for you.
[1] https://github.com/datastax/spark-cassandra-connector
-kr, Gerard.
On Thu, Dec 11, 2014 at 1:55 PM, Ashic Mahtab <[email protected]> wrote:
That makes sense. I'll try that.
Thanks :)
> From: [email protected]
> Date: Thu, 11 Dec 2014 04:53:01 -0800
> Subject: Re: "Session" for connections?
> To: [email protected]
> CC: [email protected]
>
> You could create a lazily initialized singleton factory and connection
> pool. Whenever an executor starts running the firt task that needs to
> push out data, it will create the connection pool as a singleton. And
> subsequent tasks running on the executor is going to use the
> connection pool. You will also have to intelligently shutdown the
> connections because there is not a obvious way to shut them down. You
> could have a usage timeout - shutdown connection after not being used
> for 10 x batch interval.
>
> TD
>
> On Thu, Dec 11, 2014 at 4:28 AM, Ashic Mahtab <[email protected]> wrote:
> > Hi,
> > I was wondering if there's any way of having long running session type
> > behaviour in spark. For example, let's say we're using Spark Streaming to
> > listen to a stream of events. Upon receiving an event, we process it, and if
> > certain conditions are met, we wish to send a message to rabbitmq. Now,
> > rabbit clients have the concept of a connection factory, from which you
> > create a connection, from which you create a channel. You use the channel to
> > get a queue, and finally the queue is what you publish messages on.
> >
> > Currently, what I'm doing can be summarised as :
> >
> > dstream.foreachRDD(x => x.forEachPartition(y => {
> > val factory = ..
> > val connection = ...
> > val channel = ...
> > val queue = channel.declareQueue(...);
> >
> > y.foreach(z => Processor.Process(z, queue));
> >
> > cleanup the queue stuff.
> > }));
> >
> > I'm doing the same thing for using Cassandra, etc. Now in these cases, the
> > session initiation is expensive, so foing it per message is not a good idea.
> > However, I can't find a way to say "hey...do this per worker once and only
> > once".
> >
> > Is there a better pattern to do this?
> >
> > Regards,
> > Ashic.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>