Thank you Alexey! It worked perfectly. I was missing the ClassTag correct use.
*Ana Gómez González* <http://twitter.com/angoglez> <https://www.linkedin.com/in/angoglez/> El lun, 6 mar 2023 a las 23:34, Alexey Novakov (<ale...@ververica.com>) escribió: > Hi Ana, > > I think you will need to deal with ClassTag to keep all the code generic. > I've found such example which should help: > > > https://github.com/amzn/milan/blob/7dfa29b434ced7eef286ea34c5085c10c1b787b6/milan/milan-compilers/milan-flink-compiler/src/main/scala/com/amazon/milan/compiler/flink/serialization/JsonDeserializationSchema.scala > > object JsonDeserializationSchema { > private val objectMapper = > JsonMapper.builder().addModule(DefaultScalaModule).build()} > > > class JsonDeserializationSchema[T: ClassTag] extends DeserializationSchema > [T] { > override def deserialize(bytes: Array[Byte]): T = > JsonDeserializationSchema.objectMapper.readValue[T](bytes, classTag[T]. > runtimeClass.asInstanceOf[Class[T]]) override def getProducedType: > TypeInformation[T] = TypeExtractor.getForClass(classTag[T].runtimeClass. > asInstanceOf[Class[T]]) > > ... > > > } > > ----------- > > Alexey > > On Mon, Mar 6, 2023 at 9:58 PM Ana Gómez González <angog...@gmail.com> > wrote: > >> >> Hello! >> >> First time emailing one doubt to this mailing list, hope I'm not messing >> anything up. >> I'm not fully sure if what I want to do it's conceptually correct, so pls >> let me know. >> >> I want to create a generic class that extends a DeserializationSchema. I >> want an easy way of creating different deserialization schemas for my >> rabbitMQ sources from JSON to scala case classes. >> >> My first approach looks like this: >> >> import com.fasterxml.jackson.databind.json.JsonMapper >> import com.fasterxml.jackson.module.scala.DefaultScalaModule >> import org.apache.flink.api.common.serialization.DeserializationSchema >> import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation} >> >> class GenericJsonSchema[T] extends DeserializationSchema[T] { >> >> private val typeInformation: TypeInformation[T] = TypeInformation.of(new >> TypeHint[T] {}) >> private val objectMapper: JsonMapper = JsonMapper.builder() >> .addModule(DefaultScalaModule) >> .build() >> >> @throws(classOf[IOException]) >> def deserialize(message: Array[Byte]): T = objectMapper.readValue(message, >> typeInformation.getTypeClass) >> >> def isEndOfStream(nextElement: T): Boolean = false >> >> def getProducedType: TypeInformation[T] = typeInformation >> } >> >> >> When running I obtain: >> >> >> *Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: >> The TypeHint is using a generic variable.This is not supported, generic >> types must be fully specified for the TypeHint.* >> >> I've read and tried to understand all the problems when using generic >> types and TypeInformation class, but I don't get the correct use or if it >> can be used for my purpose. >> >> >> Thanks a lot in advance >> >> >> *Ana Gómez González* >> <http://twitter.com/angoglez> <https://www.linkedin.com/in/angoglez/> >> >