The point is to provide a means for user to work around nonconforming APIs. Kryo at least is extensible in that you can register additional serializers.
On Tue, Dec 8, 2015 at 11:40 AM, Stephan Ewen <se...@apache.org> wrote: > Actually, this should be independent of Java 8 lambdas vs Java 7 anonymous > classes. > I have been using Java 8 lambdas quite a bit with Flink. > > The important thing is that no non-serializable objects are in the closure. > > As Fabian mentioned, lazy initialization helps. Serializability is also > discussed here: > http://stackoverflow.com/questions/34118469/flink-using-dagger-injections-not-serializable > > Adding another serialization framework may help for cases where simply the > java.io.Serializable interface is missing in an object. However, Not > everything is magically serializable with Kryo. > There are classes that you can serialize with Java Serialization, but not > out of the box with Kryo (especially when immutable collections are > involved). Also classes that have no default constructors, but have checks > on invariants, etc can fail with Kryo arbitrarily. > > > > On Tue, Dec 8, 2015 at 8:28 PM, Nick Dimiduk <ndimi...@gmail.com> wrote: > >> Ah, very good. I've closed my issue as a duplicate. Thanks for the >> reference. >> >> On Tue, Dec 8, 2015 at 11:23 AM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Nick, >>> >>> thanks for pushing this and opening the JIRA issue. >>> >>> The issue came up a couple of times and a known limitation (see >>> FLINK-1256). >>> So far the workaround of marking member variables as transient and >>> initializing them in the open() method of a RichFunction has been good >>> enough for all cases I am aware of. That's probably why the issue hasn't >>> been addressed yet. >>> >>> Of course this is not a satisfying solution, if you would like to use >>> Java 8 lambda functions. >>> >>> Best, Fabian >>> >>> 2015-12-08 19:38 GMT+01:00 Nick Dimiduk <ndimi...@apache.org>: >>> >>>> That's what I feared. IMO this is very limiting when mixing in other >>>> projects where a user does not have control over those projects' APIs. At >>>> least falling back to an extensible serialization mechanism (like Kryo) >>>> allows users to register serializers external to the types they're >>>> consuming. >>>> >>>> I opened https://issues.apache.org/jira/browse/FLINK-3148 for this >>>> issue. >>>> >>>> -n >>>> >>>> On Tue, Dec 8, 2015 at 1:37 AM, Till Rohrmann <trohrm...@apache.org> >>>> wrote: >>>> >>>>> Hi Nick, >>>>> >>>>> at the moment Flink uses Java serialization to ship the UDFs to the >>>>> cluster. Therefore, the closures must only contain Serializable >>>>> objects. The serializer registration only applies to the data which is >>>>> processed by the Flink job. Thus, for the moment I would try to get rid of >>>>> the ColumnInfo object in your closure. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> >>>>> On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk <ndimi...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> I've implemented a (streaming) flow using the Java API and Java8 >>>>>> Lambdas for various map functions. When I try to run the flow, job >>>>>> submission fails because of an unserializable type. This is not a type of >>>>>> data used within the flow, but rather a small collection of objects >>>>>> captured in the closure context over one of my Lambdas. I've implemented >>>>>> and registered a Kryo Serializer for this type with this environment, >>>>>> however, it's apparently not used when serializing the lambdas. Seems >>>>>> like >>>>>> the same serialization configuration and tools of the environment should >>>>>> be >>>>>> used when preparing the job for submission. Am I missing something? >>>>>> >>>>>> Thanks, >>>>>> Nick >>>>>> >>>>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>>>> method caused an error. >>>>>> at >>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512) >>>>>> at >>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395) >>>>>> at >>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:252) >>>>>> at >>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675) >>>>>> at >>>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:326) >>>>>> at >>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977) >>>>>> at >>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027) >>>>>> Caused by: org.apache.flink.api.common.InvalidProgramException: >>>>>> Object ImportFlow$$Lambda$11/1615389290@44286963 not serializable >>>>>> at >>>>>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97) >>>>>> at >>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59) >>>>>> at >>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228) >>>>>> at >>>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149) >>>>>> at >>>>>> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550) >>>>>> at ImportFlow.assembleImportFlow(ImportFlow.java:111) >>>>>> at ImportFlow.main(ImportFlow.java:178) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> at >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>> at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.lang.reflect.Method.invoke(Method.java:497) >>>>>> at >>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497) >>>>>> ... 6 more >>>>>> Caused by: java.io.NotSerializableException: >>>>>> org.apache.phoenix.util.ColumnInfo >>>>>> at >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>>>>> at java.util.ArrayList.writeObject(ArrayList.java:762) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> at >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>> at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.lang.reflect.Method.invoke(Method.java:497) >>>>>> at >>>>>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) >>>>>> at >>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>>>>> at >>>>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307) >>>>>> at >>>>>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95) >>>>>> ... 17 more >>>>>> >>>>> >>>>> >>>> >>> >> >