Hi Amit, could this be related [1]? How do you build your job?
[1] https://bugs.eclipse.org/bugs/show_bug.cgi?id=439889 Cheers, Till On Wed, Jan 3, 2018 at 2:55 PM, Timo Walther <twal...@apache.org> wrote: > Hi Amit, > > which of the two lambdas caused the error? I guess it was the mapper after > the parquet input, right? In both cases this should not happen. Maybe you > can open an issue with a small reproducible code example? > > Thanks. > > Regards, > Timo > > > Am 1/3/18 um 12:15 PM schrieb Amit Jain: > > Hi Timo, >> >> Thanks a lot! Quick re-look over the code helped me to detect used >> lambdas. >> I was using lambdas in two cases which are following. >> >> DataSet<GenericRecord> newMainDataSet = mainDataSet >> >> .fullOuterJoin(latestDeltaDataSet, JoinHint.REPARTITION_HASH_SECOND) >> .where(keySelector).equalTo(keySelector) >> >> * .with((first, second) -> first != null && second != null ? second >> : (first != null ? first : second))* .filter(filterFunction) >> .returns(GenericRecord.class); >> >> DataSet<GenericRecord> mainDataSet = >> >> mergeTableSecond.readParquet(mainPath, avroSchema, env) >> .withParameters(parameters) >> * .map(**t -> t.f1* >> *)* .returns(GenericRecord.class); >> >> >> >> On Wed, Jan 3, 2018 at 4:18 PM, Timo Walther <twal...@apache.org> wrote: >> >> Hi Amit, >>> >>> are you using lambdas as parameters of a Flink Function or in a member >>> variable? If yes, can you share an lambda example that fails? >>> >>> Regards, >>> Timo >>> >>> >>> Am 1/3/18 um 11:41 AM schrieb Amit Jain: >>> >>> Hi, >>>> >>>> I'm writing a job to merge old data with changelogs using DataSet API >>>> where >>>> I'm reading changelog using TextInputFormat and old data using >>>> HadoopInputFormat. >>>> >>>> I can see, job manager has successfully deployed the program flow to >>>> worker >>>> nodes. However, workers are immediately going to failed state because of >>>> *Caused by: java.lang.IllegalArgumentException: Invalid lambda >>>> deserialization* >>>> >>>> >>>> Complete stack trace >>>> java.lang.RuntimeException: The initialization of the DataSource's >>>> outputs >>>> caused an error: Could not read the user code wrapper: unexpected >>>> exception >>>> type >>>> at >>>> org.apache.flink.runtime.operators.DataSourceTask.invoke( >>>> DataSourceTask.java:94) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: >>>> org.apache.flink.runtime.operators.util.CorruptConfigurationException: >>>> Could not read the user code wrapper: unexpected exception type >>>> at >>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWr >>>> apper(TaskConfig.java:290) >>>> at >>>> org.apache.flink.runtime.operators.BatchTask.instantiateUser >>>> Code(BatchTask.java:1432) >>>> at >>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver >>>> .setup(ChainedMapDriver.java:39) >>>> at >>>> org.apache.flink.runtime.operators.chaining.ChainedDriver. >>>> setup(ChainedDriver.java:90) >>>> at >>>> org.apache.flink.runtime.operators.BatchTask.initOutputs( >>>> BatchTask.java:1299) >>>> at >>>> org.apache.flink.runtime.operators.DataSourceTask.initOutput >>>> s(DataSourceTask.java:287) >>>> at >>>> org.apache.flink.runtime.operators.DataSourceTask.invoke( >>>> DataSourceTask.java:91) >>>> ... 2 more >>>> Caused by: java.io.IOException: unexpected exception type >>>> at java.io.ObjectStreamClass.throwMiscException(ObjectStreamCla >>>> ss.java:1682) >>>> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClas >>>> s.java:1254) >>>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre >>>> am.java:2078) >>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) >>>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea >>>> m.java:2287) >>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream. >>>> java:2211) >>>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre >>>> am.java:2069) >>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) >>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:433) >>>> at >>>> org.apache.flink.util.InstantiationUtil.deserializeObject(In >>>> stantiationUtil.java:290) >>>> at >>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig >>>> (InstantiationUtil.java:248) >>>> at >>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWr >>>> apper(TaskConfig.java:288) >>>> ... 8 more >>>> Caused by: java.lang.reflect.InvocationTargetException >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >>>> ssorImpl.java:62) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >>>> thodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>> at java.lang.invoke.SerializedLambda.readResolve(SerializedLamb >>>> da.java:230) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >>>> ssorImpl.java:62) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >>>> thodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClas >>>> s.java:1248) >>>> ... 18 more >>>> Caused by: java.lang.IllegalArgumentException: Invalid lambda >>>> deserialization >>>> at >>>> org.myorg.quickstart.MergeTableSecond.$deserializeLambda$(Me >>>> rgeTableSecond.java:41) >>>> ... 28 more >>>> >>>> >>>> Running Environment >>>> Flink: 1.3.2 >>>> Java: openjdk version "1.8.0_151" >>>> >>>> Please help us resolve this issue. >>>> >>>> >>>> -- >>>> Thanks, >>>> Amit >>>> >>>> >>>> >