Hi, I'd like to chime in since I've faced the same issue running Flink 1.1.4. I have a long-running YARN session which I use to run multiple streaming jobs concurrently. Once after cancelling and resubmitting the job I saw the "X cannot be cast to X" ClassCastException exception in logs. I restarted YARN session, then the problem disappeared.
The class that failed to be cast was autogenerated by Avro compiler. I know that Avro's Java binding does caching schemas in some static WeakHashMap. I'm wondering whether that may step in the way of Flink classloading design. Anyway, I would be interested in watching the issue in Flink JIRA. Giuliano, could you provide the issue number? Thanks, Yury 2017-01-11 14:11 GMT+03:00 Fabian Hueske <fhue...@gmail.com>: > Hi Guiliano, > > thanks for bringing up this issue. > A "ClassCastException: X cannot be cast to X" often points to a > classloader issue. > So it might actually be a bug in Flink. > > I assume you submit the same application (same jar file) with the same > command right? > Did you cancel the job before resubmitting? > > Can you create a JIRA issue [1] for this bug (hit the read CREATE button > on top) and include the commit hash from which you built Flink? > It would be great if you could provide a short example program and > instructions how to reproduce the problem. > > Thank you very much, > Fabian > > [1] https://issues.apache.org/jira/browse/FLINK > > > > 2017-01-11 1:22 GMT+01:00 Giuliano Caliari <giuliano.cali...@gmail.com>: > >> Hello, >> >> >> >> I need some guidance on how to report a bug. >> >> >> >> I’m testing version 1.2 on my local cluster and the first time I submit >> the job everything works but whenever I re-submit the same job it fails >> with >> >> org.apache.flink.client.program.ProgramInvocationException: The program >> execution failed: Job execution failed. >> >> at org.apache.flink.client.program.ClusterClient.run(ClusterCli >> ent.java:427) >> >> at org.apache.flink.client.program.StandaloneClusterClient.subm >> itJob(StandaloneClusterClient.java:101) >> >> at org.apache.flink.client.program.ClusterClient.run(ClusterCli >> ent.java:400) >> >> at org.apache.flink.streaming.api.environment.StreamContextEnvi >> ronment.execute(StreamContextEnvironment.java:66) >> >> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironm >> ent.execute(StreamExecutionEnvironment.scala:634) >> >> at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147) >> >> at au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoi >> nt$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22) >> >> at au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$ >> body.apply(TraitorApp.scala:21) >> >> at scala.Function0$class.apply$mcV$sp(Function0.scala:34) >> >> at scala.runtime.AbstractFunction0.apply$mcV$sp( >> AbstractFunction0.scala:12) >> >> at scala.App$$anonfun$main$1.apply(App.scala:76) >> >> at scala.App$$anonfun$main$1.apply(App.scala:76) >> >> at scala.collection.immutable.List.foreach(List.scala:381) >> >> at scala.collection.generic.TraversableForwarder$class.foreach( >> TraversableForwarder.scala:35) >> >> at scala.App$class.main(App.scala:76) >> >> at au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorA >> pp.scala:21) >> >> at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala) >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >> ssorImpl.java:62) >> >> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >> thodAccessorImpl.java:43) >> >> at java.lang.reflect.Method.invoke(Method.java:498) >> >> at org.apache.flink.client.program.PackagedProgram.callMainMeth >> od(PackagedProgram.java:528) >> >> at org.apache.flink.client.program.PackagedProgram.invokeIntera >> ctiveModeForExecution(PackagedProgram.java:419) >> >> at org.apache.flink.client.program.ClusterClient.run(ClusterCli >> ent.java:339) >> >> at org.apache.flink.client.CliFrontend.executeProgram(CliFronte >> nd.java:831) >> >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) >> >> at org.apache.flink.client.CliFrontend.parseParameters(CliFront >> end.java:1073) >> >> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) >> >> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) >> >> at org.apache.flink.runtime.security.NoOpSecurityContext.runSec >> ured(NoOpSecurityContext.java:29) >> >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) >> >> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >> execution failed. >> >> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$ >> handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) >> >> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$ >> handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) >> >> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$ >> handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) >> >> at scala.concurrent.impl.Future$PromiseCompletingRunnable.lifte >> dTree1$1(Future.scala:24) >> >> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(F >> uture.scala:24) >> >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) >> >> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask. >> exec(AbstractDispatcher.scala:397) >> >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> >> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask( >> ForkJoinPool.java:1339) >> >> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo >> l.java:1979) >> >> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW >> orkerThread.java:107) >> >> Caused by: java.lang.RuntimeException: Could not forward element to next >> operator >> >> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >> ngChainingOutput.collect(OperatorChain.java:415) >> >> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >> ngChainingOutput.collect(OperatorChain.java:397) >> >> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >> tor$CountingOutput.collect(AbstractStreamOperator.java:749) >> >> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >> tor$CountingOutput.collect(AbstractStreamOperator.java:727) >> >> at org.apache.flink.streaming.api.operators.StreamSourceContext >> s$ManualWatermarkContext.collectWithTimestamp(StreamSourceCo >> ntexts.java:272) >> >> at org.apache.flink.streaming.connectors.kafka.internals.Abstra >> ctFetcher.emitRecordWithTimestamp(AbstractFetcher.java:261) >> >> at org.apache.flink.streaming.connectors.kafka.internal.Kafka01 >> 0Fetcher.emitRecord(Kafka010Fetcher.java:88) >> >> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09 >> Fetcher.runFetchLoop(Kafka09Fetcher.java:157) >> >> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsum >> erBase.run(FlinkKafkaConsumerBase.java:255) >> >> at org.apache.flink.streaming.api.operators.StreamSource.run( >> StreamSource.java:78) >> >> at org.apache.flink.streaming.api.operators.StreamSource.run( >> StreamSource.java:55) >> >> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask. >> run(SourceStreamTask.java:56) >> >> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >> StreamTask.java:269) >> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654) >> >> at java.lang.Thread.run(Thread.java:745) >> >> Caused by: java.lang.ClassCastException: >> au.com.my.package.schema.p.WowTransaction >> cannot be cast to au.com.my.package.schema.p.WowTransaction >> >> at au.com.my.package.pTraitor.OneTrait$$anonfun$execute$4.apply >> (Traitor.scala:132) >> >> at org.apache.flink.streaming.api.scala.DataStream$$anon$1.extr >> actAscendingTimestamp(DataStream.scala:763) >> >> at org.apache.flink.streaming.api.functions.timestamps.Ascendin >> gTimestampExtractor.extractTimestamp(AscendingTimestampExtractor.java:72) >> >> at org.apache.flink.streaming.runtime.operators.TimestampsAndPe >> riodicWatermarksOperator.processElement(TimestampsAndPe >> riodicWatermarksOperator.java:65) >> >> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >> ngChainingOutput.collect(OperatorChain.java:412) >> >> ... 14 more >> >> I'm running a flink cluster built from the "release-1.2" branch on >> github. >> >> >> How can I validate that this is a Flink big? >> >> Where can I report this? >> >> What sort of information do I need to provide? >> >> >> Cheers, >> >> Giuliano Caliari >> -- >> -- >> Giuliano Caliari (+55 11 984898464 <+55%2011%2098489-8464>) >> <http://www.facebook.com/giuliano.caliari>+Google >> <https://plus.google.com/u/0/104857507547056767808/posts> >> Twitter <https://twitter.com/gcaliari> >> >> Master Software Engineer by Escola Politécnica da USP >> Bachelor in Computer Science by Instituto de Matemática e Estatística da >> USP >> >> >