On 12 Jun 2015, at 23:19, Cody Koeninger <[email protected]> wrote:
> A. No, it's called once per partition. Usually you have more partitions than > executors, so it will end up getting called multiple times per executor. But > you can use a lazy val, singleton, etc to make sure the setup only takes > place once per JVM. > > B. I cant speak to the specifics there ... but as long as you're making sure > the setup gets called at most once per executor, before the work that needs > it ... should be ok. > Great thanks so much - (I guess I am not yet clear about the relationship of partition / executor / stage, but I get the idea.) Jan > On Fri, Jun 12, 2015 at 4:11 PM, algermissen1971 <[email protected]> > wrote: > > On 12 Jun 2015, at 22:59, Cody Koeninger <[email protected]> wrote: > > > Close. the mapPartitions call doesn't need to do anything at all to the > > iter. > > > > mapPartitions { iter => > > SomeDb.conn.init > > iter > > } > > Yes, thanks! > > Maybe you can confirm two more things and then you helped me make a giant > leap today: > > a) When using spark streaming, will this happen exactly once per executor? I > mean: is mapPartitions called once per executor for the lifetime of the > stream? > > Or should I rather think once per stage? > > > b) I actually need an ActorSystem and FlowMaterializer (for making an > Akka-HTTP request to store the data), not a DB connection - I presume this > does not changethe concept? > > > Jan > > > > > > > On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 > > <[email protected]> wrote: > > Cody, > > > > On 12 Jun 2015, at 17:26, Cody Koeninger <[email protected]> wrote: > > > > > There are several database apis that use a thread local or singleton > > > reference to a connection pool (we use ScalikeJDBC currently, but there > > > are others). > > > > > > You can use mapPartitions earlier in the chain to make sure the > > > connection pool is set up on that executor, then use it inside > > > updateStateByKey > > > > > > > Thanks. You are saying I should just make an arbitrary use of the > > ‘connection’ to invoke the ‘lazy’. E.g. like this: > > > > object SomeDB { > > > > lazy val conn = new SomeDB( “some serializable config") > > > > } > > > > > > Then somewhere else: > > > > theTrackingEvents.map(toPairs).mapPartitions(iter => iter.map( pair => { > > SomeDb.conn.init > > pair > > } > > )).updateStateByKey[Session](myUpdateFunction _) > > > > > > An in myUpdateFunction > > > > def myUpdateFunction( …) { > > > > SomeDb.conn.store( … ) > > > > } > > > > > > Correct? > > > > Jan > > > > > > > > > > > On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 > > > <[email protected]> wrote: > > > Hi, > > > > > > I have a scenario with spark streaming, where I need to write to a > > > database from within updateStateByKey[1]. > > > > > > That means that inside my update function I need a connection. > > > > > > I have so far understood that I should create a new (lazy) connection for > > > every partition. But since I am not working in foreachRDD I wonder where > > > I can iterate over the partitions. > > > > > > Should I use mapPartitions() somewhere up the chain? > > > > > > Jan > > > > > > > > > > > > [1] The use case being saving ‘done' sessions during web tracking. > > > > > > > > > --------------------------------------------------------------------- > > > To unsubscribe, e-mail: [email protected] > > > For additional commands, e-mail: [email protected] > > > > > > > > > > > > --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
