Hi Yuval,
I would recommend option 2. Because esp. when it comes to state you
should be in control what is persisted. There is no guarantee that the
ExternalSerializer will not change in the future. It is only meant for
shipping data as the input of the next operator.
I would recommend to write some little tool that traverses
`table.getResolvedSchema().toSourceDataType().getLogicalType()`, limits
the types that you allow in state (e.g. no structured types) and
translates it to TypeInformation.
Regards,
Timo
On 04.06.21 16:05, Yuval Itzchakov wrote:
Hi Timo,
Thank you for the response.
The tables being created in reality are based on arbitrary SQL code such
that I don't know what the schema actually is to create the
TypeInformation "by hand" and pass it on to the DataStream API.
This leaves me with option 1, which leads to another question: If I have
some state records stored in RocksDB from a current running job in a
previous Flink version (1.9.3), will changing the TypeInformation from
TypeInformation[Row] to the ExternalTypeInformation now break the
compatibility of the state currently stored and cause them to be losed
essentially? My guy feeling says yes unless some form of backwards
compatibility is going to be written specifically for the usecase.
On Fri, Jun 4, 2021, 16:33 Timo Walther <twal...@apache.org
<mailto:twal...@apache.org>> wrote:
Hi Yuval,
TypeConversions.fromDataTypeToLegacyInfo was only a utility to bridge
between TypeInformation and DataType until TypeInformation is not
exposed through the Table API anymore.
Beginning from Flink 1.13 the Table API is able to serialize the
records
to the first DataStream operator via toDataStream or toChangelogStream.
Internally, it uses
org.apache.flink.table.runtime.typeutils.ExternalTypeInfo for that. The
binary representation is using internal data structures and conversion
will be performed during serialization/deserialization:
conversion -> internal -> conversion
You have two possibilities:
1) You simply call `tableEnv.toDataStream(table).getType()` and pass
this type on to the next operator.
2) You define your own TypeInformation as you would usually do it in
DataStream API without Table API.
We might serialize `Row`s with `RowSerializer` again in the near
future.
But for now we went with the most generic solution that supports
everything that can come out of Table API.
Regards,
Timo
On 04.06.21 15:12, Yuval Itzchakov wrote:
> When upgrading to Flink 1.13, I ran into deprecation warnings on
> TypeConversions
>
> image.png
>
> The deprecation message states that this API will be deprecated
soon,
> but does not mention the alternatives that can be used for these
> transformations.
>
> My use case is that I have a table that needs to be converted into a
> DataStream[Row] and in turn I need to apply some stateful
> transformations on it. In order to do that I need the
> TypeInformation[Row] produced in order to pass into the various
state
> functions.
>
> @Timo Walther <mailto:twal...@apache.org
<mailto:twal...@apache.org>> I would love your help on this.
> --
> Best Regards,
> Yuval Itzchakov.