[ https://issues.apache.org/jira/browse/FLINK-8256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290846#comment-16290846 ]
Ryan Brideau edited comment on FLINK-8256 at 12/14/17 1:45 PM: --------------------------------------------------------------- Thanks for looking into this so quickly. I managed to track down the root of the issue on my end. I had built my project previously using the snapshot archetype, and not the newest 1.4.0 one: {code:java} mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-scala \ -DarchetypeVersion=1.4-SNAPSHOT {code} To fix the problem I just built a new empty project using the 1.4.0 archetype version and did a diff of the pom.xml of the two, updating my existing one to match the new one, and now everything works perfectly. I suspect that anybody who made a project recently might find themselves in the same situation. was (Author: brideau): Thanks for looking into this so quickly. I managed to track down the root of the issue on my end. I had built my project previously using the snapshot archetype, and not the newest 1.4.0 one: {code:java} mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-scala \ -DarchetypeVersion=1.4-SNAPSHOT {code} To fix the problem I just build a new empty project using the 1.4.0 archetype version and did a diff of the pom.xml of the two, updating my existing one to match the new one, and now everything works perfectly. I suspect that anybody who made a project recently might find themselves in the same situation. > Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException > -------------------------------------------------------------------------- > > Key: FLINK-8256 > URL: https://issues.apache.org/jira/browse/FLINK-8256 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.4.0 > Environment: macOS, Local Flink v1.4.0, Scala 2.11 > Reporter: Ryan Brideau > > I built the newest release locally today, but when I try to filter a stream > using an anonymous or named function, I get an error. Here's a simple example: > {code:java} > import org.apache.flink.api.java.utils.ParameterTool > import org.apache.flink.streaming.api.scala._ > object TestFunction { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val params = ParameterTool.fromArgs(args) > env.getConfig.setGlobalJobParameters(params) > val someArray = Array(1,2,3) > val stream = env.fromCollection(someArray).filter(_ => true) > stream.print().setParallelism(1) > env.execute("Testing Function") > } > } > {code} > This results in: > {code:java} > Job execution switched to status FAILING. > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot > instantiate user function. > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ClassCastException: cannot assign instance of > org.peopleinmotion.TestFunction$$anonfun$1 to field > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type > scala.Function1 in instance of > org.apache.flink.streaming.api.scala.DataStream$$anon$7 > at > java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233) > at > java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248) > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220) > ... 6 more > 12/13/2017 15:10:01 Job execution switched to status FAILED. > ------------------------------------------------------------ > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638) > at org.peopleinmotion.TestFunction$.main(TestFunction.scala:20) > at org.peopleinmotion.TestFunction.main(TestFunction.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054) > at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101) > at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: > Cannot instantiate user function. > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ClassCastException: cannot assign instance of > org.peopleinmotion.TestFunction$$anonfun$1 to field > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type > scala.Function1 in instance of > org.apache.flink.streaming.api.scala.DataStream$$anon$7 > at > java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233) > at > java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248) > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220) > {code} > However, replacing the function with this results in everything working as > expected: > {code:java} > val stream = env.fromCollection(someArray).filter(new FilterFunction[Int] { > override def filter(t: Int): Boolean = true > }) > {code} > Perhaps something changed in the new build compared to the previous, as this > was working without issue before? -- This message was sent by Atlassian JIRA (v6.4.14#64029)