Hi Yuval,

I would recommend option 2. Because esp. when it comes to state you should be in control what is persisted. There is no guarantee that the ExternalSerializer will not change in the future. It is only meant for shipping data as the input of the next operator.

I would recommend to write some little tool that traverses `table.getResolvedSchema().toSourceDataType().getLogicalType()`, limits the types that you allow in state (e.g. no structured types) and translates it to TypeInformation.

Regards,
Timo


On 04.06.21 16:05, Yuval Itzchakov wrote:
Hi Timo,
Thank you for the response.

The tables being created in reality are based on arbitrary SQL code such that I don't know what the schema actually is to create the TypeInformation "by hand" and pass it on to the DataStream API.

This leaves me with option 1, which leads to another question: If I have some state records stored in RocksDB from a current running job in a previous Flink version (1.9.3), will changing the TypeInformation from TypeInformation[Row] to the ExternalTypeInformation now break the compatibility of the state currently stored and cause them to be losed essentially? My guy feeling says yes unless some form of backwards compatibility is going to be written specifically for the usecase.


On Fri, Jun 4, 2021, 16:33 Timo Walther <twal...@apache.org <mailto:twal...@apache.org>> wrote:

    Hi Yuval,


    TypeConversions.fromDataTypeToLegacyInfo was only a utility to bridge
    between TypeInformation and DataType until TypeInformation is not
    exposed through the Table API anymore.

    Beginning from Flink 1.13 the Table API is able to serialize the
    records
    to the first DataStream operator via toDataStream or toChangelogStream.
    Internally, it uses
    org.apache.flink.table.runtime.typeutils.ExternalTypeInfo for that. The
    binary representation is using internal data structures and conversion
    will be performed during serialization/deserialization:

    conversion -> internal -> conversion

    You have two possibilities:

    1) You simply call `tableEnv.toDataStream(table).getType()` and pass
    this type on to the next operator.

    2) You define your own TypeInformation as you would usually do it in
    DataStream API without Table API.

    We might serialize `Row`s with `RowSerializer` again in the near
    future.
    But for now we went with the most generic solution that supports
    everything that can come out of Table API.

    Regards,
    Timo

    On 04.06.21 15:12, Yuval Itzchakov wrote:
     > When upgrading to Flink 1.13, I ran into deprecation warnings on
     > TypeConversions
     >
     > image.png
     >
     > The deprecation message states that this API will be deprecated
    soon,
     > but does not mention the alternatives that can be used for these
     > transformations.
     >
     > My use case is that I have a table that needs to be converted into a
     > DataStream[Row] and in turn I need to apply some stateful
     > transformations on it. In order to do that I need the
     > TypeInformation[Row] produced in order to pass into the various
    state
     > functions.
     >
     > @Timo Walther <mailto:twal...@apache.org
    <mailto:twal...@apache.org>> I would love your help on this.
     > --
     > Best Regards,
     > Yuval Itzchakov.


Reply via email to