Hi Hangxiang, Zakelly,

Thank you for the careful review and the +1 on the proposal.

*1. Where to host the migration logic*

I experimented with placing the migration hook on
TypeSerializerSchemaCompatibility, but ran into two issues:

   -

   Is the "schemaEvolutionSerializer" intended to be the new
   TypeSerializer? The migration needs access to both the
   DataInputDeserializer (the value) and the new TypeSerializer.
   -

   TypeSerializerSchemaCompatibility is currently designed as a result
   holder, not an executor, so keeping the procedural logic inside
   TypeSerializerSnapshot seems clearer.

*2. Naming the snapshot field*

I can change the field to `oldSerializerSnapshot` for consistency with `
resolveSchemaCompatibility()`, if you think that’s clearer. Note that
migrateState() will still require the new serializer, so the method
signature will remain migrateState(oldSnapshot, newSerializer, dataInput,
...).

*3. Need for migrateElement()*

I initially tried relying only on migrateState(), but for RocksDBListState
the code became much less clean, as it stores list elements individually
with delimiters. A dedicated migrateElement() method keeps that migration
logic more readable (also slightly improves performance) for ListState.

For context, I cherry-picked our internal PR (used in production on Flink
v1.16) that illustrates these points in practice:
https://github.com/weiqingy/flink/commit/00539b16cc55bcd144ba65c052142fbe6a556842

I’m happy to iterate further - please let me know your thoughts.


Thanks again!
Weiqing

On Tue, May 6, 2025 at 11:54 PM Hangxiang Yu <master...@gmail.com> wrote:

