Hello!

First time emailing one doubt to this mailing list, hope I'm not messing
anything up.
I'm not fully sure if what I want to do it's conceptually correct, so pls
let me know.

I want to create a generic class that extends a DeserializationSchema. I
want an easy way of creating different deserialization schemas for my
rabbitMQ sources from JSON to scala case classes.

My first approach looks like this:

import com.fasterxml.jackson.databind.json.JsonMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}

class GenericJsonSchema[T] extends DeserializationSchema[T] {

private val typeInformation: TypeInformation[T] = TypeInformation.of(new
TypeHint[T] {})
private val objectMapper: JsonMapper = JsonMapper.builder()
.addModule(DefaultScalaModule)
.build()

@throws(classOf[IOException])
def deserialize(message: Array[Byte]): T = objectMapper.readValue(message,
typeInformation.getTypeClass)

def isEndOfStream(nextElement: T): Boolean = false

def getProducedType: TypeInformation[T] = typeInformation
}


When running I obtain:


*Exception in thread "main" org.apache.flink.util.FlinkRuntimeException:
The TypeHint is using a generic variable.This is not supported, generic
types must be fully specified for the TypeHint.*

I've read and tried to understand all the problems when using generic types
and TypeInformation class, but I don't get the correct use or if it can be
used for my purpose.


Thanks a lot in advance


*Ana Gómez González*
<http://twitter.com/angoglez> <https://www.linkedin.com/in/angoglez/>

Reply via email to