Hi Shamit, Adding columns means that you're trying to perform schema evolution, which isn't yet supported by Flink per the documentation [1] "Savepoints are only supported if both the query and the Flink version remain constant"
Best regards, Martijn [1] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution On Wed, 29 Dec 2021 at 04:10, shamit jain <jainsha...@gmail.com> wrote: > Hello Experts, > > > I need help to understand whether we can deploy a job from a snapshot > after changing the code by adding one new column in an existing table. > > We are using flink-1.13.2 table API and RocksDB as backend. We have > changed the code and added one new column in the existing table and later, > tried to deploy from the snapshot. While deploying, I'm getting the below > error:- > > > Caused by: org.apache.flink.util.StateMigrationException: The new state > serializer > (org.apache.flink.table.runtime.typeutils.RowDataSerializer@957c5336) > must not be incompatible with the old state serializer > (org.apache.flink.table.runtime.typeutils.RowDataSerializer@cfe79aaf). > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837) > at > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) > ... > > > After troubleshooting, I found we are getting this error while comparing > the fields of previous and latest table definition. > > This comparision is happening from flink-table-runtime library class: > org.apache.flink.table.runtime.typeutils.RowDataSerializer -> method name: > resolveSchemaCompatibility() > > if (!Arrays.equals(previousTypes, newRowSerializer.types)) { > return TypeSerializerSchemaCompatibility.incompatible(); > } > > > Can you please help me to understand if we can add a new column in an > existing table and deploy from the snapshot? > > Regards, > Shamit Jain >