Hi Timo, Thanks a lot! Quick re-look over the code helped me to detect used lambdas. I was using lambdas in two cases which are following.
DataSet<GenericRecord> newMainDataSet = mainDataSet .fullOuterJoin(latestDeltaDataSet, JoinHint.REPARTITION_HASH_SECOND) .where(keySelector).equalTo(keySelector) * .with((first, second) -> first != null && second != null ? second : (first != null ? first : second))* .filter(filterFunction) .returns(GenericRecord.class); DataSet<GenericRecord> mainDataSet = mergeTableSecond.readParquet(mainPath, avroSchema, env) .withParameters(parameters) * .map(**t -> t.f1* *)* .returns(GenericRecord.class); On Wed, Jan 3, 2018 at 4:18 PM, Timo Walther <twal...@apache.org> wrote: > Hi Amit, > > are you using lambdas as parameters of a Flink Function or in a member > variable? If yes, can you share an lambda example that fails? > > Regards, > Timo > > > Am 1/3/18 um 11:41 AM schrieb Amit Jain: > >> Hi, >> >> I'm writing a job to merge old data with changelogs using DataSet API >> where >> I'm reading changelog using TextInputFormat and old data using >> HadoopInputFormat. >> >> I can see, job manager has successfully deployed the program flow to >> worker >> nodes. However, workers are immediately going to failed state because of >> *Caused by: java.lang.IllegalArgumentException: Invalid lambda >> deserialization* >> >> >> Complete stack trace >> java.lang.RuntimeException: The initialization of the DataSource's outputs >> caused an error: Could not read the user code wrapper: unexpected >> exception >> type >> at >> org.apache.flink.runtime.operators.DataSourceTask.invoke( >> DataSourceTask.java:94) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: >> org.apache.flink.runtime.operators.util.CorruptConfigurationException: >> Could not read the user code wrapper: unexpected exception type >> at >> org.apache.flink.runtime.operators.util.TaskConfig.getStubWr >> apper(TaskConfig.java:290) >> at >> org.apache.flink.runtime.operators.BatchTask.instantiateUser >> Code(BatchTask.java:1432) >> at >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver >> .setup(ChainedMapDriver.java:39) >> at >> org.apache.flink.runtime.operators.chaining.ChainedDriver. >> setup(ChainedDriver.java:90) >> at >> org.apache.flink.runtime.operators.BatchTask.initOutputs( >> BatchTask.java:1299) >> at >> org.apache.flink.runtime.operators.DataSourceTask.initOutput >> s(DataSourceTask.java:287) >> at >> org.apache.flink.runtime.operators.DataSourceTask.invoke( >> DataSourceTask.java:91) >> ... 2 more >> Caused by: java.io.IOException: unexpected exception type >> at java.io.ObjectStreamClass.throwMiscException(ObjectStreamCla >> ss.java:1682) >> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClas >> s.java:1254) >> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre >> am.java:2078) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) >> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea >> m.java:2287) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) >> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre >> am.java:2069) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:433) >> at >> org.apache.flink.util.InstantiationUtil.deserializeObject(In >> stantiationUtil.java:290) >> at >> org.apache.flink.util.InstantiationUtil.readObjectFromConfig >> (InstantiationUtil.java:248) >> at >> org.apache.flink.runtime.operators.util.TaskConfig.getStubWr >> apper(TaskConfig.java:288) >> ... 8 more >> Caused by: java.lang.reflect.InvocationTargetException >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >> ssorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >> thodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at java.lang.invoke.SerializedLambda.readResolve(SerializedLamb >> da.java:230) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >> ssorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >> thodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClas >> s.java:1248) >> ... 18 more >> Caused by: java.lang.IllegalArgumentException: Invalid lambda >> deserialization >> at >> org.myorg.quickstart.MergeTableSecond.$deserializeLambda$(Me >> rgeTableSecond.java:41) >> ... 28 more >> >> >> Running Environment >> Flink: 1.3.2 >> Java: openjdk version "1.8.0_151" >> >> Please help us resolve this issue. >> >> >> -- >> Thanks, >> Amit >> >> >