Hi,

I'd like to chime in since I've faced the same issue running Flink 1.1.4. I
have a long-running YARN session which I use to run multiple streaming jobs
concurrently. Once after cancelling and resubmitting the job I saw the "X
cannot be cast to X" ClassCastException exception in logs. I restarted YARN
session, then the problem disappeared.

The class that failed to be cast was autogenerated by Avro compiler. I know
that Avro's Java binding does caching schemas in some static WeakHashMap.
I'm wondering whether that may step in the way of Flink classloading design.

Anyway, I would be interested in watching the issue in Flink JIRA.

Giuliano, could you provide the issue number?

Thanks,
Yury

2017-01-11 14:11 GMT+03:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Guiliano,
>
> thanks for bringing up this issue.
> A "ClassCastException: X cannot be cast to X" often points to a
> classloader issue.
> So it might actually be a bug in Flink.
>
> I assume you submit the same application (same jar file) with the same
> command right?
> Did you cancel the job before resubmitting?
>
> Can you create a JIRA issue [1] for this bug (hit the read CREATE button
> on top) and include the commit hash from which you built Flink?
> It would be great if you could provide a short example program and
> instructions how to reproduce the problem.
>
> Thank you very much,
> Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK
>
>
>
> 2017-01-11 1:22 GMT+01:00 Giuliano Caliari <giuliano.cali...@gmail.com>:
>
>> Hello,
>>
>>
>>
>> I need some guidance on how to report a bug.
>>
>>
>>
>> I’m testing version 1.2 on my local cluster and the first time I submit
>> the job everything works but whenever I re-submit the same job it fails
>> with
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> execution failed: Job execution failed.
>>
>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:427)
>>
>> at org.apache.flink.client.program.StandaloneClusterClient.subm
>> itJob(StandaloneClusterClient.java:101)
>>
>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:400)
>>
>> at org.apache.flink.streaming.api.environment.StreamContextEnvi
>> ronment.execute(StreamContextEnvironment.java:66)
>>
>> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironm
>> ent.execute(StreamExecutionEnvironment.scala:634)
>>
>> at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147)
>>
>> at au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoi
>> nt$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22)
>>
>> at au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$
>> body.apply(TraitorApp.scala:21)
>>
>> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>>
>> at scala.runtime.AbstractFunction0.apply$mcV$sp(
>> AbstractFunction0.scala:12)
>>
>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>>
>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>>
>> at scala.collection.immutable.List.foreach(List.scala:381)
>>
>> at scala.collection.generic.TraversableForwarder$class.foreach(
>> TraversableForwarder.scala:35)
>>
>> at scala.App$class.main(App.scala:76)
>>
>> at au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorA
>> pp.scala:21)
>>
>> at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>>
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:498)
>>
>> at org.apache.flink.client.program.PackagedProgram.callMainMeth
>> od(PackagedProgram.java:528)
>>
>> at org.apache.flink.client.program.PackagedProgram.invokeIntera
>> ctiveModeForExecution(PackagedProgram.java:419)
>>
>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:339)
>>
>> at org.apache.flink.client.CliFrontend.executeProgram(CliFronte
>> nd.java:831)
>>
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
>>
>> at org.apache.flink.client.CliFrontend.parseParameters(CliFront
>> end.java:1073)
>>
>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
>>
>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
>>
>> at org.apache.flink.runtime.security.NoOpSecurityContext.runSec
>> ured(NoOpSecurityContext.java:29)
>>
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
>>
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>>
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>>
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>>
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>>
>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.lifte
>> dTree1$1(Future.scala:24)
>>
>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(F
>> uture.scala:24)
>>
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>
>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.
>> exec(AbstractDispatcher.scala:397)
>>
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
>> ForkJoinPool.java:1339)
>>
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>> l.java:1979)
>>
>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>> orkerThread.java:107)
>>
>> Caused by: java.lang.RuntimeException: Could not forward element to next
>> operator
>>
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.collect(OperatorChain.java:415)
>>
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.collect(OperatorChain.java:397)
>>
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:749)
>>
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:727)
>>
>> at org.apache.flink.streaming.api.operators.StreamSourceContext
>> s$ManualWatermarkContext.collectWithTimestamp(StreamSourceCo
>> ntexts.java:272)
>>
>> at org.apache.flink.streaming.connectors.kafka.internals.Abstra
>> ctFetcher.emitRecordWithTimestamp(AbstractFetcher.java:261)
>>
>> at org.apache.flink.streaming.connectors.kafka.internal.Kafka01
>> 0Fetcher.emitRecord(Kafka010Fetcher.java:88)
>>
>> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09
>> Fetcher.runFetchLoop(Kafka09Fetcher.java:157)
>>
>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsum
>> erBase.run(FlinkKafkaConsumerBase.java:255)
>>
>> at org.apache.flink.streaming.api.operators.StreamSource.run(
>> StreamSource.java:78)
>>
>> at org.apache.flink.streaming.api.operators.StreamSource.run(
>> StreamSource.java:55)
>>
>> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.
>> run(SourceStreamTask.java:56)
>>
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:269)
>>
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Caused by: java.lang.ClassCastException: 
>> au.com.my.package.schema.p.WowTransaction
>> cannot be cast to au.com.my.package.schema.p.WowTransaction
>>
>> at au.com.my.package.pTraitor.OneTrait$$anonfun$execute$4.apply
>> (Traitor.scala:132)
>>
>> at org.apache.flink.streaming.api.scala.DataStream$$anon$1.extr
>> actAscendingTimestamp(DataStream.scala:763)
>>
>> at org.apache.flink.streaming.api.functions.timestamps.Ascendin
>> gTimestampExtractor.extractTimestamp(AscendingTimestampExtractor.java:72)
>>
>> at org.apache.flink.streaming.runtime.operators.TimestampsAndPe
>> riodicWatermarksOperator.processElement(TimestampsAndPe
>> riodicWatermarksOperator.java:65)
>>
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.collect(OperatorChain.java:412)
>>
>> ... 14 more
>>
>> I'm running a flink cluster built from the "release-1.2" branch on
>> github.
>>
>>
>> How can I validate that this is a Flink big?
>>
>> Where can I report this?
>>
>> What sort of information do I need to provide?
>>
>>
>> Cheers,
>>
>> Giuliano Caliari
>> --
>> --
>> Giuliano Caliari (+55 11 984898464 <+55%2011%2098489-8464>)
>> <http://www.facebook.com/giuliano.caliari>+Google
>> <https://plus.google.com/u/0/104857507547056767808/posts>
>> Twitter <https://twitter.com/gcaliari>
>>
>> Master Software Engineer by Escola Politécnica da USP
>> Bachelor in Computer Science by Instituto de Matemática e Estatística da
>> USP
>>
>>
>

Reply via email to