You may also be able to initialize the client only in the parallel
execution by making it a "lazy" variable in Scala.

On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov <timur.fairu...@gmail.com>
wrote:

> Outstanding! Thanks, Aljoscha.
>
> On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi,
>> you could use a RichMapFunction that has an open method:
>>
>> data.map(new RichMapFunction[...]() {
>>   def open(): () = {
>>     // initialize client
>>   }
>>
>>   def map(input: INT): OUT = {
>>     // use client
>>   }
>> }
>>
>> the open() method is called before any elements are passed to the
>> function. The counterpart of open() is close(), which is called after all
>> elements are through or if the job cancels.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <timur.fairu...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I'm writing a Scala Flink application. I have a standalone process that
>>> exists on every Flink node that I need to call to transform my data. To
>>> access this process I need to initialize non thread-safe client first. I
>>> would like to avoid initializing a client for each element being
>>> transformed. A straightforward implementation would be something like this:
>>> ```
>>>
>>> val env = ExecutionEnvironment.getExecutionEnvironment
>>> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"))))
>>> val pool  = new ArrayBlockingQueue[Client](5)
>>> // pool is filled here
>>> data.map(e => {
>>>   val client = pool.take()
>>>   val res = client.transform(e)
>>>   pool.put(client)
>>>   res
>>> })
>>>
>>> ```
>>> However, this causes a runtime exception with message "Task not
>>> serializable", which makes sense.
>>>
>>> Function parameters and broadcast variables won't work either as far as
>>> I understand. Is there a way to make this happen?
>>>
>>> Thanks,
>>> Timur
>>>
>>
>

Reply via email to