Hi,

I’m moving my Flink 1.11 application onto the Blink Table Planner; and off
 of TypeInformation and onto DataTypes in preparation for upgrading Flink
 to Flink 1.13 or higher.

I’m having trouble moving off of TypeInformation.

Specifically I have a section of code that maps a DataStream[Message] to a
DataStream[Row]:

  implicit val typeInformation: TypeInformation[Row] =
 myObject.getProducedType
  val resultStream: DataStream[Row] = dataStream.map(msg =>
myTransform(msg))

Note that myTransform() takes in a Message object and returns a Row object.
Message is an internal class that we are using.
The resultStream:DataStream[Row] is passed as a StreamTableSource[Row]
later.

If I comment out the implicit val above, I get a failure:

  TableSource of type com.MyTableSourceFromDataStream returned a DataStream
of data type
  GenericType<org.apache.flink.types.Row> that does not match with the data
type
  ROW<`my_field_1` INT NOT NULL, ... `my_other_field` BIGINT> declared by
the
  TableSource.getProducedDataType() method. Please validate the
implementation of the TableSource.

I checked the Flink 1.11.4, Flink 1.13, and most recent sources and it
seems that the implementation of DataStream.map() is not changed and still
uses TypeInformation.

https://github.com/apache/flink/blob/master/flink
-streaming-scala/src/main/scala/org/apache/flink
/streaming/api/scala/DataStream.scala#L657

Based on the code above it seems that the issue is that Flink's
DataStream.map function uses TypeInformation.

I’m not sure if there’s an equivalent DataType implicit that I should be
declaring instead. Or if I should be using some function other than map

Do you have any suggestions for how to proceed? I'd like to completely move
off of TypeInformation in my app.

Thanks,
Sofya

Reply via email to