Hi Komal,

For the first question, could you please provide a simple program that
could help reproduce this exception? That could help us better find
out the bugs (if any) in Flink ML.

For the second question, there have been Functions implementing the
IterationListener interface in Flink ML[1] and I just manually
verified that their onEpochWatermarkIncremented method can be invoked
in the test cases. You may check whether there is any difference
between your implementation and that in the Flink ML repo, and please
also feel free to provide a program that we can check together.

[1] 
https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/iteration/ForwardInputsOfLastRound.java#L45

Best,
Yunfeng

On Fri, Apr 5, 2024 at 2:01 PM Komal M <komal.mar...@gmail.com> wrote:
>
> Hi Flink Dev Team,
> I have two possible bugs to report for Flink ML Iteration.
> Flink v1.17.2
> Flink ML v2.3.0
> Java 11
>
> Bug # 1
> Implementing a UDF KeyedRichCoProcessFunction or CoFlatMapFunction inside 
> IterationBody yields a “java.lang.ClassCastException: 
> org.apache.flink.iteration.IterationRecord cannot be cast to class 
> org.apache.flink.api.java.tuple.Tuple” error. For reference, I do not get any 
> error when applying  .keyBy().flatMap()on the streams individually inside the 
> iteration body.
>
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>                 at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
>                 ….
>                 at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>                 at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
> …
>                 at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>                 ... 5 more
> Caused by: java.lang.ClassCastException: class 
> org.apache.flink.iteration.IterationRecord cannot be cast to class 
> org.apache.flink.api.java.tuple.Tuple 
> (org.apache.flink.iteration.IterationRecord and 
> org.apache.flink.api.java.tuple.Tuple are in unnamed module of loader 'app')
>                 at 
> org.apache.flink.api.java.typeutils.runtime.TupleComparator.extractKeys(TupleComparator.java:148)
>                 at 
> org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:195)
>                 at 
> org.apache.flink.streaming.util.keys.KeySelectorUtil$ComparableKeySelector.getKey(KeySelectorUtil.java:168)
>                 at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:502)
>                 at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
>                 at 
> org.apache.flink.iteration.operator.allround.AbstractAllRoundWrapperOperator.setKeyContextElement1(AbstractAllRoundWrapperOperator.java:203)
>                 at 
> org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor1$1(RecordProcessorUtils.java:87)
>                 at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:254)
>                 at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
>                 at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>                 at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>                 at 
> org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
>                 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
>                 at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>                 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
>                 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
>                 at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>                 at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
>                 at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
>                 at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>                 at java.base/java.lang.Thread.run(Thread.java:829)
>
>
>
> Potential Bug # 2
>
> The onEpochWatermarkIncremented method is never invoked when the 
> IterationListener<T> interface is implemented by a UDF inside the 
> iterationBody.
>
>
>
> // method is invoked from within IterationBody
>
> public class ComputeML2 extends KeyedProcessFunction<String, Tuple2<Integer, 
> String>, Tuple2<Integer, String>> implements 
> IterationListener<Tuple2<Integer, String>>  {
>
> // this method is never invoked, getting no output
>
>     @Override
>
>     public void onEpochWatermarkIncremented(int epochWaterMark, 
> IterationListener.Context context, Collector<Tuple2<Integer, String>> 
> collector) throws Exception {
>
>         collector.collect(Tuple2.of(epochWaterMark,"epoch"));  //Bug: no 
> output
>
>     }
>
>
>
>
>     @Override
>
>     public void onIterationTerminated(IterationListener.Context context, 
> Collector<Tuple2<Integer, String>> collector) throws Exception {
>
>     }
>
>
>     @Override
>
>     public void processElement(Tuple2<Integer, String> integerStringTuple2, 
> KeyedProcessFunction<String, Tuple2<Integer, String>, Tuple2<Integer, 
> String>>.Context context, Collector<Tuple2<Integer, String>> collector) 
> throws Exception {
>
>             // some processing here
>
>     }
>
>
> }
>
>
>
>
> Let me know if I should submit these issues on JIRA.
> Thank you so much
> Komal
>

Reply via email to