Hi,
Row is a very special datatype where Flink cannot generate serializers
based on the generics. By default DeserializationSchema uses
reflection-based type analysis, you need to override the getResultType()
method in WormholeDeserializationSchema. And specify the type
information manually there.
Hope this helps.
Regards,
Timo
Am 06.06.18 um 13:22 schrieb 孙森:
Hi ,
I've tried to to specify such a schema, when I read from kafka, and
covert inputstream to table . But I got the exception:
* Exception in thread "main"
org.apache.flink.table.api.TableException: An input of
GenericTypeInfo cannot be converted to Table. Please specify the
type of the input with a RowTypeInfo
And the code here:
|private def getSchemaMap(jsonSchema: String) = { val umsSchema =
JsonUtils.json2caseClass[UmsSchema](jsonSchema) val fields =
umsSchema.fields_get val fieldNameList = ListBuffer.empty[String] val
fieldTypeList = ListBuffer.empty[TypeInformation[_]] fields.foreach {
field => fieldNameList.append(field.name)
fieldTypeList.append(fieldTypeMatch(field.`type`)) }
println(fieldNameList) println(fieldTypeList) (fieldNameList.toArray,
fieldTypeList.toArray) } private def fieldTypeMatch(umsFieldType:
UmsFieldType): TypeInformation[_] = { umsFieldType match { case STRING
=> Types.STRING case INT => Types.INT case LONG => Types.LONG case
FLOAT => Types.FLOAT case DOUBLE => Types.DOUBLE case BOOLEAN =>
Types.BOOLEAN case DATE => Types.SQL_DATE case DATETIME =>
Types.SQL_TIMESTAMP case DECIMAL => Types.DECIMAL } } } val
myConsumer: FlinkKafkaConsumer010[Row] = new
FlinkKafkaConsumer010(topics, new
WormholeDeserializationSchema(jsonSchema), properties) val
inputStream: DataStream[Row] = env.addSource(myConsumer) val tableEnv
= TableEnvironment.getTableEnvironment(env)<<—————exception here|
Thanks !
sen