Hi Komal, Thanks for your example code! I found that Flink ML has a bug when it comes to keyed two input operators. I have submitted a PR to fix this bug and you can build the Flink ML library for your program according to its document after this PR is approved.
The bugfix PR: https://github.com/apache/flink-ml/pull/260 The document to build Flink ML: https://github.com/apache/flink-ml?tab=readme-ov-file#building-the-project Best, Yunfeng On Mon, Apr 8, 2024 at 11:02 AM Komal M <komal.mar...@gmail.com> wrote: > > Hi Yungfeng, > > > Thank you so much for getting back! > > For the first bug, here is a sample code that should reproduce it. All it > does is subtract 1 from the feedback stream until the tuples reach 0.0. For > each subtraction it outputs a relevant message in the ‘finalOutput’ stream. > These messages are stored in the keyedState of KeyedCoProcessFunction and are > populated by a dataset stream called initialStates. For each key there are > different messages associated with it, hence the need for MapState. > > For the second bug, let me compare my implementation to the references you > have provided and get back to you on that. > > > import java.util.*; > import org.apache.flink.api.common.state.MapState; > import org.apache.flink.api.common.state.MapStateDescriptor; > import org.apache.flink.api.common.state.ValueStateDescriptor; > import org.apache.flink.api.common.typeinfo.BasicTypeInfo; > import org.apache.flink.api.common.typeinfo.TypeHint; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.common.typeinfo.Types; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.tuple.Tuple3; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.iteration.DataStreamList; > import org.apache.flink.iteration.IterationBodyResult; > import org.apache.flink.iteration.Iterations; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; > import org.apache.flink.util.Collector; > import org.apache.flink.util.OutputTag; > > > public class Test { > public static void main(String[] args) throws Exception { > // Sets up the execution environment, which is the main entry point > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(); > > // sample datastreams (they are assumed to be unbounded streams outside of > this test environment) > List<Tuple2<String, Double>> feedbackinitializer = Arrays.asList( > new Tuple2<>("A", 2.0), > new Tuple2<>("B", 3.0), > new Tuple2<>("C", 1.0), > new Tuple2<>("D", 1.0) > ); > > List<Tuple3<String, Double, String>> initialStates = Arrays.asList( > new Tuple3<>("A", 0.0, "Final Output A"), > new Tuple3<>("A", 1.0, "Test 1A"), > new Tuple3<>("B", 2.0, "Test 2B"), > new Tuple3<>("B", 1.0, "Test 1B"), > new Tuple3<>("B", 0.0, "Final Output B"), > new Tuple3<>("C", 0.0, "No Change C"), > new Tuple3<>("D", 0.0, "Final Output D") > ); > > > DataStream<Tuple2<String, Double>> feedbackStream = > env.fromCollection(feedbackinitializer); > DataStream<Tuple3<String, Double, String>> initialStateStream = > env.fromCollection(initialStates); > > //parallelize > DataStream<Tuple2<String, Double>> feedbackParallel = > feedbackStream.keyBy(x -> x.f0) > .map(i -> Tuple2.of(i.f0,i.f1)) > .returns(Types.TUPLE(Types.STRING, Types.DOUBLE)); > DataStream<Tuple3<String, Double, String>> initialStateParallel = > initialStateStream.keyBy(x -> x.f0) > .map(i -> Tuple3.of(i.f0,i.f1,i.f2)) > .returns(Types.TUPLE(Types.STRING, Types.DOUBLE, > Types.STRING)); > > > > //iterate > DataStreamList result = Iterations.iterateUnboundedStreams( > DataStreamList.of(feedbackParallel), > DataStreamList.of(initialStateParallel), > (variableStreams, dataStreams) -> { > DataStream<Tuple2<String, Double>> modelUpdate = > variableStreams.get(0); > DataStream<Tuple3<String, Double, String>> stateStream = > dataStreams.get(0); > > > OutputTag<String> finalOutputTag = new > OutputTag<String>("msgs") { > }; > > SingleOutputStreamOperator<Tuple2<String, Double>> > newModelUpdate = stateStream.connect(modelUpdate).keyBy(0, 0).process(new > KeyedCoProcessFunction<String, Tuple3<String, Double, String>, Tuple2<String, > Double>, Tuple2<String, Double>>() { > private transient MapState<Double, String> state; > > @Override > public void processElement1(Tuple3<String, Double, > String> stateUpdate, Context context, Collector<Tuple2<String, Double>> > collector) throws Exception { > state.put(stateUpdate.f1, stateUpdate.f2); //load > stateStream into mapState > } > > @Override > public void processElement2(Tuple2<String, Double> > modelUpdate, Context context, Collector<Tuple2<String, Double>> collector) > throws Exception { > double weight = modelUpdate.f1; > weight = weight - 1; > //subtract 1 until 0.0 > if (weight > -1.0) { > collector.collect(Tuple2.of(modelUpdate.f0, > weight)); > context.output(finalOutputTag, > state.get(weight)); > } > } > > @Override > public void open(Configuration config) { > MapStateDescriptor<Double, String> > stateDescriptor = > new MapStateDescriptor<>( > "statedescriptor", // the state > name > BasicTypeInfo.DOUBLE_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO > ); > this.state = > getRuntimeContext().getMapState(stateDescriptor); > > } > > }); > > DataStream<String> finalOutput = > newModelUpdate.getSideOutput(finalOutputTag); > > return new IterationBodyResult( > DataStreamList.of(newModelUpdate), > DataStreamList.of(finalOutput)); > }); > > result.get(0).print(); > > // Execute program > env.execute("Flink Java API Skeleton"); > } > } > > Best, > Komal > > From: Yunfeng Zhou <flink.zhouyunf...@gmail.com> > Date: Sunday, April 7, 2024 11:36 > To: dev@flink.apache.org <dev@flink.apache.org> > Subject: Re: Two potential bugs in Flink ML > Hi Komal, > > For the first question, could you please provide a simple program that > could help reproduce this exception? That could help us better find > out the bugs (if any) in Flink ML. > > For the second question, there have been Functions implementing the > IterationListener interface in Flink ML[1] and I just manually > verified that their onEpochWatermarkIncremented method can be invoked > in the test cases. You may check whether there is any difference > between your implementation and that in the Flink ML repo, and please > also feel free to provide a program that we can check together. > > [1] > https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/iteration/ForwardInputsOfLastRound.java#L45 > > Best, > Yunfeng > > On Fri, Apr 5, 2024 at 2:01 PM Komal M <komal.mar...@gmail.com> wrote: > > > > Hi Flink Dev Team, > > I have two possible bugs to report for Flink ML Iteration. > > Flink v1.17.2 > > Flink ML v2.3.0 > > Java 11 > > > > Bug # 1 > > Implementing a UDF KeyedRichCoProcessFunction or CoFlatMapFunction inside > > IterationBody yields a “java.lang.ClassCastException: > > org.apache.flink.iteration.IterationRecord cannot be cast to class > > org.apache.flink.api.java.tuple.Tuple” error. For reference, I do not get > > any error when applying .keyBy().flatMap()on the streams individually > > inside the iteration body. > > > > Exception in thread "main" > > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > > at > > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > > …. > > at > > java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) > > Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by > > NoRestartBackoffTimeStrategy > > at > > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) > > … > > at akka.dispatch.Mailbox.exec(Mailbox.scala:243) > > ... 5 more > > Caused by: java.lang.ClassCastException: class > > org.apache.flink.iteration.IterationRecord cannot be cast to class > > org.apache.flink.api.java.tuple.Tuple > > (org.apache.flink.iteration.IterationRecord and > > org.apache.flink.api.java.tuple.Tuple are in unnamed module of loader 'app') > > at > > org.apache.flink.api.java.typeutils.runtime.TupleComparator.extractKeys(TupleComparator.java:148) > > at > > org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:195) > > at > > org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:168) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:502) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478) > > at > > org.apache.flink.iteration.operator.allround.AbstractAllRoundWrapperOperator.setKeyContextElement1(AbstractAllRoundWrapperOperator.java:203) > > at > > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor1$1(RecordProcessorUtils.java:87) > > at > > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:254) > > at > > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > > at > > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > > at > > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > > at > > org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85) > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > > at > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > > at > > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > > at > > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > > at > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > > at > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > > at java.base/java.lang.Thread.run(Thread.java:829) > > > > > > > > Potential Bug # 2 > > > > The onEpochWatermarkIncremented method is never invoked when the > > IterationListener<T> interface is implemented by a UDF inside the > > iterationBody. > > > > > > > > // method is invoked from within IterationBody > > > > public class ComputeML2 extends KeyedProcessFunction<String, > > Tuple2<Integer, String>, Tuple2<Integer, String>> implements > > IterationListener<Tuple2<Integer, String>> { > > > > // this method is never invoked, getting no output > > > > @Override > > > > public void onEpochWatermarkIncremented(int epochWaterMark, > > IterationListener.Context context, Collector<Tuple2<Integer, String>> > > collector) throws Exception { > > > > collector.collect(Tuple2.of(epochWaterMark,"epoch")); //Bug: no > > output > > > > } > > > > > > > > > > @Override > > > > public void onIterationTerminated(IterationListener.Context context, > > Collector<Tuple2<Integer, String>> collector) throws Exception { > > > > } > > > > > > @Override > > > > public void processElement(Tuple2<Integer, String> integerStringTuple2, > > KeyedProcessFunction<String, Tuple2<Integer, String>, Tuple2<Integer, > > String>>.Context context, Collector<Tuple2<Integer, String>> collector) > > throws Exception { > > > > // some processing here > > > > } > > > > > > } > > > > > > > > > > Let me know if I should submit these issues on JIRA. > > Thank you so much > > Komal > >