Thanks for the explanation, at least it's a start. Bart
On Thu, Jun 22, 2017, at 11:20, Stefan Richter wrote: > Hi, > > this „amazing error“ message typically means that the class of the object > instance you created was loaded by a different classloader than the one > that loaded the class in the code that tries to cast it. A class in Java > is > fully identified by the canonical classname AND the classloader that > loaded it. This makes it possible that two classes with the same name and > bytecode are not instance of each other. Unfortunately, I have no > concrete idea why that happens in your case, > but maybe this info helps to track down the problem. > > Best, > Stefan > > > Am 22.06.2017 um 11:09 schrieb Bart van Deenen <bartvandee...@fastmail.fm>: > > > > Hi All > > > > I have a simple avro file > > > > {"namespace": "com.kpn.datalab.schemas.omnicrm.contact_history.v1", > > "type": "record", > > "name": "contactHistory", > > "fields": [ > > {"name": "events", "type": {"type":"array", > > "items": "bytes"}}, > > {"name": "krn", "type": "string"} > > ] > > } > > > > I generate a Java pojo (gist: https://goo.gl/FtM7T6) from this file via > > > > java -jar avro_tools.jar > > Version 1.7.7 of Apache Avro > > > > java -jar bin/avro_tools.jar compile schema <avrofile> > > generated/src/main/java > > > > This pojo file doesn't seem to want to be serialized by Flink, when I > > pack the compiled class file into a fat-jar job file. > > > > When I pass a certain byte array into the avro deserializer, this works > > fine in a regular Scala application, but when I do the same in a Flink > > job it bombs with a typecast exception. > > > > Scala code: > > > > val payload: AnyRef = kafkaEvent.payload() // this calls the > > Avro deserializer: (gist: https://goo.gl/18UqJy) > > > > println("canonical name: " + payload.getClass.getCanonicalName) > > val chAvro = > > try { > > payload.asInstanceOf[contactHistory] > > } catch { > > case c:ClassCastException => > > println(c.getMessage) > > > > I get the following amazing error: > > > > "canonical name: " + > > com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory > > com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory cannot > > be cast to > > com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory > > > > I've verified the exact same code with the exact same byte-array input > > (checked by printing a hexdump from the Flink job), and the code itself > > is not the problem. > > > > If I put a jar containing the class file from the Pojo in the Flink lib > > directory, my Flink job works fine! > > > > Any ideas how I can put my Pojo in the fat jar, because I don't want to > > restart my Flink when I add new Avro schemas? > > > > Thanks > > > > Bart >