For my case I tracked down the culprit. It's been Avro indeed. I'm
providing details below, since I believe the pattern is pretty common for
such issues.

In YARN setup there are several sources where classes are loaded from:
Flink lib directory, YARN lib directories, user code. The first two sources
are handled by system classloader, the last one is loaded by
FlinkUserCodeClassLoader.

My streaming job parses Avro-encoded data using SpecificRecord facility. In
essence, the job looks like this: Source -> Avro parser (Map) -> Sink.
Parallelism is 1. Job operates inside a long-lived YARN session. I have a
subclass of SpecificRecord, say it's name is MySpecificRecord. From class
loading perspective, Avro library classes, including the SpecificRecord,
are loaded by system class loader from YARN lib dir - such classes are
shared across different Flink tasks within task manager. On the other side,
MySpecificRecord is in the job fat jar, so it gets loaded by
FlinkUserCodeClassLoader. Upon every job restart, task gets a new
FlinkUserCodeClassLoader instance, so classes from user code are confined
to a task instance.

Simply put, the parsing itself looks like this:

val bean = new
SpecificDatumReader[MySpecificRecord](MySpecificRecord.getClassSchema).read(...)

Now, the scenario:

1. I start my job. Parsing is initiated, so the SpecificDatumReader and
SpecificData get loaded by system classloader. A new
FlinkUserCodeClassloader is instantiated, let's denote its instance as "A".
MySpecificRecord then gets loaded by A.

2. SpecificData gets a singleton SpecificData.INSTANCE that holds a cache
that maps some string key derived from Avro schema to the implementing
class. So during parsing I get MySpecificRecord (A) cached there.

3. I stop the job and re-submit it. The JVM process is the same, so all
standard Avro classes, including SpecificData, remain loaded. A new task
instance is created and gets a new FlinkUserCodeClassLoader instance, let's
name it "B". A new MySpecificRecord class incarnation is loaded by B. From
JVM standpoint MySpecificRecord (B) is different from MySpecificRecord (A),
even though their bytecode is identical.

4. The job starts parsing again. SpecificDatumReader consults
SpecificData.INSTANCE's cache for any stashed classes and finds
MySpecificRecord (A) there.

5. SpecificDatumReader uses the cached MySpecificRecord (A) to instantiate
a bean for filling the parsed data in.

6. SpecificDatumReader hands the filled instance of MySpecificRecord (A)
back to job.

7. Job tries to cast MySpecificRecord (A) to MySpecificRecord (B).

