Hi Stefan, Sorry for the late reply - I was away last week. I've just got round to retrying my above scenario (run my job, take a savepoint, restore my job) using the latest Flink 1.2-SNAPSHOT -- and am now seeing a different exception when restoring the state:
10/03/2016 11:29:02 Job execution switched to status FAILING. java.lang.RuntimeException: Could not deserialize NFA. at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize( JavaSerializer.java:86) at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize( JavaSerializer.java:31) at org.apache.flink.runtime.state.DefaultOperatorStateBackend. getPartitionableState(DefaultOperatorStateBackend.java:107) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase. initializeState(FlinkKafkaConsumerBase.java:323) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open( AbstractUdfStreamOperator.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask. openAllOperators(StreamTask.java:396) at org.apache.flink.streaming.runtime.tasks.StreamTask. invoke(StreamTask.java:269) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming. connectors.kafka.internals.KafkaTopicPartition Any ideas what's going on here? Is the Kafka consumer state management broken right now in Flink master? Thanks, Josh On Thu, Sep 22, 2016 at 9:28 AM, Stefan Richter <s.rich...@data-artisans.com > wrote: > Hi, > > to me, this looks like you are running into the problem described under > [FLINK-4603] : KeyedStateBackend cannot restore user code classes. I have > opened a pull request (PR 2533) this morning that should fix this behavior > as soon as it is merged into master. > > Best, > Stefan > > Am 21.09.2016 um 23:49 schrieb Josh <jof...@gmail.com>: > > Hi Stephan, > > Thanks for the reply. I should have been a bit clearer but actually I was > not trying to restore a 1.0 savepoint with 1.2. I ran my job on 1.2 from > scratch (starting with no state), then took a savepoint and tried to > restart it from the savepoint - and that's when I get this exception. If I > do this with the same job using an older version of Flink (1.1-SNAPSHOT > taken in June), the savepoint and restore works fine. > > I wanted to use 1.2-SNAPSHOT because it has some changes I'd like to use > (improvements to Kinesis connector + the bucketing sink). Anyway for now I > have things working with an older version of Flink - but it would be good > to know what's changed recently that's causing the restore to break and if > my job is not going to be compatible with future releases of Flink. > > Best, > Josh > > On Wed, Sep 21, 2016 at 8:21 PM, Stephan Ewen <se...@apache.org> wrote: > >> Hi Josh! >> >> The state code on 1.2-SNAPSHOT is undergoing pretty heavy changes right >> now, in order to add the elasticity feature (change parallelism or running >> jobs and still maintaining exactly once guarantees). >> At this point, no 1.0 savepoints can be restored in 1.2-SNAPSHOT. We will >> try and add compatibility towards 1.1 savepoints before the release of >> version 1.2. >> >> I think the exception is probably caused by the fact that old savepoint >> stored some serialized user code (the new one is not expected to) which >> cannot be loaded. >> >> Adding Aljoscha and Stefan to this, to see if they can add anything. >> In any case, this should have a much better error message. >> >> I hope you understand that the 1.2-SNAPSHOT version is in some parts WIP, >> so not really recommended for general use. >> >> Does version 1.1 not work for you? >> >> Greetings, >> Stephan >> >> >> On Wed, Sep 21, 2016 at 7:44 PM, Josh <jof...@gmail.com> wrote: >> >>> Hi, >>> >>> I have a Flink job which uses the RocksDBStateBackend, which has been >>> running on a Flink 1.0 cluster. >>> >>> The job is written in Scala, and I previously made some changes to the >>> job to ensure that state could be restored. For example, whenever I call >>> `map` or `flatMap` on a DataStream, I pass a named class: `.flatMap(new >>> MyCustomFlatMapper())` instead of an anonymous function. >>> >>> I've been testing the job on Flink 1.2-SNAPSHOT, and I am no longer able >>> to restore state. I'm seeing exceptions which look like this when trying to >>> restore from a savepoint: >>> >>> java.lang.RuntimeException: Could not initialize keyed state backend. >>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera >>> tor.open(AbstractStreamOperator.java:148) >>> Caused by: java.lang.ClassNotFoundException: >>> com.joshfg.flink.job.MyJob$MyCustomFlatMapper$$anon$4$$anon$2 >>> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa >>> ckend$RocksDBRestoreOperation.restoreKVStateMetaData(RocksDB >>> KeyedStateBackend.java:653) >>> >>> I'm not passing any anonymous functions to `map` or `flatMap` on Flink >>> DataStreams, so it looks like this exception is caused just from using >>> Scala functions like `filter`, `map`, `flatMap` on standard Scala >>> collections, within my class `MyCustomFlatMapper`. >>> >>> Are there any changes to the way Flink state is restored or to >>> RocksDBStateBackend, in the last 2-3 months, which could cause this to >>> happen? >>> >>> If so, any advice on fixing it? >>> >>> I'm hoping there's a better solution to this than rewriting the Flink >>> job in Java. >>> >>> Thanks, >>> >>> Josh >>> >> >> > >