Yes, that's a workaround. I found the cause of the problem. It is a
Scala API specific problem.
See: https://issues.apache.org/jira/browse/FLINK-9556
Thanks for reporting it!
Regards,
Timo
Am 08.06.18 um 09:43 schrieb 孙森:
Yes,I really override the method, but it did not work. Finally ,I
used ds.map()(Types.ROW()),then it works fine, but I did't know why.
The code is
val inputStream: DataStream[Row] =
env.addSource(myConsumer)(Types.ROW(fieldNameArray, flinkTypeArray))
在 2018年6月8日,下午3:15,Timo Walther <[email protected]
<mailto:[email protected]>> 写道:
Can you verify with a debugger if you really override the method. It
seems to be that your created type information is either not
called/not used.
Regards,
Timo
Am 07.06.18 um 09:03 schrieb 孙森:
Hi,Timo
Thank you for the reply.The `inputStream.getType` is
GenericTypeInfo<Row>.
Thanks~
sen
在 2018年6月7日,下午2:28,Timo Walther <[email protected]
<mailto:[email protected]>> 写道:
Sorry, I didn't see you last mail. The code looks good actually.
What is the result of `inputStream.getType` if you print it to the
console?
Timo
Am 07.06.18 um 08:24 schrieb Timo Walther:
Hi,
Row is a very special datatype where Flink cannot generate
serializers based on the generics. By default
DeserializationSchema uses reflection-based type analysis, you
need to override the getResultType() method in
WormholeDeserializationSchema. And specify the type information
manually there.
Hope this helps.
Regards,
Timo
Am 06.06.18 um 13:22 schrieb 孙森:
Hi ,
I've tried to to specify such a schema, when I read from kafka,
and covert inputstream to table . But I got the exception:
* Exception in thread "main"
org.apache.flink.table.api.TableException: An input of
GenericTypeInfo cannot be converted to Table. Please specify
the type of the input with a RowTypeInfo
And the code here:
|private def getSchemaMap(jsonSchema: String) = { val umsSchema =
JsonUtils.json2caseClass[UmsSchema](jsonSchema) val fields =
umsSchema.fields_get val fieldNameList = ListBuffer.empty[String]
val fieldTypeList = ListBuffer.empty[TypeInformation[_]]
fields.foreach { field => fieldNameList.append(field.name)
fieldTypeList.append(fieldTypeMatch(field.`type`)) }
println(fieldNameList) println(fieldTypeList)
(fieldNameList.toArray, fieldTypeList.toArray) } private def
fieldTypeMatch(umsFieldType: UmsFieldType): TypeInformation[_] =
{ umsFieldType match { case STRING => Types.STRING case INT =>
Types.INT case LONG => Types.LONG case FLOAT => Types.FLOAT case
DOUBLE => Types.DOUBLE case BOOLEAN => Types.BOOLEAN case DATE =>
Types.SQL_DATE case DATETIME => Types.SQL_TIMESTAMP case DECIMAL
=> Types.DECIMAL } } } val myConsumer: FlinkKafkaConsumer010[Row]
= new FlinkKafkaConsumer010(topics, new
WormholeDeserializationSchema(jsonSchema), properties) val
inputStream: DataStream[Row] = env.addSource(myConsumer) val
tableEnv =
TableEnvironment.getTableEnvironment(env)<<—————exception here|
Thanks !
sen