[ 
https://issues.apache.org/jira/browse/FLINK-30241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin closed FLINK-30241.
----------------------------
    Resolution: Cannot Reproduce

> Flink ML Iteration ConcurrentModificationException
> --------------------------------------------------
>
>                 Key: FLINK-30241
>                 URL: https://issues.apache.org/jira/browse/FLINK-30241
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / Machine Learning
>    Affects Versions: ml-2.1.0
>            Reporter: Yunfeng Zhou
>            Priority: Major
>
> https://github.com/jiangxin369/flink-ml/actions/runs/3577811156/jobs/6017233847
> {code}
> ___________________ LinearRegressionTest.test_get_model_data 
> ___________________
> self = <ml.lib.regression.tests.test_linearregression.LinearRegressionTest 
> testMethod=test_get_model_data>
>     def test_get_model_data(self):
>         regression = LinearRegression().set_weight_col('weight')
>         model = regression.fit(self.input_data_table)
>         model_data = self.t_env.to_data_stream(
> >           model.get_model_data()[0]).execute_and_collect().next()
> pyflink/ml/lib/regression/tests/test_linearregression.py:124: 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ 
> /opt/hostedtoolcache/Python/3.7.15/x64/lib/python3.7/site-packages/pyflink/datastream/data_stream.py:1760:
>  in next
>     if not self._j_closeable_iterator.hasNext():
> /opt/hostedtoolcache/Python/3.7.15/x64/lib/python3.7/site-packages/py4j/java_gateway.py:1322:
>  in __call__
>     answer, self.gateway_client, self.target_id, self.name)
> /opt/hostedtoolcache/Python/3.7.15/x64/lib/python3.7/site-packages/pyflink/util/exceptions.py:146:
>  in deco
>     return f(*a, **kw)
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ 
> answer = 'xro12236'
> gateway_client = <py4j.java_gateway.GatewayClient object at 0x7fdb862ca190>
> target_id = 'o12139', name = 'hasNext'
>     def get_return_value(answer, gateway_client, target_id=None, name=None):
>         """Converts an answer received from the Java gateway into a Python 
> object.
>     
>         For example, string representation of integers are converted to Python
>         integer, string representation of objects are converted to JavaObject
>         instances, etc.
>     
>         :param answer: the string returned by the Java gateway
>         :param gateway_client: the gateway client used to communicate with 
> the Java
>             Gateway. Only necessary if the answer is a reference (e.g., 
> object,
>             list, map)
>         :param target_id: the name of the object from which the answer comes 
> from
>             (e.g., *object1* in `object1.hello()`). Optional.
>         :param name: the name of the member from which the answer comes from
>             (e.g., *hello* in `object1.hello()`). Optional.
>         """
>         if is_error(answer)[0]:
>             if len(answer) > 1:
>                 type = answer[1]
>                 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
>                 if answer[1] == REFERENCE_TYPE:
>                     raise Py4JJavaError(
>                         "An error occurred while calling {0}{1}{2}.\n".
> >                       format(target_id, ".", name), value)
> E                   py4j.protocol.Py4JJavaError: An error occurred while 
> calling o12139.hasNext.
> E                   : java.lang.RuntimeException: Failed to fetch next result
> E                     at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
> E                     at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> E                     at sun.reflect.GeneratedMethodAccessor80.invoke(Unknown 
> Source)
> E                     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> E                     at java.lang.reflect.Method.invoke(Method.java:498)
> E                     at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> E                     at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> E                     at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> E                     at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> E                     at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> E                     at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> E                     at java.lang.Thread.run(Thread.java:750)
> E                   Caused by: java.io.IOException: Failed to fetch job 
> execution result
> E                     at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184)
> E                     at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
> E                     at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
> E                     ... 11 more
> E                   Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> E                     at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> E                     at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
> E                     at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182)
> E                     ... 13 more
> E                   Caused by: 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> E                     at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> E                     at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
> E                     at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> E                     at 
> java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
> E                     at 
> java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
> E                     at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:138)
> E                     at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:181)
> E                     ... 13 more
> E                   Caused by: org.apache.flink.runtime.JobException: 
> Recovery is suppressed by NoRestartBackoffTimeStrategy
> E                     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
> E                     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
> E                     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301)
> E                     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291)
> E                     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282)
> E                     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739)
> E                     at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
> E                     at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443)
> E                     at sun.reflect.GeneratedMethodAccessor47.invoke(Unknown 
> Source)
> E                     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> E                     at java.lang.reflect.Method.invoke(Method.java:498)
> E                     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304)
> E                     at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> E                     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302)
> E                     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
> E                     at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
> E                     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
> E                     at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> E                     at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> E                     at 
> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> E                     at 
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> E                     at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> E                     at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> E                     at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> E                     at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> E                     at akka.actor.Actor.aroundReceive(Actor.scala:537)
> E                     at akka.actor.Actor.aroundReceive$(Actor.scala:535)
> E                     at 
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
> E                     at 
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
> E                     at akka.actor.ActorCell.invoke(ActorCell.scala:548)
> E                     at 
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
> E                     at akka.dispatch.Mailbox.run(Mailbox.scala:231)
> E                     at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
> E                     at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> E                     at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> E                     at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> E                     at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> E                   Caused by: java.util.ConcurrentModificationException
> E                     at 
> java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:648)
> E                     at 
> java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1044)
> E                     at 
> org.apache.flink.iteration.operator.HeadOperator.parseInputChannelEvents(HeadOperator.java:463)
> E                     at 
> org.apache.flink.iteration.operator.HeadOperator.endInput(HeadOperator.java:391)
> E                     at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
> E                     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:97)
> E                     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
> E                     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> E                     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> E                     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> E                     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> E                     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> E                     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> E                     at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> E                     at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> E                     at java.lang.Thread.run(Thread.java:750)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to