[ https://issues.apache.org/jira/browse/FLINK-5773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15862176#comment-15862176 ]
sunjincheng edited comment on FLINK-5773 at 2/11/17 2:54 AM: ------------------------------------------------------------- HI, [~colinbreame], We can look at `setMaxParallelism` repeated scala'doc. {code} Sets the maximum degree of parallelism defined for the program. The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also defines the number of key groups used for partitioned state. {code} This set value `setMaxParallelism(valueA)` is the `setParallelism(valueB)` associated which requires (valueA >= valueB). The concurrency of your program In your local default parallelism may be 4, so request valueA>= 4, you can try to set `env.setParallelism (1) `then you can `setMaxParallelism` any number greater than 0, can you try it? was (Author: sunjincheng121): HI, [~colinbreame], We can look at xx repeated scala'doc. {code} Sets the maximum degree of parallelism defined for the program. The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also defines the number of key groups used for partitioned state. {code} This set value setMaxParallelism(valueA) is the setParallelism(valueB) associated which requires (valueA >= valueB). The concurrency of your program In your local default parallelism may be 4, so request valueA>= 4, you can try to set env .setParallelism (1) then you can setMaxParallelism any number greater than 0, can you try it? > Cannot cast scala.util.Failure to > org.apache.flink.runtime.messages.Acknowledge > ------------------------------------------------------------------------------- > > Key: FLINK-5773 > URL: https://issues.apache.org/jira/browse/FLINK-5773 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.2.0 > Reporter: Colin Breame > Fix For: 1.2.1 > > > The exception below happens when I set the > StreamExecutionEnvironment.setMaxParallelism() to anything less than 4. > Let me know if you need more information. > {code} > Caused by: java.lang.ClassCastException: Cannot cast scala.util.Failure to > org.apache.flink.runtime.messages.Acknowledge > at java.lang.Class.cast(Class.java:3369) > at scala.concurrent.Future$$anonfun$mapTo$1.apply(Future.scala:405) > at scala.util.Success$$anonfun$map$1.apply(Try.scala:206) > at scala.util.Try$.apply(Try.scala:161) > at scala.util.Success.map(Try.scala:206) > at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) > at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.processBatch$1(Future.scala:643) > at > scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply$mcV$sp(Future.scala:658) > at > scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635) > at > scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635) > at > scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at > scala.concurrent.Future$InternalCallbackExecutor$Batch.run(Future.scala:634) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:685) > at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266) > at > org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.scala:1206) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:458) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:280) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at > org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:122) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > ... 5 more > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)