>
> What I understand from the docs [1] and David A. answer on SO [2] is, that
> I have to create for my POJOs with Lists and Maps TypeInfos.
>

That is just to avoid falling back to Kryo serialization, which is less
effective, but this IMO shouldn't break your application. There might be of
course some edge case that Kryo can't handle, in that case switching to the
POJO serializer might actually help.

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
Stream Closed
Serialization trace:
IdString (de.martin.model.ContainerInfo)
pDUContainerInformation (de.martin.model.UsedContainer)
usedContainer (de.martin.model.UnitUsage)
usagePerKey (de.telekom.mschalln.processing.logic.Accumulator)

Maybe can you try to check these for possible null values / cyclic
references? These are just guesses. I think if I were to debug this, I'd
start with writing a unit test for the state serialization (manually
instantiating the serializer and running hand crafted value through the
ser/de cycle to see if it fails).

Other than that I can't really think of anything right now, I'd probably
need a minimal case to reproduce this locally to get more insight.

D.



On Fri, Jan 7, 2022 at 9:40 PM Martin <mar...@sonicdev.de> wrote:

> I changed my flink job having an explicit keyBy instead of
> reinterpretAsKeyedStream.
> Situation is still the same, so its no problem with combination of
> reinterpretAsKeyedStream and Elastic Scaling in Reactive Mode.
>
> This time I was able to check the logs of the task managers and it seems
> to be a serialization problem of my state on recovery by checkpoint after
> elastic scaling.
> Besides the mentioned sequence numbers my state has also some values which
> are nested POJOs having lists and hashmaps.
> I assume its a problem with that.
> What I understand from the docs [1] and David A. answer on SO [2] is, that
> I have to create for my POJOs with Lists and Maps TypeInfos.
>
> Is this correct?
>
> P.S.: Awkward that this only happening on scale-in/failing, but I did not
> retried it with scale-out yet.
>
> BR
> Martin
>
> Attached the exceptions happening:
>
> 2022-01-07 14:24:56,845 WARN
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] -
> Exception while restoring keyed state backend for
> KeyedProcessOperator_8b4e92de8fd229ebde272da0b8bf387e_(1/8) from
> alternative (1/1), will retry while more alternatives are available.
> org.apache.flink.runtime.state.BackendBuildingException: Failed when
> trying to restore heap backend
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:73)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> [flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> [flink-dist_2.12-1.14.2.jar:1.14.2]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> [flink-dist_2.12-1.14.2.jar:1.14.2]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> [flink-dist_2.12-1.14.2.jar:1.14.2]
> at java.lang.Thread.run(Unknown Source) [?:?]
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
> Stream Closed
> Serialization trace:
> IdString (de.martin.model.ContainerInfo)
> pDUContainerInformation (de.martin.model.UsedContainer)
> usedContainer (de.martin.model.UnitUsage)
> usagePerKey (de.telekom.mschalln.processing.logic.Accumulator)
> at com.esotericsoftware.kryo.io.Input.fill(Input.java:148)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:779)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:79)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:258)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:220)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:166)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:62)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:174)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> ... 20 more
> Caused by: java.io.IOException: Stream Closed
> at java.io.FileInputStream.readBytes(Native Method) ~[?:?]
> at java.io.FileInputStream.read(Unknown Source) ~[?:?]
> at
> org.apache.flink.core.fs.local.LocalDataInputStream.read(LocalDataInputStream.java:73)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:60)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:52)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at java.io.DataInputStream.read(Unknown Source) ~[?:?]
> at
> org.apache.flink.api.java.typeutils.runtime.DataInputViewStream.read(DataInputViewStream.java:68)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at com.esotericsoftware.kryo.io.Input.fill(Input.java:146)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:779)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:79)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:258)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:220)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:166)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:62)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:174)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> ... 20 more
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory
>
> [2] https://stackoverflow.com/a/64721838/17274259
>

Reply via email to