You may also be able to initialize the client only in the parallel execution by making it a "lazy" variable in Scala.
On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov <timur.fairu...@gmail.com> wrote: > Outstanding! Thanks, Aljoscha. > > On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> you could use a RichMapFunction that has an open method: >> >> data.map(new RichMapFunction[...]() { >> def open(): () = { >> // initialize client >> } >> >> def map(input: INT): OUT = { >> // use client >> } >> } >> >> the open() method is called before any elements are passed to the >> function. The counterpart of open() is close(), which is called after all >> elements are through or if the job cancels. >> >> Cheers, >> Aljoscha >> >> On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <timur.fairu...@gmail.com> >> wrote: >> >>> Hello, >>> >>> I'm writing a Scala Flink application. I have a standalone process that >>> exists on every Flink node that I need to call to transform my data. To >>> access this process I need to initialize non thread-safe client first. I >>> would like to avoid initializing a client for each element being >>> transformed. A straightforward implementation would be something like this: >>> ``` >>> >>> val env = ExecutionEnvironment.getExecutionEnvironment >>> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c")))) >>> val pool = new ArrayBlockingQueue[Client](5) >>> // pool is filled here >>> data.map(e => { >>> val client = pool.take() >>> val res = client.transform(e) >>> pool.put(client) >>> res >>> }) >>> >>> ``` >>> However, this causes a runtime exception with message "Task not >>> serializable", which makes sense. >>> >>> Function parameters and broadcast variables won't work either as far as >>> I understand. Is there a way to make this happen? >>> >>> Thanks, >>> Timur >>> >> >