[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155268#comment-16155268
 ] 

Aljoscha Krettek commented on FLINK-7567:
-----------------------------------------

Yes, we should in fact report that the parameter is unused or maybe even remove 
it. (This was left there by me, in fact, by accident. it seems)

As for resolving the problem, I tend to think that we shouldn't do too much 
stuff automatically. I.e. in this case we would have to set the parallelism to 
1 for the iteration feedback, which might not be what a user wants. The easiest 
is therefore to let the user know that there is a problem and let them figure 
out what they want to do.

What message could we give the user? I think the existing message already quite 
clearly states what the problem is.

> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-7567
>                 URL: https://issues.apache.org/jira/browse/FLINK-7567
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.3.2
>         Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>            Reporter: Peter Ertl
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     // do something silly just do get iteration going ...
>     val result = env.fromElements(1, 2, 3).iterate(it => {
>       (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
>     })
>     result.print()
>     env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>       at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>       at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>       at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>       at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>       at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>       at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>       at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>       at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>       at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>       at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>       at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to