Hi Yunfeng,

Thank you for the prompt action on resolving this bug. I have compiled the 
latest Flink ML source code and can confirm the problem has indeed been solved. 
Regarding the second bug relating to the IterationListener interface,  it seems 
that the methods in the IterationListener interface are not invoked when used 
with the “union” operator. I am attaching the same sample code that I modified 
to replicate the bug.

I also have a question regarding a feature in iterations ML. Is there any way 
to to specify the iterationBody to run for a user defined specific number of 
epochs?  According to the iterations 
documentation<https://nightlies.apache.org/flink/flink-ml-docs-release-2.0/docs/development/iteration/>
 “The iterative algorithm has an iteration body that is repeatedly invoked 
until some termination criteria is reached (e.g. after a user-specified number 
of epochs has been reached)” If my understanding is correct, then there should 
be some sort of parameter or other way to define a user specific termination 
criteria but I cannot seem to find the relevant methods to do so.

Looking forward to your reply!


public class Test {
    public static void main(String[] args) throws Exception {
// Sets up the execution environment, which is the main entry point
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();

// sample datastreams (they are assumed to be unbounded streams outside of this 
test environment)
        List<Tuple2<String, Double>> feedbackinitializer = Arrays.asList(
                new Tuple2<>("A", 2.0),
                new Tuple2<>("B", 3.0),
                new Tuple2<>("C", 1.0),
                new Tuple2<>("D", 1.0)
        );

        List<Tuple3<String, Double, String>> initialStates = Arrays.asList(
                new Tuple3<>("A", 0.0, "Final Output A"), //
                new Tuple3<>("A", 1.0, "Test 1A"), //
                new Tuple3<>("B", 2.0, "Test 2B"), //
                new Tuple3<>("B", 1.0, "Test 1B"), //
                new Tuple3<>("B", 0.0, "Final Output B"), //
                new Tuple3<>("C", 0.0, "No Change C"), //
                new Tuple3<>("D", 0.0, "Final Output D") //
        );


        DataStream<Tuple2<String, Double>> feedbackStream = 
env.fromCollection(feedbackinitializer);
        DataStream<Tuple3<String, Double, String>> initialStateStream = 
env.fromCollection(initialStates);

//parallelize
        DataStream<Tuple2<String, Double>> feedbackParallel = 
feedbackStream.keyBy(x -> x.f0)
                .map(i -> Tuple2.of(i.f0,i.f1))
                .returns(Types.TUPLE(Types.STRING, Types.DOUBLE));
        DataStream<Tuple3<String, Double, String>> initialStateParallel = 
initialStateStream.keyBy(x -> x.f0)
                .map(i -> Tuple3.of(i.f0,i.f1,i.f2))
                .returns(Types.TUPLE(Types.STRING, Types.DOUBLE, Types.STRING));



//iterate
        DataStreamList result = Iterations.iterateUnboundedStreams(
                DataStreamList.of(feedbackParallel),
                DataStreamList.of(initialStateParallel),
                (variableStreams, dataStreams) -> {
                    DataStream<Tuple2<String, Double>> modelUpdate = 
variableStreams.get(0);
                    DataStream<Tuple3<String, Double, String>> stateStream = 
dataStreams.get(0);


                    OutputTag<String> finalOutputTag = new 
OutputTag<String>("msgs") {
                    };


                    modelUpdate.union(modelUpdate).map(new testMap());  // <-- 
Iteration Listener methods not imvoked when used with union

                    SingleOutputStreamOperator<Tuple2<String, Double>> 
newModelUpdate = stateStream.connect(modelUpdate).keyBy(0, 0).process(new 
KeyedCoProcessFunction<String, Tuple3<String, Double, String>, Tuple2<String, 
Double>, Tuple2<String, Double>>() {
                        private transient MapState<Double, String> state;

                        @Override
                        public void processElement1(Tuple3<String, Double, 
String> stateUpdate, Context context, Collector<Tuple2<String, Double>> 
collector) throws Exception {
                            state.put(stateUpdate.f1, stateUpdate.f2); //load 
stateStream into mapState
//                            System.out.println("State Update: " + 
stateUpdate);
                        }

                        @Override
                        public void processElement2(Tuple2<String, Double> 
modelUpdate, Context context, Collector<Tuple2<String, Double>> collector) 
throws Exception {
                            double weight = modelUpdate.f1;
                            weight = weight - 1;

//                            System.out.println("Tuple: " + modelUpdate);
//subtract 1 until 0.0
                            if (weight > -1.0) {
                                collector.collect(Tuple2.of(modelUpdate.f0, 
weight));
//                                System.out.println("get(weight): " + weight + 
" "+ state.get(weight));
                                context.output(finalOutputTag, 
state.get(weight));

                            }
                        }

                        @Override
                        public void open(Configuration config) {
                            MapStateDescriptor<Double, String> stateDescriptor =
                                    new MapStateDescriptor<>(
                                            "statedescriptor", // the state name
                                            BasicTypeInfo.DOUBLE_TYPE_INFO,
                                            BasicTypeInfo.STRING_TYPE_INFO
                                            );
                            this.state = 
getRuntimeContext().getMapState(stateDescriptor);

                        }

                    });