8. ClassCastException :^(

I fixed the issue by not using the SpecificData.INSTANCE singleton (even
though this is considered a common and expected practice). I feed every
parser a new instance of SpecificData. This way the class cache is confined
to a parser instance and gets recycled along with it.

Hope this helps,
Yury

2017-01-16 14:03 GMT+03:00 Stephan Ewen <se...@apache.org>:

> Hi!
>
> I think Yury pointed out the correct diagnosis. Caching the classes across
> multiple jobs in the same session can cause these types of issues.
>
> For YARN single-job deployments, Flink 1.2 will not to any dynamic
> classloading any more, but start with everything in the application
> classpath.
> For YARN sessions, Flink 1.2 still uses dynamic loading, to re-use hot
> containers.
>
> Best,
> Stephan
>
>
>
> On Mon, Jan 16, 2017 at 11:07 AM, Ufuk Celebi <u...@apache.org> wrote:
>
>> @Giuliano: any updates? Very curious to figure out what's causing
>> this. As Fabian said, this is most likely a class loading issue.
>> Judging from the stack trace, you are not running with YARN but a
>> standalone cluster. Is that correct? Class loading wise nothing
>> changed between Flink 1.1 and Flink 1.2 with respect to class loading
>> and standalone clusters. Did you put any JARs into the lib folder of
>> Flink before submitting the job?
>>
>> – Ufuk
>>
>> On Thu, Jan 12, 2017 at 7:16 PM, Yury Ruchin <yuri.ruc...@gmail.com>
>> wrote:
>> > Hi,
>> >
>> > I'd like to chime in since I've faced the same issue running Flink
>> 1.1.4. I
>> > have a long-running YARN session which I use to run multiple streaming
>> jobs
>> > concurrently. Once after cancelling and resubmitting the job I saw the
>> "X
>> > cannot be cast to X" ClassCastException exception in logs. I restarted
>> YARN
>> > session, then the problem disappeared.
>> >
>> > The class that failed to be cast was autogenerated by Avro compiler. I
>> know
>> > that Avro's Java binding does caching schemas in some static
>> WeakHashMap.
>> > I'm wondering whether that may step in the way of Flink classloading
>> design.
>> >
>> > Anyway, I would be interested in watching the issue in Flink JIRA.
>> >
>> > Giuliano, could you provide the issue number?
>> >
>> > Thanks,
>> > Yury
>> >
>> > 2017-01-11 14:11 GMT+03:00 Fabian Hueske <fhue...@gmail.com>:
>> >>
>> >> Hi Guiliano,
>> >>
>> >> thanks for bringing up this issue.
>> >> A "ClassCastException: X cannot be cast to X" often points to a
>> >> classloader issue.
>> >> So it might actually be a bug in Flink.
>> >>
>> >> I assume you submit the same application (same jar file) with the same
>> >> command right?
>> >> Did you cancel the job before resubmitting?
>> >>
>> >> Can you create a JIRA issue [1] for this bug (hit the read CREATE
>> button
>> >> on top) and include the commit hash from which you built Flink?
>> >> It would be great if you could provide a short example program and
>> >> instructions how to reproduce the problem.
>> >>
>> >> Thank you very much,
>> >> Fabian
>> >>
>> >> [1] https://issues.apache.org/jira/browse/FLINK
>> >>
>> >>
>> >>
>> >> 2017-01-11 1:22 GMT+01:00 Giuliano Caliari <giuliano.cali...@gmail.com
>> >:
>> >>>
>> >>> Hello,
>> >>>
>> >>>
>> >>>
>> >>> I need some guidance on how to report a bug.
>> >>>
>> >>>
>> >>>
>> >>> I’m testing version 1.2 on my local cluster and the first time I
>> submit
>> >>> the job everything works but whenever I re-submit the same job it
>> fails with
>> >>>
>> >>> org.apache.flink.client.program.ProgramInvocationException: The
>> program
>> >>> execution failed: Job execution failed.
>> >>>
>> >>> at
>> >>> org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:427)
>> >>>
>> >>> at
>> >>> org.apache.flink.client.program.StandaloneClusterClient.subm
>> itJob(StandaloneClusterClient.java:101)
>> >>>
>> >>> at
>> >>> org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:400)
>> >>>
>> >>> at
>> >>> org.apache.flink.streaming.api.environment.StreamContextEnvi
>> ronment.execute(StreamContextEnvironment.java:66)
>> >>>
>> >>> at
>> >>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironm
>> ent.execute(StreamExecutionEnvironment.scala:634)
>> >>>
>> >>> at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147)
>> >>>
>> >>> at
>> >>> au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoi
>> nt$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22)
>> >>>
>> >>> at
>> >>> au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$
>> body.apply(TraitorApp.scala:21)
>> >>>
>> >>> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>> >>>
>> >>> at
>> >>> scala.runtime.AbstractFunction0.apply$mcV$sp(
>> AbstractFunction0.scala:12)
>> >>>
>> >>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>> >>>
>> >>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>> >>>
>> >>> at scala.collection.immutable.List.foreach(List.scala:381)
>> >>>
>> >>> at
>> >>> scala.collection.generic.TraversableForwarder$class.foreach(
>> TraversableForwarder.scala:35)
>> >>>
>> >>> at scala.App$class.main(App.scala:76)
>> >>>
>> >>> at
>> >>> au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorA
>> pp.scala:21)
>> >>>
>> >>> at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorAp
>> p.scala)
>> >>>
>> >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >>>
>> >>> at
>> >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> >>>
>> >>> at
>> >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> >>>
>> >>> at java.lang.reflect.Method.invoke(Method.java:498)
>> >>>
>> >>> at
>> >>> org.apache.flink.client.program.PackagedProgram.callMainMeth
>> od(PackagedProgram.java:528)
>> >>>
>> >>> at
>> >>> org.apache.flink.client.program.PackagedProgram.invokeIntera
>> ctiveModeForExecution(PackagedProgram.java:419)
>> >>>
>> >>> at
>> >>> org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:339)
>> >>>
>> >>> at
>> >>> org.apache.flink.client.CliFrontend.executeProgram(CliFronte
>> nd.java:831)
>> >>>
>> >>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
>> >>>
>> >>> at
>> >>> org.apache.flink.client.CliFrontend.parseParameters(CliFront
>> end.java:1073)
>> >>>
>> >>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
>> >>>
>> >>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
>> >>>
>> >>> at
>> >>> org.apache.flink.runtime.security.NoOpSecurityContext.runSec
>> ured(NoOpSecurityContext.java:29)
>> >>>
>> >>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
>> >>>
>> >>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> >>> execution failed.
>> >>>
>> >>> at
>> >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>> >>>
>> >>> at
>> >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>> >>>
>> >>> at
>> >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>> >>>
>> >>> at
>> >>> scala.concurrent.impl.Future$PromiseCompletingRunnable.lifte
>> dTree1$1(Future.scala:24)
>> >>>
>> >>> at
>> >>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(F
>> uture.scala:24)
>> >>>
>> >>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> >>>
>> >>> at
>> >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.
>> exec(AbstractDispatcher.scala:397)
>> >>>
>> >>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.
>> java:260)
>> >>>
>> >>> at
>> >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
>> ForkJoinPool.java:1339)
>> >>>
>> >>> at
>> >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>> l.java:1979)
>> >>>
>> >>> at
>> >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>> orkerThread.java:107)
>> >>>
>> >>> Caused by: java.lang.RuntimeException: Could not forward element to
>> next
>> >>> operator
>> >>>
>> >>> at
>> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.collect(OperatorChain.java:415)
>> >>>
>> >>> at
>> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.collect(OperatorChain.java:397)
>> >>>
>> >>> at
>> >>> org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:749)
>> >>>
>> >>> at
>> >>> org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:727)
>> >>>
>> >>> at
>> >>> org.apache.flink.streaming.api.operators.StreamSourceContext
>> s$ManualWatermarkContext.collectWithTimestamp(StreamSourceCo
>> ntexts.java:272)
>> >>>
>> >>> at
>> >>> org.apache.flink.streaming.connectors.kafka.internals.Abstra
>> ctFetcher.emitRecordWithTimestamp(AbstractFetcher.java:261)
>> >>>
>> >>> at
>> >>> org.apache.flink.streaming.connectors.kafka.internal.Kafka01
>> 0Fetcher.emitRecord(Kafka010Fetcher.java:88)
>> >>>
>> >>> at
>> >>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09
>> Fetcher.runFetchLoop(Kafka09Fetcher.java:157)
>> >>>
>> >>> at
>> >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsum
>> erBase.run(FlinkKafkaConsumerBase.java:255)
>> >>>
>> >>> at
>> >>> org.apache.flink.streaming.api.operators.StreamSource.run(
>> StreamSource.java:78)
>> >>>
>> >>> at
>> >>> org.apache.flink.streaming.api.operators.StreamSource.run(
>> StreamSource.java:55)
>> >>>
>> >>> at
>> >>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.
>> run(SourceStreamTask.java:56)
>> >>>
>> >>> at
>> >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:269)
>> >>>
>> >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
>> >>>
>> >>> at java.lang.Thread.run(Thread.java:745)
>> >>>
>> >>> Caused by: java.lang.ClassCastException:
>> >>> au.com.my.package.schema.p.WowTransaction cannot be cast to
>> >>> au.com.my.package.schema.p.WowTransaction
>> >>>
>> >>> at
>> >>> au.com.my.package.pTraitor.OneTrait$$anonfun$execute$4.apply
>> (Traitor.scala:132)
>> >>>
>> >>> at
>> >>> org.apache.flink.streaming.api.scala.DataStream$$anon$1.extr
>> actAscendingTimestamp(DataStream.scala:763)
>> >>>
>> >>> at
>> >>> org.apache.flink.streaming.api.functions.timestamps.Ascendin
>> gTimestampExtractor.extractTimestamp(AscendingTimestampExtractor.java:72)
>> >>>
>> >>> at
>> >>> org.apache.flink.streaming.runtime.operators.TimestampsAndPe
>> riodicWatermarksOperator.processElement(TimestampsAndPe
>> riodicWatermarksOperator.java:65)
>> >>>
>> >>> at
>> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.collect(OperatorChain.java:412)
>> >>>
>> >>> ... 14 more
>> >>>
>> >>>
>> >>> I'm running a flink cluster built from the "release-1.2" branch on
>> >>> github.
>> >>>
>> >>>
>> >>> How can I validate that this is a Flink big?
>> >>>
>> >>> Where can I report this?
>> >>>
>> >>> What sort of information do I need to provide?
>> >>>
>> >>>
>> >>>
>> >>> Cheers,
>> >>>
>> >>> Giuliano Caliari
>> >>> --
>> >>> --
>> >>> Giuliano Caliari (+55 11 984898464)
>> >>> +Google
>> >>> Twitter
>> >>>
>> >>> Master Software Engineer by Escola Politécnica da USP
>> >>> Bachelor in Computer Science by Instituto de Matemática e Estatística
>> da
>> >>> USP
>> >>>
>> >>
>> >
>>
>
>

Reply via email to