Same Issue I am facing :-
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Issues-in-migrating-code-from-1-3-2-to-1-4-0-td20461.html

Can anyone explain the exception

Thanks

On Thu, Dec 14, 2017 at 1:45 AM, Ryan Brideau (JIRA) <j...@apache.org>
wrote:

> Ryan Brideau created FLINK-8256:
> -----------------------------------
>
>              Summary: Cannot use Scala functions to filter in 1.4 -
> java.lang.ClassCastException
>                  Key: FLINK-8256
>                  URL: https://issues.apache.org/jira/browse/FLINK-8256
>              Project: Flink
>           Issue Type: Bug
>           Components: DataStream API
>     Affects Versions: 1.4.0
>          Environment: macOS, Local Flink v1.4.0, Scala 2.11
>             Reporter: Ryan Brideau
>
>
> I built the newest release locally today, but when I try to filter a
> stream using an anonymous or named function, I get an error. Here's a
> simple example:
>
>
> {code:java}
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.streaming.api.scala._
>
> object TestFunction {
>
>   def main(args: Array[String]): Unit = {
>
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val params = ParameterTool.fromArgs(args)
>     env.getConfig.setGlobalJobParameters(params)
>
>     val someArray = Array(1,2,3)
>     val stream = env.fromCollection(someArray).filter(_ => true)
>     stream.print().setParallelism(1)
>     env.execute("Testing Function")
>   }
> }
> {code}
>
> This results in:
>
>
> {code:java}
> Job execution switched to status FAILING.
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
>         at org.apache.flink.streaming.api.graph.StreamConfig.
> getStreamOperator(StreamConfig.java:235)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createChainedOperator(OperatorChain.java:355)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createOutputCollector(OperatorChain.java:282)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain.<
> init>(OperatorChain.java:126)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:231)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: cannot assign instance of
> org.peopleinmotion.TestFunction$$anonfun$1 to field
> org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of
> type scala.Function1 in instance of org.apache.flink.streaming.
> api.scala.DataStream$$anon$7
>         at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(
> ObjectStreamClass.java:2233)
>         at java.io.ObjectStreamClass.setObjFieldValues(
> ObjectStreamClass.java:1405)
>         at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2288)
>         at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:2206)
>         at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:2064)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1568)
>         at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2282)
>         at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:2206)
>         at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:2064)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1568)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> java:428)
>         at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:290)
>         at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
> InstantiationUtil.java:248)
>         at org.apache.flink.streaming.api.graph.StreamConfig.
> getStreamOperator(StreamConfig.java:220)
>         ... 6 more
> 12/13/2017 15:10:01     Job execution switched to status FAILED.
>
> ------------------------------------------------------------
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Job execution failed.
>         at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:492)
>         at org.apache.flink.client.program.StandaloneClusterClient.
> submitJob(StandaloneClusterClient.java:105)
>         at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:456)
>         at org.apache.flink.streaming.api.environment.
> StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>         at org.apache.flink.streaming.api.scala.
> StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
>         at org.peopleinmotion.TestFunction$.main(TestFunction.scala:20)
>         at org.peopleinmotion.TestFunction.main(TestFunction.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:525)
>         at org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:417)
>         at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:396)
>         at org.apache.flink.client.CliFrontend.executeProgram(
> CliFrontend.java:802)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
>         at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1054)
>         at org.apache.flink.client.CliFrontend$1.call(
> CliFrontend.java:1101)
>         at org.apache.flink.client.CliFrontend$1.call(
> CliFrontend.java:1098)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1556)
>         at org.apache.flink.runtime.security.HadoopSecurityContext.
> runSecured(HadoopSecurityContext.java:41)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>         at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$
> mcV$sp(JobManager.scala:897)
>         at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>         at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>         at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> liftedTree1$1(Future.scala:24)
>         at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> Future.scala:24)
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>         at akka.dispatch.ForkJoinExecutorConfigurator$
> AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Cannot instantiate user function.
>         at org.apache.flink.streaming.api.graph.StreamConfig.
> getStreamOperator(StreamConfig.java:235)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createChainedOperator(OperatorChain.java:355)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createOutputCollector(OperatorChain.java:282)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain.<
> init>(OperatorChain.java:126)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:231)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: cannot assign instance of
> org.peopleinmotion.TestFunction$$anonfun$1 to field
> org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of
> type scala.Function1 in instance of org.apache.flink.streaming.
> api.scala.DataStream$$anon$7
>         at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(
> ObjectStreamClass.java:2233)
>         at java.io.ObjectStreamClass.setObjFieldValues(
> ObjectStreamClass.java:1405)
>         at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2288)
>         at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:2206)
>         at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:2064)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1568)
>         at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2282)
>         at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:2206)
>         at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:2064)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1568)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> java:428)
>         at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:290)
>         at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
> InstantiationUtil.java:248)
>         at org.apache.flink.streaming.api.graph.StreamConfig.
> getStreamOperator(StreamConfig.java:220)
>
> {code}
>
> However, replacing the function with this results in everything working as
> expected:
>
> {code:java}
> val stream = env.fromCollection(someArray).filter(new FilterFunction[Int]
> {
>       override def filter(t: Int): Boolean = true
>     })
> {code}
>
> Perhaps something changed in the new build compared to the previous, as
> this was working without issue before?
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v6.4.14#64029)
>



-- 
Shivam Sharma
Data Engineer @ Goibibo
Indian Institute Of Information Technology, Design and Manufacturing
Jabalpur
Mobile No- (+91) 8882114744
Email:- 28shivamsha...@gmail.com
LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
<https://www.linkedin.com/in/28shivamsharma>*

Reply via email to