Ah, very good. I've closed my issue as a duplicate. Thanks for the reference.
On Tue, Dec 8, 2015 at 11:23 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Nick, > > thanks for pushing this and opening the JIRA issue. > > The issue came up a couple of times and a known limitation (see > FLINK-1256). > So far the workaround of marking member variables as transient and > initializing them in the open() method of a RichFunction has been good > enough for all cases I am aware of. That's probably why the issue hasn't > been addressed yet. > > Of course this is not a satisfying solution, if you would like to use Java > 8 lambda functions. > > Best, Fabian > > 2015-12-08 19:38 GMT+01:00 Nick Dimiduk <ndimi...@apache.org>: > >> That's what I feared. IMO this is very limiting when mixing in other >> projects where a user does not have control over those projects' APIs. At >> least falling back to an extensible serialization mechanism (like Kryo) >> allows users to register serializers external to the types they're >> consuming. >> >> I opened https://issues.apache.org/jira/browse/FLINK-3148 for this issue. >> >> -n >> >> On Tue, Dec 8, 2015 at 1:37 AM, Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Hi Nick, >>> >>> at the moment Flink uses Java serialization to ship the UDFs to the >>> cluster. Therefore, the closures must only contain Serializable >>> objects. The serializer registration only applies to the data which is >>> processed by the Flink job. Thus, for the moment I would try to get rid of >>> the ColumnInfo object in your closure. >>> >>> Cheers, >>> Till >>> >>> >>> On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk <ndimi...@gmail.com> >>> wrote: >>> >>>> Hello, >>>> >>>> I've implemented a (streaming) flow using the Java API and Java8 >>>> Lambdas for various map functions. When I try to run the flow, job >>>> submission fails because of an unserializable type. This is not a type of >>>> data used within the flow, but rather a small collection of objects >>>> captured in the closure context over one of my Lambdas. I've implemented >>>> and registered a Kryo Serializer for this type with this environment, >>>> however, it's apparently not used when serializing the lambdas. Seems like >>>> the same serialization configuration and tools of the environment should be >>>> used when preparing the job for submission. Am I missing something? >>>> >>>> Thanks, >>>> Nick >>>> >>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>> method caused an error. >>>> at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395) >>>> at >>>> org.apache.flink.client.program.Client.runBlocking(Client.java:252) >>>> at >>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675) >>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326) >>>> at >>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977) >>>> at >>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027) >>>> Caused by: org.apache.flink.api.common.InvalidProgramException: Object >>>> ImportFlow$$Lambda$11/1615389290@44286963 not serializable >>>> at >>>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97) >>>> at >>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59) >>>> at >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228) >>>> at >>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149) >>>> at >>>> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550) >>>> at ImportFlow.assembleImportFlow(ImportFlow.java:111) >>>> at ImportFlow.main(ImportFlow.java:178) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:497) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497) >>>> ... 6 more >>>> Caused by: java.io.NotSerializableException: >>>> org.apache.phoenix.util.ColumnInfo >>>> at >>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>>> at >>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>>> at java.util.ArrayList.writeObject(ArrayList.java:762) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:497) >>>> at >>>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) >>>> at >>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) >>>> at >>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>> at >>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>> at >>>> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) >>>> at >>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) >>>> at >>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>>> at >>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>>> at >>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>> at >>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>> at >>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>>> at >>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307) >>>> at >>>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95) >>>> ... 17 more >>>> >>> >>> >> >