Thanks Haibo, bummer ;) On Mon, Jul 15, 2019 at 12:27 PM Haibo Sun <sunhaib...@163.com> wrote:
> *This Message originated outside your organization.* > ------------------------------ > Hi, Avi Levi > > I don't think there's any way to solve this problem right now, and Flink > documentation clearly shows that this is not supported. > > “Trying to restore state, which was previously configured without TTL, > using TTL enabled descriptor or vice versa will lead to compatibility > failure and StateMigrationException." > > Flink Document: > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl > <https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl> > > Best, > Haibo > > At 2019-07-14 16:50:19, "Avi Levi" <avi.l...@bluevoyant.com> wrote: > > Hi, > I added a ttl to my state > *old version :* > private lazy val stateDescriptor = new ValueStateDescriptor("foo", > Types.CASE_CLASS[DomainState]) > > *vs the new version * > > @transient > private lazy val storeTtl = StateTtlConfig.newBuilder(90) > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > .cleanupInRocksdbCompactFilter() > .build() > > private lazy val stateDescriptor = { > val des = new ValueStateDescriptor("foo", > Types.CASE_CLASS[DomainState]) > des.enableTimeToLive(storeTtl) > des > } > > *BUT when trying to restore from savepoint I am getting this error:* > > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144) > ... > > > Caused by: org.apache.flink.util.StateMigrationException: The new state > serializer cannot be incompatible. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:527) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:137) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) > ... 11 more > > > Do you have any idea how can I resolve it ? > > > Best wishes > >