Actually, this should be independent of Java 8 lambdas vs Java 7 anonymous
classes.
I have been using Java 8 lambdas quite a bit with Flink.

The important thing is that no non-serializable objects are in the closure.

As Fabian mentioned, lazy initialization helps. Serializability is also
discussed here:
http://stackoverflow.com/questions/34118469/flink-using-dagger-injections-not-serializable

Adding another serialization framework may help for cases where simply the
java.io.Serializable interface is missing in an object. However, Not
everything is magically serializable with Kryo.
There are classes that you can serialize with Java Serialization, but not
out of the box with Kryo (especially when immutable collections are
involved). Also classes that have no default constructors, but have checks
on invariants, etc can fail with Kryo arbitrarily.



On Tue, Dec 8, 2015 at 8:28 PM, Nick Dimiduk <ndimi...@gmail.com> wrote:

> Ah, very good. I've closed my issue as a duplicate. Thanks for the
> reference.
>
> On Tue, Dec 8, 2015 at 11:23 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Nick,
>>
>> thanks for pushing this and opening the JIRA issue.
>>
>> The issue came up a couple of times and a known limitation (see
>> FLINK-1256).
>> So far the workaround of marking member variables as transient and
>> initializing them in the open() method of a RichFunction has been good
>> enough for all cases I am aware of. That's probably why the issue hasn't
>> been addressed yet.
>>
>> Of course this is not a satisfying solution, if you would like to use
>> Java 8 lambda functions.
>>
>> Best, Fabian
>>
>> 2015-12-08 19:38 GMT+01:00 Nick Dimiduk <ndimi...@apache.org>:
>>
>>> That's what I feared. IMO this is very limiting when mixing in other
>>> projects where a user does not have control over those projects' APIs. At
>>> least falling back to an extensible serialization mechanism (like Kryo)
>>> allows users to register serializers external to the types they're
>>> consuming.
>>>
>>> I opened https://issues.apache.org/jira/browse/FLINK-3148 for this
>>> issue.
>>>
>>> -n
>>>
>>> On Tue, Dec 8, 2015 at 1:37 AM, Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> Hi Nick,
>>>>
>>>> at the moment Flink uses Java serialization to ship the UDFs to the
>>>> cluster. Therefore, the closures must only contain Serializable
>>>> objects. The serializer registration only applies to the data which is
>>>> processed by the Flink job. Thus, for the moment I would try to get rid of
>>>> the ColumnInfo object in your closure.
>>>>
>>>> Cheers,
>>>> Till
>>>> ​
>>>>
>>>> On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk <ndimi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I've implemented a (streaming) flow using the Java API and Java8
>>>>> Lambdas for various map functions. When I try to run the flow, job
>>>>> submission fails because of an unserializable type. This is not a type of
>>>>> data used within the flow, but rather a small collection of objects
>>>>> captured in the closure context over one of my Lambdas. I've implemented
>>>>> and registered a Kryo Serializer for this type with this environment,
>>>>> however, it's apparently not used when serializing the lambdas. Seems like
>>>>> the same serialization configuration and tools of the environment should 
>>>>> be
>>>>> used when preparing the job for submission. Am I missing something?
>>>>>
>>>>> Thanks,
>>>>> Nick
>>>>>
>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>> method caused an error.
>>>>>         at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
>>>>>         at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
>>>>>         at
>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:252)
>>>>>         at
>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
>>>>>         at
>>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
>>>>>         at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
>>>>>         at
>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
>>>>> Caused by: org.apache.flink.api.common.InvalidProgramException: Object
>>>>> ImportFlow$$Lambda$11/1615389290@44286963 not serializable
>>>>>         at
>>>>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
>>>>>         at
>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
>>>>>         at
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
>>>>>         at
>>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149)
>>>>>         at
>>>>> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550)
>>>>>         at ImportFlow.assembleImportFlow(ImportFlow.java:111)
>>>>>         at ImportFlow.main(ImportFlow.java:178)
>>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>         at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>         at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>>>>         at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
>>>>>         ... 6 more
>>>>> Caused by: java.io.NotSerializableException:
>>>>> org.apache.phoenix.util.ColumnInfo
>>>>>         at
>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>>>         at
>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>>>         at java.util.ArrayList.writeObject(ArrayList.java:762)
>>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>         at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>         at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>>>>         at
>>>>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>>>>>         at
>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>>>>>         at
>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>>         at
>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>>>         at
>>>>> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>>>>>         at
>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>>>>>         at
>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>>>         at
>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>>>         at
>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>>         at
>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>>>         at
>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>>>         at
>>>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
>>>>>         at
>>>>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
>>>>>         ... 17 more
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to