Hi Chesnay, Thank you so much for the help. The fix is working now.
Thanks, Sudhansu On Mon, May 10, 2021 at 2:48 PM Chesnay Schepler <ches...@apache.org> wrote: > From what I can tell this method does exist in 1.12.2 . > > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/common/ExecutionConfig.html#addDefaultKryoSerializer-java.lang.Class-java.lang.Class > - > > > https://github.com/apache/flink/blob/release-1.12.2/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java#L730 > > On 5/10/2021 11:11 AM, sudhansu jena wrote: > > Hi, > > Thanks for the prompt response. I have already visited that page but in > the current flink version i.e 1.12.2, the method addDefaultKryoSerializer > is not available in the config object. > > Thanks, > Sudhansu > > On Mon, May 10, 2021 at 2:24 PM Chesnay Schepler <ches...@apache.org> > wrote: > >> Please have a look at >> https://stackoverflow.com/questions/32453030/using-an-collectionsunmodifiablecollection-with-apache-flink >> >> On 5/10/2021 10:48 AM, sudhansu jena wrote: >> >> Hi Team, >> >> We have recently enabled check pointing in our flink job using S3 as the >> state backend, but while submitting the Job, it fails with the below >> error.Can you please let us know what is going wrong here. >> >> >> Below is the code snippet for enabling check pointing. >> >> env.setStateBackend(new >> FsStateBackend("s3://bucket-name/job-name/",true)); >> env.enableCheckpointing(1000); >> CheckpointConfig config = env.getCheckpointConfig(); >> >> config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >> config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); >> >> flink-conf.yaml >> >> state.backend: filesystem >> state.checkpoints.dir: s3://flinkcheckpointing/checkpoint-metadata/ >> >> >> Error Logs : >> >> 2021-05-10 13:57:20 java.io.IOException: Could not perform checkpoint 1 >> for operator Co-Process-Broadcast -> Sink: SinkToOutputKafka (1/1)#0. at >> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:963) >> at >> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:115) >> at >> org.apache.flink.streaming.runtime.io.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:126) >> at >> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180) >> at >> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:157) >> at >> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:179) >> at >> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) >> at >> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:97) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396) >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at >> java.base/java.lang.Thread.run(Unknown Source) Caused by: >> org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete >> snapshot 1 for operator Co-Process-Broadcast -> Sink: SinkToOutputKafka >> (1/1)#0. Failure reason: Checkpoint was declined. at >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241) >> at >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) >> at >> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686) >> at >> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607) >> at >> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572) >> at >> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:947) >> ... 14 more Caused by: com.esotericsoftware.kryo.KryoException: >> java.lang.UnsupportedOperationException Serialization trace: rules >> (com.project.eventing.rule.AndCompositeAbstractRule) rule >> (com.project.eventing.model.producer.DataPair) ruleMap >> (com.project.eventing.model.producer.EventConfigState) at >> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) >> at >> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at >> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) >> at >> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at >> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) >> at >> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at >> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) >> at >> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273) >> at >> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111) >> at >> org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69) >> at >> org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84) >> at >> org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40) >> at >> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100) >> at >> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234) >> at >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213) >> ... 24 more Caused by: java.lang.UnsupportedOperationException at >> java.base/java.util.Collections$UnmodifiableCollection.add(Unknown Source) >> at >> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) >> at >> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at >> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) >> ... 43 more >> Thanks, >> Sudhansu >> >> >> >