I changed my flink job having an explicit keyBy instead of reinterpretAsKeyedStream.
Situation is still the same, so its no problem with combination of reinterpretAsKeyedStream and Elastic Scaling in Reactive Mode.
This time I was able to check the logs of the task managers and it seems to be a serialization problem of my state on recovery by checkpoint after elastic scaling.
Besides the mentioned sequence numbers my state has also some values which are nested POJOs having lists and hashmaps.
I assume its a problem with that.
What I understand from the docs [1] and David A. answer on SO [2] is, that I have to create for my POJOs with Lists and Maps TypeInfos.
Is this correct?
P.S.: Awkward that this only happening on scale-in/failing, but I did not retried it with scale-out yet.
BR
Martin
Attached the exceptions happening:
2022-01-07 14:24:56,845 WARN org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - Exception while restoring keyed state backend for KeyedProcessOperator_8b4e92de8fd229ebde272da0b8bf387e_(1/8) from alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:73) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) [flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) [flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) [flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist_2.12-1.14.2.jar:1.14.2]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream Closed
Serialization trace:
IdString (de.martin.model.ContainerInfo)
pDUContainerInformation (de.martin.model.UsedContainer)
usedContainer (de.martin.model.UnitUsage)
usagePerKey (de.telekom.mschalln.processing.logic.Accumulator)
at com.esotericsoftware.kryo.io.Input.fill(Input.java:148) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:779) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:79) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:258) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:220) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:166) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:62) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:174) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
... 20 more
Caused by: java.io.IOException: Stream Closed
at java.io.FileInputStream.readBytes(Native Method) ~[?:?]
at java.io.FileInputStream.read(Unknown Source) ~[?:?]
at org.apache.flink.core.fs.local.LocalDataInputStream.read(LocalDataInputStream.java:73) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:60) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:52) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at java.io.DataInputStream.read(Unknown Source) ~[?:?]
at org.apache.flink.api.java.typeutils.runtime.DataInputViewStream.read(DataInputViewStream.java:68) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.io.Input.fill(Input.java:146) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:779) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:79) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:258) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:220) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:166) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:62) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:174) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
... 20 more
[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory
[2] https://stackoverflow.com/a/64721838/17274259