Hi, Tejas This exception is caused by the Rule adding fields that cannot be recovered from the historical Checkpoint. You can try to start job without recover from checkpoint/savepoint.
And I double-checked that Rule as you write is recognized as a Pojo type Best, Weihua > 2022年5月13日 上午7:31,Tejas B <tejasub1...@gmail.com> 写道: > > Hi Weihua, > This is the error I am getting : > Caused by: org.apache.flink.util.FlinkException: Could not restore operator > state backend for > CoBroadcastWithNonKeyedOperator_8c5504f305beefca0724b3e55af8ea26_(1/1) from > any of the 1 provided restore options. at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174) > ... 11 more Caused by: > org.apache.flink.runtime.state.BackendBuildingException: Failed when trying > to restore operator state backend at > org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) > at > org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createOperatorStateBackend(HashMapStateBackend.java:148) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > ... 13 more Caused by: com.esotericsoftware.kryo.KryoException: Unable to > find class: 11 at > com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > > From the error it looks like it's falling back to Kryo serializer instead of > POJO serializer. > > Thanks, > Tejas > > > On Thu, May 12, 2022 at 7:33 AM Weihua Hu <huweihua....@gmail.com > <mailto:huweihua....@gmail.com>> wrote: > Hi, Tejas > > These code is works in my idea environment. > Could you provide more error info or log? > > > Best, > Weihua > >> 2022年5月10日 下午1:22,Tejas B <tejasub1...@gmail.com >> <mailto:tejasub1...@gmail.com>> 写道: >> >> Hi, >> I am trying to get flink schema evolution to work for me using POJO >> serializer. But I found out that if an enum is present in the POJO then the >> POJO serializer is not used. Example of my POJO is as follows : >> public class Rule { >> String id; >> int val; >> RuleType ruleType; >> //Newly added field >> //int val2 = 0; >> >> public Rule() {} >> >> public Rule(String id, int val, RuleType ruleType) { >> this.id = id; >> this.val = val; >> this.ruleType = ruleType; >> //this.val2 = val2; >> } >> >> public String getId() { >> return id; >> } >> >> public void setId(String id) { >> this.id = id; >> } >> >> public int getVal() { >> return val; >> } >> >> public void setVal(int val) { >> this.val = val; >> } >> >> public RuleType getRuleType() { >> return ruleType; >> } >> >> public void setRuleType(RuleType ruleType) { >> this.ruleType = ruleType; >> } >> >> //public int getVal2() { >> // return val2; >> //} >> >> //public void setVal2(int val2) { >> // this.val2 = val2; >> //} >> >> @Override >> public boolean equals(Object o) { >> if (this == o) return true; >> if (o == null || getClass() != o.getClass()) return false; >> Rule rule = (Rule) o; >> return val == rule.val && id.equals(rule.id <http://rule.id/>) && >> ruleType == rule.ruleType; >> } >> >> @Override >> public int hashCode() { >> return Objects.hash(id, val, ruleType); >> } >> >> @Override >> public String toString() { >> return "Rule{" + >> "name='" + id + '\'' + >> ", val=" + val + >> ", ruleType=" + ruleType + >> '}'; >> } >> } >> >> RuleType is an enum class as follows : >> >> public enum RuleType { >> X, >> Y, >> Z >> } >> Now for the Rule class the schema evolution (Adding a new field called >> val2), works only if I write a custom typeFactory for this class. >> >> Is there a way that I can write typeFactory for the enum class ? Why does >> the flink not recognize enum in a POJO class ? >> >