Updated the FieldDefinition class inline to avoid confusion. I am just listing 
a few fields in the class (not all). It is all following suggested POJO 
approach.

From: Fuyao Li <fuyao...@oracle.com>
Date: Thursday, May 12, 2022 at 09:46
To: Weihua Hu <huweihua....@gmail.com>
Cc: User <user@flink.apache.org>
Subject: Re: [External] : Re: How to define TypeInformation for Flink recursive 
resolved POJO
Hi Weihua,

I am following all the standards mentioned here. The code structure is listed 
in the previous email.

@Data
Class Metadata {
   @TypeInfo(StringFieldDefinitionMapTypeInfoFactory.class)
Map<String, FieldDefinition> fields;
    @TypeInfo(StringSetTypeInfoFactory.class)
    private Set<String> validColumns = new HashSet<>();
}

@Data
Class FieldDefinition {
   private Metadata parentMetadata; // causing recusive resolving when type 
info is added
}

public class StringFieldDefinitionMapTypeInfoFactory extends 
TypeInfoFactory<Map<String, FieldDefinition>> {
    @Override
    public TypeInformation<Map<String, FieldDefinition>> createTypeInfo(Type 
type, Map<String, TypeInformation<?>> map) {
        return new MapTypeInfo<String, 
FieldDefinition>(TypeInformation.of(String.class),TypeInformation.of(FieldDefinition.class));
    }
}

public class StringSetTypeInfoFactory extends TypeInfoFactory<Set<String>> {
    @Override
    public TypeInformation<Set<String>> createTypeInfo(Type type, Map<String, 
TypeInformation<?>> map) {
        return TypeInformation.of(new TypeHint<Set<String>>() {});
    }
}



I am using @Data in @Lombok to expose the getters and setters. For List<>, 
Map<>, Set<>, due to the type erase behavior. I have to provide the type 
information through type 
factory<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/*defining-type-information-using-a-factory__;Iw!!ACWV5N9M2RV99hQ!LCjf2FOQwsLmZ9_DZ_ioN-csa1vlsKaVaDrr9MXfvyMxaotX-_wb9cIjGXvwLRrMEE3FHOmWy9wgTHwi$>
 and type hints.

I have two problems:

  1.  the main blocker here is that it has recursive logic mentioned below.
  2.  Set<String> type hint doesn’t work well.

How could I define the TypeInformation for Set, it seems not supported in java 
type.

I provided something like this and then annotated thIS field in the POJO.

public class StringSetTypeInfoFactory extends TypeInfoFactory<Set<String>> {
    @Override
    public TypeInformation<Set<String>> createTypeInfo(Type type, Map<String, 
TypeInformation<?>> map) {
        return TypeInformation.of(new TypeHint<Set<String>>() {});
    }
}

But I still get the following errors.
Generic types have been disabled in the ExecutionConfig and type java.util.Set 
is treated as a generic type.


Thanks,
Fuyao


From: Weihua Hu <huweihua....@gmail.com>
Date: Thursday, May 12, 2022 at 07:24
To: Fuyao Li <fuyao...@oracle.com>
Cc: user <user@flink.apache.org>
Subject: [External] : Re: How to define TypeInformation for Flink recursive 
resolved POJO
Hi, Fuyao

How did you define these classes?  There is some requirements for POJO as flink 
docs[1] said:

  *   The class must be public.
  *   It must have a public constructor without arguments (default constructor).
  *   All fields are either public or must be accessible through getter and 
setter functions. For a field called foo the getter and setter methods must be 
named getFoo() and setFoo().
  *   The type of a field must be supported by a registered serializer.

[1]https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/*pojos__;Iw!!ACWV5N9M2RV99hQ!PdVwjLMJx2-0lTqUPEqHsfgBJE4bUJE2NCqYoBQfpxWMkIIk3mmVJWPhqgeHbj7ramdK5s05pknkWT94hjPBiA$>



Best,
Weihua



2022年5月12日 上午3:03,Fuyao Li <fuyao...@oracle.com<mailto:fuyao...@oracle.com>> 写道:

Hi Community,

I have a POJO that has nested recursively resolved structure. How should I 
define the @TypeInfo annotation correctly to avoid stack overflow exception 
when starting the application.

Basically,
Class Metadata
Map<String, FieldDefinition> fields

Class FieldDefinition
Metadata parentMetadata

The Metadata class got resolved recursively and causing stack overflow. I had 
to design this way since the metadata structure model looks like this.

Is there any ways to fix this issue? Or I must treat this as a Generic type and 
don’t add @TypeInfo annotation so it can fallback to Kryo. If it fall back to 
Kryo and remove the streamEnvironment.getConfig().disableGenericTypes(); 
statement, there won’t be any problem during program startup.

Thanks,
Fuyao

Reply via email to