Hi Hangxiang, Zakelly, Thank you for the careful review and the +1 on the proposal.
*1. Where to host the migration logic* I experimented with placing the migration hook on TypeSerializerSchemaCompatibility, but ran into two issues: - Is the "schemaEvolutionSerializer" intended to be the new TypeSerializer? The migration needs access to both the DataInputDeserializer (the value) and the new TypeSerializer. - TypeSerializerSchemaCompatibility is currently designed as a result holder, not an executor, so keeping the procedural logic inside TypeSerializerSnapshot seems clearer. *2. Naming the snapshot field* I can change the field to `oldSerializerSnapshot` for consistency with ` resolveSchemaCompatibility()`, if you think that’s clearer. Note that migrateState() will still require the new serializer, so the method signature will remain migrateState(oldSnapshot, newSerializer, dataInput, ...). *3. Need for migrateElement()* I initially tried relying only on migrateState(), but for RocksDBListState the code became much less clean, as it stores list elements individually with delimiters. A dedicated migrateElement() method keeps that migration logic more readable (also slightly improves performance) for ListState. For context, I cherry-picked our internal PR (used in production on Flink v1.16) that illustrates these points in practice: https://github.com/weiqingy/flink/commit/00539b16cc55bcd144ba65c052142fbe6a556842 I’m happy to iterate further - please let me know your thoughts. Thanks again! Weiqing On Tue, May 6, 2025 at 11:54 PM Hangxiang Yu <master...@gmail.com> wrote: > Hi, Weiqing. > Thanks for driving this FLIP. > I'm +1 for supporting schema evolution for SQL RowData type. > > I just have some questions: > 1. Could we consider defining a method returning > *SchemaEvolutionSerializer* > in *TypeSerializerSchemaCompatibility* (like > compatibleAfterMigration(TypeSerializer<T> schemaEvolutionSerializer))? > Then we could also only care about implementing the schema evolution as the > format of serializer (which could also be automic support since we > implement it inside and call it in the internal state). > I think it may be better because *TypeSerializerSnapshot* is a common > interface and many of its implementations may not really need > *migrateState* > . > > 2. Considering the semantics of *TypeSerializerSnapshot*, I'd also suggest > changing the field to *oldSerializerSnapshot *which is also consistent with > *resolveSchemaCompatibility* > > *3. *Do we really need an extra *migrateElement* method ? Or if we go with > the option of defining *SchemaEvolutionSerializer, *Could element schema > evolution serializer just be a special *SchemaEvolutionSerializer ?* > > On Tue, Apr 29, 2025 at 2:30 PM Weiqing Yang <yangweiqing...@gmail.com> > wrote: > > > Thanks for the suggestions, Zakelly! > > > > Regarding *migrateElement* - it is specifically needed for ListState, > which > > stores elements individually with delimiters. Its implementation > > deserializes and processes each element one by one during migration, so I > > introduced the *migrateElement* API to handle this per-element > processing. > > > > Regarding the *migrateState *signature - I’m open to suggestions. My > > original design aimed to align with the existing implementations in > > RocksDBMapState, RocksDBListState, and AbstractRocksDBState. For example, > > in RocksDBMapState (v1.20), the migrateSerializedValue > > < > > > https://github.com/apache/flink/blob/release-1.20.1/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L243 > > > > > method first deserializes the old value and then serializes it with the > new > > serializer: > > > > *... > > **if (!isNull) { > > ** mapUserValue = > > priorMapValueSerializer.deserialize(serializedOldValueInput); > > **} > > **... > > **newMapValueSerializer.serialize(mapUserValue, > > serializedMigratedValueOutput);* > > > > Here, *snapshotConfiguration.migrateState* is called as: > > > > *if (!isNull) {* > > *priorMapValueSerializer.snapshotConfiguration().migrateState(* > > *priorMapValueSerializer**, newSerializer, > > serializedOldValueInput, serializedMigratedValueOutput); > > * > > *}* > > > > The idea was to mirror this structure - delegate the migration logic > > to *priorSerializer.snapshotConfiguration(), > > *passing both the prior and new serializers. > > > > On Mon, Apr 28, 2025 at 4:24 AM Zakelly Lan <zakelly....@gmail.com> > wrote: > > > > > Hi Weiqiang, > > > > > > Thanks for your answers! > > > > > > It seems a simple deserialization-serialization lacks flexibility, thus > > I'd > > > agree to introduce new methods. > > > I'd suggest changing the signature to: > > > ``` > > > public void migrateState( > > > TypeSerializerSnapshot<T> oldSerializerSnapshot, > > > DataInputDeserializer in, > > > DataOutputSerializer out) throws IOException > > > ``` > > > which is more aligned with other methods under > `TypeSerializerSnapshot`. > > > WDYT? > > > > > > And another question: Could you describe in which case we need > > > `migrateElement`? > > > > > > > > > Best, > > > Zakelly > > > > > > On Mon, Apr 28, 2025 at 2:49 AM Weiqing Yang <yangweiqing...@gmail.com > > > > > wrote: > > > > > > > Hi Zakelly, > > > > > > > > Thanks for your feedback. > > > > > > > > You're right - *resolveSchemaCompatibility* is critical for > identifying > > > > schema compatibility. However, our challenge extends beyond detection > > to > > > > handling the actual migration process, particularly given RowData’s > > > complex > > > > requirements. > > > > > > > > The standard migration logic in *AbstractRocksDBState* isn't > sufficient > > > for > > > > RowData because, during migration, we need to: > > > > > > > > - Add null values for newly added fields > > > > - Reorder fields based on field names in the new schema > > > > - Recursively handle nested structures > > > > - Apply different migration logic depending on the state type > (e.g., > > > > *ListState* uses *migrateElement*(), *MapState* uses > > *migrateState*()) > > > > > > > > > > > > > > > > The current approach: > > > > > > > > > > > > *V value = > > > > > > > > > > priorSerializer.deserialize(serializedOldValueInput);**newSerializer.serialize(value, > > > > serializedMigratedValueOutput);* > > > > > > > > doesn’t offer enough control for these needs. > > > > > > > > The proposed *migrateState* and *migrateElement* methods maintain > > > backward > > > > compatibility with default implementations, while enabling RowData to > > > > perform specialized migration logic without requiring backend > changes. > > > > > > > > I’ve updated the proposal document to include pseudo-code examples of > > > > *migrateState* and *migrateElement* in the > *RowDataSerializerSnapshot* > > > > class to illustrate this. Let me know if I missed anything. > > > > > > > > Best regards, > > > > Weiqing > > > > > > > > On Sat, Apr 26, 2025 at 9:39 PM Zakelly Lan <zakelly....@gmail.com> > > > wrote: > > > > > > > > > Hi, Weiqing > > > > > > > > > > Thanks for the FLIP! In general I'd +1 for schema evolution for > > RowData > > > > > types, which will enhance the user experience of SQL jobs. > > > > > > > > > > I have one questions for now: > > > > > > > > > > You suggested introducing new methods in `TypeSerializerSnapshot`, > > but > > > is > > > > > it possible to leverage existing state migration procedure[1], > which > > > also > > > > > performs deserialization and serialization with old and new > > serializer > > > > > correspondingly. IIUC, all we need is to properly implement > > > > > `resolveSchemaCompatibility` for `RowDataSerializerSnapshot`[2] > since > > > it > > > > > will be invoked here[3]. No need for new methods, right? > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java#L201-L205 > > > > > [2] > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java#L335 > > > > > [3] > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java#L312 > > > > > > > > > > > > > > > Best, > > > > > Zakelly > > > > > > > > > > On Sat, Apr 26, 2025 at 1:47 PM Weiqing Yang < > > yangweiqing...@gmail.com > > > > > > > > > wrote: > > > > > > > > > > > Hi all, > > > > > > > > > > > > I’d like to initiate a discussion about enhancing state schema > > > > evolution > > > > > > support for RowData in Flink. > > > > > > > > > > > > *Motivation* > > > > > > > > > > > > Flink applications frequently need to evolve their state schema > as > > > > > business > > > > > > requirements change. Currently, when users update a Table API or > > SQL > > > > job > > > > > > with schema changes involving RowData types (particularly nested > > > > > > structures), they encounter serialization compatibility errors > > during > > > > > state > > > > > > restoration, causing job failures.The issue occurs because > existing > > > > state > > > > > > migration mechanisms don't properly handle RowData types during > > > schema > > > > > > evolution, preventing users from making backward-compatible > changes > > > > like: > > > > > > > > > > > > - > > > > > > > > > > > > Adding nullable fields to existing structures > > > > > > - > > > > > > > > > > > > Reordering fields within a row while preserving field names > > > > > > - > > > > > > > > > > > > Evolving nested row structures > > > > > > > > > > > > This limitation impacts production applications using Flink's > Table > > > > API, > > > > > as > > > > > > the RowData type is central to this interface. Users are forced > to > > > > choose > > > > > > between maintaining outdated schemas or reprocessing all state > data > > > > when > > > > > > schema changes are required. > > > > > > > > > > > > Here’s the proposal document: Link > > > > > > < > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1WtAxp-jAVTLMOfWNldLCAoK137P0ZCMxR8hOZGcMxuc/edit?tab=t.0 > > > > > > > > > > > > > Your feedback and ideas are welcome to refine this feature. > > > > > > > > > > > > Thanks, > > > > > > Weiqing > > > > > > > > > > > > > > > > > > > > > > > -- > Best, > Hangxiang. >