Thanks a lot for looking into this with so much detail, Juho! It was super helpful.
Shortly put: This is indeed a bug with Flink. The HeapInternalTimerService should be performing compatibility checks on the restored / provided serializers and reconfigure serializers if possible, instead of just an equals check. I think the problem only surfaced now with Flink out-of-the-box because in Flink 1.4 we changed how we treat Avro dependencies, which affected the default KryoSerializer registrations. I’ve filed a JIRA for this issue: https://issues.apache.org/jira/browse/FLINK-8421. The issue is made a blocker for 1.4.1, so we should expect that to be fixed in the next bugfix release. Unfortunately, I don’t think there is a easy workaround for the issue at the moment. Best, Gordon On 12 January 2018 at 11:07:18 PM, Juho Autio (juho.au...@rovio.com) wrote: Thanks, the window operator is just: .timeWindow(Time.seconds(10)) We haven't changed key types. I tried debugging this issue in IDE and found the root cause to be this: !this.keyDeserializer.equals(keySerializer) -> true => throw new IllegalStateException("Tried to initialize restored TimerService with different serializers than those used to snapshot its state."); This is in HeapInternalTimerService#startTimerService. With debugger I can see this: keySerializer = org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@45f8043a keyDeserializer = org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@45f8043a namespaceDeserializer = org.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer@69d3cf7e this.keySerializer = null this.keyDeserializer = org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@e26116cd this.namespaceDeserializer = null Now, as the problamatic difference comes from comparison of this.keyDeserializer & keySerializer, some further details on those: this.keyDeserializer.type = java.lang.Object (java.lang.class@325) keySerializer.type = java.lang.Object (java.lang.class@325) I dug in deeper to KryoSerializer#equals, found this condition to be the one that fails: Objects.equals(this.kryoRegistrations, other.kryoRegistrations) -> false Takes me down to KryoRegistration#equals: this.registeredClass = class org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroRegisteredClass other.registeredClass = class org.apache.avro.generic.GenericData$Array this.serializerDefinitionType = INSTANCE other.serializerDefinitionType = CLASS this.serializerClass = null other.serializerClass = class org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass Weird huh? I can't see how I would've changed anything related to these when making those minor code changes required in upgrading to 1.4. Cheers, Juho On Fri, Jan 12, 2018 at 2:58 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: Hi Juho, Could your key type have possibly changed / been modified across the upgrade? Also, from the error trace, it seems like the failing restore is of a window operator. What window type are you using? That exception is a result of either mismatching key serializers or namespace serializers (i.e. a window serializer), so the above info should help us narrow down the issue here. Cheers, Gordon -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/