Hi, Tejas

This exception is caused by the Rule adding fields that cannot be recovered 
from the historical Checkpoint. You can try to start job without recover from 
checkpoint/savepoint.

And I double-checked that Rule as you write is recognized as a Pojo type


Best,
Weihua

> 2022年5月13日 上午7:31,Tejas B <tejasub1...@gmail.com> 写道:
> 
> Hi Weihua,
> This is the error I am getting :
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
> state backend for 
> CoBroadcastWithNonKeyedOperator_8c5504f305beefca0724b3e55af8ea26_(1/1) from 
> any of the 1 provided restore options. at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>  at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286)
>  at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174)
>  ... 11 more Caused by: 
> org.apache.flink.runtime.state.BackendBuildingException: Failed when trying 
> to restore operator state backend at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
>  at 
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createOperatorStateBackend(HashMapStateBackend.java:148)
>  at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277)
>  at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>  at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>  ... 13 more Caused by: com.esotericsoftware.kryo.KryoException: Unable to 
> find class: 11 at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>  at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>  at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> 
> From the error it looks like it's falling back to Kryo serializer instead of 
> POJO serializer.
> 
> Thanks,
> Tejas
> 
> 
> On Thu, May 12, 2022 at 7:33 AM Weihua Hu <huweihua....@gmail.com 
> <mailto:huweihua....@gmail.com>> wrote:
> Hi, Tejas
> 
> These code is works in my idea environment.
> Could you provide more error info or log?
> 
> 
> Best,
> Weihua
> 
>> 2022年5月10日 下午1:22,Tejas B <tejasub1...@gmail.com 
>> <mailto:tejasub1...@gmail.com>> 写道:
>> 
>> Hi,
>> I am trying to get flink schema evolution to work for me using POJO 
>> serializer. But I found out that if an enum is present in the POJO then the 
>> POJO serializer is not used. Example of my POJO is as follows :
>> public class Rule {
>> String id;
>> int val;
>> RuleType ruleType;
>> //Newly added field
>> //int val2 = 0;
>> 
>> public Rule() {}
>> 
>> public Rule(String id, int val, RuleType ruleType) {
>>     this.id = id;
>>     this.val = val;
>>     this.ruleType = ruleType;
>>     //this.val2 = val2;
>> }
>> 
>> public String getId() {
>>     return id;
>> }
>> 
>> public void setId(String id) {
>>     this.id = id;
>> }
>> 
>> public int getVal() {
>>     return val;
>> }
>> 
>> public void setVal(int val) {
>>     this.val = val;
>> }
>> 
>> public RuleType getRuleType() {
>>     return ruleType;
>> }
>> 
>> public void setRuleType(RuleType ruleType) {
>>     this.ruleType = ruleType;
>> }
>> 
>> //public int getVal2() {
>> //    return val2;
>> //}
>> 
>> //public void setVal2(int val2) {
>> //    this.val2 = val2;
>> //}
>> 
>> @Override
>> public boolean equals(Object o) {
>>     if (this == o) return true;
>>     if (o == null || getClass() != o.getClass()) return false;
>>     Rule rule = (Rule) o;
>>     return val == rule.val && id.equals(rule.id <http://rule.id/>) && 
>> ruleType == rule.ruleType;
>> }
>> 
>> @Override
>> public int hashCode() {
>>     return Objects.hash(id, val, ruleType);
>> }
>> 
>> @Override
>> public String toString() {
>>     return "Rule{" +
>>             "name='" + id + '\'' +
>>             ", val=" + val +
>>             ", ruleType=" + ruleType +
>>             '}';
>> }
>> }
>> 
>> RuleType is an enum class as follows :
>> 
>> public enum RuleType {
>>     X,
>>     Y,
>>     Z
>> }
>> Now for the Rule class the schema evolution (Adding a new field called 
>> val2), works only if I write a custom typeFactory for this class.
>> 
>> Is there a way that I can write typeFactory for the enum class ? Why does 
>> the flink not recognize enum in a POJO class ?
>> 
> 

Reply via email to