                    DataStream<String> finalOutput = 
newModelUpdate.getSideOutput(finalOutputTag);

                    return new IterationBodyResult(
                            DataStreamList.of(newModelUpdate),
                            DataStreamList.of(finalOutput));
                });

//        result.get(0).print();

// Execute program
        env.execute("Flink Java API Skeleton");
    }

    public static final class testMap implements MapFunction<Tuple2<String, 
Double>, String>, IterationListener<String> {


        @Override
        public String map(Tuple2<String, Double> tuple) throws Exception {
            System.out.println(tuple);
            return null;
        }

        @Override
        public void onEpochWatermarkIncremented(int i, Context context, 
Collector<String> collector) throws Exception {
                System.out.println("i");
        }

        @Override
        public void onIterationTerminated(Context context, Collector<String> 
collector) throws Exception {
            System.out.println("Iteration Terminated");
        }
    }
}

Best,
Komal


From: Yunfeng Zhou <flink.zhouyunf...@gmail.com>
Date: Tuesday, April 9, 2024 19:14
To: dev@flink.apache.org <dev@flink.apache.org>
Subject: Re: Two potential bugs in Flink ML
Hi Komal,

Thanks for your example code! I found that Flink ML has a bug when it
comes to keyed two input operators. I have submitted a PR to fix this
bug and you can build the Flink ML library for your program according
to its document after this PR is approved.

The bugfix PR: https://github.com/apache/flink-ml/pull/260
The document to build Flink ML:
https://github.com/apache/flink-ml?tab=readme-ov-file#building-the-project

Best,
Yunfeng

