Hi,

I have a Flink job which uses the RocksDBStateBackend, which has been
running on a Flink 1.0 cluster.

The job is written in Scala, and I previously made some changes to the job
to ensure that state could be restored. For example, whenever I call `map`
or `flatMap` on a DataStream, I pass a named class: `.flatMap(new
MyCustomFlatMapper())` instead of an anonymous function.

I've been testing the job on Flink 1.2-SNAPSHOT, and I am no longer able to
restore state. I'm seeing exceptions which look like this when trying to
restore from a savepoint:

java.lang.RuntimeException: Could not initialize keyed state backend.
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.open(AbstractStreamOperator.java:148)
Caused by: java.lang.ClassNotFoundException:
com.joshfg.flink.job.MyJob$MyCustomFlatMapper$$anon$4$$anon$2
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:653)

I'm not passing any anonymous functions to `map` or `flatMap` on Flink
DataStreams, so it looks like this exception is caused just from using
Scala functions like `filter`, `map`, `flatMap` on standard Scala
collections, within my class `MyCustomFlatMapper`.

Are there any changes to the way Flink state is restored or to
RocksDBStateBackend, in the last 2-3 months, which could cause this to
happen?

If so, any advice on fixing it?

I'm hoping there's a better solution to this than rewriting the Flink job
in Java.

Thanks,

Josh

Reply via email to