@Shivam I think Flink 1.4 should not expose any ProtoBuf dependency any
more. We shade it in Mesos and use a ProtoBuf free akka version now.

On Thu, Dec 14, 2017 at 8:41 PM, Shivam Sharma <28shivamsha...@gmail.com>
wrote:

> Hi Stephan,
>
> Thanks for your help. Basically reverted the classloading to parent
> first, *resolved
> this issue*. Thanks for this but I have one question:
>
> I am building a fat jar without any dependency as Provided. And in my case
> I am using proto-java version 3.4.0  but I think fink uses pretty old
> version(I think 2.5.0)
> and when I submit my jar which version it will pick 2.5.0 or 3.4.0 in case
> of parent-first classloading.
>
> Thanks
>
> On Thu, Dec 14, 2017 at 5:30 PM, Stephan Ewen <se...@apache.org> wrote:
>
> > @Shivam and @Ryan:
> >
> > My first feeling would be the following: You have the Scala library in
> your
> > user code, and thus through the reversed class loading, the scala
> function
> > types get duplicated.
> >
> > The right way to fix that is to make sure you build a proper jar file
> > without any provided dependencies. Make sure you set "-Pbuild-jar" when
> > packaging your program.
> >
> >   - You could also set "classloader.resolve-order: parent-first" in your
> > configuration to restore the old class loading style.
> >
> >   - We should add "scala" to the default value for
> > "classloader.parent-first-patterns". You can add it yourself in the
> > configuration (make sure you keep all existing parent-first-patterns as
> > well).
> >
> >
> >
> > On Thu, Dec 14, 2017 at 12:14 PM, Till Rohrmann <trohrm...@apache.org>
> > wrote:
> >
> > > Hi,
> > >
> > > I tried to reproduce the problem given your description Ryan. I
> submitted
> > > the test job to a vanilla Flink 1.4.0 cluster (Hadoop-free version
> > > downloaded from flink.apache.org, Hadoop 2.7 version donwloaded from
> > > flink.apache.org and a cluster built from sources). However, I was not
> > > able
> > > to reproduce the problem. Therefore I suspect that it has something to
> do
> > > with your or my setup.
> > >
> > > In order to further diagnose the problem, it would be tremendously
> > helpful
> > > if you could share the logs contained in the logs directory with us.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Thu, Dec 14, 2017 at 11:19 AM, Stephan Ewen <se...@apache.org>
> wrote:
> > >
> > > > Hi!
> > > >
> > > > This may be due to the changed classloading semantics.
> > > >
> > > > Just to verify this, can you check if it gets solved by setting the
> > > > following in the Flink configuration: "classloader.resolve-order:
> > parent-
> > > > first"
> > > >
> > > > By default, Flink 1.4 uses now inverted classloading to allow users
> to
> > > use
> > > > their own copies of dependencies, irrespective of what the underlying
> > > > classpath is spoiled with. You can for example use a different Avro
> > > > versions than Hadoop pull in, even without shading, or even different
> > > Akka
> > > > / Jackson / etc versions.
> > > >
> > > > That is a nice improvement, but it may have some impacts on tools
> that
> > > have
> > > > been build before. When you see classcast exceptions (like X cannot
> be
> > > cast
> > > > to X), that is probably caused by the fact that the classloader
> > > duplicates
> > > > a dependency from the JVM classpath in user-space, but
> objects/classes
> > > move
> > > > between the domains.
> > > >
> > > > Stephan
> > > >
> > > > On Thu, Dec 14, 2017 at 8:57 AM, Shivam Sharma <
> > 28shivamsha...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Same Issue I am facing :-
> > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.
> > > > > com/Issues-in-migrating-code-from-1-3-2-to-1-4-0-td20461.html
> > > > >
> > > > > Can anyone explain the exception
> > > > >
> > > > > Thanks
> > > > >
> > > > > On Thu, Dec 14, 2017 at 1:45 AM, Ryan Brideau (JIRA) <
> > j...@apache.org>
> > > > > wrote:
> > > > >
> > > > > > Ryan Brideau created FLINK-8256:
> > > > > > -----------------------------------
> > > > > >
> > > > > >              Summary: Cannot use Scala functions to filter in
> 1.4 -
> > > > > > java.lang.ClassCastException
> > > > > >                  Key: FLINK-8256
> > > > > >                  URL: https://issues.apache.org/
> > > jira/browse/FLINK-8256
> > > > > >              Project: Flink
> > > > > >           Issue Type: Bug
> > > > > >           Components: DataStream API
> > > > > >     Affects Versions: 1.4.0
> > > > > >          Environment: macOS, Local Flink v1.4.0, Scala 2.11
> > > > > >             Reporter: Ryan Brideau
> > > > > >
> > > > > >
> > > > > > I built the newest release locally today, but when I try to
> filter
> > a
> > > > > > stream using an anonymous or named function, I get an error.
> > Here's a
> > > > > > simple example:
> > > > > >
> > > > > >
> > > > > > {code:java}
> > > > > > import org.apache.flink.api.java.utils.ParameterTool
> > > > > > import org.apache.flink.streaming.api.scala._
> > > > > >
> > > > > > object TestFunction {
> > > > > >
> > > > > >   def main(args: Array[String]): Unit = {
> > > > > >
> > > > > >     val env = StreamExecutionEnvironment.getExecutionEnvironment
> > > > > >     val params = ParameterTool.fromArgs(args)
> > > > > >     env.getConfig.setGlobalJobParameters(params)
> > > > > >
> > > > > >     val someArray = Array(1,2,3)
> > > > > >     val stream = env.fromCollection(someArray).filter(_ => true)
> > > > > >     stream.print().setParallelism(1)
> > > > > >     env.execute("Testing Function")
> > > > > >   }
> > > > > > }
> > > > > > {code}
> > > > > >
> > > > > > This results in:
> > > > > >
> > > > > >
> > > > > > {code:java}
> > > > > > Job execution switched to status FAILING.
> > > > > > org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> > Cannot
> > > > > > instantiate user function.
> > > > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > > > getStreamOperator(StreamConfig.java:235)
> > > > > >         at org.apache.flink.streaming.
> runtime.tasks.OperatorChain.
> > > > > > createChainedOperator(OperatorChain.java:355)
> > > > > >         at org.apache.flink.streaming.
> runtime.tasks.OperatorChain.
> > > > > > createOutputCollector(OperatorChain.java:282)
> > > > > >         at org.apache.flink.streaming.
> > runtime.tasks.OperatorChain.<
> > > > > > init>(OperatorChain.java:126)
> > > > > >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > > > > > invoke(StreamTask.java:231)
> > > > > >         at org.apache.flink.runtime.taskmanager.Task.run(Task.
> > > > java:718)
> > > > > >         at java.lang.Thread.run(Thread.java:748)
> > > > > > Caused by: java.lang.ClassCastException: cannot assign instance
> of
> > > > > > org.peopleinmotion.TestFunction$$anonfun$1 to field
> > > > > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.
> cleanFun$6
> > > of
> > > > > > type scala.Function1 in instance of org.apache.flink.streaming.
> > > > > > api.scala.DataStream$$anon$7
> > > > > >         at java.io.ObjectStreamClass$FieldReflector.
> > > setObjFieldValues(
> > > > > > ObjectStreamClass.java:2233)
> > > > > >         at java.io.ObjectStreamClass.setObjFieldValues(
> > > > > > ObjectStreamClass.java:1405)
> > > > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > > > ObjectInputStream.java:2288)
> > > > > >         at java.io.ObjectInputStream.readSerialData(
> > > > > > ObjectInputStream.java:2206)
> > > > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > > > ObjectInputStream.java:2064)
> > > > > >         at java.io.ObjectInputStream.
> > readObject0(ObjectInputStream.
> > > > > > java:1568)
> > > > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > > > ObjectInputStream.java:2282)
> > > > > >         at java.io.ObjectInputStream.readSerialData(
> > > > > > ObjectInputStream.java:2206)
> > > > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > > > ObjectInputStream.java:2064)
> > > > > >         at java.io.ObjectInputStream.
> > readObject0(ObjectInputStream.
> > > > > > java:1568)
> > > > > >         at java.io.ObjectInputStream.
> readObject(ObjectInputStream.
> > > > > > java:428)
> > > > > >         at org.apache.flink.util.InstantiationUtil.
> > > deserializeObject(
> > > > > > InstantiationUtil.java:290)
> > > > > >         at org.apache.flink.util.InstantiationUtil.
> > > > readObjectFromConfig(
> > > > > > InstantiationUtil.java:248)
> > > > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > > > getStreamOperator(StreamConfig.java:220)
> > > > > >         ... 6 more
> > > > > > 12/13/2017 15:10:01     Job execution switched to status FAILED.
> > > > > >
> > > > > > ------------------------------------------------------------
> > > > > >  The program finished with the following exception:
> > > > > >
> > > > > > org.apache.flink.client.program.ProgramInvocationException: The
> > > > program
> > > > > > execution failed: Job execution failed.
> > > > > >         at org.apache.flink.client.program.ClusterClient.run(
> > > > > > ClusterClient.java:492)
> > > > > >         at org.apache.flink.client.program.
> > StandaloneClusterClient.
> > > > > > submitJob(StandaloneClusterClient.java:105)
> > > > > >         at org.apache.flink.client.program.ClusterClient.run(
> > > > > > ClusterClient.java:456)
> > > > > >         at org.apache.flink.streaming.api.environment.
> > > > > > StreamContextEnvironment.execute(StreamContextEnvironment.java:
> 66)
> > > > > >         at org.apache.flink.streaming.api.scala.
> > > > > > StreamExecutionEnvironment.execute(StreamExecutionEnvironment.
> > > > scala:638)
> > > > > >         at org.peopleinmotion.TestFunction$.main(
> > > > TestFunction.scala:20)
> > > > > >         at org.peopleinmotion.TestFunction.main(
> > TestFunction.scala)
> > > > > >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> > > Method)
> > > > > >         at sun.reflect.NativeMethodAccessorImpl.invoke(
> > > > > > NativeMethodAccessorImpl.java:62)
> > > > > >         at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> > > > > > DelegatingMethodAccessorImpl.java:43)
> > > > > >         at java.lang.reflect.Method.invoke(Method.java:498)
> > > > > >         at org.apache.flink.client.program.PackagedProgram.
> > > > callMainMeth
> > > > > od(
> > > > > > PackagedProgram.java:525)
> > > > > >         at org.apache.flink.client.program.PackagedProgram.
> > > > > > invokeInteractiveModeForExecution(PackagedProgram.java:417)
> > > > > >         at org.apache.flink.client.program.ClusterClient.run(
> > > > > > ClusterClient.java:396)
> > > > > >         at org.apache.flink.client.CliFrontend.executeProgram(
> > > > > > CliFrontend.java:802)
> > > > > >         at org.apache.flink.client.CliFrontend.run(CliFrontend.
> > > > java:282)
> > > > > >         at org.apache.flink.client.CliFrontend.parseParameters(
> > > > > > CliFrontend.java:1054)
> > > > > >         at org.apache.flink.client.CliFrontend$1.call(
> > > > > > CliFrontend.java:1101)
> > > > > >         at org.apache.flink.client.CliFrontend$1.call(
> > > > > > CliFrontend.java:1098)
> > > > > >         at java.security.AccessController.doPrivileged(Native
> > > Method)
> > > > > >         at javax.security.auth.Subject.doAs(Subject.java:422)
> > > > > >         at org.apache.hadoop.security.UserGroupInformation.doAs(
> > > > > > UserGroupInformation.java:1556)
> > > > > >         at org.apache.flink.runtime.security.
> > HadoopSecurityContext.
> > > > > > runSecured(HadoopSecurityContext.java:41)
> > > > > >         at org.apache.flink.client.CliFrontend.main(CliFrontend.
> > > java:
> > > > > 1098)
> > > > > > Caused by: org.apache.flink.runtime.
> client.JobExecutionException:
> > > Job
> > > > > > execution failed.
> > > > > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$
> > > > > > mcV$sp(JobManager.scala:897)
> > > > > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana
> > > > > ger.scala:840)
> > > > > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana
> > > > > ger.scala:840)
> > > > > >         at scala.concurrent.impl.Future$
> PromiseCompletingRunnable.
> > > > > > liftedTree1$1(Future.scala:24)
> > > > > >         at scala.concurrent.impl.Future$
> > > PromiseCompletingRunnable.run(
> > > > > > Future.scala:24)
> > > > > >         at akka.dispatch.TaskInvocation.
> > > run(AbstractDispatcher.scala:
> > > > 39)
> > > > > >         at akka.dispatch.ForkJoinExecutorConfigurator$
> > > > > > AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> > > > > >         at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> > > > > > ForkJoinTask.java:260)
> > > > > >         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> > > > > > runTask(ForkJoinPool.java:1339)
> > > > > >         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> > > > > > ForkJoinPool.java:1979)
> > > > > >         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> > > > > > ForkJoinWorkerThread.java:107)
> > > > > > Caused by: org.apache.flink.streaming.runtime.tasks.
> > > > StreamTaskException:
> > > > > > Cannot instantiate user function.
> > > > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > > > getStreamOperator(StreamConfig.java:235)
> > > > > >         at org.apache.flink.streaming.
> runtime.tasks.OperatorChain.
> > > > > > createChainedOperator(OperatorChain.java:355)
> > > > > >         at org.apache.flink.streaming.
> runtime.tasks.OperatorChain.
> > > > > > createOutputCollector(OperatorChain.java:282)
> > > > > >         at org.apache.flink.streaming.
> > runtime.tasks.OperatorChain.<
> > > > > > init>(OperatorChain.java:126)
> > > > > >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > > > > > invoke(StreamTask.java:231)
> > > > > >         at org.apache.flink.runtime.taskmanager.Task.run(Task.
> > > > java:718)
> > > > > >         at java.lang.Thread.run(Thread.java:748)
> > > > > > Caused by: java.lang.ClassCastException: cannot assign instance
> of
> > > > > > org.peopleinmotion.TestFunction$$anonfun$1 to field
> > > > > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.
> cleanFun$6
> > > of
> > > > > > type scala.Function1 in instance of org.apache.flink.streaming.
> > > > > > api.scala.DataStream$$anon$7
> > > > > >         at java.io.ObjectStreamClass$FieldReflector.
> > > setObjFieldValues(
> > > > > > ObjectStreamClass.java:2233)
> > > > > >         at java.io.ObjectStreamClass.setObjFieldValues(
> > > > > > ObjectStreamClass.java:1405)
> > > > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > > > ObjectInputStream.java:2288)
> > > > > >         at java.io.ObjectInputStream.readSerialData(
> > > > > > ObjectInputStream.java:2206)
> > > > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > > > ObjectInputStream.java:2064)
> > > > > >         at java.io.ObjectInputStream.
> > readObject0(ObjectInputStream.
> > > > > > java:1568)
> > > > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > > > ObjectInputStream.java:2282)
> > > > > >         at java.io.ObjectInputStream.readSerialData(
> > > > > > ObjectInputStream.java:2206)
> > > > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > > > ObjectInputStream.java:2064)
> > > > > >         at java.io.ObjectInputStream.
> > readObject0(ObjectInputStream.
> > > > > > java:1568)
> > > > > >         at java.io.ObjectInputStream.
> readObject(ObjectInputStream.
> > > > > > java:428)
> > > > > >         at org.apache.flink.util.InstantiationUtil.
> > > deserializeObject(
> > > > > > InstantiationUtil.java:290)
> > > > > >         at org.apache.flink.util.InstantiationUtil.
> > > > readObjectFromConfig(
> > > > > > InstantiationUtil.java:248)
> > > > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > > > getStreamOperator(StreamConfig.java:220)
> > > > > >
> > > > > > {code}
> > > > > >
> > > > > > However, replacing the function with this results in everything
> > > working
> > > > > as
> > > > > > expected:
> > > > > >
> > > > > > {code:java}
> > > > > > val stream = env.fromCollection(someArray).filter(new
> > > > > FilterFunction[Int]
> > > > > > {
> > > > > >       override def filter(t: Int): Boolean = true
> > > > > >     })
> > > > > > {code}
> > > > > >
> > > > > > Perhaps something changed in the new build compared to the
> > previous,
> > > as
> > > > > > this was working without issue before?
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > This message was sent by Atlassian JIRA
> > > > > > (v6.4.14#64029)
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Shivam Sharma
> > > > > Data Engineer @ Goibibo
> > > > > Indian Institute Of Information Technology, Design and
> Manufacturing
> > > > > Jabalpur
> > > > > Mobile No- (+91) 8882114744
> > > > > Email:- 28shivamsha...@gmail.com
> > > > > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> > > > > <https://www.linkedin.com/in/28shivamsharma>*
> > > > >
> > > >
> > >
> >
>
>
>
> --
> Shivam Sharma
> Data Engineer @ Goibibo
> Indian Institute Of Information Technology, Design and Manufacturing
> Jabalpur
> Mobile No- (+91) 8882114744
> Email:- 28shivamsha...@gmail.com
> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> <https://www.linkedin.com/in/28shivamsharma>*
>

Reply via email to