On Mon, Apr 8, 2024 at 11:02 AM Komal M <komal.mar...@gmail.com> wrote:
>
> Hi Yungfeng,
>
>
> Thank you so much for getting back!
>
> For the first bug, here is a sample code that should reproduce it. All it 
> does is subtract 1 from the feedback stream until the tuples reach 0.0. For 
> each subtraction it outputs a relevant message in the ‘finalOutput’ stream. 
> These messages are stored in the keyedState of KeyedCoProcessFunction and are 
> populated by a dataset stream called initialStates. For each key there are 
> different messages associated with it, hence the need for MapState.
>
>  For the second bug, let me compare my implementation to the references you 
> have provided and get back to you on that.
>
>
> import java.util.*;
> import org.apache.flink.api.common.state.MapState;
> import org.apache.flink.api.common.state.MapStateDescriptor;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.iteration.DataStreamList;
> import org.apache.flink.iteration.IterationBodyResult;
> import org.apache.flink.iteration.Iterations;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
> import org.apache.flink.util.Collector;
> import org.apache.flink.util.OutputTag;
>
>
> public class Test {
>     public static void main(String[] args) throws Exception {
> // Sets up the execution environment, which is the main entry point
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment();
>
> // sample datastreams (they are assumed to be unbounded streams outside of 
> this test environment)
>         List<Tuple2<String, Double>> feedbackinitializer = Arrays.asList(
>                 new Tuple2<>("A", 2.0),
>                 new Tuple2<>("B", 3.0),
>                 new Tuple2<>("C", 1.0),
>                 new Tuple2<>("D", 1.0)
>         );
>
>         List<Tuple3<String, Double, String>> initialStates = Arrays.asList(
>                 new Tuple3<>("A", 0.0, "Final Output A"),
>                 new Tuple3<>("A", 1.0, "Test 1A"),
>                 new Tuple3<>("B", 2.0, "Test 2B"),
>                 new Tuple3<>("B", 1.0, "Test 1B"),
>                 new Tuple3<>("B", 0.0, "Final Output B"),
>                 new Tuple3<>("C", 0.0, "No Change C"),
>                 new Tuple3<>("D", 0.0, "Final Output D")
>         );
>
>
>         DataStream<Tuple2<String, Double>> feedbackStream = 
> env.fromCollection(feedbackinitializer);
>         DataStream<Tuple3<String, Double, String>> initialStateStream = 
> env.fromCollection(initialStates);
>
> //parallelize
>         DataStream<Tuple2<String, Double>> feedbackParallel = 
> feedbackStream.keyBy(x -> x.f0)
>                 .map(i -> Tuple2.of(i.f0,i.f1))
>                 .returns(Types.TUPLE(Types.STRING, Types.DOUBLE));
>         DataStream<Tuple3<String, Double, String>> initialStateParallel = 
> initialStateStream.keyBy(x -> x.f0)
>                 .map(i -> Tuple3.of(i.f0,i.f1,i.f2))
>                 .returns(Types.TUPLE(Types.STRING, Types.DOUBLE, 
> Types.STRING));
>
>
>
> //iterate
>         DataStreamList result = Iterations.iterateUnboundedStreams(
>                 DataStreamList.of(feedbackParallel),
>                 DataStreamList.of(initialStateParallel),
>                 (variableStreams, dataStreams) -> {
>                     DataStream<Tuple2<String, Double>> modelUpdate = 
> variableStreams.get(0);
>                     DataStream<Tuple3<String, Double, String>> stateStream = 
> dataStreams.get(0);
>
>
>                     OutputTag<String> finalOutputTag = new 
> OutputTag<String>("msgs") {
>                     };
>
>                     SingleOutputStreamOperator<Tuple2<String, Double>> 
> newModelUpdate = stateStream.connect(modelUpdate).keyBy(0, 0).process(new 
> KeyedCoProcessFunction<String, Tuple3<String, Double, String>, Tuple2<String, 
> Double>, Tuple2<String, Double>>() {
>                         private transient MapState<Double, String> state;
>
>                         @Override
>                         public void processElement1(Tuple3<String, Double, 
> String> stateUpdate, Context context, Collector<Tuple2<String, Double>> 
> collector) throws Exception {
>                             state.put(stateUpdate.f1, stateUpdate.f2); //load 
> stateStream into mapState
>                         }
>
>                         @Override
>                         public void processElement2(Tuple2<String, Double> 
> modelUpdate, Context context, Collector<Tuple2<String, Double>> collector) 
> throws Exception {
>                             double weight = modelUpdate.f1;
>                             weight = weight - 1;
> //subtract 1 until 0.0
>                             if (weight > -1.0) {
>                                 collector.collect(Tuple2.of(modelUpdate.f0, 
> weight));
>                                 context.output(finalOutputTag, 
> state.get(weight));
>                             }
>                         }
>
>                         @Override
>                         public void open(Configuration config) {
>                             MapStateDescriptor<Double, String> 
> stateDescriptor =
>                                     new MapStateDescriptor<>(
>                                             "statedescriptor", // the state 
> name
>                                             BasicTypeInfo.DOUBLE_TYPE_INFO,
>                                             BasicTypeInfo.STRING_TYPE_INFO
>                                             );
>                             this.state = 
> getRuntimeContext().getMapState(stateDescriptor);
>
>                         }
>
>                     });
>
>                     DataStream<String> finalOutput = 
> newModelUpdate.getSideOutput(finalOutputTag);
>
>                     return new IterationBodyResult(
>                             DataStreamList.of(newModelUpdate),
>                             DataStreamList.of(finalOutput));
>                 });
>
>         result.get(0).print();
>
> // Execute program
>         env.execute("Flink Java API Skeleton");
>     }
> }
>
> Best,
> Komal
>
> From: Yunfeng Zhou <flink.zhouyunf...@gmail.com>
> Date: Sunday, April 7, 2024 11:36
> To: dev@flink.apache.org <dev@flink.apache.org>
> Subject: Re: Two potential bugs in Flink ML
> Hi Komal,
>
> For the first question, could you please provide a simple program that
> could help reproduce this exception? That could help us better find
> out the bugs (if any) in Flink ML.
>
> For the second question, there have been Functions implementing the
> IterationListener interface in Flink ML[1] and I just manually
> verified that their onEpochWatermarkIncremented method can be invoked
> in the test cases. You may check whether there is any difference
> between your implementation and that in the Flink ML repo, and please
> also feel free to provide a program that we can check together.
>
> [1] 
> https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/iteration/ForwardInputsOfLastRound.java#L45
>
> Best,
> Yunfeng
>
> On Fri, Apr 5, 2024 at 2:01 PM Komal M <komal.mar...@gmail.com> wrote:
> >
> > Hi Flink Dev Team,
> > I have two possible bugs to report for Flink ML Iteration.
> > Flink v1.17.2
> > Flink ML v2.3.0
> > Java 11
> >
> > Bug # 1
> > Implementing a UDF KeyedRichCoProcessFunction or CoFlatMapFunction inside 
> > IterationBody yields a “java.lang.ClassCastException: 
> > org.apache.flink.iteration.IterationRecord cannot be cast to class 
> > org.apache.flink.api.java.tuple.Tuple” error. For reference, I do not get 
> > any error when applying  .keyBy().flatMap()on the streams individually 
> > inside the iteration body.
> >
> > Exception in thread "main" 
> > org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> >                 at 
> > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> >                 ….
> >                 at 
> > java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> > Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> > NoRestartBackoffTimeStrategy
> >                 at 
> > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
> > …
> >                 at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
> >                 ... 5 more
> > Caused by: java.lang.ClassCastException: class 
> > org.apache.flink.iteration.IterationRecord cannot be cast to class 
> > org.apache.flink.api.java.tuple.Tuple 
> > (org.apache.flink.iteration.IterationRecord and 
> > org.apache.flink.api.java.tuple.Tuple are in unnamed module of loader 'app')
> >                 at 
> > org.apache.flink.api.java.typeutils.runtime.TupleComparator.extractKeys(TupleComparator.java:148)
> >                 at 
> > org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:195)
> >                 at 
> > org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:168)
> >                 at 
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:502)
> >                 at 
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
> >                 at 
> > org.apache.flink.iteration.operator.allround.AbstractAllRoundWrapperOperator.setKeyContextElement1(AbstractAllRoundWrapperOperator.java:203)
> >                 at 
> > org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor1$1(RecordProcessorUtils.java:87)
> >                 at 
> > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:254)
> >                 at 
> > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
> >                 at 
> > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> >                 at 
> > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> >                 at 
> > org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
> >                 at 
> > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> >                 at 
> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> >                 at 
> > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> >                 at 
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> >                 at 
> > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> >                 at 
> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> >                 at 
> > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> >                 at 
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> >                 at java.base/java.lang.Thread.run(Thread.java:829)
> >
> >
> >
> > Potential Bug # 2
> >
> > The onEpochWatermarkIncremented method is never invoked when the 
> > IterationListener<T> interface is implemented by a UDF inside the 
> > iterationBody.
> >
> >
> >
> > // method is invoked from within IterationBody
> >
> > public class ComputeML2 extends KeyedProcessFunction<String, 
> > Tuple2<Integer, String>, Tuple2<Integer, String>> implements 
> > IterationListener<Tuple2<Integer, String>>  {
> >
> > // this method is never invoked, getting no output
> >
> >     @Override
> >
> >     public void onEpochWatermarkIncremented(int epochWaterMark, 
> > IterationListener.Context context, Collector<Tuple2<Integer, String>> 
> > collector) throws Exception {
> >
> >         collector.collect(Tuple2.of(epochWaterMark,"epoch"));  //Bug: no 
> > output
> >
> >     }
> >
> >
> >
> >
> >     @Override
> >
> >     public void onIterationTerminated(IterationListener.Context context, 
> > Collector<Tuple2<Integer, String>> collector) throws Exception {
> >
> >     }
> >
> >
> >     @Override
> >
> >     public void processElement(Tuple2<Integer, String> integerStringTuple2, 
> > KeyedProcessFunction<String, Tuple2<Integer, String>, Tuple2<Integer, 
> > String>>.Context context, Collector<Tuple2<Integer, String>> collector) 
> > throws Exception {
> >
> >             // some processing here
> >
> >     }
> >
> >
> > }
> >
> >
> >
> >
> > Let me know if I should submit these issues on JIRA.
> > Thank you so much
> > Komal
> >

Reply via email to