Thanks Martijn! I will check Datastream APIs, if it fits in our use case. Regards, Shamit Jain
On Thu, Dec 30, 2021 at 3:44 AM Martijn Visser <mart...@ververica.com> wrote: > Hi Shamit, > > Yes, there are more possibilities when using the DataStream API like with > the link you've included. You could also use the State Processor API [1] to > read, write & modify your savepoint. > > Best regards, > > Martijn > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/libs/state_processor_api/ > > On Wed, 29 Dec 2021 at 22:57, shamit jain <jainsha...@gmail.com> wrote: > >> Thanks Martijn! >> >> One more question, Can we achieve(schema evolution) using DataStream APIs? >> >> In flink documentation, I found [2] "The process of migrating state to >> adapt to changed schemas happens automatically, and independently for each >> state. This process is performed internally by Flink by first checking if >> the new serializer for the state has different serialization schema than >> the previous serializer; if so, the previous serializer is used to read the >> state to objects, and written back to bytes again with the new serializer." >> >> [2] >> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/schema_evolution/#evolving-state-schema >> >> Regards, >> Shamit Jain >> >> >> >> On Wed, Dec 29, 2021 at 5:09 AM Martijn Visser <mart...@ververica.com> >> wrote: >> >>> Hi Shamit, >>> >>> Adding columns means that you're trying to perform schema evolution, >>> which isn't yet supported by Flink per the documentation [1] "Savepoints >>> are only supported if both the query and the Flink version remain constant" >>> >>> Best regards, >>> >>> Martijn >>> >>> [1] >>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution >>> >>> On Wed, 29 Dec 2021 at 04:10, shamit jain <jainsha...@gmail.com> wrote: >>> >>>> Hello Experts, >>>> >>>> >>>> I need help to understand whether we can deploy a job from a snapshot >>>> after changing the code by adding one new column in an existing table. >>>> >>>> We are using flink-1.13.2 table API and RocksDB as backend. We have >>>> changed the code and added one new column in the existing table and later, >>>> tried to deploy from the snapshot. While deploying, I'm getting the below >>>> error:- >>>> >>>> >>>> Caused by: org.apache.flink.util.StateMigrationException: The new state >>>> serializer >>>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@957c5336) >>>> must not be incompatible with the old state serializer >>>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@cfe79aaf). >>>> at >>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704) >>>> at >>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624) >>>> at >>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837) >>>> at >>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) >>>> at >>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71) >>>> at >>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301) >>>> at >>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352) >>>> at >>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) >>>> at >>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) >>>> ... >>>> >>>> >>>> After troubleshooting, I found we are getting this error while >>>> comparing the fields of previous and latest table definition. >>>> >>>> This comparision is happening from flink-table-runtime library class: >>>> org.apache.flink.table.runtime.typeutils.RowDataSerializer -> method name: >>>> resolveSchemaCompatibility() >>>> >>>> if (!Arrays.equals(previousTypes, newRowSerializer.types)) { >>>> return TypeSerializerSchemaCompatibility.incompatible(); >>>> } >>>> >>>> >>>> Can you please help me to understand if we can add a new column in an >>>> existing table and deploy from the snapshot? >>>> >>>> Regards, >>>> Shamit Jain >>>> >>>