From what I can tell this method does exist in 1.12.2 .

https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/common/ExecutionConfig.html#addDefaultKryoSerializer-java.lang.Class-java.lang.Class-

https://github.com/apache/flink/blob/release-1.12.2/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java#L730

On 5/10/2021 11:11 AM, sudhansu jena wrote:
Hi,

Thanks for the prompt response. I have already visited that page but in the current flink version i.e 1.12.2, the method addDefaultKryoSerializer is not available in the config object.

Thanks,
Sudhansu

On Mon, May 10, 2021 at 2:24 PM Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>> wrote:

    Please have a look at
    
https://stackoverflow.com/questions/32453030/using-an-collectionsunmodifiablecollection-with-apache-flink
    
<https://stackoverflow.com/questions/32453030/using-an-collectionsunmodifiablecollection-with-apache-flink>

    On 5/10/2021 10:48 AM, sudhansu jena wrote:
    Hi Team,

    We have recently enabled check pointing in our flink job using S3
    as the state backend, but while submitting the Job, it fails with
    the below error.Can you please let us know what is going wrong here.


    Below is the code snippet for enabling check pointing.

           env.setStateBackend(new
    FsStateBackend("s3://bucket-name/job-name/",true));
            env.enableCheckpointing(1000);
            CheckpointConfig config = env.getCheckpointConfig();
    
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

    flink-conf.yaml

    state.backend: filesystem
    state.checkpoints.dir: s3://flinkcheckpointing/checkpoint-metadata/


    Error Logs :

    2021-05-10 13:57:20 java.io.IOException: Could not perform
    checkpoint 1 for operator Co-Process-Broadcast -> Sink:
    SinkToOutputKafka (1/1)#0. at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:963)
    at
    
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:115)
    at
    
org.apache.flink.streaming.runtime.io.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:126)
    at
    
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
    at
    
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:157)
    at
    
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:179)
    at
    
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at
    
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:97)
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
    at
    
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.base/java.lang.Thread.run(Unknown Source) Caused by:
    org.apache.flink.runtime.checkpoint.CheckpointException: Could
    not complete snapshot 1 for operator Co-Process-Broadcast ->
    Sink: SinkToOutputKafka (1/1)#0. Failure reason: Checkpoint was
    declined. at
    
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
    at
    
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
    at
    
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
    at
    
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686)
    at
    
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607)
    at
    
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572)
    at
    
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004)
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988)
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:947)
    ... 14 more Caused by: com.esotericsoftware.kryo.KryoException:
    java.lang.UnsupportedOperationException Serialization trace:
    rules (com.project.eventing.rule.AndCompositeAbstractRule) rule
    (com.project.eventing.model.producer.DataPair) ruleMap
    (com.project.eventing.model.producer.EventConfigState) at
    com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at
    
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at
    com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
    at
    
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at
    com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at
    
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
    at
    
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at
    com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
    at
    
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at
    
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
    at
    
org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111)
    at
    
org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69)
    at
    
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84)
    at
    
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40)
    at
    
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100)
    at
    
org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234)
    at
    
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213)
    ... 24 more Caused by: java.lang.UnsupportedOperationException at
    java.base/java.util.Collections$UnmodifiableCollection.add(Unknown
    Source) at
    
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
    at
    
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at
    com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
    ... 43 more
    Thanks,
    Sudhansu



Reply via email to