Hi, I’m moving my Flink 1.11 application onto the Blink Table Planner; and off of TypeInformation and onto DataTypes in preparation for upgrading Flink to Flink 1.13 or higher.
I’m having trouble moving off of TypeInformation. Specifically I have a section of code that maps a DataStream[Message] to a DataStream[Row]: implicit val typeInformation: TypeInformation[Row] = myObject.getProducedType val resultStream: DataStream[Row] = dataStream.map(msg => myTransform(msg)) Note that myTransform() takes in a Message object and returns a Row object. Message is an internal class that we are using. The resultStream:DataStream[Row] is passed as a StreamTableSource[Row] later. If I comment out the implicit val above, I get a failure: TableSource of type com.MyTableSourceFromDataStream returned a DataStream of data type GenericType<org.apache.flink.types.Row> that does not match with the data type ROW<`my_field_1` INT NOT NULL, ... `my_other_field` BIGINT> declared by the TableSource.getProducedDataType() method. Please validate the implementation of the TableSource. I checked the Flink 1.11.4, Flink 1.13, and most recent sources and it seems that the implementation of DataStream.map() is not changed and still uses TypeInformation. https://github.com/apache/flink/blob/master/flink -streaming-scala/src/main/scala/org/apache/flink /streaming/api/scala/DataStream.scala#L657 Based on the code above it seems that the issue is that Flink's DataStream.map function uses TypeInformation. I’m not sure if there’s an equivalent DataType implicit that I should be declaring instead. Or if I should be using some function other than map Do you have any suggestions for how to proceed? I'd like to completely move off of TypeInformation in my app. Thanks, Sofya