Hi Guiliano, thanks for bringing up this issue. A "ClassCastException: X cannot be cast to X" often points to a classloader issue. So it might actually be a bug in Flink.
I assume you submit the same application (same jar file) with the same command right? Did you cancel the job before resubmitting? Can you create a JIRA issue [1] for this bug (hit the read CREATE button on top) and include the commit hash from which you built Flink? It would be great if you could provide a short example program and instructions how to reproduce the problem. Thank you very much, Fabian [1] https://issues.apache.org/jira/browse/FLINK 2017-01-11 1:22 GMT+01:00 Giuliano Caliari <giuliano.cali...@gmail.com>: > Hello, > > > > I need some guidance on how to report a bug. > > > > I’m testing version 1.2 on my local cluster and the first time I submit > the job everything works but whenever I re-submit the same job it fails > with > > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:427) > > at org.apache.flink.client.program.StandaloneClusterClient.submitJob( > StandaloneClusterClient.java:101) > > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:400) > > at org.apache.flink.streaming.api.environment.StreamContextEnvironment. > execute(StreamContextEnvironment.java:66) > > at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment. > execute(StreamExecutionEnvironment.scala:634) > > at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147) > > at au.com.my.package.pTraitor.TraitorAppOneTrait$. > delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1( > TraitorApp.scala:22) > > at au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply( > TraitorApp.scala:21) > > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > > at scala.runtime.AbstractFunction0.apply$mcV$ > sp(AbstractFunction0.scala:12) > > at scala.App$$anonfun$main$1.apply(App.scala:76) > > at scala.App$$anonfun$main$1.apply(App.scala:76) > > at scala.collection.immutable.List.foreach(List.scala:381) > > at scala.collection.generic.TraversableForwarder$class. > foreach(TraversableForwarder.scala:35) > > at scala.App$class.main(App.scala:76) > > at au.com.my.package.pTraitor.TraitorAppOneTrait$.main( > TraitorApp.scala:21) > > at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at org.apache.flink.client.program.PackagedProgram.callMainMethod( > PackagedProgram.java:528) > > at org.apache.flink.client.program.PackagedProgram. > invokeInteractiveModeForExecution(PackagedProgram.java:419) > > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:339) > > at org.apache.flink.client.CliFrontend.executeProgram( > CliFrontend.java:831) > > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) > > at org.apache.flink.client.CliFrontend.parseParameters( > CliFrontend.java:1073) > > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) > > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) > > at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured( > NoOpSecurityContext.java:29) > > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > > at org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$ > mcV$sp(JobManager.scala:900) > > at org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > > at org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > > at scala.concurrent.impl.Future$PromiseCompletingRunnable. > liftedTree1$1(Future.scala:24) > > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run( > Future.scala:24) > > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( > AbstractDispatcher.scala:397) > > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > runTask(ForkJoinPool.java:1339) > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > ForkJoinPool.java:1979) > > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > ForkJoinWorkerThread.java:107) > > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain.java:415) > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain.java:397) > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator.java:749) > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > CountingOutput.collect(AbstractStreamOperator.java:727) > > at org.apache.flink.streaming.api.operators.StreamSourceContexts$ > ManualWatermarkContext.collectWithTimestamp(StreamSourceContexts.java:272) > > at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher. > emitRecordWithTimestamp(AbstractFetcher.java:261) > > at org.apache.flink.streaming.connectors.kafka.internal. > Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:88) > > at org.apache.flink.streaming.connectors.kafka.internal. > Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:157) > > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run( > FlinkKafkaConsumerBase.java:255) > > at org.apache.flink.streaming.api.operators.StreamSource. > run(StreamSource.java:78) > > at org.apache.flink.streaming.api.operators.StreamSource. > run(StreamSource.java:55) > > at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run( > SourceStreamTask.java:56) > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:269) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.ClassCastException: > au.com.my.package.schema.p.WowTransaction > cannot be cast to au.com.my.package.schema.p.WowTransaction > > at au.com.my.package.pTraitor.OneTrait$$anonfun$execute$4. > apply(Traitor.scala:132) > > at org.apache.flink.streaming.api.scala.DataStream$$anon$1. > extractAscendingTimestamp(DataStream.scala:763) > > at org.apache.flink.streaming.api.functions.timestamps. > AscendingTimestampExtractor.extractTimestamp(AscendingTimestampExtractor. > java:72) > > at org.apache.flink.streaming.runtime.operators. > TimestampsAndPeriodicWatermarksOperator.processElement( > TimestampsAndPeriodicWatermarksOperator.java:65) > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > CopyingChainingOutput.collect(OperatorChain.java:412) > > ... 14 more > > I'm running a flink cluster built from the "release-1.2" branch on github. > > > How can I validate that this is a Flink big? > > Where can I report this? > > What sort of information do I need to provide? > > > Cheers, > > Giuliano Caliari > -- > -- > Giuliano Caliari (+55 11 984898464 <+55%2011%2098489-8464>) > <http://www.facebook.com/giuliano.caliari>+Google > <https://plus.google.com/u/0/104857507547056767808/posts> > Twitter <https://twitter.com/gcaliari> > > Master Software Engineer by Escola Politécnica da USP > Bachelor in Computer Science by Instituto de Matemática e Estatística da > USP > >