Zhipeng Zhang created FLINK-31255: ------------------------------------- Summary: OperatorUtils#createWrappedOperatorConfig fails to wrap operator config Key: FLINK-31255 URL: https://issues.apache.org/jira/browse/FLINK-31255 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.1.0, ml-2.0.0, ml-2.2.0 Reporter: Zhipeng Zhang
Currently we use operator wrapper to enable using normal operators in iterations. However, teh operatorConfig is not correctly unwrapped. For example, the following code fails because of wrong type serializer. {code:java} @Test public void testIterationWithMapPartition() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Long> input = env.fromParallelCollection(new NumberSequenceIterator(0L, 5L), Types.LONG); DataStreamList result = Iterations.iterateBoundedStreamsUntilTermination( DataStreamList.of(input), ReplayableDataStreamList.notReplay(input), IterationConfig.newBuilder() .setOperatorLifeCycle(OperatorLifeCycle.PER_ROUND) .build(), new IterationBodyWithMapPartition()); List<Integer> counts = IteratorUtils.toList(result.get(0).executeAndCollect()); System.out.println(counts.size()); } private static class IterationBodyWithMapPartition implements IterationBody { @Override public IterationBodyResult process( DataStreamList variableStreams, DataStreamList dataStreams) { DataStream<Long> input = variableStreams.get(0); DataStream<Long> mapPartitionResult = DataStreamUtils.mapPartition( input, new MapPartitionFunction <Long, Long>() { @Override public void mapPartition(Iterable <Long> iterable, Collector <Long> collector) throws Exception { for (Long iter: iterable) { collector.collect(iter); } } }); DataStream<Integer> terminationCriteria = mapPartitionResult.<Long>flatMap(new TerminateOnMaxIter(2)).returns(Types.INT); return new IterationBodyResult( DataStreamList.of(mapPartitionResult), variableStreams, terminationCriteria); } } {code} The error stack is: Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.flink.iteration.IterationRecord at org.apache.flink.iteration.typeinfo.IterationRecordSerializer.serialize(IterationRecordSerializer.java:34) at org.apache.flink.iteration.datacache.nonkeyed.FileSegmentWriter.addRecord(FileSegmentWriter.java:79) at org.apache.flink.iteration.datacache.nonkeyed.DataCacheWriter.addRecord(DataCacheWriter.java:107) at org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache.add(ListStateWithCache.java:148) at org.apache.flink.ml.common.datastream.DataStreamUtils$MapPartitionOperator.processElement(DataStreamUtils.java:445) at org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:69) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748) -- This message was sent by Atlassian Jira (v8.20.10#820010)