Hi, Thanks for the prompt response. I have already visited that page but in the current flink version i.e 1.12.2, the method addDefaultKryoSerializer is not available in the config object.
Thanks, Sudhansu On Mon, May 10, 2021 at 2:24 PM Chesnay Schepler <ches...@apache.org> wrote: > Please have a look at > https://stackoverflow.com/questions/32453030/using-an-collectionsunmodifiablecollection-with-apache-flink > > On 5/10/2021 10:48 AM, sudhansu jena wrote: > > Hi Team, > > We have recently enabled check pointing in our flink job using S3 as the > state backend, but while submitting the Job, it fails with the below > error.Can you please let us know what is going wrong here. > > > Below is the code snippet for enabling check pointing. > > env.setStateBackend(new > FsStateBackend("s3://bucket-name/job-name/",true)); > env.enableCheckpointing(1000); > CheckpointConfig config = env.getCheckpointConfig(); > > config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); > > flink-conf.yaml > > state.backend: filesystem > state.checkpoints.dir: s3://flinkcheckpointing/checkpoint-metadata/ > > > Error Logs : > > 2021-05-10 13:57:20 java.io.IOException: Could not perform checkpoint 1 > for operator Co-Process-Broadcast -> Sink: SinkToOutputKafka (1/1)#0. at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:963) > at > org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:115) > at > org.apache.flink.streaming.runtime.io.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:126) > at > org.apache.flink.streaming.runtime.io.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180) > at > org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:157) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:179) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:97) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at > java.base/java.lang.Thread.run(Unknown Source) Caused by: > org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete > snapshot 1 for operator Co-Process-Broadcast -> Sink: SinkToOutputKafka > (1/1)#0. Failure reason: Checkpoint was declined. at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572) > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:947) > ... 14 more Caused by: com.esotericsoftware.kryo.KryoException: > java.lang.UnsupportedOperationException Serialization trace: rules > (com.project.eventing.rule.AndCompositeAbstractRule) rule > (com.project.eventing.model.producer.DataPair) ruleMap > (com.project.eventing.model.producer.EventConfigState) at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273) > at > org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111) > at > org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69) > at > org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84) > at > org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213) > ... 24 more Caused by: java.lang.UnsupportedOperationException at > java.base/java.util.Collections$UnmodifiableCollection.add(Unknown Source) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > ... 43 more > Thanks, > Sudhansu > > >