Hi Timo,

Thanks a lot! Quick re-look over the code helped me to detect used lambdas.
I was using lambdas in two cases which are following.

DataSet<GenericRecord> newMainDataSet = mainDataSet

    .fullOuterJoin(latestDeltaDataSet, JoinHint.REPARTITION_HASH_SECOND)
    .where(keySelector).equalTo(keySelector)

*    .with((first, second) -> first != null && second != null ? second
: (first != null ? first : second))*    .filter(filterFunction)
    .returns(GenericRecord.class);

DataSet<GenericRecord> mainDataSet =

    mergeTableSecond.readParquet(mainPath, avroSchema, env)
        .withParameters(parameters)
*        .map(**t -> t.f1*
*)*        .returns(GenericRecord.class);



On Wed, Jan 3, 2018 at 4:18 PM, Timo Walther <twal...@apache.org> wrote:

> Hi Amit,
>
> are you using lambdas as parameters of a Flink Function or in a member
> variable? If yes, can you share an lambda example that fails?
>
> Regards,
> Timo
>
>
> Am 1/3/18 um 11:41 AM schrieb Amit Jain:
>
>> Hi,
>>
>> I'm writing a job to merge old data with changelogs using DataSet API
>> where
>> I'm reading changelog using TextInputFormat and old data using
>> HadoopInputFormat.
>>
>> I can see, job manager has successfully deployed the program flow to
>> worker
>> nodes. However, workers are immediately going to failed state because of
>> *Caused by: java.lang.IllegalArgumentException: Invalid lambda
>> deserialization*
>>
>>
>> Complete stack trace
>> java.lang.RuntimeException: The initialization of the DataSource's outputs
>> caused an error: Could not read the user code wrapper: unexpected
>> exception
>> type
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(
>> DataSourceTask.java:94)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by:
>> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
>> Could not read the user code wrapper: unexpected exception type
>> at
>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWr
>> apper(TaskConfig.java:290)
>> at
>> org.apache.flink.runtime.operators.BatchTask.instantiateUser
>> Code(BatchTask.java:1432)
>> at
>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver
>> .setup(ChainedMapDriver.java:39)
>> at
>> org.apache.flink.runtime.operators.chaining.ChainedDriver.
>> setup(ChainedDriver.java:90)
>> at
>> org.apache.flink.runtime.operators.BatchTask.initOutputs(
>> BatchTask.java:1299)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.initOutput
>> s(DataSourceTask.java:287)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(
>> DataSourceTask.java:91)
>> ... 2 more
>> Caused by: java.io.IOException: unexpected exception type
>> at java.io.ObjectStreamClass.throwMiscException(ObjectStreamCla
>> ss.java:1682)
>> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClas
>> s.java:1254)
>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre
>> am.java:2078)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea
>> m.java:2287)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre
>> am.java:2069)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:433)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(In
>> stantiationUtil.java:290)
>> at
>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig
>> (InstantiationUtil.java:248)
>> at
>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWr
>> apper(TaskConfig.java:288)
>> ... 8 more
>> Caused by: java.lang.reflect.InvocationTargetException
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at java.lang.invoke.SerializedLambda.readResolve(SerializedLamb
>> da.java:230)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClas
>> s.java:1248)
>> ... 18 more
>> Caused by: java.lang.IllegalArgumentException: Invalid lambda
>> deserialization
>> at
>> org.myorg.quickstart.MergeTableSecond.$deserializeLambda$(Me
>> rgeTableSecond.java:41)
>> ... 28 more
>>
>>
>> Running Environment
>> Flink: 1.3.2
>> Java: openjdk version "1.8.0_151"
>>
>> Please help us resolve this issue.
>>
>>
>> --
>> Thanks,
>> Amit
>>
>>
>

Reply via email to