On 12 Jun 2015, at 23:19, Cody Koeninger <[email protected]> wrote:

> A. No, it's called once per partition.  Usually you have more partitions than 
> executors, so it will end up getting called multiple times per executor.  But 
> you can use a lazy val, singleton, etc to make sure the setup only takes 
> place once per JVM.
> 
> B.  I cant speak to the specifics there ... but as long as you're making sure 
> the setup gets called at most once per executor, before the work that needs 
> it ... should be ok.
> 

Great thanks so much - 

(I guess I am not yet clear about the relationship of partition / executor / 
stage, but I get the idea.)

Jan


> On Fri, Jun 12, 2015 at 4:11 PM, algermissen1971 <[email protected]> 
> wrote:
> 
> On 12 Jun 2015, at 22:59, Cody Koeninger <[email protected]> wrote:
> 
> > Close.  the mapPartitions call doesn't need to do anything at all to the 
> > iter.
> >
> > mapPartitions { iter =>
> >   SomeDb.conn.init
> >   iter
> > }
> 
> Yes, thanks!
> 
> Maybe you can confirm two more things and then you helped me make a giant 
> leap today:
> 
> a) When using spark streaming, will this happen exactly once per executor? I 
> mean: is mapPartitions called once per executor for the lifetime of the 
> stream?
> 
> Or should I rather think once per stage?
> 
> 
> b) I actually need an ActorSystem and FlowMaterializer (for making an 
> Akka-HTTP request to store the data), not a DB connection - I presume this 
> does not changethe concept?
> 
> 
> Jan
> 
> 
> 
> >
> > On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 
> > <[email protected]> wrote:
> > Cody,
> >
> > On 12 Jun 2015, at 17:26, Cody Koeninger <[email protected]> wrote:
> >
> > > There are several database apis that use a thread local or singleton 
> > > reference to a connection pool (we use ScalikeJDBC currently, but there 
> > > are others).
> > >
> > > You can use mapPartitions earlier in the chain to make sure the 
> > > connection pool is set up on that executor, then use it inside 
> > > updateStateByKey
> > >
> >
> > Thanks. You are saying I should just make an arbitrary use of the 
> > ‘connection’ to invoke the ‘lazy’. E.g. like this:
> >
> > object SomeDB {
> >
> >   lazy val conn = new SomeDB( “some serializable config")
> >
> > }
> >
> >
> > Then somewhere else:
> >
> > theTrackingEvents.map(toPairs).mapPartitions(iter => iter.map( pair => {
> >   SomeDb.conn.init
> >   pair
> >    }
> > )).updateStateByKey[Session](myUpdateFunction _)
> >
> >
> > An in myUpdateFunction
> >
> > def myUpdateFunction( …) {
> >
> >     SomeDb.conn.store(  … )
> >
> > }
> >
> >
> > Correct?
> >
> > Jan
> >
> >
> >
> >
> > > On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 
> > > <[email protected]> wrote:
> > > Hi,
> > >
> > > I have a scenario with spark streaming, where I need to write to a 
> > > database from within updateStateByKey[1].
> > >
> > > That means that inside my update function I need a connection.
> > >
> > > I have so far understood that I should create a new (lazy) connection for 
> > > every partition. But since I am not working in foreachRDD I wonder where 
> > > I can iterate over the partitions.
> > >
> > > Should I use mapPartitions() somewhere up the chain?
> > >
> > > Jan
> > >
> > >
> > >
> > > [1] The use case being saving ‘done' sessions during web tracking.
> > >
> > >
> > > ---------------------------------------------------------------------
> > > To unsubscribe, e-mail: [email protected]
> > > For additional commands, e-mail: [email protected]
> > >
> > >
> >
> >
> 
> 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to