Hi Weihua, This is the error I am getting : Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for CoBroadcastWithNonKeyedOperator_8c5504f305beefca0724b3e55af8ea26_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174) ... 11 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createOperatorStateBackend(HashMapStateBackend.java:148) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ... 13 more Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 11 at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>From the error it looks like it's falling back to Kryo serializer instead of POJO serializer. Thanks, Tejas On Thu, May 12, 2022 at 7:33 AM Weihua Hu <huweihua....@gmail.com> wrote: > Hi, Tejas > > These code is works in my idea environment. > Could you provide more error info or log? > > > Best, > Weihua > > 2022年5月10日 下午1:22,Tejas B <tejasub1...@gmail.com> 写道: > > Hi, > I am trying to get flink schema evolution to work for me using POJO > serializer. But I found out that if an enum is present in the POJO then the > POJO serializer is not used. Example of my POJO is as follows : > > public class Rule { > String id;int val; > RuleType ruleType;//Newly added field//int val2 = 0; > public Rule() {} > public Rule(String id, int val, RuleType ruleType) { > this.id = id; > this.val = val; > this.ruleType = ruleType; > //this.val2 = val2; > } > public String getId() { > return id; > } > public void setId(String id) { > this.id = id; > } > public int getVal() { > return val; > } > public void setVal(int val) { > this.val = val; > } > public RuleType getRuleType() { > return ruleType; > } > public void setRuleType(RuleType ruleType) { > this.ruleType = ruleType; > } > //public int getVal2() {// return val2;//} > //public void setVal2(int val2) {// this.val2 = val2;//} > @Overridepublic boolean equals(Object o) { > if (this == o) return true; > if (o == null || getClass() != o.getClass()) return false; > Rule rule = (Rule) o; > return val == rule.val && id.equals(rule.id) && ruleType == rule.ruleType; > } > @Overridepublic int hashCode() { > return Objects.hash(id, val, ruleType); > } > @Overridepublic String toString() { > return "Rule{" + > "name='" + id + '\'' + > ", val=" + val + > ", ruleType=" + ruleType + > '}'; > } > > } > > RuleType is an enum class as follows : > > public enum RuleType { > X, > Y, > Z > > } > > Now for the Rule class the schema evolution (Adding a new field called > val2), works only if I write a custom typeFactory for this class. > > Is there a way that I can write typeFactory for the enum class ? Why does > the flink not recognize enum in a POJO class ? > > >