Hi Nick, thanks for pushing this and opening the JIRA issue.
The issue came up a couple of times and a known limitation (see FLINK-1256). So far the workaround of marking member variables as transient and initializing them in the open() method of a RichFunction has been good enough for all cases I am aware of. That's probably why the issue hasn't been addressed yet. Of course this is not a satisfying solution, if you would like to use Java 8 lambda functions. Best, Fabian 2015-12-08 19:38 GMT+01:00 Nick Dimiduk <ndimi...@apache.org>: > That's what I feared. IMO this is very limiting when mixing in other > projects where a user does not have control over those projects' APIs. At > least falling back to an extensible serialization mechanism (like Kryo) > allows users to register serializers external to the types they're > consuming. > > I opened https://issues.apache.org/jira/browse/FLINK-3148 for this issue. > > -n > > On Tue, Dec 8, 2015 at 1:37 AM, Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi Nick, >> >> at the moment Flink uses Java serialization to ship the UDFs to the >> cluster. Therefore, the closures must only contain Serializable objects. >> The serializer registration only applies to the data which is processed by >> the Flink job. Thus, for the moment I would try to get rid of the >> ColumnInfo object in your closure. >> >> Cheers, >> Till >> >> >> On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk <ndimi...@gmail.com> wrote: >> >>> Hello, >>> >>> I've implemented a (streaming) flow using the Java API and Java8 Lambdas >>> for various map functions. When I try to run the flow, job submission fails >>> because of an unserializable type. This is not a type of data used within >>> the flow, but rather a small collection of objects captured in the closure >>> context over one of my Lambdas. I've implemented and registered a Kryo >>> Serializer for this type with this environment, however, it's apparently >>> not used when serializing the lambdas. Seems like the same serialization >>> configuration and tools of the environment should be used when preparing >>> the job for submission. Am I missing something? >>> >>> Thanks, >>> Nick >>> >>> org.apache.flink.client.program.ProgramInvocationException: The main >>> method caused an error. >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512) >>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395) >>> at >>> org.apache.flink.client.program.Client.runBlocking(Client.java:252) >>> at >>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675) >>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326) >>> at >>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977) >>> at >>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027) >>> Caused by: org.apache.flink.api.common.InvalidProgramException: Object >>> ImportFlow$$Lambda$11/1615389290@44286963 not serializable >>> at >>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97) >>> at >>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59) >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228) >>> at >>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149) >>> at >>> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550) >>> at ImportFlow.assembleImportFlow(ImportFlow.java:111) >>> at ImportFlow.main(ImportFlow.java:178) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:497) >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497) >>> ... 6 more >>> Caused by: java.io.NotSerializableException: >>> org.apache.phoenix.util.ColumnInfo >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>> at >>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>> at java.util.ArrayList.writeObject(ArrayList.java:762) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:497) >>> at >>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) >>> at >>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>> at >>> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) >>> at >>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>> at >>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>> at >>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>> at >>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307) >>> at >>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95) >>> ... 17 more >>> >> >> >