Yunfeng Zhou created FLINK-30241: ------------------------------------ Summary: Flink ML Iteration ConcurrentModificationException Key: FLINK-30241 URL: https://issues.apache.org/jira/browse/FLINK-30241 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.1.0 Reporter: Yunfeng Zhou
https://github.com/jiangxin369/flink-ml/actions/runs/3577811156/jobs/6017233847 {code} ___________________ LinearRegressionTest.test_get_model_data ___________________ self = <ml.lib.regression.tests.test_linearregression.LinearRegressionTest testMethod=test_get_model_data> def test_get_model_data(self): regression = LinearRegression().set_weight_col('weight') model = regression.fit(self.input_data_table) model_data = self.t_env.to_data_stream( > model.get_model_data()[0]).execute_and_collect().next() pyflink/ml/lib/regression/tests/test_linearregression.py:124: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ /opt/hostedtoolcache/Python/3.7.15/x64/lib/python3.7/site-packages/pyflink/datastream/data_stream.py:1760: in next if not self._j_closeable_iterator.hasNext(): /opt/hostedtoolcache/Python/3.7.15/x64/lib/python3.7/site-packages/py4j/java_gateway.py:1322: in __call__ answer, self.gateway_client, self.target_id, self.name) /opt/hostedtoolcache/Python/3.7.15/x64/lib/python3.7/site-packages/pyflink/util/exceptions.py:146: in deco return f(*a, **kw) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ answer = 'xro12236' gateway_client = <py4j.java_gateway.GatewayClient object at 0x7fdb862ca190> target_id = 'o12139', name = 'hasNext' def get_return_value(answer, gateway_client, target_id=None, name=None): """Converts an answer received from the Java gateway into a Python object. For example, string representation of integers are converted to Python integer, string representation of objects are converted to JavaObject instances, etc. :param answer: the string returned by the Java gateway :param gateway_client: the gateway client used to communicate with the Java Gateway. Only necessary if the answer is a reference (e.g., object, list, map) :param target_id: the name of the object from which the answer comes from (e.g., *object1* in `object1.hello()`). Optional. :param name: the name of the member from which the answer comes from (e.g., *hello* in `object1.hello()`). Optional. """ if is_error(answer)[0]: if len(answer) > 1: type = answer[1] value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) if answer[1] == REFERENCE_TYPE: raise Py4JJavaError( "An error occurred while calling {0}{1}{2}.\n". > format(target_id, ".", name), value) E py4j.protocol.Py4JJavaError: An error occurred while calling o12139.hasNext. E : java.lang.RuntimeException: Failed to fetch next result E at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) E at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) E at sun.reflect.GeneratedMethodAccessor80.invoke(Unknown Source) E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) E at java.lang.reflect.Method.invoke(Method.java:498) E at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) E at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) E at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) E at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) E at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) E at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) E at java.lang.Thread.run(Thread.java:750) E Caused by: java.io.IOException: Failed to fetch job execution result E at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184) E at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) E at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) E ... 11 more E Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. E at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) E at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) E at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182) E ... 13 more E Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. E at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) E at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) E at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) E at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) E at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) E at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:138) E at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:181) E ... 13 more E Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy E at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) E at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) E at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301) E at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291) E at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282) E at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739) E at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78) E at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443) E at sun.reflect.GeneratedMethodAccessor47.invoke(Unknown Source) E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) E at java.lang.reflect.Method.invoke(Method.java:498) E at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304) E at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) E at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302) E at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) E at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) E at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) E at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) E at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) E at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) E at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) E at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) E at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) E at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) E at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) E at akka.actor.Actor.aroundReceive(Actor.scala:537) E at akka.actor.Actor.aroundReceive$(Actor.scala:535) E at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) E at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) E at akka.actor.ActorCell.invoke(ActorCell.scala:548) E at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) E at akka.dispatch.Mailbox.run(Mailbox.scala:231) E at akka.dispatch.Mailbox.exec(Mailbox.scala:243) E at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) E at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) E at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) E at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) E Caused by: java.util.ConcurrentModificationException E at java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:648) E at java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1044) E at org.apache.flink.iteration.operator.HeadOperator.parseInputChannelEvents(HeadOperator.java:463) E at org.apache.flink.iteration.operator.HeadOperator.endInput(HeadOperator.java:391) E at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96) E at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:97) E at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68) E at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) E at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) E at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) E at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) E at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) E at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) E at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) E at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) E at java.lang.Thread.run(Thread.java:750) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)