Hi Komal,

Thanks for your example code! I found that Flink ML has a bug when it
comes to keyed two input operators. I have submitted a PR to fix this
bug and you can build the Flink ML library for your program according
to its document after this PR is approved.

The bugfix PR: https://github.com/apache/flink-ml/pull/260
The document to build Flink ML:
https://github.com/apache/flink-ml?tab=readme-ov-file#building-the-project

Best,
Yunfeng

On Mon, Apr 8, 2024 at 11:02 AM Komal M <komal.mar...@gmail.com> wrote:
>
> Hi Yungfeng,
>
>
> Thank you so much for getting back!
>
> For the first bug, here is a sample code that should reproduce it. All it 
> does is subtract 1 from the feedback stream until the tuples reach 0.0. For 
> each subtraction it outputs a relevant message in the ‘finalOutput’ stream. 
> These messages are stored in the keyedState of KeyedCoProcessFunction and are 
> populated by a dataset stream called initialStates. For each key there are 
> different messages associated with it, hence the need for MapState.
>
>  For the second bug, let me compare my implementation to the references you 
> have provided and get back to you on that.
>
>
> import java.util.*;
> import org.apache.flink.api.common.state.MapState;
> import org.apache.flink.api.common.state.MapStateDescriptor;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.iteration.DataStreamList;
> import org.apache.flink.iteration.IterationBodyResult;
> import org.apache.flink.iteration.Iterations;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
> import org.apache.flink.util.Collector;
> import org.apache.flink.util.OutputTag;
>
>
> public class Test {
>     public static void main(String[] args) throws Exception {
> // Sets up the execution environment, which is the main entry point
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment();
>
> // sample datastreams (they are assumed to be unbounded streams outside of 
> this test environment)
>         List<Tuple2<String, Double>> feedbackinitializer = Arrays.asList(
>                 new Tuple2<>("A", 2.0),
>                 new Tuple2<>("B", 3.0),
>                 new Tuple2<>("C", 1.0),
>                 new Tuple2<>("D", 1.0)
>         );
>
>         List<Tuple3<String, Double, String>> initialStates = Arrays.asList(
>                 new Tuple3<>("A", 0.0, "Final Output A"),
>                 new Tuple3<>("A", 1.0, "Test 1A"),
>                 new Tuple3<>("B", 2.0, "Test 2B"),
>                 new Tuple3<>("B", 1.0, "Test 1B"),
>                 new Tuple3<>("B", 0.0, "Final Output B"),
>                 new Tuple3<>("C", 0.0, "No Change C"),
>                 new Tuple3<>("D", 0.0, "Final Output D")
>         );
>
>
>         DataStream<Tuple2<String, Double>> feedbackStream = 
> env.fromCollection(feedbackinitializer);
>         DataStream<Tuple3<String, Double, String>> initialStateStream = 
> env.fromCollection(initialStates);
>
> //parallelize
>         DataStream<Tuple2<String, Double>> feedbackParallel = 
> feedbackStream.keyBy(x -> x.f0)
>                 .map(i -> Tuple2.of(i.f0,i.f1))
>                 .returns(Types.TUPLE(Types.STRING, Types.DOUBLE));
>         DataStream<Tuple3<String, Double, String>> initialStateParallel = 
> initialStateStream.keyBy(x -> x.f0)
>                 .map(i -> Tuple3.of(i.f0,i.f1,i.f2))
>                 .returns(Types.TUPLE(Types.STRING, Types.DOUBLE, 
> Types.STRING));
>
>
>
> //iterate
>         DataStreamList result = Iterations.iterateUnboundedStreams(
>                 DataStreamList.of(feedbackParallel),
>                 DataStreamList.of(initialStateParallel),
>                 (variableStreams, dataStreams) -> {
>                     DataStream<Tuple2<String, Double>> modelUpdate = 
> variableStreams.get(0);
>                     DataStream<Tuple3<String, Double, String>> stateStream = 
> dataStreams.get(0);
>
>
>                     OutputTag<String> finalOutputTag = new 
> OutputTag<String>("msgs") {
>                     };
>
>                     SingleOutputStreamOperator<Tuple2<String, Double>> 
> newModelUpdate = stateStream.connect(modelUpdate).keyBy(0, 0).process(new 
> KeyedCoProcessFunction<String, Tuple3<String, Double, String>, Tuple2<String, 
> Double>, Tuple2<String, Double>>() {
>                         private transient MapState<Double, String> state;
>
>                         @Override
>                         public void processElement1(Tuple3<String, Double, 
> String> stateUpdate, Context context, Collector<Tuple2<String, Double>> 
> collector) throws Exception {
>                             state.put(stateUpdate.f1, stateUpdate.f2); //load 
> stateStream into mapState
>                         }
>
>                         @Override
>                         public void processElement2(Tuple2<String, Double> 
> modelUpdate, Context context, Collector<Tuple2<String, Double>> collector) 
> throws Exception {
>                             double weight = modelUpdate.f1;
>                             weight = weight - 1;
> //subtract 1 until 0.0
>                             if (weight > -1.0) {
>                                 collector.collect(Tuple2.of(modelUpdate.f0, 
> weight));
>                                 context.output(finalOutputTag, 
> state.get(weight));
>                             }
>                         }
>
>                         @Override
>                         public void open(Configuration config) {
>                             MapStateDescriptor<Double, String> 
> stateDescriptor =
>                                     new MapStateDescriptor<>(
>                                             "statedescriptor", // the state 
> name
>                                             BasicTypeInfo.DOUBLE_TYPE_INFO,
>                                             BasicTypeInfo.STRING_TYPE_INFO
>                                             );
>                             this.state = 
> getRuntimeContext().getMapState(stateDescriptor);
>
>                         }
>
>                     });
>
>                     DataStream<String> finalOutput = 
> newModelUpdate.getSideOutput(finalOutputTag);
>
>                     return new IterationBodyResult(
>                             DataStreamList.of(newModelUpdate),
>                             DataStreamList.of(finalOutput));
>                 });
>
>         result.get(0).print();
>
> // Execute program
>         env.execute("Flink Java API Skeleton");
>     }
> }
>
> Best,
> Komal
>
> From: Yunfeng Zhou <flink.zhouyunf...@gmail.com>
> Date: Sunday, April 7, 2024 11:36
> To: dev@flink.apache.org <dev@flink.apache.org>
> Subject: Re: Two potential bugs in Flink ML
> Hi Komal,
>
> For the first question, could you please provide a simple program that
> could help reproduce this exception? That could help us better find
> out the bugs (if any) in Flink ML.
>
> For the second question, there have been Functions implementing the
> IterationListener interface in Flink ML[1] and I just manually
> verified that their onEpochWatermarkIncremented method can be invoked
> in the test cases. You may check whether there is any difference
> between your implementation and that in the Flink ML repo, and please
> also feel free to provide a program that we can check together.
>
> [1] 
> https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/iteration/ForwardInputsOfLastRound.java#L45
>
> Best,
> Yunfeng
>
> On Fri, Apr 5, 2024 at 2:01 PM Komal M <komal.mar...@gmail.com> wrote:
> >
> > Hi Flink Dev Team,
> > I have two possible bugs to report for Flink ML Iteration.
> > Flink v1.17.2
> > Flink ML v2.3.0
> > Java 11
> >
> > Bug # 1
> > Implementing a UDF KeyedRichCoProcessFunction or CoFlatMapFunction inside 
> > IterationBody yields a “java.lang.ClassCastException: 
> > org.apache.flink.iteration.IterationRecord cannot be cast to class 
> > org.apache.flink.api.java.tuple.Tuple” error. For reference, I do not get 
> > any error when applying  .keyBy().flatMap()on the streams individually 
> > inside the iteration body.
> >
> > Exception in thread "main" 
> > org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> >                 at 
> > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> >                 ….
> >                 at 
> > java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> > Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> > NoRestartBackoffTimeStrategy
> >                 at 
> > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
> > …
> >                 at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
> >                 ... 5 more
> > Caused by: java.lang.ClassCastException: class 
> > org.apache.flink.iteration.IterationRecord cannot be cast to class 
> > org.apache.flink.api.java.tuple.Tuple 
> > (org.apache.flink.iteration.IterationRecord and 
> > org.apache.flink.api.java.tuple.Tuple are in unnamed module of loader 'app')
> >                 at 
> > org.apache.flink.api.java.typeutils.runtime.TupleComparator.extractKeys(TupleComparator.java:148)
> >                 at 
> > org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:195)
> >                 at 
> > org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:168)
> >                 at 
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:502)
> >                 at 
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
> >                 at 
> > org.apache.flink.iteration.operator.allround.AbstractAllRoundWrapperOperator.setKeyContextElement1(AbstractAllRoundWrapperOperator.java:203)
> >                 at 
> > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor1$1(RecordProcessorUtils.java:87)
> >                 at 
> > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:254)
> >                 at 
> > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
> >                 at 
> > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> >                 at 
> > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> >                 at 
> > org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
> >                 at 
> > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> >                 at 
> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> >                 at 
> > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> >                 at 
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> >                 at 
> > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> >                 at 
> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> >                 at 
> > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> >                 at 
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> >                 at java.base/java.lang.Thread.run(Thread.java:829)
> >
> >
> >
> > Potential Bug # 2
> >
> > The onEpochWatermarkIncremented method is never invoked when the 
> > IterationListener<T> interface is implemented by a UDF inside the 
> > iterationBody.
> >
> >
> >
> > // method is invoked from within IterationBody
> >
> > public class ComputeML2 extends KeyedProcessFunction<String, 
> > Tuple2<Integer, String>, Tuple2<Integer, String>> implements 
> > IterationListener<Tuple2<Integer, String>>  {
> >
> > // this method is never invoked, getting no output
> >
> >     @Override
> >
> >     public void onEpochWatermarkIncremented(int epochWaterMark, 
> > IterationListener.Context context, Collector<Tuple2<Integer, String>> 
> > collector) throws Exception {
> >
> >         collector.collect(Tuple2.of(epochWaterMark,"epoch"));  //Bug: no 
> > output
> >
> >     }
> >
> >
> >
> >
> >     @Override
> >
> >     public void onIterationTerminated(IterationListener.Context context, 
> > Collector<Tuple2<Integer, String>> collector) throws Exception {
> >
> >     }
> >
> >
> >     @Override
> >
> >     public void processElement(Tuple2<Integer, String> integerStringTuple2, 
> > KeyedProcessFunction<String, Tuple2<Integer, String>, Tuple2<Integer, 
> > String>>.Context context, Collector<Tuple2<Integer, String>> collector) 
> > throws Exception {
> >
> >             // some processing here
> >
> >     }
> >
> >
> > }
> >
> >
> >
> >
> > Let me know if I should submit these issues on JIRA.
> > Thank you so much
> > Komal
> >

Reply via email to