Hi Chesnay,

Thank you so much for the help. The fix is working now.

Thanks,
Sudhansu

On Mon, May 10, 2021 at 2:48 PM Chesnay Schepler <ches...@apache.org> wrote:

> From what I can tell this method does exist in 1.12.2 .
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/common/ExecutionConfig.html#addDefaultKryoSerializer-java.lang.Class-java.lang.Class
> -
>
>
> https://github.com/apache/flink/blob/release-1.12.2/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java#L730
>
> On 5/10/2021 11:11 AM, sudhansu jena wrote:
>
> Hi,
>
> Thanks for the prompt response. I have already visited that page but in
> the current flink version i.e 1.12.2, the method addDefaultKryoSerializer
> is not available in the config object.
>
> Thanks,
> Sudhansu
>
> On Mon, May 10, 2021 at 2:24 PM Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> Please have a look at
>> https://stackoverflow.com/questions/32453030/using-an-collectionsunmodifiablecollection-with-apache-flink
>>
>> On 5/10/2021 10:48 AM, sudhansu jena wrote:
>>
>> Hi Team,
>>
>> We have recently enabled check pointing in our flink job using S3 as the
>> state backend, but while submitting the Job, it fails with the below
>> error.Can you please let us know what is going wrong here.
>>
>>
>> Below is the code snippet for enabling check pointing.
>>
>>        env.setStateBackend(new
>> FsStateBackend("s3://bucket-name/job-name/",true));
>>         env.enableCheckpointing(1000);
>>         CheckpointConfig config = env.getCheckpointConfig();
>>
>> config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>         config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
>>
>> flink-conf.yaml
>>
>> state.backend: filesystem
>> state.checkpoints.dir: s3://flinkcheckpointing/checkpoint-metadata/
>>
>>
>> Error Logs :
>>
>> 2021-05-10 13:57:20 java.io.IOException: Could not perform checkpoint 1
>> for operator Co-Process-Broadcast -> Sink: SinkToOutputKafka (1/1)#0. at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:963)
>> at
>> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:115)
>> at
>> org.apache.flink.streaming.runtime.io.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:126)
>> at
>> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
>> at
>> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:157)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:179)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:97)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at
>> java.base/java.lang.Thread.run(Unknown Source) Caused by:
>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete
>> snapshot 1 for operator Co-Process-Broadcast -> Sink: SinkToOutputKafka
>> (1/1)#0. Failure reason: Checkpoint was declined. at
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
>> at
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
>> at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686)
>> at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607)
>> at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572)
>> at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:947)
>> ... 14 more Caused by: com.esotericsoftware.kryo.KryoException:
>> java.lang.UnsupportedOperationException Serialization trace: rules
>> (com.project.eventing.rule.AndCompositeAbstractRule) rule
>> (com.project.eventing.model.producer.DataPair) ruleMap
>> (com.project.eventing.model.producer.EventConfigState) at
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>> at
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
>> at
>> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111)
>> at
>> org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69)
>> at
>> org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84)
>> at
>> org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40)
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100)
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234)
>> at
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213)
>> ... 24 more Caused by: java.lang.UnsupportedOperationException at
>> java.base/java.util.Collections$UnmodifiableCollection.add(Unknown Source)
>> at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>> at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>> ... 43 more
>> Thanks,
>> Sudhansu
>>
>>
>>
>

Reply via email to