Thanks Haibo,
bummer ;)

On Mon, Jul 15, 2019 at 12:27 PM Haibo Sun <sunhaib...@163.com> wrote:

> *This Message originated outside your organization.*
> ------------------------------
> Hi,  Avi Levi
>
> I don't think there's any way to solve this problem right now, and Flink
> documentation clearly shows that this is not supported.
>
> “Trying to restore state, which was previously configured without TTL,
> using TTL enabled descriptor or vice versa will lead to compatibility
> failure and StateMigrationException."
>
> Flink Document:
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl
> <https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl>
>
> Best,
> Haibo
>
> At 2019-07-14 16:50:19, "Avi Levi" <avi.l...@bluevoyant.com> wrote:
>
> Hi,
> I added a ttl to my state
> *old version :*
>  private lazy val stateDescriptor = new ValueStateDescriptor("foo",
> Types.CASE_CLASS[DomainState])
>
> *vs the new version *
>
> @transient
> private lazy val storeTtl = StateTtlConfig.newBuilder(90)
>   .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>   .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>   .cleanupInRocksdbCompactFilter()
>   .build()
>
>   private lazy val stateDescriptor = {
>     val des = new ValueStateDescriptor("foo",
> Types.CASE_CLASS[DomainState])
>     des.enableTimeToLive(storeTtl)
>     des
>   }
>
> *BUT when trying to restore from savepoint I am getting this error:*
>
> java.lang.RuntimeException: Error while getting state
>       at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>       at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
>       ...
>
>
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer cannot be incompatible.
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:527)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
>       at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
>       at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:137)
>       at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
>       at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
>       at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
>       at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
>       at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
>       at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>       ... 11 more
>
>
> Do you have any idea how can I resolve it ?
>
>
> Best wishes
>
>

Reply via email to