Hi , We have the following Flink Job that processes records from kafka based on the rules we get from S3 files into broadcasted state. Earlier we were able to spin a job with any number of task parallelism without any issues. Recently we made changes to the Broadcast state Structure and it is working well upto parallelism of 3. If we give parallelism of 4 or more , we end up getting serialization exceptions which result in job failure. ( Block 4 as in the Image) Also If the leader job manager dies and a new one comes up , the other jobs are restarted automatically but this job dies of serialization issues. But when we start manually with a parallelism <= 3, it is working.
Programmatically this code is working when we tested with all possible test cases. How do we debug serialization issues that we face. I have attached the exception of logs and the code related to it. Let me know if any more details are required. *KRYO SERIALIZAER INITIALISATON* Class<?> unmodifiableCollectionsSerializer = Class.forName("java.util.Collections$UnmodifiableCollection"); env.getConfig().addDefaultKryoSerializer( unmodifiableCollectionsSerializer, UnmodifiableCollectionsSerializer.class ); *CONFIGSTATE INTERFACE(USED IN BROADCAST STATE)* public interface EventConfigState { void createOrUpdateState(String key, DataPair dataPair); List<OutputMessage> executeRule(InputMessage inputMessage); Map<String, Set<DataPair>> getCurrentState(); } *DERIVED EVENT CONFIG STATE IMPLEMENTATION* public class DerivedEventConfigState implements EventConfigState { Logger logger = LoggerFactory.getLogger(DerivedEventConfigState.class); private Map<String, Set<DataPair>> derivedConfigMap; public DerivedEventConfigState() { derivedConfigMap = new HashMap<>(); } public void createOrUpdateState(String key, DataPair dataPair) { derivedConfigMap.putIfAbsent(key, new HashSet<>()); if (derivedConfigMap.get(key).contains(dataPair)) { derivedConfigMap.get(key).remove(dataPair); } derivedConfigMap.get(key).add(dataPair); } @Override public List<OutputMessage> executeRule(InputMessage inputMessage) { String key = inputMessage.getKey(); List<OutputMessage> outputMessageList = new ArrayList<>(); if (derivedConfigMap.size() == 0) { logger.error("DerivedEventConfigMap is empty"); return outputMessageList; } if ( derivedConfigMap.get(key) == null) { return outputMessageList; } for (DataPair dataPair : derivedConfigMap.get(key)) { IRule rule = dataPair.getRule(); if (rule.isSatisfied(inputMessage)) { IMessageBuilder messageBuilder = MessageBuilderFactory.getMessageBuilder("OutputMessage"); OutputMessage outputMessage = messageBuilder.build( inputMessage, dataPair.getEventMessageDefinition() ); outputMessageList.add(outputMessage); } } return outputMessageList; } @Override public Map<String, Set<DataPair>> getCurrentState() { return Collections.unmodifiableMap(derivedConfigMap); } @Override public String toString() { return "DerivedEventConfigState{" + "derivedConfigMap=" + derivedConfigMap + '}'; } } Attached are three Exceptions thrown rando *Caused by: org.apache.flink.util.SerializedThrowable: TABLE-OP at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?] at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at java.lang.Class.forName0(Native Method) ~[?:?] at java.lang.Class.forName(Unknown Source) ~[?:?] at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:947) ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 14 more* *Caused by: org.apache.flink.util.SerializedThrowable: Encountered unregistered class ID: 97 Serialization trace: derivedConfigMap (com.org.app.model.producer.DerivedEventConfigState) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:947) ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 14 more* *Caused by: org.apache.flink.util.SerializedThrowable: Buffer underflow. Serialization trace: logger (com.org.app.model.producer.AppEventConfigState) at com.esotericsoftware.kryo.io.Input.require(Input.java:181) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213) ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 24 more* Thanks Prasanna,