Re: Datastream[Row] covert to table exception

2018-06-08 Thread Timo Walther
Yes, that's a workaround. I found the cause of the problem. It is a Scala API specific problem. See: https://issues.apache.org/jira/browse/FLINK-9556 Thanks for reporting it! Regards, Timo Am 08.06.18 um 09:43 schrieb 孙森: Yes,I really override the method, but it did not work.  Finally ,I use

Re: Datastream[Row] covert to table exception

2018-06-06 Thread Timo Walther
Sorry, I didn't see you last mail. The code looks good actually. What is the result of `inputStream.getType` if you print it to the console? Timo Am 07.06.18 um 08:24 schrieb Timo Walther: Hi, Row is a very special datatype where Flink cannot generate serializers based on the generics. By de

Re: Datastream[Row] covert to table exception

2018-06-06 Thread Timo Walther
Hi, Row is a very special datatype where Flink cannot generate serializers based on the generics. By default DeserializationSchema uses reflection-based type analysis, you need to override the getResultType() method in WormholeDeserializationSchema. And specify the type information manually t

Re: Datastream[Row] covert to table exception

2018-06-06 Thread 孙森
I’m sorry, the whole code is: class WormholeDeserializationSchema(schema: String) extends KeyedDeserializationSchema[Row] { //var keyValueTopic:KeyValueTopic = _ override def deserialize(messageKey: Array[Byte], message: Array[Byte], topic: String, partition: Int, offset: Long) = {