Hi Team,

We have recently enabled check pointing in our flink job using S3 as the
state backend, but while submitting the Job, it fails with the below
error.Can you please let us know what is going wrong here.


Below is the code snippet for enabling check pointing.

       env.setStateBackend(new
FsStateBackend("s3://bucket-name/job-name/",true));
        env.enableCheckpointing(1000);
        CheckpointConfig config = env.getCheckpointConfig();

config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

flink-conf.yaml

state.backend: filesystem
state.checkpoints.dir: s3://flinkcheckpointing/checkpoint-metadata/


Error Logs :

2021-05-10 13:57:20
java.io.IOException: Could not perform checkpoint 1 for operator
Co-Process-Broadcast -> Sink: SinkToOutputKafka (1/1)#0.
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:963)
at
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:115)
at
org.apache.flink.streaming.runtime.io.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:126)
at
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
at
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:157)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:179)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:97)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could
not complete snapshot 1 for operator Co-Process-Broadcast -> Sink:
SinkToOutputKafka (1/1)#0. Failure reason: Checkpoint was declined.
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:947)
... 14 more
Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
rules (com.project.eventing.rule.AndCompositeAbstractRule)
rule (com.project.eventing.model.producer.DataPair)
ruleMap (com.project.eventing.model.producer.EventConfigState)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
at
org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111)
at
org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69)
at
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84)
at
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213)
... 24 more
Caused by: java.lang.UnsupportedOperationException
at java.base/java.util.Collections$UnmodifiableCollection.add(Unknown
Source)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 43 more






Thanks,
Sudhansu

Reply via email to