Hi Stefan,

Sorry for the late reply - I was away last week.
I've just got round to retrying my above scenario (run my job, take a
savepoint, restore my job) using the latest Flink 1.2-SNAPSHOT  -- and am
now seeing a different exception when restoring the state:

10/03/2016 11:29:02 Job execution switched to status FAILING.
java.lang.RuntimeException: Could not deserialize NFA.
at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(
JavaSerializer.java:86)
at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(
JavaSerializer.java:31)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.
getPartitionableState(DefaultOperatorStateBackend.java:107)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.
initializeState(FlinkKafkaConsumerBase.java:323)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(
AbstractUdfStreamOperator.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.
openAllOperators(StreamTask.java:396)
at org.apache.flink.streaming.runtime.tasks.StreamTask.
invoke(StreamTask.java:269)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.
connectors.kafka.internals.KafkaTopicPartition

Any ideas what's going on here? Is the Kafka consumer state management
broken right now in Flink master?

Thanks,
Josh


On Thu, Sep 22, 2016 at 9:28 AM, Stefan Richter <s.rich...@data-artisans.com
> wrote:

> Hi,
>
> to me, this looks like you are running into the problem described under
> [FLINK-4603] : KeyedStateBackend cannot restore user code classes. I have
> opened a pull request (PR 2533) this morning that should fix this behavior
> as soon as it is merged into master.
>
> Best,
> Stefan
>
> Am 21.09.2016 um 23:49 schrieb Josh <jof...@gmail.com>:
>
> Hi Stephan,
>
> Thanks for the reply. I should have been a bit clearer but actually I was
> not trying to restore a 1.0 savepoint with 1.2. I ran my job on 1.2 from
> scratch (starting with no state), then took a savepoint and tried to
> restart it from the savepoint - and that's when I get this exception. If I
> do this with the same job using an older version of Flink (1.1-SNAPSHOT
> taken in June), the savepoint and restore works fine.
>
> I wanted to use 1.2-SNAPSHOT because it has some changes I'd like to use
> (improvements to Kinesis connector + the bucketing sink). Anyway for now I
> have things working with an older version of Flink - but it would be good
> to know what's changed recently that's causing the restore to break and if
> my job is not going to be compatible with future releases of Flink.
>
> Best,
> Josh
>
> On Wed, Sep 21, 2016 at 8:21 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi Josh!
>>
>> The state code on 1.2-SNAPSHOT is undergoing pretty heavy changes right
>> now, in order to add the elasticity feature (change parallelism or running
>> jobs and still maintaining exactly once guarantees).
>> At this point, no 1.0 savepoints can be restored in 1.2-SNAPSHOT. We will
>> try and add compatibility towards 1.1 savepoints before the release of
>> version 1.2.
>>
>> I think the exception is probably caused by the fact that old savepoint
>> stored some serialized user code (the new one is not expected to) which
>> cannot be loaded.
>>
>> Adding Aljoscha and Stefan to this, to see if they can add anything.
>> In any case, this should have a much better error message.
>>
>> I hope you understand that the 1.2-SNAPSHOT version is in some parts WIP,
>> so not really recommended for general use.
>>
>> Does version 1.1 not work for you?
>>
>> Greetings,
>> Stephan
>>
>>
>> On Wed, Sep 21, 2016 at 7:44 PM, Josh <jof...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a Flink job which uses the RocksDBStateBackend, which has been
>>> running on a Flink 1.0 cluster.
>>>
>>> The job is written in Scala, and I previously made some changes to the
>>> job to ensure that state could be restored. For example, whenever I call
>>> `map` or `flatMap` on a DataStream, I pass a named class: `.flatMap(new
>>> MyCustomFlatMapper())` instead of an anonymous function.
>>>
>>> I've been testing the job on Flink 1.2-SNAPSHOT, and I am no longer able
>>> to restore state. I'm seeing exceptions which look like this when trying to
>>> restore from a savepoint:
>>>
>>> java.lang.RuntimeException: Could not initialize keyed state backend.
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>> tor.open(AbstractStreamOperator.java:148)
>>> Caused by: java.lang.ClassNotFoundException:
>>> com.joshfg.flink.job.MyJob$MyCustomFlatMapper$$anon$4$$anon$2
>>> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBa
>>> ckend$RocksDBRestoreOperation.restoreKVStateMetaData(RocksDB
>>> KeyedStateBackend.java:653)
>>>
>>> I'm not passing any anonymous functions to `map` or `flatMap` on Flink
>>> DataStreams, so it looks like this exception is caused just from using
>>> Scala functions like `filter`, `map`, `flatMap` on standard Scala
>>> collections, within my class `MyCustomFlatMapper`.
>>>
>>> Are there any changes to the way Flink state is restored or to
>>> RocksDBStateBackend, in the last 2-3 months, which could cause this to
>>> happen?
>>>
>>> If so, any advice on fixing it?
>>>
>>> I'm hoping there's a better solution to this than rewriting the Flink
>>> job in Java.
>>>
>>> Thanks,
>>>
>>> Josh
>>>
>>
>>
>
>

Reply via email to