Hello! First time emailing one doubt to this mailing list, hope I'm not messing anything up. I'm not fully sure if what I want to do it's conceptually correct, so pls let me know.
I want to create a generic class that extends a DeserializationSchema. I want an easy way of creating different deserialization schemas for my rabbitMQ sources from JSON to scala case classes. My first approach looks like this: import com.fasterxml.jackson.databind.json.JsonMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.flink.api.common.serialization.DeserializationSchema import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation} class GenericJsonSchema[T] extends DeserializationSchema[T] { private val typeInformation: TypeInformation[T] = TypeInformation.of(new TypeHint[T] {}) private val objectMapper: JsonMapper = JsonMapper.builder() .addModule(DefaultScalaModule) .build() @throws(classOf[IOException]) def deserialize(message: Array[Byte]): T = objectMapper.readValue(message, typeInformation.getTypeClass) def isEndOfStream(nextElement: T): Boolean = false def getProducedType: TypeInformation[T] = typeInformation } When running I obtain: *Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: The TypeHint is using a generic variable.This is not supported, generic types must be fully specified for the TypeHint.* I've read and tried to understand all the problems when using generic types and TypeInformation class, but I don't get the correct use or if it can be used for my purpose. Thanks a lot in advance *Ana Gómez González* <http://twitter.com/angoglez> <https://www.linkedin.com/in/angoglez/>