Hi Danny,
"if ChangelogMode.INSERT is the default, existing pipelines should be
compatible"
It is not about changelog mode compatibility, it is about the type
compatibility. The renaming to `toInsertStream` is only to have a mean
of dealing with data type inconsistencies that could break existing
pipelines.
As the FLIP describes, the following new behavior should be implemented:
- It does this by translating the TypeInformation to DataType.
- This will happen with a new TypeInfoDataTypeConverter that will no
longer produce LegacyTypeInformationType.
- All types from DataStream API should be supported by this converter.
- TupleTypeInfoBase will be translated into a proper RowType or
StructuredType.
- BigDecimals will be converted to DECIMAL(38,18) by default.
- Composite types (tuples, POJOs, rows) will be flattened by default if
they are used as top-level records (similar to the old behavior).
- The order of POJO field's is determined by the DataTypeExtractor and
must not be defined manually anymore.
- GenericTypeInfo is converted to RawType immediately by considering the
current configuration.
- A DataStream that originated from Table API will keep its DataType
information due to ExternalTypeInfo implementing DataTypeQueryable.
I would feel safer if we do this under a new method name.
"toDataStream(table, schema.bindTo(DataType))"
This is what I meant with "integrate the DataType into the Schema class
itself". Yes, we can do that if everybody is fine with it. But why
should a user specify both a schema and a data type? This means
potentially duplicate definition of fields and their data types etc.
Regards,
Timo
On 03.09.20 11:31, Danny Chan wrote:
"It is a more conservative approach to introduce that in a
new method rather than changing the existing one under the hood and
potentially break existing pipelines silently”
I like the idea actually, but if ChangelogMode.INSERT is the default, existing
pipelines should be compatible. We can see the other kinds of ChangelogMode as
an extension.
“for `toDataStream` users need to be
able to express whether they would prefer Row, POJO or atomic”
I think most of the cases people do not need to convert the stream to a Row or
POJO, because the table projection always returns a flatternned internal row,
if people did want a POJO there, how about we bind the DataType to the existing
schema, like this
toDataStream(table, schema.bindTo(DataType))
Best,
Danny Chan
在 2020年9月3日 +0800 PM3:18,dev@flink.apache.org,写道:
It is a more conservative approach to introduce that in a
new method rather than changing the existing one under the hood and
potentially break existing pipelines silently