Hi Shamit,

Adding columns means that you're trying to perform schema evolution, which
isn't yet supported by Flink per the documentation [1] "Savepoints are only
supported if both the query and the Flink version remain constant"

Best regards,

Martijn

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution

On Wed, 29 Dec 2021 at 04:10, shamit jain <jainsha...@gmail.com> wrote:

> Hello Experts,
>
>
> I need help to understand whether we can deploy a job from a snapshot
> after changing the code by adding one new column in an existing table.
>
> We are using flink-1.13.2 table API and RocksDB as backend. We have
> changed the code and added one new column in the existing table and later,
> tried to deploy from the snapshot. While deploying, I'm getting the below
> error:-
>
>
> Caused by: org.apache.flink.util.StateMigrationException: The new state
> serializer
> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@957c5336)
> must not be incompatible with the old state serializer
> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@cfe79aaf).
>    at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
>    at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
>    at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
>    at
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>    at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
>    at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301)
>    at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352)
>    at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
>    at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>    ...
>
>
> After troubleshooting, I found we are getting this error while comparing
> the fields of previous and latest table definition.
>
> This comparision is happening from flink-table-runtime library  class:
> org.apache.flink.table.runtime.typeutils.RowDataSerializer -> method name:
> resolveSchemaCompatibility()
>
>    if (!Arrays.equals(previousTypes, newRowSerializer.types)) {
>                 return TypeSerializerSchemaCompatibility.incompatible();
>             }
>
>
> Can you please help me to understand if we can add a new column in an
> existing table and deploy from the snapshot?
>
> Regards,
> Shamit Jain
>

Reply via email to