Ah, very good. I've closed my issue as a duplicate. Thanks for the
reference.

On Tue, Dec 8, 2015 at 11:23 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Nick,
>
> thanks for pushing this and opening the JIRA issue.
>
> The issue came up a couple of times and a known limitation (see
> FLINK-1256).
> So far the workaround of marking member variables as transient and
> initializing them in the open() method of a RichFunction has been good
> enough for all cases I am aware of. That's probably why the issue hasn't
> been addressed yet.
>
> Of course this is not a satisfying solution, if you would like to use Java
> 8 lambda functions.
>
> Best, Fabian
>
> 2015-12-08 19:38 GMT+01:00 Nick Dimiduk <ndimi...@apache.org>:
>
>> That's what I feared. IMO this is very limiting when mixing in other
>> projects where a user does not have control over those projects' APIs. At
>> least falling back to an extensible serialization mechanism (like Kryo)
>> allows users to register serializers external to the types they're
>> consuming.
>>
>> I opened https://issues.apache.org/jira/browse/FLINK-3148 for this issue.
>>
>> -n
>>
>> On Tue, Dec 8, 2015 at 1:37 AM, Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Hi Nick,
>>>
>>> at the moment Flink uses Java serialization to ship the UDFs to the
>>> cluster. Therefore, the closures must only contain Serializable
>>> objects. The serializer registration only applies to the data which is
>>> processed by the Flink job. Thus, for the moment I would try to get rid of
>>> the ColumnInfo object in your closure.
>>>
>>> Cheers,
>>> Till
>>> ​
>>>
>>> On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk <ndimi...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I've implemented a (streaming) flow using the Java API and Java8
>>>> Lambdas for various map functions. When I try to run the flow, job
>>>> submission fails because of an unserializable type. This is not a type of
>>>> data used within the flow, but rather a small collection of objects
>>>> captured in the closure context over one of my Lambdas. I've implemented
>>>> and registered a Kryo Serializer for this type with this environment,
>>>> however, it's apparently not used when serializing the lambdas. Seems like
>>>> the same serialization configuration and tools of the environment should be
>>>> used when preparing the job for submission. Am I missing something?
>>>>
>>>> Thanks,
>>>> Nick
>>>>
>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>> method caused an error.
>>>>         at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
>>>>         at
>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
>>>>         at
>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:252)
>>>>         at
>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
>>>>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
>>>>         at
>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
>>>>         at
>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
>>>> Caused by: org.apache.flink.api.common.InvalidProgramException: Object
>>>> ImportFlow$$Lambda$11/1615389290@44286963 not serializable
>>>>         at
>>>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
>>>>         at
>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
>>>>         at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
>>>>         at
>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149)
>>>>         at
>>>> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550)
>>>>         at ImportFlow.assembleImportFlow(ImportFlow.java:111)
>>>>         at ImportFlow.main(ImportFlow.java:178)
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>         at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>         at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>>>         at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
>>>>         ... 6 more
>>>> Caused by: java.io.NotSerializableException:
>>>> org.apache.phoenix.util.ColumnInfo
>>>>         at
>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>>         at
>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>>         at java.util.ArrayList.writeObject(ArrayList.java:762)
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>         at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>         at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>>>         at
>>>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>>>>         at
>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>>>>         at
>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>         at
>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>>         at
>>>> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>>>>         at
>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>>>>         at
>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>>         at
>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>>         at
>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>         at
>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>>         at
>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>>         at
>>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
>>>>         at
>>>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
>>>>         ... 17 more
>>>>
>>>
>>>
>>
>

Reply via email to