Hi, I have a Flink job which uses the RocksDBStateBackend, which has been running on a Flink 1.0 cluster.
The job is written in Scala, and I previously made some changes to the job to ensure that state could be restored. For example, whenever I call `map` or `flatMap` on a DataStream, I pass a named class: `.flatMap(new MyCustomFlatMapper())` instead of an anonymous function. I've been testing the job on Flink 1.2-SNAPSHOT, and I am no longer able to restore state. I'm seeing exceptions which look like this when trying to restore from a savepoint: java.lang.RuntimeException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.open(AbstractStreamOperator.java:148) Caused by: java.lang.ClassNotFoundException: com.joshfg.flink.job.MyJob$MyCustomFlatMapper$$anon$4$$anon$2 at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:653) I'm not passing any anonymous functions to `map` or `flatMap` on Flink DataStreams, so it looks like this exception is caused just from using Scala functions like `filter`, `map`, `flatMap` on standard Scala collections, within my class `MyCustomFlatMapper`. Are there any changes to the way Flink state is restored or to RocksDBStateBackend, in the last 2-3 months, which could cause this to happen? If so, any advice on fixing it? I'm hoping there's a better solution to this than rewriting the Flink job in Java. Thanks, Josh