Hello David,
> That is just to avoid falling back to Kryo serialization, which is less effective, but this IMO shouldn't break your application.
Thanks for that hint. I would have spend some hours working on that without fixing my issue.
> Maybe can you try to check these for possible null values / cyclic references?
I will have a look into that.
The data model is quite complex and automatically dervied from an OpenAPI spec from a 3GPP specification and I am not complete aware of all details of it, so there is a chance that the null or cyclic ref. case could be.
> . I think if I were to debug this, I'd start with writing a unit test for the state serialization (manually instantiating the serializer and running hand crafted value through the ser/de cycle to see if it fails).
I think that approach is reasonable and I will go for that.
Can you hint me to a starting point (docs, existing unit tests, code fragments) for manually doing a flink state serialization cycle?
Say I start with having some of my complex objectes already and how to go on now.
> I'd probably need a minimal case to reproduce this locally to get more insight.
Martin
What I understand from the docs [1] and David A. answer on SO [2] is, that I have to create for my POJOs with Lists and Maps TypeInfos.That is just to avoid falling back to Kryo serialization, which is less effective, but this IMO shouldn't break your application. There might be of course some edge case that Kryo can't handle, in that case switching to the POJO serializer might actually help.Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream Closed
Serialization trace:
IdString (de.martin.model.ContainerInfo)
pDUContainerInformation (de.martin.model.UsedContainer)
usedContainer (de.martin.model.UnitUsage)
usagePerKey (de.telekom.mschalln.processing.logic.Accumulator)Maybe can you try to check these for possible null values / cyclic references? These are just guesses. I think if I were to debug this, I'd start with writing a unit test for the state serialization (manually instantiating the serializer and running hand crafted value through the ser/de cycle to see if it fails).Other than that I can't really think of anything right now, I'd probably need a minimal case to reproduce this locally to get more insight.D.
On Fri, Jan 7, 2022 at 9:40 PM Martin <mar...@sonicdev.de> wrote:I changed my flink job having an explicit keyBy instead of reinterpretAsKeyedStream.
Situation is still the same, so its no problem with combination of reinterpretAsKeyedStream and Elastic Scaling in Reactive Mode.This time I was able to check the logs of the task managers and it seems to be a serialization problem of my state on recovery by checkpoint after elastic scaling.
Besides the mentioned sequence numbers my state has also some values which are nested POJOs having lists and hashmaps.
I assume its a problem with that.
What I understand from the docs [1] and David A. answer on SO [2] is, that I have to create for my POJOs with Lists and Maps TypeInfos.Is this correct?
P.S.: Awkward that this only happening on scale-in/failing, but I did not retried it with scale-out yet.
BR
MartinAttached the exceptions happening:
2022-01-07 14:24:56,845 WARN org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - Exception while restoring keyed state backend for KeyedProcessOperator_8b4e92de8fd229ebde272da0b8bf387e_(1/8) from alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:73) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) [flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) [flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) [flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist_2.12-1.14.2.jar:1.14.2]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream Closed
Serialization trace:
IdString (de.martin.model.ContainerInfo)
pDUContainerInformation (de.martin.model.UsedContainer)
usedContainer (de.martin.model.UnitUsage)
usagePerKey (de.telekom.mschalln.processing.logic.Accumulator)
at com.esotericsoftware.kryo.io.Input.fill(Input.java:148) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:779) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:79) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:258) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:220) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:166) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:62) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:174) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
... 20 more
Caused by: java.io.IOException: Stream Closed
at java.io.FileInputStream.readBytes(Native Method) ~[?:?]
at java.io.FileInputStream.read(Unknown Source) ~[?:?]
at org.apache.flink.core.fs.local.LocalDataInputStream.read(LocalDataInputStream.java:73) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:60) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:52) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at java.io.DataInputStream.read(Unknown Source) ~[?:?]
at org.apache.flink.api.java.typeutils.runtime.DataInputViewStream.read(DataInputViewStream.java:68) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.io.Input.fill(Input.java:146) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:779) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:79) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:258) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:220) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:166) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:62) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:174) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
... 20 more