Hi all,

Currently I have to relaunch my Flink cluster every time I want to
upgrade/redeploy my Flink job, because otherwise I get a ClassCastException:

java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast to
com.me.avro.MyAvroType

It's related to MyAvroType which is an class generated from an Avro schema.
The ClassCastException occurs every time I redeploy the job without killing
the Flink cluster (even if there have been no changes to the job/jar).

I wrote my own AvroDeserializationSchema in Scala which does something a
little strange to get the avro type information (see below), and I'm
wondering if that's causing the problem when the Flink job creates an
AvroDeserializationSchema[MyAvroType].

Does anyone have any ideas?

Thanks,
Josh



class AvroDeserializationSchema[T <: SpecificRecordBase :ClassTag] extends
DeserializationSchema[T] {

  ...

  private val avroType = classTag[T].runtimeClass.asInstanceOf[Class[T]]

  private val typeInformation = TypeExtractor.getForClass(avroType)

  ...

  override def getProducedType: TypeInformation[T] = typeInformation

  ...

}

Reply via email to