Hi!

This may be due to the changed classloading semantics.

Just to verify this, can you check if it gets solved by setting the
following in the Flink configuration: "classloader.resolve-order: parent-
first"

By default, Flink 1.4 uses now inverted classloading to allow users to use
their own copies of dependencies, irrespective of what the underlying
classpath is spoiled with. You can for example use a different Avro
versions than Hadoop pull in, even without shading, or even different Akka
/ Jackson / etc versions.

That is a nice improvement, but it may have some impacts on tools that have
been build before. When you see classcast exceptions (like X cannot be cast
to X), that is probably caused by the fact that the classloader duplicates
a dependency from the JVM classpath in user-space, but objects/classes move
between the domains.

Stephan

On Thu, Dec 14, 2017 at 8:57 AM, Shivam Sharma <28shivamsha...@gmail.com>
wrote:

> Same Issue I am facing :-
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.
> com/Issues-in-migrating-code-from-1-3-2-to-1-4-0-td20461.html
>
> Can anyone explain the exception
>
> Thanks
>
> On Thu, Dec 14, 2017 at 1:45 AM, Ryan Brideau (JIRA) <j...@apache.org>
> wrote:
>
> > Ryan Brideau created FLINK-8256:
> > -----------------------------------
> >
> >              Summary: Cannot use Scala functions to filter in 1.4 -
> > java.lang.ClassCastException
> >                  Key: FLINK-8256
> >                  URL: https://issues.apache.org/jira/browse/FLINK-8256
> >              Project: Flink
> >           Issue Type: Bug
> >           Components: DataStream API
> >     Affects Versions: 1.4.0
> >          Environment: macOS, Local Flink v1.4.0, Scala 2.11
> >             Reporter: Ryan Brideau
> >
> >
> > I built the newest release locally today, but when I try to filter a
> > stream using an anonymous or named function, I get an error. Here's a
> > simple example:
> >
> >
> > {code:java}
> > import org.apache.flink.api.java.utils.ParameterTool
> > import org.apache.flink.streaming.api.scala._
> >
> > object TestFunction {
> >
> >   def main(args: Array[String]): Unit = {
> >
> >     val env = StreamExecutionEnvironment.getExecutionEnvironment
> >     val params = ParameterTool.fromArgs(args)
> >     env.getConfig.setGlobalJobParameters(params)
> >
> >     val someArray = Array(1,2,3)
> >     val stream = env.fromCollection(someArray).filter(_ => true)
> >     stream.print().setParallelism(1)
> >     env.execute("Testing Function")
> >   }
> > }
> > {code}
> >
> > This results in:
> >
> >
> > {code:java}
> > Job execution switched to status FAILING.
> > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> > instantiate user function.
> >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > getStreamOperator(StreamConfig.java:235)
> >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > createChainedOperator(OperatorChain.java:355)
> >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > createOutputCollector(OperatorChain.java:282)
> >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.<
> > init>(OperatorChain.java:126)
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > invoke(StreamTask.java:231)
> >         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> >         at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.lang.ClassCastException: cannot assign instance of
> > org.peopleinmotion.TestFunction$$anonfun$1 to field
> > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of
> > type scala.Function1 in instance of org.apache.flink.streaming.
> > api.scala.DataStream$$anon$7
> >         at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(
> > ObjectStreamClass.java:2233)
> >         at java.io.ObjectStreamClass.setObjFieldValues(
> > ObjectStreamClass.java:1405)
> >         at java.io.ObjectInputStream.defaultReadFields(
> > ObjectInputStream.java:2288)
> >         at java.io.ObjectInputStream.readSerialData(
> > ObjectInputStream.java:2206)
> >         at java.io.ObjectInputStream.readOrdinaryObject(
> > ObjectInputStream.java:2064)
> >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > java:1568)
> >         at java.io.ObjectInputStream.defaultReadFields(
> > ObjectInputStream.java:2282)
> >         at java.io.ObjectInputStream.readSerialData(
> > ObjectInputStream.java:2206)
> >         at java.io.ObjectInputStream.readOrdinaryObject(
> > ObjectInputStream.java:2064)
> >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > java:1568)
> >         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> > java:428)
> >         at org.apache.flink.util.InstantiationUtil.deserializeObject(
> > InstantiationUtil.java:290)
> >         at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
> > InstantiationUtil.java:248)
> >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > getStreamOperator(StreamConfig.java:220)
> >         ... 6 more
> > 12/13/2017 15:10:01     Job execution switched to status FAILED.
> >
> > ------------------------------------------------------------
> >  The program finished with the following exception:
> >
> > org.apache.flink.client.program.ProgramInvocationException: The program
> > execution failed: Job execution failed.
> >         at org.apache.flink.client.program.ClusterClient.run(
> > ClusterClient.java:492)
> >         at org.apache.flink.client.program.StandaloneClusterClient.
> > submitJob(StandaloneClusterClient.java:105)
> >         at org.apache.flink.client.program.ClusterClient.run(
> > ClusterClient.java:456)
> >         at org.apache.flink.streaming.api.environment.
> > StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> >         at org.apache.flink.streaming.api.scala.
> > StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
> >         at org.peopleinmotion.TestFunction$.main(TestFunction.scala:20)
> >         at org.peopleinmotion.TestFunction.main(TestFunction.scala)
> >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >         at sun.reflect.NativeMethodAccessorImpl.invoke(
> > NativeMethodAccessorImpl.java:62)
> >         at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> > DelegatingMethodAccessorImpl.java:43)
> >         at java.lang.reflect.Method.invoke(Method.java:498)
> >         at org.apache.flink.client.program.PackagedProgram.callMainMeth
> od(
> > PackagedProgram.java:525)
> >         at org.apache.flink.client.program.PackagedProgram.
> > invokeInteractiveModeForExecution(PackagedProgram.java:417)
> >         at org.apache.flink.client.program.ClusterClient.run(
> > ClusterClient.java:396)
> >         at org.apache.flink.client.CliFrontend.executeProgram(
> > CliFrontend.java:802)
> >         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
> >         at org.apache.flink.client.CliFrontend.parseParameters(
> > CliFrontend.java:1054)
> >         at org.apache.flink.client.CliFrontend$1.call(
> > CliFrontend.java:1101)
> >         at org.apache.flink.client.CliFrontend$1.call(
> > CliFrontend.java:1098)
> >         at java.security.AccessController.doPrivileged(Native Method)
> >         at javax.security.auth.Subject.doAs(Subject.java:422)
> >         at org.apache.hadoop.security.UserGroupInformation.doAs(
> > UserGroupInformation.java:1556)
> >         at org.apache.flink.runtime.security.HadoopSecurityContext.
> > runSecured(HadoopSecurityContext.java:41)
> >         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:
> 1098)
> > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> > execution failed.
> >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$
> > mcV$sp(JobManager.scala:897)
> >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana
> ger.scala:840)
> >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana
> ger.scala:840)
> >         at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> > liftedTree1$1(Future.scala:24)
> >         at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> > Future.scala:24)
> >         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> >         at akka.dispatch.ForkJoinExecutorConfigurator$
> > AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> >         at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> > ForkJoinTask.java:260)
> >         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> > runTask(ForkJoinPool.java:1339)
> >         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> > ForkJoinPool.java:1979)
> >         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> > ForkJoinWorkerThread.java:107)
> > Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> > Cannot instantiate user function.
> >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > getStreamOperator(StreamConfig.java:235)
> >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > createChainedOperator(OperatorChain.java:355)
> >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > createOutputCollector(OperatorChain.java:282)
> >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.<
> > init>(OperatorChain.java:126)
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > invoke(StreamTask.java:231)
> >         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> >         at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.lang.ClassCastException: cannot assign instance of
> > org.peopleinmotion.TestFunction$$anonfun$1 to field
> > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of
> > type scala.Function1 in instance of org.apache.flink.streaming.
> > api.scala.DataStream$$anon$7
> >         at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(
> > ObjectStreamClass.java:2233)
> >         at java.io.ObjectStreamClass.setObjFieldValues(
> > ObjectStreamClass.java:1405)
> >         at java.io.ObjectInputStream.defaultReadFields(
> > ObjectInputStream.java:2288)
> >         at java.io.ObjectInputStream.readSerialData(
> > ObjectInputStream.java:2206)
> >         at java.io.ObjectInputStream.readOrdinaryObject(
> > ObjectInputStream.java:2064)
> >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > java:1568)
> >         at java.io.ObjectInputStream.defaultReadFields(
> > ObjectInputStream.java:2282)
> >         at java.io.ObjectInputStream.readSerialData(
> > ObjectInputStream.java:2206)
> >         at java.io.ObjectInputStream.readOrdinaryObject(
> > ObjectInputStream.java:2064)
> >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > java:1568)
> >         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> > java:428)
> >         at org.apache.flink.util.InstantiationUtil.deserializeObject(
> > InstantiationUtil.java:290)
> >         at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
> > InstantiationUtil.java:248)
> >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > getStreamOperator(StreamConfig.java:220)
> >
> > {code}
> >
> > However, replacing the function with this results in everything working
> as
> > expected:
> >
> > {code:java}
> > val stream = env.fromCollection(someArray).filter(new
> FilterFunction[Int]
> > {
> >       override def filter(t: Int): Boolean = true
> >     })
> > {code}
> >
> > Perhaps something changed in the new build compared to the previous, as
> > this was working without issue before?
> >
> >
> >
> > --
> > This message was sent by Atlassian JIRA
> > (v6.4.14#64029)
> >
>
>
>
> --
> Shivam Sharma
> Data Engineer @ Goibibo
> Indian Institute Of Information Technology, Design and Manufacturing
> Jabalpur
> Mobile No- (+91) 8882114744
> Email:- 28shivamsha...@gmail.com
> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> <https://www.linkedin.com/in/28shivamsharma>*
>

Reply via email to