Hi community,

When customers are iterating on the Flink SQL pipelines, we have discovered
a couple of state incompatible changes that we believe could be state
compatible and would allow working with stateful Flink SQL pipeline easier.

Today some of these incompatibility issues can be addressed by manually
editing the checkpoint/savepoint metadata with the State Processing
APIs[1], but that’s a very manual process. We are hoping to understand if
others in the community also think these changes should be considered state
compatible and how we should go about updating Flink to reflect these
changes.


   1.

   Changing the nullability of a field:
   https://issues.apache.org/jira/browse/FLINK-37240
   2.

   *Increasing* the length of a variable length data type:
   https://issues.apache.org/jira/browse/FLINK-35147.


In terms of specific implementation details:

I believe the crux of all of these incompatible changes is from how Flink
determines the compatibility of the state of operators in the new and
previous when resuming the Flink job.


Today this logic lives in the `RowDataSerializer` class and performs a
simple equality check [2] based on the logic in each LogicalType.equals()
[3] in the schemas. My initial impressions are that I believe we can
improve on this equality check and make it more fine grained, etc. Then we
could introduce environmental variables that would allow the user to toggle
the behaviour to allow for flexibility and backwards compatibility. But I
would love to get any feedback from the community if there’s anything we’re
missing.

Digging further, I believe allowing nullability changes can be easily
addressed and should be safe and relatively straight forward as we already
default to not null when serializing the data [4]. We then leave it up to
the users to ensure that the downstream systems are updated to accept null
values.

Best,

Eric

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/


[2]
https://github.com/decodableco/decodable-flink/blob/decodable-flink-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java#L343-L345


[3]
https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java#L215-L225
[4]
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/writer/BinaryRowWriter.java#L57-L62

Reply via email to