Hi Team,
We have recently enabled check pointing in our flink job using S3 as the
state backend, but while submitting the Job, it fails with the below
error.Can you please let us know what is going wrong here.
Below is the code snippet for enabling check pointing.
env.setStateBackend(new
FsStateBackend("s3://bucket-name/job-name/",true));
env.enableCheckpointing(1000);
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
flink-conf.yaml
state.backend: filesystem
state.checkpoints.dir: s3://flinkcheckpointing/checkpoint-metadata/
Error Logs :
2021-05-10 13:57:20
java.io.IOException: Could not perform checkpoint 1 for operator
Co-Process-Broadcast -> Sink: SinkToOutputKafka (1/1)#0.
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:963)
at
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:115)
at
org.apache.flink.streaming.runtime.io.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:126)
at
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
at
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:157)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:179)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:97)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could
not complete snapshot 1 for operator Co-Process-Broadcast -> Sink:
SinkToOutputKafka (1/1)#0. Failure reason: Checkpoint was declined.
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:947)
... 14 more
Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
rules (com.project.eventing.rule.AndCompositeAbstractRule)
rule (com.project.eventing.model.producer.DataPair)
ruleMap (com.project.eventing.model.producer.EventConfigState)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
at
org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111)
at
org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69)
at
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84)
at
org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213)
... 24 more
Caused by: java.lang.UnsupportedOperationException
at java.base/java.util.Collections$UnmodifiableCollection.add(Unknown
Source)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 43 more
Thanks,
Sudhansu