Thanks a lot for looking into this with so much detail, Juho! It was super 
helpful.

Shortly put: This is indeed a bug with Flink.
The HeapInternalTimerService should be performing compatibility checks on the 
restored / provided serializers and reconfigure serializers if possible, 
instead of just an equals check.
I think the problem only surfaced now with Flink out-of-the-box because in 
Flink 1.4 we changed how we treat Avro dependencies, which affected the default 
KryoSerializer registrations.

I’ve filed a JIRA for this issue: 
https://issues.apache.org/jira/browse/FLINK-8421. The issue is made a blocker 
for 1.4.1, so we should expect that to be fixed in the next bugfix release.
Unfortunately, I don’t think there is a easy workaround for the issue at the 
moment.

Best,
Gordon

On 12 January 2018 at 11:07:18 PM, Juho Autio (juho.au...@rovio.com) wrote:

Thanks, the window operator is just:

.timeWindow(Time.seconds(10))

We haven't changed key types.



I tried debugging this issue in IDE and found the root cause to be this:

!this.keyDeserializer.equals(keySerializer) -> true
=> throw new IllegalStateException("Tried to initialize restored TimerService 
with different serializers than those used to snapshot its state.");

This is in HeapInternalTimerService#startTimerService.



With debugger I can see this:

keySerializer = 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@45f8043a
keyDeserializer = 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@45f8043a
namespaceDeserializer = 
org.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer@69d3cf7e

this.keySerializer = null
this.keyDeserializer = 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@e26116cd
this.namespaceDeserializer = null

Now, as the problamatic difference comes from comparison of 
this.keyDeserializer & keySerializer, some further details on those:
this.keyDeserializer.type = java.lang.Object (java.lang.class@325)
keySerializer.type = java.lang.Object (java.lang.class@325)

I dug in deeper to KryoSerializer#equals, found this condition to be the one 
that fails:

Objects.equals(this.kryoRegistrations, other.kryoRegistrations) -> false

Takes me down to KryoRegistration#equals:

this.registeredClass = class 
org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroRegisteredClass
other.registeredClass = class org.apache.avro.generic.GenericData$Array

this.serializerDefinitionType = INSTANCE
other.serializerDefinitionType = CLASS

this.serializerClass = null
other.serializerClass = class 
org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass

Weird huh? I can't see how I would've changed anything related to these when 
making those minor code changes required in upgrading to 1.4.

Cheers,
Juho

On Fri, Jan 12, 2018 at 2:58 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
Hi Juho,

Could your key type have possibly changed / been modified across the
upgrade?
Also, from the error trace, it seems like the failing restore is of a window
operator. What window type are you using?

That exception is a result of either mismatching key serializers or
namespace serializers (i.e. a window serializer), so the above info should
help us narrow down the issue here.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to