Hi community, When customers are iterating on the Flink SQL pipelines, we have discovered a couple of state incompatible changes that we believe could be state compatible and would allow working with stateful Flink SQL pipeline easier.
Today some of these incompatibility issues can be addressed by manually editing the checkpoint/savepoint metadata with the State Processing APIs[1], but that’s a very manual process. We are hoping to understand if others in the community also think these changes should be considered state compatible and how we should go about updating Flink to reflect these changes. 1. Changing the nullability of a field: https://issues.apache.org/jira/browse/FLINK-37240 2. *Increasing* the length of a variable length data type: https://issues.apache.org/jira/browse/FLINK-35147. In terms of specific implementation details: I believe the crux of all of these incompatible changes is from how Flink determines the compatibility of the state of operators in the new and previous when resuming the Flink job. Today this logic lives in the `RowDataSerializer` class and performs a simple equality check [2] based on the logic in each LogicalType.equals() [3] in the schemas. My initial impressions are that I believe we can improve on this equality check and make it more fine grained, etc. Then we could introduce environmental variables that would allow the user to toggle the behaviour to allow for flexibility and backwards compatibility. But I would love to get any feedback from the community if there’s anything we’re missing. Digging further, I believe allowing nullability changes can be easily addressed and should be safe and relatively straight forward as we already default to not null when serializing the data [4]. We then leave it up to the users to ensure that the downstream systems are updated to accept null values. Best, Eric [1] https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/ [2] https://github.com/decodableco/decodable-flink/blob/decodable-flink-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java#L343-L345 [3] https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java#L215-L225 [4] https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/writer/BinaryRowWriter.java#L57-L62