[ https://issues.apache.org/jira/browse/FLINK-31486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703005#comment-17703005 ]
Dong Lin commented on FLINK-31486: ---------------------------------- Merged to apache/flink-ml master branch fa5f47ea2a09360143aab5f39b85b373675636ad > Using KeySelector in IterationBody causes ClassCastException > ------------------------------------------------------------ > > Key: FLINK-31486 > URL: https://issues.apache.org/jira/browse/FLINK-31486 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning > Reporter: Jiang Xin > Assignee: Jiang Xin > Priority: Major > Labels: pull-request-available > Fix For: ml-2.2.0 > > > We have the following code which uses CoGroup along with KeySelector in an > IterationBody. When we submit to Flink Session cluster, the exception raises. > {code:java} > public static void main(String[] args) throws Exception { > Configuration config = new Configuration(); > config.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 5000000L); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(config); > env.setStateBackend(new EmbeddedRocksDBStateBackend()); > env.getConfig().enableObjectReuse(); > env.setRestartStrategy(RestartStrategies.noRestart()); > env.setParallelism(1); > env.getCheckpointConfig().disableCheckpointing(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); > int num = 400; > int types = num / 10; > Random rand = new Random(0); > long[] randoms = new long[types]; > for (int i = 0; i < types; i++) { > randoms[i] = rand.nextInt(types); > } > SourceFunction<Row> rowGenerator = > new SourceFunction<Row>() { > @Override > public final void run(SourceContext<Row> ctx) throws > Exception { > int cnt = 0; > while (cnt < num) { > ctx.collect( > Row.of( > randoms[cnt % (types)], > randoms[cnt % (types)], > new DenseVector(10))); > cnt++; > } > } > @Override > public void cancel() {} > }; > Table trainDataTable = > tEnv.fromDataStream( > env.addSource(rowGenerator, "sourceOp-" + 1) > .returns( > Types.ROW( > Types.LONG, > Types.LONG, > DenseVectorTypeInfo.INSTANCE))); > testCoGroupWithIteration(tEnv, trainDataTable); > } > public static void testCoGroupWithIteration(StreamTableEnvironment tEnv, > Table trainDataTable) > throws Exception { > DataStream<Row> data1 = tEnv.toDataStream(trainDataTable); > DataStream<Row> data2 = tEnv.toDataStream(trainDataTable); > DataStreamList coResult = > Iterations.iterateBoundedStreamsUntilTermination( > DataStreamList.of(data1), > ReplayableDataStreamList.notReplay(data2), > IterationConfig.newBuilder().build(), > new TrainIterationBody()); > List<Integer> counts = > IteratorUtils.toList(coResult.get(0).executeAndCollect()); > System.out.println(counts.size()); > } > private static class TrainIterationBody implements IterationBody { > @Override > public IterationBodyResult process( > DataStreamList variableStreams, DataStreamList dataStreams) { > DataStreamList feedbackVariableStream = > IterationBody.forEachRound( > dataStreams, > input -> { > DataStream<Row> dataStream1 = > variableStreams.get(0); > DataStream<Row> dataStream2 = dataStreams.get(0); > DataStream<Row> coResult = > dataStream1 > .coGroup(dataStream2) > .where( > (KeySelector<Row, Long>) > t2 -> > t2.getFieldAs(0)) > .equalTo( > (KeySelector<Row, Long>) > t2 -> > t2.getFieldAs(1)) > .window(EndOfStreamWindows.get()) > .apply( > new > RichCoGroupFunction<Row, Row, Row>() { > @Override > public void coGroup( > Iterable<Row> > iterable, > Iterable<Row> > iterable1, > > Collector<Row> collector) { > for (Row row : > iterable1) { > > collector.collect(row); > } > } > }); > return DataStreamList.of(coResult); > }); > DataStream<Integer> terminationCriteria = > feedbackVariableStream > .get(0) > .flatMap(new TerminateOnMaxIter(2)) > .returns(Types.INT); > return new IterationBodyResult( > feedbackVariableStream, feedbackVariableStream, > terminationCriteria); > } > } {code} > The exception is as below. Note that the exception can not be reproduced in > the unittest with MiniCluster since all classes are in the Java classpath. > {code:java} > Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: > Could not instantiate state partitioner. at > org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662) > at > org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96) > at > org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168) > at > org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146) > at > org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at > java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ClassCastException: cannot assign instance of > java.lang.invoke.SerializedLambda to field > org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector.keySelector1 > of type org.apache.flink.api.java.functions.KeySelector in instance of > org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector > at > java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302) > at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543) > at > org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:659) > ... 17 more {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)