Youjun Yuan created FLINK-37684:
-----------------------------------
Summary: ClassCastException: cannot assign instance of
java.lang.invoke.SerializedLambda to field
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector
Key: FLINK-37684
URL: https://issues.apache.org/jira/browse/FLINK-37684
Project: Flink
Issue Type: Bug
Components: API / DataStream
Affects Versions: 1.14.4
Reporter: Youjun Yuan
Flink job failed to run (in k8s) with below error.
The job just has 4 operators, see below. One thing interesting, is that if I
remove the second one (id=74, which is just a map operator), then it's fine.
Topo:
{code:java}
{
"nodes" : [ {
"id" : 73,
"type" : "Source: click-tracking-record-kafka_source",
"pact" : "Data Source",
"contents" : "Source: click-tracking-record-kafka_source",
"parallelism" : 1
}, {
"id" : 74,
"type" : "Proto-Deserialization",
"pact" : "Operator",
"contents" : "Proto-Deserialization",
"parallelism" : 1,
"predecessors" : [ {
"id" : 73,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 76,
"type" : "KeyedProcess",
"pact" : "Operator",
"contents" : "KeyedProcess",
"parallelism" : 1,
"predecessors" : [ {
"id" : 74,
"ship_strategy" : "HASH",
"side" : "second"
} ]
}, {
"id" : 79,
"type" : "Sink
uf-aggregate_-_to_kafka_-_click-tracking-record_-_as_-_int-dj-live-user-features-dev03",
"pact" : "Operator",
"contents" : "Sink
uf-aggregate_-_to_kafka_-_click-tracking-record_-_as_-_int-dj-live-user-features-dev03",
"parallelism" : 1,
"predecessors" : [ {
"id" : 76,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
} ]
}{code}
Full call stack:
{code:java}
Caused by: java.lang.ClassCastException: cannot assign instance of
java.lang.invoke.SerializedLambda to field
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector
of type org.apache.flink.api.java.functions.KeySelector in instance of
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner
at
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)
~[?:1.8.0_322]
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431)
~[?:1.8.0_322]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2437)
~[?:1.8.0_322]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
~[?:1.8.0_322]
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
~[?:1.8.0_322]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
~[?:1.8.0_322]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
~[?:1.8.0_322]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
~[?:1.8.0_322]
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
~[?:1.8.0_322]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
~[?:1.8.0_322]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
~[?:1.8.0_322]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
~[?:1.8.0_322]
at java.util.ArrayList.readObject(ArrayList.java:799) ~[?:1.8.0_322]
at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) ~[?:?]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_322]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_322]
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
~[?:1.8.0_322]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2322)
~[?:1.8.0_322]
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
~[?:1.8.0_322]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
~[?:1.8.0_322]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
~[?:1.8.0_322]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
~[?:1.8.0_322]
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.graph.StreamConfig.getNonChainedOutputs(StreamConfig.java:387)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.graph.StreamConfig.toString(StreamConfig.java:714)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at java.lang.String.valueOf(String.java:2994) ~[?:1.8.0_322]
at java.lang.StringBuilder.append(StringBuilder.java:136) ~[?:1.8.0_322]
at java.util.AbstractMap.toString(AbstractMap.java:559) ~[?:1.8.0_322]
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1421)
~[?:1.8.0_322]
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
~[?:1.8.0_322]
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
~[?:1.8.0_322]
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:548)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.graph.StreamConfig.setTransitiveChainedTaskConfigs(StreamConfig.java:495)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:479)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:377)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:178)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:116)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:985)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:50)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:39)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:56)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.submitAndGetJobClientFuture(EmbeddedExecutor.java:122)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.execute(EmbeddedExecutor.java:104)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2042)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:137)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1916)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
com.thetradedesk.streaming.services.databahnjunction.DatabahnJunction.main(DatabahnJunction.java:56)
~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_322]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_322]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_322]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_322]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)