> Hi, Weiqing.
> Thanks for driving this FLIP.
> I'm +1 for supporting schema evolution for SQL RowData type.
>
> I just have some questions:
> 1. Could we consider defining a method returning
> *SchemaEvolutionSerializer*
> in *TypeSerializerSchemaCompatibility* (like
> compatibleAfterMigration(TypeSerializer<T> schemaEvolutionSerializer))?
> Then we could also only care about implementing the schema evolution as the
> format of serializer (which could also be automic support since we
> implement it inside and call it in the internal state).
> I think it may be better because *TypeSerializerSnapshot* is a common
> interface and many of its implementations may not really need
> *migrateState*
> .
>
> 2. Considering the semantics of *TypeSerializerSnapshot*, I'd also suggest
> changing the field to *oldSerializerSnapshot *which is also consistent with
> *resolveSchemaCompatibility*
>
> *3. *Do we really need an extra *migrateElement* method ? Or if we go with
> the option of defining *SchemaEvolutionSerializer, *Could element schema
> evolution serializer just be a special *SchemaEvolutionSerializer ?*
>
> On Tue, Apr 29, 2025 at 2:30 PM Weiqing Yang <yangweiqing...@gmail.com>
> wrote:
>
> > Thanks for the suggestions, Zakelly!
> >
> > Regarding *migrateElement* - it is specifically needed for ListState,
> which
> > stores elements individually with delimiters. Its implementation
> > deserializes and processes each element one by one during migration, so I
> > introduced the *migrateElement* API to handle this per-element
> processing.
> >
> > Regarding the *migrateState *signature - I’m open to suggestions. My
> > original design aimed to align with the existing implementations in
> > RocksDBMapState, RocksDBListState, and AbstractRocksDBState. For example,
> > in RocksDBMapState (v1.20), the migrateSerializedValue
> > <
> >
> https://github.com/apache/flink/blob/release-1.20.1/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L243
> > >
> > method first deserializes the old value and then serializes it with the
> new
> > serializer:
> >
> > *...
> > **if (!isNull) {
> > **    mapUserValue =
> > priorMapValueSerializer.deserialize(serializedOldValueInput);
> > **}
> > **...
> > **newMapValueSerializer.serialize(mapUserValue,
> > serializedMigratedValueOutput);*
> >
> > Here, *snapshotConfiguration.migrateState* is called as:
> >
> > *if (!isNull) {*
> >     *priorMapValueSerializer.snapshotConfiguration().migrateState(*
> >            *priorMapValueSerializer**, newSerializer,
> > serializedOldValueInput, serializedMigratedValueOutput);
> > *
> > *}*
> >
> > The idea was to mirror this structure - delegate the migration logic
> > to *priorSerializer.snapshotConfiguration(),
> > *passing both the prior and new serializers.
> >
> > On Mon, Apr 28, 2025 at 4:24 AM Zakelly Lan <zakelly....@gmail.com>
> wrote:
> >
> > > Hi Weiqiang,
> > >
> > > Thanks for your answers!
> > >
> > > It seems a simple deserialization-serialization lacks flexibility, thus
> > I'd
> > > agree to introduce new methods.
> > > I'd suggest changing the signature to:
> > > ```
> > >  public void migrateState(
> > >                TypeSerializerSnapshot<T> oldSerializerSnapshot,
> > >                DataInputDeserializer in,
> > >                DataOutputSerializer out) throws IOException
> > > ```
> > > which is more aligned with other methods under
> `TypeSerializerSnapshot`.
> > > WDYT?
> > >
> > > And another question: Could you describe in which case we need
> > > `migrateElement`?
> > >
> > >
> > > Best,
> > > Zakelly
> > >
> > > On Mon, Apr 28, 2025 at 2:49 AM Weiqing Yang <yangweiqing...@gmail.com
> >
> > > wrote:
> > >
> > > > Hi Zakelly,
> > > >
> > > > Thanks for your feedback.
> > > >
> > > > You're right - *resolveSchemaCompatibility* is critical for
> identifying
> > > > schema compatibility. However, our challenge extends beyond detection
> > to
> > > > handling the actual migration process, particularly given RowData’s
> > > complex
> > > > requirements.
> > > >
> > > > The standard migration logic in *AbstractRocksDBState* isn't
> sufficient
> > > for
> > > > RowData because, during migration, we need to:
> > > >
> > > >    - Add null values for newly added fields
> > > >    - Reorder fields based on field names in the new schema
> > > >    - Recursively handle nested structures
> > > >    - Apply different migration logic depending on the state type
> (e.g.,
> > > >    *ListState* uses *migrateElement*(), *MapState* uses
> > *migrateState*())
> > > >
> > > >
> > > >
> > > > The current approach:
> > > >
> > > >
> > > > *V value =
> > > >
> > >
> >
> priorSerializer.deserialize(serializedOldValueInput);**newSerializer.serialize(value,
> > > > serializedMigratedValueOutput);*
> > > >
> > > > doesn’t offer enough control for these needs.
> > > >
> > > > The proposed *migrateState* and *migrateElement* methods maintain
> > > backward
> > > > compatibility with default implementations, while enabling RowData to
> > > > perform specialized migration logic without requiring backend
> changes.
> > > >
> > > > I’ve updated the proposal document to include pseudo-code examples of
> > > > *migrateState* and *migrateElement* in the
> *RowDataSerializerSnapshot*
> > > > class to illustrate this. Let me know if I missed anything.
> > > >
> > > > Best regards,
> > > > Weiqing
> > > >
> > > > On Sat, Apr 26, 2025 at 9:39 PM Zakelly Lan <zakelly....@gmail.com>
> > > wrote:
> > > >
> > > > > Hi, Weiqing
> > > > >
> > > > > Thanks for the FLIP! In general I'd +1 for schema evolution for
> > RowData
> > > > > types, which will enhance the user experience of SQL jobs.
> > > > >
> > > > > I have one questions for now:
> > > > >
> > > > > You suggested introducing new methods in `TypeSerializerSnapshot`,
> > but
> > > is
> > > > > it possible to leverage existing state migration procedure[1],
> which
> > > also
> > > > > performs deserialization and serialization with old and new
> > serializer
> > > > > correspondingly. IIUC, all we need is to properly implement
> > > > > `resolveSchemaCompatibility` for `RowDataSerializerSnapshot`[2]
> since
> > > it
> > > > > will be invoked here[3]. No need for new methods, right?
> > > > >
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java#L201-L205
> > > > > [2]
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java#L335
> > > > > [3]
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java#L312
> > > > >
> > > > >
> > > > > Best,
> > > > > Zakelly
> > > > >
> > > > > On Sat, Apr 26, 2025 at 1:47 PM Weiqing Yang <
> > yangweiqing...@gmail.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I’d like to initiate a discussion about enhancing state schema
> > > > evolution
> > > > > > support for RowData in Flink.
> > > > > >
> > > > > > *Motivation*
> > > > > >
> > > > > > Flink applications frequently need to evolve their state schema
> as
> > > > > business
> > > > > > requirements change. Currently, when users update a Table API or
> > SQL
> > > > job
> > > > > > with schema changes involving RowData types (particularly nested
> > > > > > structures), they encounter serialization compatibility errors
> > during
> > > > > state
> > > > > > restoration, causing job failures.The issue occurs because
> existing
> > > > state
> > > > > > migration mechanisms don't properly handle RowData types during
> > > schema
> > > > > > evolution, preventing users from making backward-compatible
> changes
> > > > like:
> > > > > >
> > > > > >    -
> > > > > >
> > > > > >    Adding nullable fields to existing structures
> > > > > >    -
> > > > > >
> > > > > >    Reordering fields within a row while preserving field names
> > > > > >    -
> > > > > >
> > > > > >    Evolving nested row structures
> > > > > >
> > > > > > This limitation impacts production applications using Flink's
> Table
> > > > API,
> > > > > as
> > > > > > the RowData type is central to this interface. Users are forced
> to
> > > > choose
> > > > > > between maintaining outdated schemas or reprocessing all state
> data
> > > > when
> > > > > > schema changes are required.
> > > > > >
> > > > > > Here’s the proposal document: Link
> > > > > > <
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1WtAxp-jAVTLMOfWNldLCAoK137P0ZCMxR8hOZGcMxuc/edit?tab=t.0
> > > > > > >
> > > > > > Your feedback and ideas are welcome to refine this feature.
> > > > > >
> > > > > > Thanks,
> > > > > > Weiqing
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
> Best,
> Hangxiang.
>

Reply via email to