Hi, Weiqing.
Thanks for driving this FLIP.
I'm +1 for supporting schema evolution for SQL RowData type.

I just have some questions:
1. Could we consider defining a method returning *SchemaEvolutionSerializer*
in *TypeSerializerSchemaCompatibility* (like
compatibleAfterMigration(TypeSerializer<T> schemaEvolutionSerializer))?
Then we could also only care about implementing the schema evolution as the
format of serializer (which could also be automic support since we
implement it inside and call it in the internal state).
I think it may be better because *TypeSerializerSnapshot* is a common
interface and many of its implementations may not really need *migrateState*
.

2. Considering the semantics of *TypeSerializerSnapshot*, I'd also suggest
changing the field to *oldSerializerSnapshot *which is also consistent with
*resolveSchemaCompatibility*

*3. *Do we really need an extra *migrateElement* method ? Or if we go with
the option of defining *SchemaEvolutionSerializer, *Could element schema
evolution serializer just be a special *SchemaEvolutionSerializer ?*

On Tue, Apr 29, 2025 at 2:30 PM Weiqing Yang <yangweiqing...@gmail.com>
wrote:

> Thanks for the suggestions, Zakelly!
>
> Regarding *migrateElement* - it is specifically needed for ListState, which
> stores elements individually with delimiters. Its implementation
> deserializes and processes each element one by one during migration, so I
> introduced the *migrateElement* API to handle this per-element processing.
>
> Regarding the *migrateState *signature - I’m open to suggestions. My
> original design aimed to align with the existing implementations in
> RocksDBMapState, RocksDBListState, and AbstractRocksDBState. For example,
> in RocksDBMapState (v1.20), the migrateSerializedValue
> <
> https://github.com/apache/flink/blob/release-1.20.1/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L243
> >
> method first deserializes the old value and then serializes it with the new
> serializer:
>
> *...
> **if (!isNull) {
> **    mapUserValue =
> priorMapValueSerializer.deserialize(serializedOldValueInput);
> **}
> **...
> **newMapValueSerializer.serialize(mapUserValue,
> serializedMigratedValueOutput);*
>
> Here, *snapshotConfiguration.migrateState* is called as:
>
> *if (!isNull) {*
>     *priorMapValueSerializer.snapshotConfiguration().migrateState(*
>            *priorMapValueSerializer**, newSerializer,
> serializedOldValueInput, serializedMigratedValueOutput);
> *
> *}*
>
> The idea was to mirror this structure - delegate the migration logic
> to *priorSerializer.snapshotConfiguration(),
> *passing both the prior and new serializers.
>
> On Mon, Apr 28, 2025 at 4:24 AM Zakelly Lan <zakelly....@gmail.com> wrote:
>
> > Hi Weiqiang,
> >
> > Thanks for your answers!
> >
> > It seems a simple deserialization-serialization lacks flexibility, thus
> I'd
> > agree to introduce new methods.
> > I'd suggest changing the signature to:
> > ```
> >  public void migrateState(
> >                TypeSerializerSnapshot<T> oldSerializerSnapshot,
> >                DataInputDeserializer in,
> >                DataOutputSerializer out) throws IOException
> > ```
> > which is more aligned with other methods under `TypeSerializerSnapshot`.
> > WDYT?
> >
> > And another question: Could you describe in which case we need
> > `migrateElement`?
> >
> >
> > Best,
> > Zakelly
> >
> > On Mon, Apr 28, 2025 at 2:49 AM Weiqing Yang <yangweiqing...@gmail.com>
> > wrote:
> >
> > > Hi Zakelly,
> > >
> > > Thanks for your feedback.
> > >
> > > You're right - *resolveSchemaCompatibility* is critical for identifying
> > > schema compatibility. However, our challenge extends beyond detection
> to
> > > handling the actual migration process, particularly given RowData’s
> > complex
> > > requirements.
> > >
> > > The standard migration logic in *AbstractRocksDBState* isn't sufficient
> > for
> > > RowData because, during migration, we need to:
> > >
> > >    - Add null values for newly added fields
> > >    - Reorder fields based on field names in the new schema
> > >    - Recursively handle nested structures
> > >    - Apply different migration logic depending on the state type (e.g.,
> > >    *ListState* uses *migrateElement*(), *MapState* uses
> *migrateState*())
> > >
> > >
> > >
> > > The current approach:
> > >
> > >
> > > *V value =
> > >
> >
> priorSerializer.deserialize(serializedOldValueInput);**newSerializer.serialize(value,
> > > serializedMigratedValueOutput);*
> > >
> > > doesn’t offer enough control for these needs.
> > >
> > > The proposed *migrateState* and *migrateElement* methods maintain
> > backward
> > > compatibility with default implementations, while enabling RowData to
> > > perform specialized migration logic without requiring backend changes.
> > >
> > > I’ve updated the proposal document to include pseudo-code examples of
> > > *migrateState* and *migrateElement* in the *RowDataSerializerSnapshot*
> > > class to illustrate this. Let me know if I missed anything.
> > >
> > > Best regards,
> > > Weiqing
> > >
> > > On Sat, Apr 26, 2025 at 9:39 PM Zakelly Lan <zakelly....@gmail.com>
> > wrote:
> > >
> > > > Hi, Weiqing
> > > >
> > > > Thanks for the FLIP! In general I'd +1 for schema evolution for
> RowData
> > > > types, which will enhance the user experience of SQL jobs.
> > > >
> > > > I have one questions for now:
> > > >
> > > > You suggested introducing new methods in `TypeSerializerSnapshot`,
> but
> > is
> > > > it possible to leverage existing state migration procedure[1], which
> > also
> > > > performs deserialization and serialization with old and new
> serializer
> > > > correspondingly. IIUC, all we need is to properly implement
> > > > `resolveSchemaCompatibility` for `RowDataSerializerSnapshot`[2] since
> > it
> > > > will be invoked here[3]. No need for new methods, right?
> > > >
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java#L201-L205
> > > > [2]
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java#L335
> > > > [3]
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/f8b3c4b9a8ce1c6a094fcc0f292faea4bad8806c/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java#L312
> > > >
> > > >
> > > > Best,
> > > > Zakelly
> > > >
> > > > On Sat, Apr 26, 2025 at 1:47 PM Weiqing Yang <
> yangweiqing...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I’d like to initiate a discussion about enhancing state schema
> > > evolution
> > > > > support for RowData in Flink.
> > > > >
> > > > > *Motivation*
> > > > >
> > > > > Flink applications frequently need to evolve their state schema as
> > > > business
> > > > > requirements change. Currently, when users update a Table API or
> SQL
> > > job
> > > > > with schema changes involving RowData types (particularly nested
> > > > > structures), they encounter serialization compatibility errors
> during
> > > > state
> > > > > restoration, causing job failures.The issue occurs because existing
> > > state
> > > > > migration mechanisms don't properly handle RowData types during
> > schema
> > > > > evolution, preventing users from making backward-compatible changes
> > > like:
> > > > >
> > > > >    -
> > > > >
> > > > >    Adding nullable fields to existing structures
> > > > >    -
> > > > >
> > > > >    Reordering fields within a row while preserving field names
> > > > >    -
> > > > >
> > > > >    Evolving nested row structures
> > > > >
> > > > > This limitation impacts production applications using Flink's Table
> > > API,
> > > > as
> > > > > the RowData type is central to this interface. Users are forced to
> > > choose
> > > > > between maintaining outdated schemas or reprocessing all state data
> > > when
> > > > > schema changes are required.
> > > > >
> > > > > Here’s the proposal document: Link
> > > > > <
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1WtAxp-jAVTLMOfWNldLCAoK137P0ZCMxR8hOZGcMxuc/edit?tab=t.0
> > > > > >
> > > > > Your feedback and ideas are welcome to refine this feature.
> > > > >
> > > > > Thanks,
> > > > > Weiqing
> > > > >
> > > >
> > >
> >
>


-- 
Best,
Hangxiang.

Reply via email to