Thanks Till, your suggestion worked! I actually just created a new SpecificData for each AvroDeserializationSchema instance, so I think it's still just as efficient.
Josh On Wed, Jun 8, 2016 at 4:41 PM, Till Rohrmann <trohrm...@apache.org> wrote: > The only thing I could think of is to not use the SpecificData singleton > but instead creating a new SpecificData object for each SpecificDatumReader > (you can pass it as a third argument to the constructor). This, of course, > is not really efficient. But you could try it out to see whether it solves > your problem. > > Cheers, > Till > > On Wed, Jun 8, 2016 at 4:24 PM, Josh <jof...@gmail.com> wrote: > >> Sorry - I forgot to include my stack trace too. Here it is: >> >> The program finished with the following exception: >> >> org.apache.flink.client.program.ProgramInvocationException: The program >> execution failed: Job execution failed. >> at org.apache.flink.client.program.Client.runBlocking(Client.java:381) >> at org.apache.flink.client.program.Client.runBlocking(Client.java:355) >> at >> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65) >> at >> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:536) >> at com.me.flink.job.MyFlinkJob$.main(MyFlinkJob.scala:85) >> at com.me.flink.job.MyFlinkJob.main(MyFlinkJob.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >> at org.apache.flink.client.program.Client.runBlocking(Client.java:248) >> at >> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >> at >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192) >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243) >> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >> execution failed. >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> Caused by: java.lang.Exception: Could not forward element to next operator >> at >> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.run(KinesisDataFetcher.java:150) >> at >> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:285) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.RuntimeException: Could not forward element to next >> operator >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) >> at >> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318) >> at >> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerThread.run(ShardConsumerThread.java:141) >> Caused by: java.lang.ClassCastException: com.me.avro.MyAvroType cannot be >> cast to com.me.avro.MyAvroType >> at com.me.flink.job.MyFlinkJob$$anonfun$1.apply(MyFlinkJob.scala:61) >> at >> org.apache.flink.streaming.api.scala.DataStream$$anon$1.extractAscendingTimestamp(DataStream.scala:746) >> at >> org.apache.flink.streaming.api.functions.AscendingTimestampExtractor.extractTimestamp(AscendingTimestampExtractor.java:71) >> at >> org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:63) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) >> ... 3 more >> >> On Wed, Jun 8, 2016 at 3:19 PM, Josh <jof...@gmail.com> wrote: >> >>> Hi Till, >>> >>> Thanks for the reply! I see - yes it does sound very much like >>> FLINK-1390. >>> >>> Please see my AvroDeserializationSchema implementation here: >>> http://pastebin.com/mK7SfBQ8 >>> >>> I think perhaps the problem is caused by this line: >>> val readerSchema = SpecificData.get().getSchema(classTag[T].runtimeClass >>> ) >>> >>> Looking at SpecificData, it contains a classCache which is a map of >>> strings to classes, similar to what's described in FLINK-1390. >>> >>> I'm not sure how to change my AvroDeserializationSchema to prevent this >>> from happening though! Do you have any ideas? >>> >>> Josh >>> >>> >>> >>> On Wed, Jun 8, 2016 at 11:23 AM, Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Hi Josh, >>>> >>>> the error message you've posted usually indicates that there is a class >>>> loader issue. When you first run your program the class >>>> com.me.avro.MyAvroType will be first loaded (by the user code class >>>> loader). I suspect that this class is now somewhere cached (e.g. the avro >>>> serializer) and when you run your program a second time, then there is a >>>> new user code class loader which has loaded the same class and now you want >>>> to convert an instance of the first class into the second class. However, >>>> these two classes are not identical since they were loaded by different >>>> class loaders. >>>> >>>> In order to find the culprit, it would be helpful to see the full stack >>>> trace of the ClassCastException and the code of the >>>> AvroDeserializationSchema. I suspect that something similar to >>>> https://issues.apache.org/jira/browse/FLINK-1390 is happening. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Wed, Jun 8, 2016 at 10:38 AM, Josh <jof...@gmail.com> wrote: >>>> >>>>> Hi all, >>>>> >>>>> Currently I have to relaunch my Flink cluster every time I want to >>>>> upgrade/redeploy my Flink job, because otherwise I get a >>>>> ClassCastException: >>>>> >>>>> java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast to >>>>> com.me.avro.MyAvroType >>>>> >>>>> It's related to MyAvroType which is an class generated from an Avro >>>>> schema. The ClassCastException occurs every time I redeploy the job >>>>> without >>>>> killing the Flink cluster (even if there have been no changes to the >>>>> job/jar). >>>>> >>>>> I wrote my own AvroDeserializationSchema in Scala which does something >>>>> a little strange to get the avro type information (see below), and I'm >>>>> wondering if that's causing the problem when the Flink job creates an >>>>> AvroDeserializationSchema[MyAvroType]. >>>>> >>>>> Does anyone have any ideas? >>>>> >>>>> Thanks, >>>>> Josh >>>>> >>>>> >>>>> >>>>> class AvroDeserializationSchema[T <: SpecificRecordBase :ClassTag] >>>>> extends DeserializationSchema[T] { >>>>> >>>>> ... >>>>> >>>>> private val avroType = >>>>> classTag[T].runtimeClass.asInstanceOf[Class[T]] >>>>> >>>>> private val typeInformation = TypeExtractor.getForClass(avroType) >>>>> >>>>> ... >>>>> >>>>> override def getProducedType: TypeInformation[T] = typeInformation >>>>> >>>>> ... >>>>> >>>>> } >>>>> >>>> >>>> >>> >> >