Any thoughts on these ? Thanks, Prasanna.
On Sat, Oct 30, 2021 at 7:25 PM Prasanna kumar < prasannakumarram...@gmail.com> wrote: > Hi , > > We have the following Flink Job that processes records from kafka based on > the rules we get from S3 files into broadcasted state. > Earlier we were able to spin a job with any number of task parallelism > without any issues. > Recently we made changes to the Broadcast state Structure and it is > working well upto parallelism of 3. > If we give parallelism of 4 or more , we end up getting > serialization exceptions which result in job failure. ( Block 4 as in the > Image) > Also If the leader job manager dies and a new one comes up , the other > jobs are restarted automatically but this job dies of serialization issues. > But when we start manually with a parallelism <= 3, it is working. > > Programmatically this code is working when we tested with all possible > test cases. > How do we debug serialization issues that we face. > I have attached the exception of logs and the code related to it. > > Let me know if any more details are required. > > > > > > *KRYO SERIALIZAER INITIALISATON* > > Class<?> unmodifiableCollectionsSerializer = > Class.forName("java.util.Collections$UnmodifiableCollection"); > env.getConfig().addDefaultKryoSerializer( > unmodifiableCollectionsSerializer, > UnmodifiableCollectionsSerializer.class > ); > > > *CONFIGSTATE INTERFACE(USED IN BROADCAST STATE)* > > public interface EventConfigState { > > > void createOrUpdateState(String key, DataPair dataPair); > > List<OutputMessage> executeRule(InputMessage inputMessage); > > Map<String, Set<DataPair>> getCurrentState(); > } > > > *DERIVED EVENT CONFIG STATE IMPLEMENTATION* > > public class DerivedEventConfigState implements EventConfigState { > > Logger logger = LoggerFactory.getLogger(DerivedEventConfigState.class); > private Map<String, Set<DataPair>> derivedConfigMap; > > public DerivedEventConfigState() { > derivedConfigMap = new HashMap<>(); > } > > public void createOrUpdateState(String key, DataPair dataPair) { > > derivedConfigMap.putIfAbsent(key, new HashSet<>()); > if (derivedConfigMap.get(key).contains(dataPair)) { > derivedConfigMap.get(key).remove(dataPair); > } > derivedConfigMap.get(key).add(dataPair); > } > > @Override > public List<OutputMessage> executeRule(InputMessage inputMessage) { > > String key = inputMessage.getKey(); > List<OutputMessage> outputMessageList = new ArrayList<>(); > > if (derivedConfigMap.size() == 0) { > logger.error("DerivedEventConfigMap is empty"); > return outputMessageList; > } > > if ( derivedConfigMap.get(key) == null) { > return outputMessageList; > } > > for (DataPair dataPair : derivedConfigMap.get(key)) { > IRule rule = dataPair.getRule(); > if (rule.isSatisfied(inputMessage)) { > > IMessageBuilder messageBuilder = > MessageBuilderFactory.getMessageBuilder("OutputMessage"); > OutputMessage outputMessage = messageBuilder.build( > inputMessage, > dataPair.getEventMessageDefinition() > ); > outputMessageList.add(outputMessage); > } > } > return outputMessageList; > } > > @Override > public Map<String, Set<DataPair>> getCurrentState() { > return Collections.unmodifiableMap(derivedConfigMap); > } > > @Override > public String toString() { > return "DerivedEventConfigState{" > + "derivedConfigMap=" + derivedConfigMap > + '}'; > } > } > > > Attached are three Exceptions thrown rando > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *Caused by: org.apache.flink.util.SerializedThrowable: TABLE-OP at > java.net.URLClassLoader.findClass(Unknown Source) ~[?:?] at > java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] at > org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at java.lang.Class.forName0(Native > Method) ~[?:?] at java.lang.Class.forName(Unknown Source) ~[?:?] at > com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:947) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 14 more* > > > *Caused by: org.apache.flink.util.SerializedThrowable: Encountered > unregistered class ID: 97 Serialization trace: derivedConfigMap > (com.org.app.model.producer.DerivedEventConfigState) at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:947) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 14 more* > > *Caused by: org.apache.flink.util.SerializedThrowable: Buffer underflow. > Serialization trace: logger > (com.org.app.model.producer.AppEventConfigState) at > com.esotericsoftware.kryo.io.Input.require(Input.java:181) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 24 more* > > Thanks > Prasanna, >