Same Issue I am facing :- http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Issues-in-migrating-code-from-1-3-2-to-1-4-0-td20461.html
Can anyone explain the exception Thanks On Thu, Dec 14, 2017 at 1:45 AM, Ryan Brideau (JIRA) <j...@apache.org> wrote: > Ryan Brideau created FLINK-8256: > ----------------------------------- > > Summary: Cannot use Scala functions to filter in 1.4 - > java.lang.ClassCastException > Key: FLINK-8256 > URL: https://issues.apache.org/jira/browse/FLINK-8256 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.4.0 > Environment: macOS, Local Flink v1.4.0, Scala 2.11 > Reporter: Ryan Brideau > > > I built the newest release locally today, but when I try to filter a > stream using an anonymous or named function, I get an error. Here's a > simple example: > > > {code:java} > import org.apache.flink.api.java.utils.ParameterTool > import org.apache.flink.streaming.api.scala._ > > object TestFunction { > > def main(args: Array[String]): Unit = { > > val env = StreamExecutionEnvironment.getExecutionEnvironment > val params = ParameterTool.fromArgs(args) > env.getConfig.setGlobalJobParameters(params) > > val someArray = Array(1,2,3) > val stream = env.fromCollection(someArray).filter(_ => true) > stream.print().setParallelism(1) > env.execute("Testing Function") > } > } > {code} > > This results in: > > > {code:java} > Job execution switched to status FAILING. > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot > instantiate user function. > at org.apache.flink.streaming.api.graph.StreamConfig. > getStreamOperator(StreamConfig.java:235) > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > createChainedOperator(OperatorChain.java:355) > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > createOutputCollector(OperatorChain.java:282) > at org.apache.flink.streaming.runtime.tasks.OperatorChain.< > init>(OperatorChain.java:126) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:231) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ClassCastException: cannot assign instance of > org.peopleinmotion.TestFunction$$anonfun$1 to field > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of > type scala.Function1 in instance of org.apache.flink.streaming. > api.scala.DataStream$$anon$7 > at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues( > ObjectStreamClass.java:2233) > at java.io.ObjectStreamClass.setObjFieldValues( > ObjectStreamClass.java:1405) > at java.io.ObjectInputStream.defaultReadFields( > ObjectInputStream.java:2288) > at java.io.ObjectInputStream.readSerialData( > ObjectInputStream.java:2206) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > java:1568) > at java.io.ObjectInputStream.defaultReadFields( > ObjectInputStream.java:2282) > at java.io.ObjectInputStream.readSerialData( > ObjectInputStream.java:2206) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > java:1568) > at java.io.ObjectInputStream.readObject(ObjectInputStream. > java:428) > at org.apache.flink.util.InstantiationUtil.deserializeObject( > InstantiationUtil.java:290) > at org.apache.flink.util.InstantiationUtil.readObjectFromConfig( > InstantiationUtil.java:248) > at org.apache.flink.streaming.api.graph.StreamConfig. > getStreamOperator(StreamConfig.java:220) > ... 6 more > 12/13/2017 15:10:01 Job execution switched to status FAILED. > > ------------------------------------------------------------ > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:492) > at org.apache.flink.client.program.StandaloneClusterClient. > submitJob(StandaloneClusterClient.java:105) > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:456) > at org.apache.flink.streaming.api.environment. > StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at org.apache.flink.streaming.api.scala. > StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638) > at org.peopleinmotion.TestFunction$.main(TestFunction.scala:20) > at org.peopleinmotion.TestFunction.main(TestFunction.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.client.program.PackagedProgram.callMainMethod( > PackagedProgram.java:525) > at org.apache.flink.client.program.PackagedProgram. > invokeInteractiveModeForExecution(PackagedProgram.java:417) > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:396) > at org.apache.flink.client.CliFrontend.executeProgram( > CliFrontend.java:802) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282) > at org.apache.flink.client.CliFrontend.parseParameters( > CliFrontend.java:1054) > at org.apache.flink.client.CliFrontend$1.call( > CliFrontend.java:1101) > at org.apache.flink.client.CliFrontend$1.call( > CliFrontend.java:1098) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at org.apache.hadoop.security.UserGroupInformation.doAs( > UserGroupInformation.java:1556) > at org.apache.flink.runtime.security.HadoopSecurityContext. > runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$ > mcV$sp(JobManager.scala:897) > at org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) > at org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) > at scala.concurrent.impl.Future$PromiseCompletingRunnable. > liftedTree1$1(Future.scala:24) > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run( > Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at akka.dispatch.ForkJoinExecutorConfigurator$ > AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec( > ForkJoinTask.java:260) > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > ForkJoinPool.java:1979) > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: > Cannot instantiate user function. > at org.apache.flink.streaming.api.graph.StreamConfig. > getStreamOperator(StreamConfig.java:235) > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > createChainedOperator(OperatorChain.java:355) > at org.apache.flink.streaming.runtime.tasks.OperatorChain. > createOutputCollector(OperatorChain.java:282) > at org.apache.flink.streaming.runtime.tasks.OperatorChain.< > init>(OperatorChain.java:126) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:231) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ClassCastException: cannot assign instance of > org.peopleinmotion.TestFunction$$anonfun$1 to field > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of > type scala.Function1 in instance of org.apache.flink.streaming. > api.scala.DataStream$$anon$7 > at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues( > ObjectStreamClass.java:2233) > at java.io.ObjectStreamClass.setObjFieldValues( > ObjectStreamClass.java:1405) > at java.io.ObjectInputStream.defaultReadFields( > ObjectInputStream.java:2288) > at java.io.ObjectInputStream.readSerialData( > ObjectInputStream.java:2206) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > java:1568) > at java.io.ObjectInputStream.defaultReadFields( > ObjectInputStream.java:2282) > at java.io.ObjectInputStream.readSerialData( > ObjectInputStream.java:2206) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream. > java:1568) > at java.io.ObjectInputStream.readObject(ObjectInputStream. > java:428) > at org.apache.flink.util.InstantiationUtil.deserializeObject( > InstantiationUtil.java:290) > at org.apache.flink.util.InstantiationUtil.readObjectFromConfig( > InstantiationUtil.java:248) > at org.apache.flink.streaming.api.graph.StreamConfig. > getStreamOperator(StreamConfig.java:220) > > {code} > > However, replacing the function with this results in everything working as > expected: > > {code:java} > val stream = env.fromCollection(someArray).filter(new FilterFunction[Int] > { > override def filter(t: Int): Boolean = true > }) > {code} > > Perhaps something changed in the new build compared to the previous, as > this was working without issue before? > > > > -- > This message was sent by Atlassian JIRA > (v6.4.14#64029) > -- Shivam Sharma Data Engineer @ Goibibo Indian Institute Of Information Technology, Design and Manufacturing Jabalpur Mobile No- (+91) 8882114744 Email:- 28shivamsha...@gmail.com LinkedIn:-*https://www.linkedin.com/in/28shivamsharma <https://www.linkedin.com/in/28shivamsharma>*