[ https://issues.apache.org/jira/browse/FLINK-5633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16058706#comment-16058706 ]
Erik van Oosten commented on FLINK-5633: ---------------------------------------- In case you need throughput (like we do), the caching is indispensable. In those cases you can use the following {{SpecificData}} implementation. Simply instantiate it once and then pass that singleton instance to every {{SpecificDatumReader}}. {code:scala|title=LocalCachingSpecificData.scala} import java.lang.reflect.Constructor import java.util.concurrent.ConcurrentHashMap import org.apache.avro.Schema import org.apache.avro.specific.SpecificData import scala.collection.JavaConverters._ /** * This can be used instead of [[SpecificData]] in multi-classloader environments like Flink. * This variation removes the JVM singleton constructor cache and replaces it with a * cache that is local to the current class loader. * * If two Flink jobs use the same generated Avro code, they will still have separate instances of the classes because * they live in separate class loaders. * However, a JVM-wide singleton cache keeps reference to the class in the first class loader that was loaded. Any * subsequent jobs will fail with a [[ClassCastException]] because they will get incompatible classes. */ class LocalCachingSpecificData extends SpecificData { private val NO_ARG: Array[Class[_]] = Array.empty private val SCHEMA_ARG: Array[Class[_]] = Array(classOf[Schema]) private val CTOR_CACHE: scala.collection.concurrent.Map[Class[_], Constructor[_]] = new ConcurrentHashMap[Class[_], Constructor[_]]().asScala /** Create an instance of a class. * If the class implements [[org.apache.avro.specific.SpecificData.SchemaConstructable]], call a constructor with a * [[org.apache.avro.Schema]] parameter, otherwise use a no-arg constructor. */ private def newInstance(c: Class[_], s: Schema): AnyRef = { val useSchema = classOf[SpecificData.SchemaConstructable].isAssignableFrom(c) val constructor = CTOR_CACHE.getOrElseUpdate(c, { val ctor = c.getDeclaredConstructor((if (useSchema) SCHEMA_ARG else NO_ARG): _*) ctor.setAccessible(true) ctor }) if (useSchema) constructor.newInstance(s).asInstanceOf[AnyRef] else constructor.newInstance().asInstanceOf[AnyRef] } override def createFixed(old: AnyRef, schema: Schema): AnyRef = { val c = getClass(schema) if (c == null) super.createFixed(old, schema) // delegate to generic else if (c.isInstance(old)) old else newInstance(c, schema) } override def newRecord(old: AnyRef, schema: Schema): AnyRef = { val c = getClass(schema) if (c == null) super.newRecord(old, schema) // delegate to generic else if (c.isInstance(old)) {old } else {newInstance(c, schema) } } } {code} > ClassCastException: X cannot be cast to X when re-submitting a job. > ------------------------------------------------------------------- > > Key: FLINK-5633 > URL: https://issues.apache.org/jira/browse/FLINK-5633 > Project: Flink > Issue Type: Bug > Components: Job-Submission, YARN > Affects Versions: 1.1.4 > Reporter: Giuliano Caliari > Priority: Minor > > I’m running a job on my local cluster and the first time I submit the job > everything works but whenever I cancel and re-submit the same job it fails > with: > {quote} > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634) > at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147) > at > au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22) > at > au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at > au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:21) > at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) > at > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:415) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:397) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:749) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.collectWithTimestamp(StreamSourceContexts.java:272) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:261) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:88) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:157) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:255) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ClassCastException: > au.com.my.package.schema.p.WowTransaction cannot be cast to > au.com.my.package.schema.p.WowTransaction > at > au.com.my.package.pTraitor.OneTrait$$anonfun$execute$4.apply(Traitor.scala:132) > at > org.apache.flink.streaming.api.scala.DataStream$$anon$1.extractAscendingTimestamp(DataStream.scala:763) > at > org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor.extractTimestamp(AscendingTimestampExtractor.java:72) > at > org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:65) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:412) > ... 14 more > {quote} > This happens on versions 1.1.4 and 1.2 > Here's a great description of the problem, provided by Yury Ruchin: > {quote} > In YARN setup there are several sources where classes are loaded from: Flink > lib directory, YARN lib directories, user code. The first two sources are > handled by system classloader, the last one is loaded by > FlinkUserCodeClassLoader. > My streaming job parses Avro-encoded data using SpecificRecord facility. In > essence, the job looks like this: Source -> Avro parser (Map) -> Sink. > Parallelism is 1. Job operates inside a long-lived YARN session. I have a > subclass of SpecificRecord, say it's name is MySpecificRecord. From class > loading perspective, Avro library classes, including the SpecificRecord, are > loaded by system class loader from YARN lib dir - such classes are shared > across different Flink tasks within task manager. On the other side, > MySpecificRecord is in the job fat jar, so it gets loaded by > FlinkUserCodeClassLoader. Upon every job restart, task gets a new > FlinkUserCodeClassLoader instance, so classes from user code are confined to > a task instance. > Simply put, the parsing itself looks like this: > val bean = new > SpecificDatumReader[MySpecificRecord](MySpecificRecord.getClassSchema).read(...) > Now, the scenario: > 1. I start my job. Parsing is initiated, so the SpecificDatumReader and > SpecificData get loaded by system classloader. A new FlinkUserCodeClassloader > is instantiated, let's denote its instance as "A". MySpecificRecord then gets > loaded by A. > 2. SpecificData gets a singleton SpecificData.INSTANCE that holds a cache > that maps some string key derived from Avro schema to the implementing class. > So during parsing I get MySpecificRecord (A) cached there. > 3. I stop the job and re-submit it. The JVM process is the same, so all > standard Avro classes, including SpecificData, remain loaded. A new task > instance is created and gets a new FlinkUserCodeClassLoader instance, let's > name it "B". A new MySpecificRecord class incarnation is loaded by B. From > JVM standpoint MySpecificRecord (B) is different from MySpecificRecord (A), > even though their bytecode is identical. > 4. The job starts parsing again. SpecificDatumReader consults > SpecificData.INSTANCE's cache for any stashed classes and finds > MySpecificRecord (A) there. > 5. SpecificDatumReader uses the cached MySpecificRecord (A) to instantiate a > bean for filling the parsed data in. > 6. SpecificDatumReader hands the filled instance of MySpecificRecord (A) back > to job. > 7. Job tries to cast MySpecificRecord (A) to MySpecificRecord (B). > 8. ClassCastException :^( > I fixed the issue by not using the SpecificData.INSTANCE singleton (even > though this is considered a common and expected practice). I feed every > parser a new instance of SpecificData. This way the class cache is confined > to a parser instance and gets recycled along with it. > {quote} > A discussion the the error can be found at: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-get-help-on-ClassCastException-when-re-submitting-a-job-td10972.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)