Hi ,

We have the following Flink Job that processes records from kafka based on
the rules we get from S3 files into broadcasted state.
Earlier we were able to spin a job with any number of task parallelism
without any issues.
Recently we made changes to the Broadcast state Structure and it is working
well upto parallelism of 3.
If we give parallelism of 4 or more , we end up getting
serialization exceptions which result in job failure. ( Block 4 as in the
Image)
Also If the leader job manager dies and a new one comes up , the other jobs
are restarted automatically but this job dies of serialization issues.
But when we start manually with a parallelism <= 3, it is working.

Programmatically this code is working when we tested with all possible test
cases.
How do we debug serialization issues that we face.
I have attached the exception of logs and the code related to it.

Let me know if any more details are required.





*KRYO SERIALIZAER INITIALISATON*

Class<?> unmodifiableCollectionsSerializer =
    Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig().addDefaultKryoSerializer(
    unmodifiableCollectionsSerializer,
    UnmodifiableCollectionsSerializer.class
);


*CONFIGSTATE INTERFACE(USED IN BROADCAST STATE)*

public interface EventConfigState {


    void createOrUpdateState(String key, DataPair dataPair);

    List<OutputMessage> executeRule(InputMessage inputMessage);

    Map<String, Set<DataPair>> getCurrentState();
}


*DERIVED EVENT CONFIG STATE IMPLEMENTATION*

public class DerivedEventConfigState  implements EventConfigState {

    Logger logger = LoggerFactory.getLogger(DerivedEventConfigState.class);
    private Map<String, Set<DataPair>> derivedConfigMap;

    public DerivedEventConfigState() {
        derivedConfigMap = new HashMap<>();
    }

    public void createOrUpdateState(String key, DataPair dataPair) {

        derivedConfigMap.putIfAbsent(key, new HashSet<>());
        if (derivedConfigMap.get(key).contains(dataPair)) {
            derivedConfigMap.get(key).remove(dataPair);
        }
        derivedConfigMap.get(key).add(dataPair);
    }

    @Override
    public List<OutputMessage> executeRule(InputMessage inputMessage) {

        String key = inputMessage.getKey();
        List<OutputMessage> outputMessageList = new ArrayList<>();

        if (derivedConfigMap.size() == 0) {
            logger.error("DerivedEventConfigMap is empty");
            return outputMessageList;
        }

        if ( derivedConfigMap.get(key) == null) {
            return outputMessageList;
        }

        for (DataPair dataPair : derivedConfigMap.get(key)) {
            IRule rule = dataPair.getRule();
            if (rule.isSatisfied(inputMessage)) {

                IMessageBuilder messageBuilder =
                    MessageBuilderFactory.getMessageBuilder("OutputMessage");
                OutputMessage outputMessage = messageBuilder.build(
                    inputMessage,
                    dataPair.getEventMessageDefinition()
                );
                outputMessageList.add(outputMessage);
            }
        }
        return outputMessageList;
    }

    @Override
    public Map<String, Set<DataPair>> getCurrentState() {
        return Collections.unmodifiableMap(derivedConfigMap);
    }

    @Override
    public String toString() {
        return "DerivedEventConfigState{"
            + "derivedConfigMap=" + derivedConfigMap
            + '}';
    }
}


Attached are three Exceptions thrown rando










































*Caused by: org.apache.flink.util.SerializedThrowable: TABLE-OP at
java.net.URLClassLoader.findClass(Unknown Source) ~[?:?] at
java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at java.lang.Class.forName0(Native
Method) ~[?:?] at java.lang.Class.forName(Unknown Source) ~[?:?] at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:947)
~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 14 more*


*Caused by: org.apache.flink.util.SerializedThrowable: Encountered
unregistered class ID: 97 Serialization trace: derivedConfigMap
(com.org.app.model.producer.DerivedEventConfigState) at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:947)
~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 14 more*

*Caused by: org.apache.flink.util.SerializedThrowable: Buffer underflow.
Serialization trace: logger
(com.org.app.model.producer.AppEventConfigState) at
com.esotericsoftware.kryo.io.Input.require(Input.java:181)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213)
~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 24 more*

Thanks
Prasanna,

Reply via email to