Hi, Just to clarify - I quickly went through the README of the project, and saw this: "This error is seen after trying to read from a savepoint that was created using the same case class as a key."
So, if I understood correctly, you were attempting to use the State Processor API to access a savepoint that was written with a Scala DataStream job, correct? If that's the case, I'm afraid this would not work as of now. See [1] for a similar scenario that others had also bumped into. TL;DR is - the State Processor API currently is not guaranteed to work for snapshots that are written with Scala DataStream jobs. For now, I'll add a big warning about this to the docs. But in general, it seems like we might want to consider bumping up the priority for enabling this, as quite a few users are using the Scala DataStream API for their jobs. Just as a side comment: this repo looks like a very interesting project! Cheers, Gordon [1] https://issues.apache.org/jira/browse/FLINK-15719 On Wed, Feb 19, 2020 at 7:03 AM Mark Niehe <mark.ni...@segment.com> wrote: > Hey all, > > I've run into an issue with the State Processor API. To highlight the > issues I've been having, I've created a reference repository that will > demonstrate the issue (repository: > https://github.com/segmentio/flink-state-management). > > The current implementation of the pipeline has left us with keyed state > that we no longer need, and we don't have references some of the old keys. > My plan was to: > 1. create a savepoint > 2. read the keys from each operator (using State Processor API) > 3. filter out all the keys that are longer used > 4. bootstrap a new savepoint that contains the filtered state > > I managed to get this working using a sample pipeline and a very basic key > (a string), but when I switched the key to be something more complex (a > case class of two strings), I started seeing this exception: > Caused by: org.apache.flink.util.StateMigrationException: The new key > serializer must be compatible. > at > org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:170) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:141) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270) > ... 13 more > > Has anyone come across this before and figured out a fix? Any help you can > give would be greatly appreciated! > > Thanks, > -- > <http://segment.com/> > Mark Niehe · Software Engineer > Integrations > <https://segment.com/catalog?utm_source=signature&utm_medium=email> · > Blog <https://segment.com/blog?utm_source=signature&utm_medium=email> · > We're > Hiring! <https://segment.com/jobs?utm_source=signature&utm_medium=email> >