Hi Weihua,
This is the error I am getting :
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for
CoBroadcastWithNonKeyedOperator_8c5504f305beefca0724b3e55af8ea26_(1/1) from
any of the 1 provided restore options. at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174)
... 11 more Caused by:
org.apache.flink.runtime.state.BackendBuildingException: Failed when trying
to restore operator state backend at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
at
org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createOperatorStateBackend(HashMapStateBackend.java:148)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 13 more Caused by: com.esotericsoftware.kryo.KryoException: Unable to
find class: 11 at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)

>From the error it looks like it's falling back to Kryo serializer instead
of POJO serializer.

Thanks,
Tejas


On Thu, May 12, 2022 at 7:33 AM Weihua Hu <huweihua....@gmail.com> wrote:

> Hi, Tejas
>
> These code is works in my idea environment.
> Could you provide more error info or log?
>
>
> Best,
> Weihua
>
> 2022年5月10日 下午1:22,Tejas B <tejasub1...@gmail.com> 写道:
>
> Hi,
> I am trying to get flink schema evolution to work for me using POJO
> serializer. But I found out that if an enum is present in the POJO then the
> POJO serializer is not used. Example of my POJO is as follows :
>
> public class Rule {
> String id;int val;
> RuleType ruleType;//Newly added field//int val2 = 0;
> public Rule() {}
> public Rule(String id, int val, RuleType ruleType) {
>     this.id = id;
>     this.val = val;
>     this.ruleType = ruleType;
>     //this.val2 = val2;
> }
> public String getId() {
>     return id;
> }
> public void setId(String id) {
>     this.id = id;
> }
> public int getVal() {
>     return val;
> }
> public void setVal(int val) {
>     this.val = val;
> }
> public RuleType getRuleType() {
>     return ruleType;
> }
> public void setRuleType(RuleType ruleType) {
>     this.ruleType = ruleType;
> }
> //public int getVal2() {//    return val2;//}
> //public void setVal2(int val2) {//    this.val2 = val2;//}
> @Overridepublic boolean equals(Object o) {
>     if (this == o) return true;
>     if (o == null || getClass() != o.getClass()) return false;
>     Rule rule = (Rule) o;
>     return val == rule.val && id.equals(rule.id) && ruleType == rule.ruleType;
> }
> @Overridepublic int hashCode() {
>     return Objects.hash(id, val, ruleType);
> }
> @Overridepublic String toString() {
>     return "Rule{" +
>             "name='" + id + '\'' +
>             ", val=" + val +
>             ", ruleType=" + ruleType +
>             '}';
> }
>
> }
>
> RuleType is an enum class as follows :
>
> public enum RuleType {
>     X,
>     Y,
>     Z
>
> }
>
> Now for the Rule class the schema evolution (Adding a new field called
> val2), works only if I write a custom typeFactory for this class.
>
> Is there a way that I can write typeFactory for the enum class ? Why does
> the flink not recognize enum in a POJO class ?
>
>
>

Reply via email to