Thank you Alexey!
It worked perfectly. I was missing the ClassTag correct use.



*Ana Gómez González*

<http://twitter.com/angoglez> <https://www.linkedin.com/in/angoglez/>


El lun, 6 mar 2023 a las 23:34, Alexey Novakov (<ale...@ververica.com>)
escribió:

> Hi Ana,
>
> I think you will need to deal with ClassTag to keep all the code generic.
> I've found such example which should help:
>
>
> https://github.com/amzn/milan/blob/7dfa29b434ced7eef286ea34c5085c10c1b787b6/milan/milan-compilers/milan-flink-compiler/src/main/scala/com/amazon/milan/compiler/flink/serialization/JsonDeserializationSchema.scala
>
> object JsonDeserializationSchema {
>   private val objectMapper = 
> JsonMapper.builder().addModule(DefaultScalaModule).build()}
>
>
> class JsonDeserializationSchema[T: ClassTag] extends DeserializationSchema
> [T] {
> override def deserialize(bytes: Array[Byte]): T =
> JsonDeserializationSchema.objectMapper.readValue[T](bytes, classTag[T].
> runtimeClass.asInstanceOf[Class[T]]) override def getProducedType:
> TypeInformation[T] = TypeExtractor.getForClass(classTag[T].runtimeClass.
> asInstanceOf[Class[T]])
>
>   ...
>
>
> }
>
> -----------
>
> Alexey
>
> On Mon, Mar 6, 2023 at 9:58 PM Ana Gómez González <angog...@gmail.com>
> wrote:
>
>>
>> Hello!
>>
>> First time emailing one doubt to this mailing list, hope I'm not messing
>> anything up.
>> I'm not fully sure if what I want to do it's conceptually correct, so pls
>> let me know.
>>
>> I want to create a generic class that extends a DeserializationSchema. I
>> want an easy way of creating different deserialization schemas for my
>> rabbitMQ sources from JSON to scala case classes.
>>
>> My first approach looks like this:
>>
>> import com.fasterxml.jackson.databind.json.JsonMapper
>> import com.fasterxml.jackson.module.scala.DefaultScalaModule
>> import org.apache.flink.api.common.serialization.DeserializationSchema
>> import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
>>
>> class GenericJsonSchema[T] extends DeserializationSchema[T] {
>>
>> private val typeInformation: TypeInformation[T] = TypeInformation.of(new
>> TypeHint[T] {})
>> private val objectMapper: JsonMapper = JsonMapper.builder()
>> .addModule(DefaultScalaModule)
>> .build()
>>
>> @throws(classOf[IOException])
>> def deserialize(message: Array[Byte]): T = objectMapper.readValue(message,
>> typeInformation.getTypeClass)
>>
>> def isEndOfStream(nextElement: T): Boolean = false
>>
>> def getProducedType: TypeInformation[T] = typeInformation
>> }
>>
>>
>> When running I obtain:
>>
>>
>> *Exception in thread "main" org.apache.flink.util.FlinkRuntimeException:
>> The TypeHint is using a generic variable.This is not supported, generic
>> types must be fully specified for the TypeHint.*
>>
>> I've read and tried to understand all the problems when using generic
>> types and TypeInformation class, but I don't get the correct use or if it
>> can be used for my purpose.
>>
>>
>> Thanks a lot in advance
>>
>>
>> *Ana Gómez González*
>> <http://twitter.com/angoglez> <https://www.linkedin.com/in/angoglez/>
>>
>

Reply via email to