@Shivam I think Flink 1.4 should not expose any ProtoBuf dependency any more. We shade it in Mesos and use a ProtoBuf free akka version now.
On Thu, Dec 14, 2017 at 8:41 PM, Shivam Sharma <28shivamsha...@gmail.com> wrote: > Hi Stephan, > > Thanks for your help. Basically reverted the classloading to parent > first, *resolved > this issue*. Thanks for this but I have one question: > > I am building a fat jar without any dependency as Provided. And in my case > I am using proto-java version 3.4.0 but I think fink uses pretty old > version(I think 2.5.0) > and when I submit my jar which version it will pick 2.5.0 or 3.4.0 in case > of parent-first classloading. > > Thanks > > On Thu, Dec 14, 2017 at 5:30 PM, Stephan Ewen <se...@apache.org> wrote: > > > @Shivam and @Ryan: > > > > My first feeling would be the following: You have the Scala library in > your > > user code, and thus through the reversed class loading, the scala > function > > types get duplicated. > > > > The right way to fix that is to make sure you build a proper jar file > > without any provided dependencies. Make sure you set "-Pbuild-jar" when > > packaging your program. > > > > - You could also set "classloader.resolve-order: parent-first" in your > > configuration to restore the old class loading style. > > > > - We should add "scala" to the default value for > > "classloader.parent-first-patterns". You can add it yourself in the > > configuration (make sure you keep all existing parent-first-patterns as > > well). > > > > > > > > On Thu, Dec 14, 2017 at 12:14 PM, Till Rohrmann <trohrm...@apache.org> > > wrote: > > > > > Hi, > > > > > > I tried to reproduce the problem given your description Ryan. I > submitted > > > the test job to a vanilla Flink 1.4.0 cluster (Hadoop-free version > > > downloaded from flink.apache.org, Hadoop 2.7 version donwloaded from > > > flink.apache.org and a cluster built from sources). However, I was not > > > able > > > to reproduce the problem. Therefore I suspect that it has something to > do > > > with your or my setup. > > > > > > In order to further diagnose the problem, it would be tremendously > > helpful > > > if you could share the logs contained in the logs directory with us. > > > > > > Cheers, > > > Till > > > > > > On Thu, Dec 14, 2017 at 11:19 AM, Stephan Ewen <se...@apache.org> > wrote: > > > > > > > Hi! > > > > > > > > This may be due to the changed classloading semantics. > > > > > > > > Just to verify this, can you check if it gets solved by setting the > > > > following in the Flink configuration: "classloader.resolve-order: > > parent- > > > > first" > > > > > > > > By default, Flink 1.4 uses now inverted classloading to allow users > to > > > use > > > > their own copies of dependencies, irrespective of what the underlying > > > > classpath is spoiled with. You can for example use a different Avro > > > > versions than Hadoop pull in, even without shading, or even different > > > Akka > > > > / Jackson / etc versions. > > > > > > > > That is a nice improvement, but it may have some impacts on tools > that > > > have > > > > been build before. When you see classcast exceptions (like X cannot > be > > > cast > > > > to X), that is probably caused by the fact that the classloader > > > duplicates > > > > a dependency from the JVM classpath in user-space, but > objects/classes > > > move > > > > between the domains. > > > > > > > > Stephan > > > > > > > > On Thu, Dec 14, 2017 at 8:57 AM, Shivam Sharma < > > 28shivamsha...@gmail.com > > > > > > > > wrote: > > > > > > > > > Same Issue I am facing :- > > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble. > > > > > com/Issues-in-migrating-code-from-1-3-2-to-1-4-0-td20461.html > > > > > > > > > > Can anyone explain the exception > > > > > > > > > > Thanks > > > > > > > > > > On Thu, Dec 14, 2017 at 1:45 AM, Ryan Brideau (JIRA) < > > j...@apache.org> > > > > > wrote: > > > > > > > > > > > Ryan Brideau created FLINK-8256: > > > > > > ----------------------------------- > > > > > > > > > > > > Summary: Cannot use Scala functions to filter in > 1.4 - > > > > > > java.lang.ClassCastException > > > > > > Key: FLINK-8256 > > > > > > URL: https://issues.apache.org/ > > > jira/browse/FLINK-8256 > > > > > > Project: Flink > > > > > > Issue Type: Bug > > > > > > Components: DataStream API > > > > > > Affects Versions: 1.4.0 > > > > > > Environment: macOS, Local Flink v1.4.0, Scala 2.11 > > > > > > Reporter: Ryan Brideau > > > > > > > > > > > > > > > > > > I built the newest release locally today, but when I try to > filter > > a > > > > > > stream using an anonymous or named function, I get an error. > > Here's a > > > > > > simple example: > > > > > > > > > > > > > > > > > > {code:java} > > > > > > import org.apache.flink.api.java.utils.ParameterTool > > > > > > import org.apache.flink.streaming.api.scala._ > > > > > > > > > > > > object TestFunction { > > > > > > > > > > > > def main(args: Array[String]): Unit = { > > > > > > > > > > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > > > > > val params = ParameterTool.fromArgs(args) > > > > > > env.getConfig.setGlobalJobParameters(params) > > > > > > > > > > > > val someArray = Array(1,2,3) > > > > > > val stream = env.fromCollection(someArray).filter(_ => true) > > > > > > stream.print().setParallelism(1) > > > > > > env.execute("Testing Function") > > > > > > } > > > > > > } > > > > > > {code} > > > > > > > > > > > > This results in: > > > > > > > > > > > > > > > > > > {code:java} > > > > > > Job execution switched to status FAILING. > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTaskException: > > Cannot > > > > > > instantiate user function. > > > > > > at org.apache.flink.streaming.api.graph.StreamConfig. > > > > > > getStreamOperator(StreamConfig.java:235) > > > > > > at org.apache.flink.streaming. > runtime.tasks.OperatorChain. > > > > > > createChainedOperator(OperatorChain.java:355) > > > > > > at org.apache.flink.streaming. > runtime.tasks.OperatorChain. > > > > > > createOutputCollector(OperatorChain.java:282) > > > > > > at org.apache.flink.streaming. > > runtime.tasks.OperatorChain.< > > > > > > init>(OperatorChain.java:126) > > > > > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > > > > > > invoke(StreamTask.java:231) > > > > > > at org.apache.flink.runtime.taskmanager.Task.run(Task. > > > > java:718) > > > > > > at java.lang.Thread.run(Thread.java:748) > > > > > > Caused by: java.lang.ClassCastException: cannot assign instance > of > > > > > > org.peopleinmotion.TestFunction$$anonfun$1 to field > > > > > > org.apache.flink.streaming.api.scala.DataStream$$anon$7. > cleanFun$6 > > > of > > > > > > type scala.Function1 in instance of org.apache.flink.streaming. > > > > > > api.scala.DataStream$$anon$7 > > > > > > at java.io.ObjectStreamClass$FieldReflector. > > > setObjFieldValues( > > > > > > ObjectStreamClass.java:2233) > > > > > > at java.io.ObjectStreamClass.setObjFieldValues( > > > > > > ObjectStreamClass.java:1405) > > > > > > at java.io.ObjectInputStream.defaultReadFields( > > > > > > ObjectInputStream.java:2288) > > > > > > at java.io.ObjectInputStream.readSerialData( > > > > > > ObjectInputStream.java:2206) > > > > > > at java.io.ObjectInputStream.readOrdinaryObject( > > > > > > ObjectInputStream.java:2064) > > > > > > at java.io.ObjectInputStream. > > readObject0(ObjectInputStream. > > > > > > java:1568) > > > > > > at java.io.ObjectInputStream.defaultReadFields( > > > > > > ObjectInputStream.java:2282) > > > > > > at java.io.ObjectInputStream.readSerialData( > > > > > > ObjectInputStream.java:2206) > > > > > > at java.io.ObjectInputStream.readOrdinaryObject( > > > > > > ObjectInputStream.java:2064) > > > > > > at java.io.ObjectInputStream. > > readObject0(ObjectInputStream. > > > > > > java:1568) > > > > > > at java.io.ObjectInputStream. > readObject(ObjectInputStream. > > > > > > java:428) > > > > > > at org.apache.flink.util.InstantiationUtil. > > > deserializeObject( > > > > > > InstantiationUtil.java:290) > > > > > > at org.apache.flink.util.InstantiationUtil. > > > > readObjectFromConfig( > > > > > > InstantiationUtil.java:248) > > > > > > at org.apache.flink.streaming.api.graph.StreamConfig. > > > > > > getStreamOperator(StreamConfig.java:220) > > > > > > ... 6 more > > > > > > 12/13/2017 15:10:01 Job execution switched to status FAILED. > > > > > > > > > > > > ------------------------------------------------------------ > > > > > > The program finished with the following exception: > > > > > > > > > > > > org.apache.flink.client.program.ProgramInvocationException: The > > > > program > > > > > > execution failed: Job execution failed. > > > > > > at org.apache.flink.client.program.ClusterClient.run( > > > > > > ClusterClient.java:492) > > > > > > at org.apache.flink.client.program. > > StandaloneClusterClient. > > > > > > submitJob(StandaloneClusterClient.java:105) > > > > > > at org.apache.flink.client.program.ClusterClient.run( > > > > > > ClusterClient.java:456) > > > > > > at org.apache.flink.streaming.api.environment. > > > > > > StreamContextEnvironment.execute(StreamContextEnvironment.java: > 66) > > > > > > at org.apache.flink.streaming.api.scala. > > > > > > StreamExecutionEnvironment.execute(StreamExecutionEnvironment. > > > > scala:638) > > > > > > at org.peopleinmotion.TestFunction$.main( > > > > TestFunction.scala:20) > > > > > > at org.peopleinmotion.TestFunction.main( > > TestFunction.scala) > > > > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > > > Method) > > > > > > at sun.reflect.NativeMethodAccessorImpl.invoke( > > > > > > NativeMethodAccessorImpl.java:62) > > > > > > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > > > > > > DelegatingMethodAccessorImpl.java:43) > > > > > > at java.lang.reflect.Method.invoke(Method.java:498) > > > > > > at org.apache.flink.client.program.PackagedProgram. > > > > callMainMeth > > > > > od( > > > > > > PackagedProgram.java:525) > > > > > > at org.apache.flink.client.program.PackagedProgram. > > > > > > invokeInteractiveModeForExecution(PackagedProgram.java:417) > > > > > > at org.apache.flink.client.program.ClusterClient.run( > > > > > > ClusterClient.java:396) > > > > > > at org.apache.flink.client.CliFrontend.executeProgram( > > > > > > CliFrontend.java:802) > > > > > > at org.apache.flink.client.CliFrontend.run(CliFrontend. > > > > java:282) > > > > > > at org.apache.flink.client.CliFrontend.parseParameters( > > > > > > CliFrontend.java:1054) > > > > > > at org.apache.flink.client.CliFrontend$1.call( > > > > > > CliFrontend.java:1101) > > > > > > at org.apache.flink.client.CliFrontend$1.call( > > > > > > CliFrontend.java:1098) > > > > > > at java.security.AccessController.doPrivileged(Native > > > Method) > > > > > > at javax.security.auth.Subject.doAs(Subject.java:422) > > > > > > at org.apache.hadoop.security.UserGroupInformation.doAs( > > > > > > UserGroupInformation.java:1556) > > > > > > at org.apache.flink.runtime.security. > > HadoopSecurityContext. > > > > > > runSecured(HadoopSecurityContext.java:41) > > > > > > at org.apache.flink.client.CliFrontend.main(CliFrontend. > > > java: > > > > > 1098) > > > > > > Caused by: org.apache.flink.runtime. > client.JobExecutionException: > > > Job > > > > > > execution failed. > > > > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$ > > > > > > mcV$sp(JobManager.scala:897) > > > > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana > > > > > ger.scala:840) > > > > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana > > > > > ger.scala:840) > > > > > > at scala.concurrent.impl.Future$ > PromiseCompletingRunnable. > > > > > > liftedTree1$1(Future.scala:24) > > > > > > at scala.concurrent.impl.Future$ > > > PromiseCompletingRunnable.run( > > > > > > Future.scala:24) > > > > > > at akka.dispatch.TaskInvocation. > > > run(AbstractDispatcher.scala: > > > > 39) > > > > > > at akka.dispatch.ForkJoinExecutorConfigurator$ > > > > > > AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > > > > > > at scala.concurrent.forkjoin.ForkJoinTask.doExec( > > > > > > ForkJoinTask.java:260) > > > > > > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > > > > > > runTask(ForkJoinPool.java:1339) > > > > > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > > > > > > ForkJoinPool.java:1979) > > > > > > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > > > > > > ForkJoinWorkerThread.java:107) > > > > > > Caused by: org.apache.flink.streaming.runtime.tasks. > > > > StreamTaskException: > > > > > > Cannot instantiate user function. > > > > > > at org.apache.flink.streaming.api.graph.StreamConfig. > > > > > > getStreamOperator(StreamConfig.java:235) > > > > > > at org.apache.flink.streaming. > runtime.tasks.OperatorChain. > > > > > > createChainedOperator(OperatorChain.java:355) > > > > > > at org.apache.flink.streaming. > runtime.tasks.OperatorChain. > > > > > > createOutputCollector(OperatorChain.java:282) > > > > > > at org.apache.flink.streaming. > > runtime.tasks.OperatorChain.< > > > > > > init>(OperatorChain.java:126) > > > > > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > > > > > > invoke(StreamTask.java:231) > > > > > > at org.apache.flink.runtime.taskmanager.Task.run(Task. > > > > java:718) > > > > > > at java.lang.Thread.run(Thread.java:748) > > > > > > Caused by: java.lang.ClassCastException: cannot assign instance > of > > > > > > org.peopleinmotion.TestFunction$$anonfun$1 to field > > > > > > org.apache.flink.streaming.api.scala.DataStream$$anon$7. > cleanFun$6 > > > of > > > > > > type scala.Function1 in instance of org.apache.flink.streaming. > > > > > > api.scala.DataStream$$anon$7 > > > > > > at java.io.ObjectStreamClass$FieldReflector. > > > setObjFieldValues( > > > > > > ObjectStreamClass.java:2233) > > > > > > at java.io.ObjectStreamClass.setObjFieldValues( > > > > > > ObjectStreamClass.java:1405) > > > > > > at java.io.ObjectInputStream.defaultReadFields( > > > > > > ObjectInputStream.java:2288) > > > > > > at java.io.ObjectInputStream.readSerialData( > > > > > > ObjectInputStream.java:2206) > > > > > > at java.io.ObjectInputStream.readOrdinaryObject( > > > > > > ObjectInputStream.java:2064) > > > > > > at java.io.ObjectInputStream. > > readObject0(ObjectInputStream. > > > > > > java:1568) > > > > > > at java.io.ObjectInputStream.defaultReadFields( > > > > > > ObjectInputStream.java:2282) > > > > > > at java.io.ObjectInputStream.readSerialData( > > > > > > ObjectInputStream.java:2206) > > > > > > at java.io.ObjectInputStream.readOrdinaryObject( > > > > > > ObjectInputStream.java:2064) > > > > > > at java.io.ObjectInputStream. > > readObject0(ObjectInputStream. > > > > > > java:1568) > > > > > > at java.io.ObjectInputStream. > readObject(ObjectInputStream. > > > > > > java:428) > > > > > > at org.apache.flink.util.InstantiationUtil. > > > deserializeObject( > > > > > > InstantiationUtil.java:290) > > > > > > at org.apache.flink.util.InstantiationUtil. > > > > readObjectFromConfig( > > > > > > InstantiationUtil.java:248) > > > > > > at org.apache.flink.streaming.api.graph.StreamConfig. > > > > > > getStreamOperator(StreamConfig.java:220) > > > > > > > > > > > > {code} > > > > > > > > > > > > However, replacing the function with this results in everything > > > working > > > > > as > > > > > > expected: > > > > > > > > > > > > {code:java} > > > > > > val stream = env.fromCollection(someArray).filter(new > > > > > FilterFunction[Int] > > > > > > { > > > > > > override def filter(t: Int): Boolean = true > > > > > > }) > > > > > > {code} > > > > > > > > > > > > Perhaps something changed in the new build compared to the > > previous, > > > as > > > > > > this was working without issue before? > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > This message was sent by Atlassian JIRA > > > > > > (v6.4.14#64029) > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > Shivam Sharma > > > > > Data Engineer @ Goibibo > > > > > Indian Institute Of Information Technology, Design and > Manufacturing > > > > > Jabalpur > > > > > Mobile No- (+91) 8882114744 > > > > > Email:- 28shivamsha...@gmail.com > > > > > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma > > > > > <https://www.linkedin.com/in/28shivamsharma>* > > > > > > > > > > > > > > > > > > -- > Shivam Sharma > Data Engineer @ Goibibo > Indian Institute Of Information Technology, Design and Manufacturing > Jabalpur > Mobile No- (+91) 8882114744 > Email:- 28shivamsha...@gmail.com > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma > <https://www.linkedin.com/in/28shivamsharma>* >